From b6399c0a08a424d0296ece4e0cd703940416b2d4 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 23 Nov 2022 15:35:24 +0100 Subject: [PATCH] Refactor agent: split code in subfunctions (#1441) logs of a killed pipeline are stored, with this pull --- agent/logger.go | 132 +++++++++++++++++++++++++ agent/runner.go | 227 ++++++------------------------------------- agent/tracer.go | 100 +++++++++++++++++++ cli/exec/exec.go | 2 +- pipeline/pipeline.go | 10 +- server/grpc/rpc.go | 36 +++---- 6 files changed, 286 insertions(+), 221 deletions(-) create mode 100644 agent/logger.go create mode 100644 agent/tracer.go diff --git a/agent/logger.go b/agent/logger.go new file mode 100644 index 000000000..7ed688370 --- /dev/null +++ b/agent/logger.go @@ -0,0 +1,132 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "context" + "encoding/json" + "io" + "sync" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/pipeline" + backend "github.com/woodpecker-ci/woodpecker/pipeline/backend/types" + "github.com/woodpecker-ci/woodpecker/pipeline/multipart" + "github.com/woodpecker-ci/woodpecker/pipeline/rpc" +) + +func (r *Runner) createLogger(ctxmeta context.Context, logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc { + return func(step *backend.Step, rc multipart.Reader) error { + loglogger := logger.With(). + Str("image", step.Image). + Str("stage", step.Alias). + Logger() + + part, rerr := rc.NextPart() + if rerr != nil { + return rerr + } + uploads.Add(1) + + var secrets []string + for _, secret := range work.Config.Secrets { + if secret.Mask { + secrets = append(secrets, secret.Value) + } + } + + loglogger.Debug().Msg("log stream opened") + + limitedPart := io.LimitReader(part, maxLogsUpload) + logStream := rpc.NewLineWriter(r.client, work.ID, step.Alias, secrets...) + if _, err := io.Copy(logStream, limitedPart); err != nil { + log.Error().Err(err).Msg("copy limited logStream part") + } + + loglogger.Debug().Msg("log stream copied") + + data, err := json.Marshal(logStream.Lines()) + if err != nil { + loglogger.Err(err).Msg("could not marshal logstream") + } + + file := &rpc.File{ + Mime: "application/json+logs", + Step: step.Alias, + Name: "logs.json", + Data: data, + Size: len(data), + Time: time.Now().Unix(), + } + + loglogger.Debug().Msg("log stream uploading") + if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { + loglogger.Error().Err(serr).Msg("log stream upload error") + } else { + loglogger.Debug().Msg("log stream upload complete") + } + + defer func() { + loglogger.Debug().Msg("log stream closed") + uploads.Done() + }() + + part, rerr = rc.NextPart() + if rerr != nil { + return nil + } + // TODO should be configurable + limitedPart = io.LimitReader(part, maxFileUpload) + data, err = io.ReadAll(limitedPart) + if err != nil { + loglogger.Err(err).Msg("could not read limited part") + } + + file = &rpc.File{ + Mime: part.Header().Get("Content-Type"), + Step: step.Alias, + Name: part.FileName(), + Data: data, + Size: len(data), + Time: time.Now().Unix(), + Meta: make(map[string]string), + } + for key, value := range part.Header() { + file.Meta[key] = value[0] + } + + loglogger.Debug(). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream uploading") + + if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { + loglogger.Error(). + Err(serr). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream upload error") + } + + loglogger.Debug(). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream upload complete") + return nil + } +} diff --git a/agent/runner.go b/agent/runner.go index a4d7081bc..4ef759146 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -17,10 +17,7 @@ package agent import ( "context" - "encoding/json" - "io" - "runtime" - "strconv" + "errors" "sync" "time" @@ -30,7 +27,6 @@ import ( "github.com/woodpecker-ci/woodpecker/pipeline" backend "github.com/woodpecker-ci/woodpecker/pipeline/backend/types" - "github.com/woodpecker-ci/woodpecker/pipeline/multipart" "github.com/woodpecker-ci/woodpecker/pipeline/rpc" "github.com/woodpecker-ci/woodpecker/shared/utils" ) @@ -61,14 +57,14 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen } } -func (r *Runner) Run(ctx context.Context) error { +func (r *Runner) Run(runnerCtx context.Context) error { log.Debug().Msg("request next execution") - meta, _ := metadata.FromOutgoingContext(ctx) + meta, _ := metadata.FromOutgoingContext(runnerCtx) ctxmeta := metadata.NewOutgoingContext(context.Background(), meta) // get the next workflow from the queue - work, err := r.client.Next(ctx, r.filter) + work, err := r.client.Next(runnerCtx, r.filter) if err != nil { return err } @@ -100,13 +96,13 @@ func (r *Runner) Run(ctx context.Context) error { logger.Debug().Msg("received execution") - ctx, cancel := context.WithTimeout(ctxmeta, timeout) + workflowCtx, cancel := context.WithTimeout(ctxmeta, timeout) defer cancel() // Add sigterm support for internal context. // Required when the pipeline is terminated by external signals // like kubernetes. - ctx = utils.WithContextSigtermCallback(ctx, func() { + workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() { logger.Error().Msg("Received sigterm termination signal") }) @@ -114,7 +110,7 @@ func (r *Runner) Run(ctx context.Context) error { go func() { logger.Debug().Msg("listen for cancel signal") - if werr := r.client.Wait(ctx, work.ID); werr != nil { + if werr := r.client.Wait(workflowCtx, work.ID); werr != nil { canceled.SetTo(true) logger.Warn().Err(werr).Msg("cancel signal received") @@ -127,14 +123,14 @@ func (r *Runner) Run(ctx context.Context) error { go func() { for { select { - case <-ctx.Done(): + case <-workflowCtx.Done(): logger.Debug().Msg("pipeline done") return case <-time.After(time.Minute): logger.Debug().Msg("pipeline lease renewed") - if err := r.client.Extend(ctx, work.ID); err != nil { + if err := r.client.Extend(workflowCtx, work.ID); err != nil { log.Error().Err(err).Msg("extending pipeline deadline failed") } } @@ -150,212 +146,46 @@ func (r *Runner) Run(ctx context.Context) error { } var uploads sync.WaitGroup - defaultLogger := pipeline.LogFunc(func(step *backend.Step, rc multipart.Reader) error { - loglogger := logger.With(). - Str("image", step.Image). - Str("stage", step.Alias). - Logger() - - part, rerr := rc.NextPart() - if rerr != nil { - return rerr - } - uploads.Add(1) - - var secrets []string - for _, secret := range work.Config.Secrets { - if secret.Mask { - secrets = append(secrets, secret.Value) - } - } - - loglogger.Debug().Msg("log stream opened") - - limitedPart := io.LimitReader(part, maxLogsUpload) - logStream := rpc.NewLineWriter(r.client, work.ID, step.Alias, secrets...) - if _, err := io.Copy(logStream, limitedPart); err != nil { - log.Error().Err(err).Msg("copy limited logStream part") - } - - loglogger.Debug().Msg("log stream copied") - - data, err := json.Marshal(logStream.Lines()) - if err != nil { - loglogger.Err(err).Msg("could not marshal logstream") - } - - file := &rpc.File{ - Mime: "application/json+logs", - Step: step.Alias, - Name: "logs.json", - Data: data, - Size: len(data), - Time: time.Now().Unix(), - } - - loglogger.Debug().Msg("log stream uploading") - if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { - loglogger.Error().Err(serr).Msg("log stream upload error") - } else { - loglogger.Debug().Msg("log stream upload complete") - } - - defer func() { - loglogger.Debug().Msg("log stream closed") - uploads.Done() - }() - - part, rerr = rc.NextPart() - if rerr != nil { - return nil - } - // TODO should be configurable - limitedPart = io.LimitReader(part, maxFileUpload) - data, err = io.ReadAll(limitedPart) - if err != nil { - loglogger.Err(err).Msg("could not read limited part") - } - - file = &rpc.File{ - Mime: part.Header().Get("Content-Type"), - Step: step.Alias, - Name: part.FileName(), - Data: data, - Size: len(data), - Time: time.Now().Unix(), - Meta: make(map[string]string), - } - for key, value := range part.Header() { - file.Meta[key] = value[0] - } - - loglogger.Debug(). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream uploading") - - if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { - loglogger.Error(). - Err(serr). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream upload error") - } - - loglogger.Debug(). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream upload complete") - return nil - }) - - defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { - steplogger := logger.With(). - Str("image", state.Pipeline.Step.Image). - Str("stage", state.Pipeline.Step.Alias). - Err(state.Process.Error). - Int("exit_code", state.Process.ExitCode). - Bool("exited", state.Process.Exited). - Logger() - - stepState := rpc.State{ - Step: state.Pipeline.Step.Alias, - Exited: state.Process.Exited, - ExitCode: state.Process.ExitCode, - Started: time.Now().Unix(), // TODO do not do this - Finished: time.Now().Unix(), - } - if state.Process.Error != nil { - stepState.Error = state.Process.Error.Error() - } - - defer func() { - steplogger.Debug().Msg("update step status") - - if uerr := r.client.Update(ctxmeta, work.ID, stepState); uerr != nil { - steplogger.Debug(). - Err(uerr). - Msg("update step status error") - } - - steplogger.Debug().Msg("update step status complete") - }() - if state.Process.Exited { - return nil - } - if state.Pipeline.Step.Environment == nil { - state.Pipeline.Step.Environment = map[string]string{} - } - - // TODO: find better way to update this state and move it to pipeline to have the same env in cli-exec - state.Pipeline.Step.Environment["CI_MACHINE"] = r.hostname - - state.Pipeline.Step.Environment["CI_PIPELINE_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_PIPELINE_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - state.Pipeline.Step.Environment["CI_STEP_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_STEP_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - state.Pipeline.Step.Environment["CI_SYSTEM_ARCH"] = runtime.GOOS + "/" + runtime.GOARCH - - // DEPRECATED - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - if state.Pipeline.Error != nil { - state.Pipeline.Step.Environment["CI_PIPELINE_STATUS"] = "failure" - state.Pipeline.Step.Environment["CI_STEP_STATUS"] = "failure" - - // DEPRECATED - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" - } - - return nil - }) - err = pipeline.New(work.Config, - pipeline.WithContext(ctx), - pipeline.WithLogger(defaultLogger), - pipeline.WithTracer(defaultTracer), + pipeline.WithContext(workflowCtx), + pipeline.WithLogger(r.createLogger(ctxmeta, logger, &uploads, work)), + pipeline.WithTracer(r.createTracer(ctxmeta, logger, work)), pipeline.WithEngine(*r.engine), pipeline.WithDescription(map[string]string{ "ID": work.ID, "Repo": repoName, "Pipeline": pipelineNumber, }), - ).Run() + ).Run(runnerCtx) state.Finished = time.Now().Unix() state.Exited = true - if err != nil { - switch xerr := err.(type) { - case *pipeline.ExitError: - state.ExitCode = xerr.Code - default: + + if canceled.IsSet() { + state.Error = "" + state.ExitCode = 137 + } else if err != nil { + pExitError := &pipeline.ExitError{} + if errors.As(err, &pExitError) { + state.ExitCode = pExitError.Code + } else if errors.Is(err, pipeline.ErrCancel) { + state.Error = "" + state.ExitCode = 137 + canceled.SetTo(true) + } else { state.ExitCode = 1 state.Error = err.Error() } - if canceled.IsSet() { - state.ExitCode = 137 - } } logger.Debug(). Str("error", state.Error). Int("exit_code", state.ExitCode). + Bool("canceled", canceled.IsSet()). Msg("pipeline complete") logger.Debug().Msg("uploading logs") - uploads.Wait() - logger.Debug().Msg("uploading logs complete") logger.Debug(). @@ -363,8 +193,7 @@ func (r *Runner) Run(ctx context.Context) error { Int("exit_code", state.ExitCode). Msg("updating pipeline status") - err = r.client.Done(ctxmeta, work.ID, state) - if err != nil { + if err := r.client.Done(ctxmeta, work.ID, state); err != nil { logger.Error().Err(err).Msg("updating pipeline status failed") } else { logger.Debug().Msg("updating pipeline status complete") diff --git a/agent/tracer.go b/agent/tracer.go new file mode 100644 index 000000000..91fcdbc8d --- /dev/null +++ b/agent/tracer.go @@ -0,0 +1,100 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "context" + "runtime" + "strconv" + "time" + + "github.com/rs/zerolog" + + "github.com/woodpecker-ci/woodpecker/pipeline" + "github.com/woodpecker-ci/woodpecker/pipeline/rpc" +) + +func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, work *rpc.Pipeline) pipeline.TraceFunc { + return func(state *pipeline.State) error { + steplogger := logger.With(). + Str("image", state.Pipeline.Step.Image). + Str("stage", state.Pipeline.Step.Alias). + Err(state.Process.Error). + Int("exit_code", state.Process.ExitCode). + Bool("exited", state.Process.Exited). + Logger() + + stepState := rpc.State{ + Step: state.Pipeline.Step.Alias, + Exited: state.Process.Exited, + ExitCode: state.Process.ExitCode, + Started: time.Now().Unix(), // TODO do not do this + Finished: time.Now().Unix(), + } + if state.Process.Error != nil { + stepState.Error = state.Process.Error.Error() + } + + defer func() { + steplogger.Debug().Msg("update step status") + + if uerr := r.client.Update(ctxmeta, work.ID, stepState); uerr != nil { + steplogger.Debug(). + Err(uerr). + Msg("update step status error") + } + + steplogger.Debug().Msg("update step status complete") + }() + if state.Process.Exited { + return nil + } + if state.Pipeline.Step.Environment == nil { + state.Pipeline.Step.Environment = map[string]string{} + } + + // TODO: find better way to update this state and move it to pipeline to have the same env in cli-exec + state.Pipeline.Step.Environment["CI_MACHINE"] = r.hostname + + state.Pipeline.Step.Environment["CI_PIPELINE_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_PIPELINE_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_STEP_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_STEP_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_SYSTEM_ARCH"] = runtime.GOOS + "/" + runtime.GOARCH + + // DEPRECATED + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + if state.Pipeline.Error != nil { + state.Pipeline.Step.Environment["CI_PIPELINE_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_STEP_STATUS"] = "failure" + + // DEPRECATED + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" + } + + return nil + } +} diff --git a/cli/exec/exec.go b/cli/exec/exec.go index 66273d3bc..9c7bd4852 100644 --- a/cli/exec/exec.go +++ b/cli/exec/exec.go @@ -228,7 +228,7 @@ func execWithAxis(c *cli.Context, file, repoPath string, axis matrix.Axis) error pipeline.WithDescription(map[string]string{ "CLI": "exec", }), - ).Run() + ).Run(c.Context) } // return the metadata from the cli context. diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 1ae1a2eea..f1584f5e3 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "errors" "strings" "sync" "time" @@ -69,7 +70,7 @@ func (r *Runtime) MakeLogger() zerolog.Logger { } // Starts the execution of the pipeline and waits for it to complete -func (r *Runtime) Run() error { +func (r *Runtime) Run(runnerCtx context.Context) error { logger := r.MakeLogger() logger.Debug().Msgf("Executing %d stages, in order of:", len(r.spec.Stages)) for _, stage := range r.spec.Stages { @@ -85,7 +86,7 @@ func (r *Runtime) Run() error { } defer func() { - if err := r.engine.Destroy(r.ctx, r.spec); err != nil { + if err := r.engine.Destroy(runnerCtx, r.spec); err != nil { logger.Error().Err(err).Msg("could not destroy engine") } }() @@ -190,7 +191,7 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error { // if we got a nil process but an error state // then we need to log the internal error to the step. - if r.logger != nil && err != nil && processState == nil { + if r.logger != nil && err != nil && !errors.Is(err, ErrCancel) && processState == nil { _ = r.logger.Log(step, multipart.New(strings.NewReader( "Backend engine error while running step: "+err.Error(), ))) @@ -247,6 +248,9 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { wg.Wait() waitState, err := r.engine.Wait(r.ctx, step) if err != nil { + if errors.Is(err, context.Canceled) { + return waitState, ErrCancel + } return nil, err } diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 7cc74919e..22befa5ae 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -296,20 +296,20 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { // Done implements the rpc.Done function func (s *RPC) Done(c context.Context, id string, state rpc.State) error { - stepID, err := strconv.ParseInt(id, 10, 64) + workflowID, err := strconv.ParseInt(id, 10, 64) if err != nil { return err } - step, err := s.store.StepLoad(stepID) + workflow, err := s.store.StepLoad(workflowID) if err != nil { - log.Error().Msgf("error: cannot find step with id %d: %s", stepID, err) + log.Error().Msgf("error: cannot find step with id %d: %s", workflowID, err) return err } - currentPipeline, err := s.store.GetPipeline(step.PipelineID) + currentPipeline, err := s.store.GetPipeline(workflow.PipelineID) if err != nil { - log.Error().Msgf("error: cannot find pipeline with id %d: %s", step.PipelineID, err) + log.Error().Msgf("error: cannot find pipeline with id %d: %s", workflow.PipelineID, err) return err } @@ -325,36 +325,36 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { Str("step_id", id). Msgf("gRPC Done with state: %#v", state) - if step, err = pipeline.UpdateStepStatusToDone(s.store, *step, state); err != nil { - log.Error().Msgf("error: done: cannot update step_id %d state: %s", step.ID, err) + if workflow, err = pipeline.UpdateStepStatusToDone(s.store, *workflow, state); err != nil { + log.Error().Msgf("error: done: cannot update step_id %d state: %s", workflow.ID, err) } var queueErr error - if step.Failing() { + if workflow.Failing() { queueErr = s.queue.Error(c, id, fmt.Errorf("Step finished with exitcode %d, %s", state.ExitCode, state.Error)) } else { - queueErr = s.queue.Done(c, id, step.State) + queueErr = s.queue.Done(c, id, workflow.State) } if queueErr != nil { - log.Error().Msgf("error: done: cannot ack step_id %d: %s", stepID, err) + log.Error().Msgf("error: done: cannot ack step_id %d: %s", workflowID, err) } steps, err := s.store.StepList(currentPipeline) if err != nil { return err } - s.completeChildrenIfParentCompleted(steps, step) + s.completeChildrenIfParentCompleted(steps, workflow) if !model.IsThereRunningStage(steps) { - if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, model.PipelineStatus(steps), step.Stopped); err != nil { + if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, model.PipelineStatus(steps), workflow.Stopped); err != nil { log.Error().Err(err).Msgf("error: done: cannot update build_id %d final state", currentPipeline.ID) } } - s.updateForgeStatus(c, repo, currentPipeline, step) + s.updateForgeStatus(c, repo, currentPipeline, workflow) if err := s.logger.Close(c, id); err != nil { - log.Error().Err(err).Msgf("done: cannot close build_id %d logger", step.ID) + log.Error().Err(err).Msgf("done: cannot close build_id %d logger", workflow.ID) } if err := s.notify(c, repo, currentPipeline, steps); err != nil { @@ -366,7 +366,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(currentPipeline.Status), "total").Set(float64(currentPipeline.Finished - currentPipeline.Started)) } if model.IsMultiPipeline(steps) { - s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(step.State), step.Name).Set(float64(step.Stopped - step.Started)) + s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(workflow.State), workflow.Name).Set(float64(workflow.Stopped - workflow.Started)) } return nil @@ -382,10 +382,10 @@ func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { return nil } -func (s *RPC) completeChildrenIfParentCompleted(steps []*model.Step, completedStep *model.Step) { +func (s *RPC) completeChildrenIfParentCompleted(steps []*model.Step, completedWorkflow *model.Step) { for _, p := range steps { - if p.Running() && p.PPID == completedStep.PID { - if _, err := pipeline.UpdateStepToStatusSkipped(s.store, *p, completedStep.Stopped); err != nil { + if p.Running() && p.PPID == completedWorkflow.PID { + if _, err := pipeline.UpdateStepToStatusSkipped(s.store, *p, completedWorkflow.Stopped); err != nil { log.Error().Msgf("error: done: cannot update step_id %d child state: %s", p.ID, err) } }