1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2024-12-12 08:23:48 +02:00
woodpecker/pipeline/pipeline.go

205 lines
4.2 KiB
Go
Raw Normal View History

2017-03-05 09:56:08 +02:00
package pipeline
import (
"context"
"strings"
2017-03-05 09:56:08 +02:00
"time"
"github.com/rs/zerolog/log"
2017-03-05 09:56:08 +02:00
"golang.org/x/sync/errgroup"
backend "github.com/woodpecker-ci/woodpecker/pipeline/backend/types"
"github.com/woodpecker-ci/woodpecker/pipeline/multipart"
2017-03-05 09:56:08 +02:00
)
type (
// State defines the pipeline and process state.
State struct {
// Global state of the pipeline.
Pipeline struct {
// Pipeline time started
Time int64 `json:"time"`
// Current pipeline step
Step *backend.Step `json:"step"`
// Current pipeline error state
Error error `json:"error"`
}
// Current process state.
Process *backend.State
}
)
// Runtime is a configuration runtime.
type Runtime struct {
err error
spec *backend.Config
engine backend.Engine
started int64
ctx context.Context
tracer Tracer
logger Logger
}
// New returns a new runtime using the specified runtime
// configuration and runtime engine.
func New(spec *backend.Config, opts ...Option) *Runtime {
r := new(Runtime)
r.spec = spec
r.ctx = context.Background()
for _, opts := range opts {
opts(r)
}
return r
}
// Starts the execution of the pipeline and waits for it to complete
2017-03-05 09:56:08 +02:00
func (r *Runtime) Run() error {
defer func() {
if err := r.engine.Destroy(r.ctx, r.spec); err != nil {
log.Error().Err(err).Msg("could not destroy pipeline")
}
2017-03-05 09:56:08 +02:00
}()
r.started = time.Now().Unix()
2018-04-01 20:34:01 +02:00
if err := r.engine.Setup(r.ctx, r.spec); err != nil {
2017-03-05 09:56:08 +02:00
return err
}
for _, stage := range r.spec.Stages {
select {
case <-r.ctx.Done():
return ErrCancel
case err := <-r.execAll(stage.Steps):
if err != nil {
r.err = err
}
}
}
return r.err
}
// Updates the current status of a step
func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error {
if r.tracer == nil {
// no tracer nothing to trace :)
return nil
}
if processState == nil {
processState = new(backend.State)
if err != nil {
processState.Error = err
processState.Exited = true
processState.OOMKilled = false
processState.ExitCode = 126 // command invoked cannot be executed.
}
}
state := new(State)
state.Pipeline.Time = r.started
state.Pipeline.Step = step
state.Process = processState // empty
state.Pipeline.Error = r.err
return r.tracer.Trace(state)
}
2017-03-05 09:56:08 +02:00
// Executes a set of parallel steps
func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
2017-03-05 09:56:08 +02:00
var g errgroup.Group
done := make(chan error)
for _, step := range steps {
// required since otherwise the loop variable
// will be captured by the function. This will
// recreate the step "variable"
step := step
2017-03-05 09:56:08 +02:00
g.Go(func() error {
// Case the pipeline was already complete.
switch {
case r.err != nil && !step.OnFailure:
return nil
case r.err == nil && !step.OnSuccess:
return nil
}
// Trace started.
err := r.traceStep(nil, nil, step)
if err != nil {
return err
}
processState, err := r.exec(step)
// Return the error after tracing it.
traceErr := r.traceStep(processState, err, step)
if traceErr != nil {
return traceErr
}
return err
2017-03-05 09:56:08 +02:00
})
}
go func() {
done <- g.Wait()
close(done)
}()
return done
}
// Executes the step and returns the state and error.
func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
// TODO: using DRONE_ will be deprecated with 0.15.0. remove fallback with following release
for key, value := range step.Environment {
if strings.HasPrefix(key, "CI_") {
step.Environment[strings.Replace(key, "CI_", "DRONE_", 1)] = value
}
}
if err := r.engine.Exec(r.ctx, step); err != nil {
return nil, err
2017-03-05 09:56:08 +02:00
}
if r.logger != nil {
rc, err := r.engine.Tail(r.ctx, step)
2017-03-05 09:56:08 +02:00
if err != nil {
return nil, err
2017-03-05 09:56:08 +02:00
}
go func() {
if err := r.logger.Log(step, multipart.New(rc)); err != nil {
log.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()
2017-03-05 09:56:08 +02:00
}()
}
// nothing else to do, this is a detached process.
if step.Detached {
return nil, nil
2017-03-05 09:56:08 +02:00
}
waitState, err := r.engine.Wait(r.ctx, step)
2017-03-05 09:56:08 +02:00
if err != nil {
return nil, err
2017-03-05 09:56:08 +02:00
}
if waitState.OOMKilled {
return waitState, &OomError{
Name: step.Name,
Code: waitState.ExitCode,
2017-03-05 09:56:08 +02:00
}
} else if waitState.ExitCode != 0 {
return waitState, &ExitError{
Name: step.Name,
Code: waitState.ExitCode,
2017-03-05 09:56:08 +02:00
}
}
return waitState, nil
2017-03-05 09:56:08 +02:00
}