1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2026-06-03 16:35:37 +02:00

Fix approved gated pipeline scheduling (#6627)

Closes #6619

Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
Akash Kumar
2026-05-26 18:13:05 +05:30
committed by GitHub
parent 7af1eef7e7
commit 32a7bf9748
6 changed files with 240 additions and 87 deletions
+86
View File
@@ -0,0 +1,86 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build test
package scenarios
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.woodpecker-ci.org/woodpecker/v3/e2e/setup"
forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pipeline"
)
// TestGatedPipeline verifies the approval gate on a repo that requires
// approval for every event. The pipeline is created in StatusBlocked, and
// once pipeline.Approve releases it the pipeline runs to completion on an
// agent like any normal pipeline, steps included.
func TestGatedPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: simpleSuccessYAML},
})
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)
// Require approval for every event, gating every pipeline regardless of author.
env.Fixtures.Repo.RequireApproval = model.RequireApprovalAllEvents
require.NoError(t, env.Store.UpdateRepo(env.Fixtures.Repo), "enable repo approval")
// Pipeline must come back blocked, not running.
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Event: model.EventPush,
Branch: "main",
Commit: "deadbeef",
Ref: "refs/heads/main",
Author: env.Fixtures.Owner.Login,
Sender: env.Fixtures.Owner.Login,
})
require.NoError(t, err, "create gated pipeline")
require.NotNil(t, created)
require.Equal(t, model.StatusBlocked, created.Status, "untrusted author pipeline must be blocked")
// Approve as the repo owner, releasing the gate.
approved, err := pipeline.Approve(t.Context(), env.Store, created, env.Fixtures.Owner, env.Fixtures.Repo)
require.NoError(t, err, "approve gated pipeline")
require.NotNil(t, approved)
assert.Equal(t, env.Fixtures.Owner.Login, approved.Reviewer, "reviewer should be the approver")
assert.NotZero(t, approved.Reviewed, "reviewed timestamp should be set")
// Wait for the agent to actually pick it up and run it to a terminal state.
finished := setup.WaitForPipeline(t, env.Store, approved.ID)
assert.Equal(t, model.StatusSuccess, finished.Status, "approved gated pipeline should succeed")
// Workflow outcome: one workflow, succeeded, assigned to an agent.
workflows, err := env.Store.WorkflowGetTree(finished)
require.NoError(t, err, "get workflow tree")
require.Len(t, workflows, 1, "approved pipeline should produce exactly one workflow")
assert.Equal(t, model.StatusSuccess, workflows[0].State, "workflow should succeed")
assert.Greater(t, workflows[0].AgentID, int64(0), "workflow should record the agent that ran it")
// Step outcome: every step from simpleSuccessYAML ran and exited cleanly.
steps, err := env.Store.StepList(finished.ID)
require.NoError(t, err, "list steps")
require.ElementsMatch(t, []string{"clone", "step-one", "step-two"}, modelStepsToName(steps),
"approved pipeline should run exactly the steps from the YAML")
for _, step := range steps {
assert.Equalf(t, model.StatusSuccess, step.State, "step %q status", step.Name)
assert.Equalf(t, 0, step.ExitCode, "step %q exit code", step.Name)
}
}
+17 -36
View File
@@ -52,48 +52,29 @@ func Approve(ctx context.Context, store store.Store, currentPipeline *model.Pipe
yamls = append(yamls, &forge_types.FileMeta{Data: y.Data, Name: y.Name})
}
if currentPipeline.Workflows, err = store.WorkflowGetTree(currentPipeline); err != nil {
return nil, fmt.Errorf("error: loading workflows. %w", err)
// Release the gate before building workflows: saveWorkflowsFromPipelineBuilder
// derives workflow and step state from the pipeline status, so the status
// must already be pending when the new workflows are persisted.
currentPipeline.Status = model.StatusPending
currentPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, forge, store, currentPipeline, user, repo, yamls, nil, true)
if handleParseErrors(currentPipeline, parseErr) {
if err := updatePipelineWithErr(ctx, forge, store, currentPipeline, repo, user, parseErr); err != nil {
log.Error().Err(err).Msgf("error setting error status of pipeline for %s#%d after approval", repo.FullName, currentPipeline.Number)
}
msg := fmt.Sprintf("failure to parse pipeline config for %s", repo.FullName)
log.Error().Err(parseErr).Msg(msg)
return nil, errors.New(msg)
}
if err != nil {
log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting new steps for %s#%d after approval", repo.FullName, currentPipeline.Number)
return nil, err
}
if currentPipeline, err = UpdateToStatusPending(store, *currentPipeline, user.Login); err != nil {
return nil, fmt.Errorf("error updating pipeline. %w", err)
}
for _, wf := range currentPipeline.Workflows {
if wf.State != model.StatusBlocked {
continue
}
wf.State = model.StatusPending
if err := store.WorkflowUpdate(wf); err != nil {
return nil, fmt.Errorf("error updating workflow. %w", err)
}
for _, step := range wf.Children {
if step.State != model.StatusBlocked {
continue
}
step.State = model.StatusPending
if err := store.StepUpdate(step); err != nil {
return nil, fmt.Errorf("error updating step. %w", err)
}
}
}
currentPipeline, pipelineItems, err := createPipelineItems(ctx, forge, store, currentPipeline, user, repo, yamls, nil)
if err != nil {
msg := fmt.Sprintf("failure to createPipelineItems for %s", repo.FullName)
log.Error().Err(err).Msg(msg)
return nil, errors.New(msg)
}
// we have no way to link old workflows and steps in database to new engine generated steps,
// so we just delete the old and insert the new ones
if err := store.WorkflowsReplace(currentPipeline, currentPipeline.Workflows); err != nil {
log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting new steps for %s#%d after approval", repo.FullName, currentPipeline.Number)
return nil, err
}
publishPipeline(ctx, forge, currentPipeline, repo, user)
currentPipeline, err = start(ctx, forge, store, currentPipeline, user, repo, pipelineItems)
+6 -11
View File
@@ -21,7 +21,6 @@ import (
"github.com/rs/zerolog/log"
pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/yaml/constraint"
"go.woodpecker-ci.org/woodpecker/v3/server"
@@ -94,12 +93,14 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline
return nil, updatePipelineWithErr(ctx, _forge, _store, pipeline, repo, repoUser, fmt.Errorf("could not load config from forge: %w", configFetchErr))
}
pipelineItems, parseErr := parsePipeline(ctx, _forge, _store, pipeline, repoUser, repo, forgeYamlConfigs, nil)
if pipeline_errors.HasBlockingErrors(parseErr) {
currentPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, _forge, _store, pipeline, repoUser, repo, forgeYamlConfigs, nil, false)
*pipeline = *currentPipeline
if handleParseErrors(pipeline, parseErr) {
log.Debug().Str("repo", repo.FullName).Err(parseErr).Msg("failed to parse yaml")
return pipeline, updatePipelineWithErr(ctx, _forge, _store, pipeline, repo, repoUser, parseErr)
} else if parseErr != nil {
pipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr)
}
if err != nil {
return nil, fmt.Errorf("createPipelineItems failed: %w", err)
}
if len(pipelineItems) == 0 {
@@ -111,12 +112,6 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline
return nil, ErrFiltered
}
enrichPipelineItemSteps(pipelineItems, repo)
pipeline, err = saveWorkflowsFromPipelineBuilder(_store, pipeline, pipelineItems)
if err != nil {
return nil, fmt.Errorf("saveWorkflowsFromPipelineBuilder failed: %w", err)
}
// persist the pipeline config for historical correctness, restarts, etc
var configs []*model.Config
for _, forgeYamlConfig := range forgeYamlConfigs {
+76 -38
View File
@@ -155,31 +155,52 @@ func parsePipeline(ctx context.Context, forge forge.Forge, store store.Store, cu
return b.Build()
}
func createPipelineItems(c context.Context, forge forge.Forge, store store.Store,
currentPipeline *model.Pipeline, user *model.User, repo *model.Repo,
yamls []*forge_types.FileMeta, envs map[string]string,
) (*model.Pipeline, []*builder.Item, error) {
pipelineItems, err := parsePipeline(c, forge, store, currentPipeline, user, repo, yamls, envs)
if pipeline_errors.HasBlockingErrors(err) {
currentPipeline, uErr := UpdateToStatusError(store, *currentPipeline, err)
if uErr != nil {
log.Error().Err(uErr).Msgf("error setting error status of pipeline for %s#%d", repo.FullName, currentPipeline.Number)
} else {
updatePipelineStatus(c, forge, currentPipeline, repo, user)
}
// handleParseErrors classifies the error returned by parsePipeline. Blocking
// errors abort the run, so true is returned and the caller decides how to
// report and persist the failure. Non-blocking errors are recorded on the
// pipeline so they surface to the user without stopping the run.
func handleParseErrors(pipeline *model.Pipeline, parseErr error) (blocking bool) {
if pipeline_errors.HasBlockingErrors(parseErr) {
return true
}
if parseErr != nil {
pipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr)
}
return false
}
return currentPipeline, nil, err
} else if err != nil {
currentPipeline.Errors = pipeline_errors.GetPipelineErrors(err)
if err := updatePipelinePending(c, forge, store, currentPipeline, repo, user); err != nil {
return nil, nil, err
}
// The createPipelineItems parses the pipeline config and persists the resulting
// workflows. It is the shared core of Create, Approve, and Restart.
//
// Returns two errors: parseErr carries pipeline config diagnostics, which
// callers classify with handleParseErrors and report as a blocking failure in
// their own way. The second error, err, signals a hard failure (e.g. persisting
// workflows) that always aborts the run. When the pipeline already has
// persisted workflows (a gated pipeline being approved), setting replaceExisting
// swaps them out for the freshly built ones.
func createPipelineItems(ctx context.Context, forge forge.Forge, store store.Store,
currentPipeline *model.Pipeline, user *model.User, repo *model.Repo,
yamls []*forge_types.FileMeta, envs map[string]string, replaceExisting bool,
) (pipeline *model.Pipeline, items []*builder.Item, parseErr, err error) {
pipelineItems, parseErr := parsePipeline(ctx, forge, store, currentPipeline, user, repo, yamls, envs)
if pipeline_errors.HasBlockingErrors(parseErr) {
return currentPipeline, nil, parseErr, nil
}
// An empty pipeline (e.g. everything filtered out) has no workflows to
// persist. Return early so the caller can filter it without us touching
// the store.
if len(pipelineItems) == 0 {
return currentPipeline, pipelineItems, parseErr, nil
}
enrichPipelineItemSteps(pipelineItems, repo)
currentPipeline, err = saveWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems)
currentPipeline, err = saveWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems, replaceExisting)
if err != nil {
return currentPipeline, nil, parseErr, err
}
return currentPipeline, pipelineItems, err
return currentPipeline, pipelineItems, parseErr, nil
}
// enrichPipelineItemSteps stamps server-side fields onto the backend step
@@ -199,10 +220,35 @@ func enrichPipelineItemSteps(items []*builder.Item, repo *model.Repo) {
}
}
// saveWorkflowsFromPipelineBuilder is the link between pipeline representation in "pipeline package" and server
// to be specific this func currently is used to convert the pipeline.Item list (crafted by PipelineBuilder.Build()) into
// a pipeline that can be stored in the database by the server and save converted workflows.
func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item) (*model.Pipeline, error) {
// saveWorkflowsFromPipelineBuilder converts the pipeline.Item list crafted by
// PipelineBuilder.Build() into model workflows and persists them.
//
// A freshly created pipeline has no workflows yet, so they are inserted. A
// gated pipeline already persisted its workflows when it was created, so on
// approval the stored workflows must be swapped for the freshly built ones:
// pass replaceExisting to delete the old workflows and steps before inserting.
func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item, replaceExisting bool) (*model.Pipeline, error) {
if pipeline.Workflows != nil && !replaceExisting {
return nil, errors.New("cannot save new workflows from pipeline builder: pipeline already has workflows loaded")
}
workflows := workflowsFromPipelineBuilder(pipeline, pipelineItems)
if replaceExisting {
if err := store.WorkflowsReplace(pipeline, workflows); err != nil {
return nil, err
}
} else if err := store.WorkflowsCreate(workflows); err != nil {
return nil, err
}
pipeline.Workflows = workflows
setPipelineItemWorkflowIDs(pipelineItems, pipeline.Workflows)
return pipeline, nil
}
func workflowsFromPipelineBuilder(pipeline *model.Pipeline, pipelineItems []*builder.Item) []*model.Workflow {
var pidSequence int
for _, item := range pipelineItems {
if pidSequence < item.Workflow.PID {
@@ -210,12 +256,7 @@ func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipelin
}
}
// The workflows in the pipeline should be empty, only we populate them.
// But if a pipeline was already loaded from the database and contains workflows,
// we error out to prevent harm.
if pipeline.Workflows != nil {
return nil, errors.New("cannot save new workflows from pipeline builder: pipeline already has workflows loaded")
}
workflows := make([]*model.Workflow, 0, len(pipelineItems))
for _, item := range pipelineItems {
workflow := &model.Workflow{
@@ -254,17 +295,14 @@ func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipelin
}
}
pipeline.Workflows = append(pipeline.Workflows, workflow)
workflows = append(workflows, workflow)
}
if err := store.WorkflowsCreate(pipeline.Workflows); err != nil {
return nil, err
}
return workflows
}
// now thread IDs back to the builder items
for i, wf := range pipeline.Workflows {
func setPipelineItemWorkflowIDs(pipelineItems []*builder.Item, workflows []*model.Workflow) {
for i, wf := range workflows {
pipelineItems[i].Workflow.ID = wf.ID
}
return pipeline, nil
}
+44 -1
View File
@@ -69,7 +69,7 @@ func TestSetPipelineStepsOnPipeline(t *testing.T) {
s := store_mocks.NewMockStore(t)
s.On("WorkflowsCreate", mock.Anything).Return(nil)
pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems)
pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems, false)
require.NoError(t, err)
if len(pipeline.Workflows) != 1 {
t.Fatal("Should generate three in total")
@@ -82,6 +82,49 @@ func TestSetPipelineStepsOnPipeline(t *testing.T) {
}
}
func TestSaveWorkflowsReplaceExisting(t *testing.T) {
t.Parallel()
pipeline := &model.Pipeline{
ID: 1,
Event: model.EventPush,
// a gated pipeline already carries persisted workflows on approval
Workflows: []*model.Workflow{{ID: 99, PID: 1}},
}
pipelineItems := []*builder.Item{{
Workflow: &builder.Workflow{ID: 1, PID: 1},
Config: &backend_types.Config{
Stages: []*backend_types.Stage{
{Steps: []*backend_types.Step{{Name: "clone"}}},
},
},
}}
s := store_mocks.NewMockStore(t)
s.On("WorkflowsReplace", mock.Anything, mock.Anything).Return(nil)
pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems, true)
require.NoError(t, err)
assert.Len(t, pipeline.Workflows, 1)
assert.Equal(t, int64(1), pipeline.Workflows[0].PipelineID)
}
func TestSaveWorkflowsRejectsExistingWithoutReplace(t *testing.T) {
t.Parallel()
pipeline := &model.Pipeline{
ID: 1,
Event: model.EventPush,
Workflows: []*model.Workflow{{ID: 99, PID: 1}},
}
s := store_mocks.NewMockStore(t)
_, err := saveWorkflowsFromPipelineBuilder(s, pipeline, nil, false)
require.Error(t, err)
}
func TestParsePipeline(t *testing.T) {
t.Parallel()
+11 -1
View File
@@ -89,7 +89,17 @@ func Restart(ctx context.Context, store store.Store, lastPipeline *model.Pipelin
return nil, errors.New(msg)
}
newPipeline, pipelineItems, err := createPipelineItems(ctx, forge, store, newPipeline, user, repo, pipelineFiles, envs)
newPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, forge, store, newPipeline, user, repo, pipelineFiles, envs, false)
if handleParseErrors(newPipeline, parseErr) {
if newPipeline, uErr := UpdateToStatusError(store, *newPipeline, parseErr); uErr != nil {
log.Error().Err(uErr).Msgf("error setting error status of pipeline for %s#%d", repo.FullName, newPipeline.Number)
} else {
updatePipelineStatus(ctx, forge, newPipeline, repo, user)
}
msg := fmt.Sprintf("failure to parse pipeline config for %s", repo.FullName)
log.Error().Err(parseErr).Msg(msg)
return nil, errors.New(msg)
}
if err != nil {
msg := fmt.Sprintf("failure to createPipelineItems for %s", repo.FullName)
log.Error().Err(err).Msg(msg)