1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2026-06-03 16:35:37 +02:00
Files
woodpecker/pipeline/backend/kubernetes/kubernetes.go
T
Simon C. Kemper a765cb885a fix(kubernetes): retry WaitStep when container terminated state not yet finalized (#6672)
## Problem

Kubelet sets `pod.Status.Phase = Succeeded` before finalizing `containerStatuses[0].state.terminated`. When the informer sees the phase change and `WaitStep` calls `Get()`, the container status may still show `Terminated == nil`, causing a hard error:

```
no terminated state found for container wp-XXX/wp-XXX
```

This is a known race in the Kubernetes API server/kubelet eventually-consistent model. The window is normally milliseconds but widens to seconds under load (apiserver latency spikes, ResourceQuota admission storms, node pressure).

## Fix

Wrap the post-informer `Get()` + `Terminated == nil` check in `backoff.Retry` with exponential backoff (200ms initial, 5s max interval, 15s total budget). This mirrors the retry pattern already used for `TailStep` log stream recovery (#5550).
2026-05-30 12:35:53 +02:00

545 lines
16 KiB
Go

// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"errors"
"fmt"
"io"
"maps"
"os"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/cenkalti/backoff/v5"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v3"
kube_core_v1 "k8s.io/api/core/v1"
kube_errors "k8s.io/apimachinery/pkg/api/errors"
kube_meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // To authenticate to GCP K8s clusters
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/yaml"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)
const (
EngineName = "kubernetes"
// TODO: 5 seconds is against best practice, k3s didn't work otherwise
defaultResyncDuration = 5 * time.Second
maxRetryDuration = 1 * time.Minute
)
type kube struct {
client kubernetes.Interface
config *config
goos string
}
type config struct {
Namespace string
EnableNamespacePerOrg bool
StorageClass string
VolumeSize string
StorageRwx bool
PodLabels map[string]string
PodLabelsAllowFromStep bool
PodAnnotations map[string]string
PodAnnotationsAllowFromStep bool
PodNodeSelector map[string]string
PodTolerationsAllowFromStep bool
PodTolerations []Toleration
PodAffinity *kube_core_v1.Affinity
PodAffinityAllowFromStep bool
ImagePullSecretNames []string
SecurityContext SecurityContextConfig
NativeSecretsAllowFromStep bool
PriorityClassName string
StopTimeout int64
}
func (c *config) GetNamespace(orgID int64) string {
if c.EnableNamespacePerOrg {
return strings.ToLower(fmt.Sprintf("%s-%s", c.Namespace, strconv.FormatInt(orgID, 10)))
}
return c.Namespace
}
type SecurityContextConfig struct {
RunAsNonRoot bool
FSGroup *int64
}
func (c *config) newDefaultDeleteOptions() kube_meta_v1.DeleteOptions {
propagationPolicy := kube_meta_v1.DeletePropagationBackground
return kube_meta_v1.DeleteOptions{
GracePeriodSeconds: &c.StopTimeout,
PropagationPolicy: &propagationPolicy,
}
}
func configFromCliContext(ctx context.Context) (*config, error) {
if ctx != nil {
if c, ok := ctx.Value(types.CliCommand).(*cli.Command); ok {
config := config{
Namespace: c.String("backend-k8s-namespace"),
EnableNamespacePerOrg: c.Bool("backend-k8s-namespace-per-org"),
StorageClass: c.String("backend-k8s-storage-class"),
VolumeSize: c.String("backend-k8s-volume-size"),
StorageRwx: c.Bool("backend-k8s-storage-rwx"),
PriorityClassName: c.String("backend-k8s-priority-class"),
PodLabels: make(map[string]string), // just init empty map to prevent nil panic
PodLabelsAllowFromStep: c.Bool("backend-k8s-pod-labels-allow-from-step"),
PodAnnotations: make(map[string]string), // just init empty map to prevent nil panic
PodAnnotationsAllowFromStep: c.Bool("backend-k8s-pod-annotations-allow-from-step"),
PodTolerationsAllowFromStep: c.Bool("backend-k8s-pod-tolerations-allow-from-step"),
PodNodeSelector: make(map[string]string), // just init empty map to prevent nil panic
PodAffinityAllowFromStep: c.Bool("backend-k8s-pod-affinity-allow-from-step"),
ImagePullSecretNames: c.StringSlice("backend-k8s-pod-image-pull-secret-names"),
SecurityContext: SecurityContextConfig{
RunAsNonRoot: c.Bool("backend-k8s-secctx-nonroot"), // cspell:words secctx nonroot
FSGroup: newInt64(defaultFSGroup),
},
NativeSecretsAllowFromStep: c.Bool("backend-k8s-allow-native-secrets"),
StopTimeout: c.Int64("backend-k8s-stop-timeout"),
}
// Unmarshal label and annotation settings here to ensure they're valid on startup
if labels := c.String("backend-k8s-pod-labels"); labels != "" {
if err := yaml.Unmarshal([]byte(labels), &config.PodLabels); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod labels '%s'", c.String("backend-k8s-pod-labels"))
return nil, err
}
}
if annotations := c.String("backend-k8s-pod-annotations"); annotations != "" {
if err := yaml.Unmarshal([]byte(c.String("backend-k8s-pod-annotations")), &config.PodAnnotations); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod annotations '%s'", c.String("backend-k8s-pod-annotations"))
return nil, err
}
}
if nodeSelector := c.String("backend-k8s-pod-node-selector"); nodeSelector != "" {
if err := yaml.Unmarshal([]byte(nodeSelector), &config.PodNodeSelector); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod node selector '%s'", nodeSelector)
return nil, err
}
}
if podTolerations := c.String("backend-k8s-pod-tolerations"); podTolerations != "" {
if err := yaml.Unmarshal([]byte(podTolerations), &config.PodTolerations); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod tolerations '%s'", podTolerations)
return nil, err
}
}
if podAffinity := c.String("backend-k8s-pod-affinity"); podAffinity != "" {
if err := yaml.Unmarshal([]byte(podAffinity), &config.PodAffinity); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod affinity '%s'", podAffinity)
return nil, err
}
}
return &config, nil
}
}
return nil, types.ErrNoCliContextFound
}
// New returns a new Kubernetes Backend.
func New() types.Backend {
return &kube{}
}
func (e *kube) Name() string {
return EngineName
}
func (e *kube) IsAvailable(context.Context) bool {
host := os.Getenv("KUBERNETES_SERVICE_HOST")
return len(host) > 0
}
func (e *kube) Flags() []cli.Flag {
return Flags
}
func (e *kube) Load(ctx context.Context) (*types.BackendInfo, error) {
config, err := configFromCliContext(ctx)
if err != nil {
return nil, err
}
e.config = config
var kubeClient kubernetes.Interface
_, err = rest.InClusterConfig()
if err != nil {
kubeClient, err = getClientOutOfCluster()
} else {
kubeClient, err = getClientInsideOfCluster()
}
if err != nil {
return nil, err
}
e.client = kubeClient
// TODO(2693): use info resp of kubeClient to define platform var
e.goos = runtime.GOOS
return &types.BackendInfo{
Platform: runtime.GOOS + "/" + runtime.GOARCH,
}, nil
}
func (e *kube) getConfig() *config {
if e.config == nil {
return nil
}
c := *e.config
c.PodLabels = maps.Clone(e.config.PodLabels)
c.PodAnnotations = maps.Clone(e.config.PodAnnotations)
c.PodNodeSelector = maps.Clone(e.config.PodNodeSelector)
c.ImagePullSecretNames = slices.Clone(e.config.ImagePullSecretNames)
return &c
}
// SetupWorkflow sets up the pipeline environment.
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
namespace := e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID)
if e.config.EnableNamespacePerOrg {
log.Trace().Str("taskUUID", taskUUID).Msgf("Ensure organization namespace: %s", namespace)
err := mkNamespace(ctx, e.client.CoreV1().Namespaces(), namespace)
if err != nil {
return err
}
}
log.Trace().Str("taskUUID", taskUUID).Msgf("Creating workflow volume")
_, err := startVolume(ctx, e, conf.Volume, namespace)
if err != nil {
return err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("Creating workflow headless service")
_, err = startHeadlessService(ctx, e, namespace, taskUUID)
if err != nil {
return err
}
return nil
}
// StartStep starts the pipeline step.
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
options, err := parseBackendOptions(step)
if err != nil {
log.Error().Err(err).Msg("could not parse backend options")
}
if needsRegistrySecret(step) {
err = startRegistrySecret(ctx, e, step)
if err != nil {
return err
}
}
if needsStepSecret(step) {
err = startStepSecret(ctx, e, step)
if err != nil {
return err
}
}
log.Trace().Str("taskUUID", taskUUID).Msgf("starting step: %s", step.Name)
_, err = startPod(ctx, e, step, options, taskUUID)
return err
}
// WaitStep waits for the pipeline step to complete and returns
// the completion results.
func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
podName, err := stepToPodName(step)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("waiting for pod: %s", podName)
finished := make(chan struct{})
var finishedOnce sync.Once
podUpdated := func(_, newPod any) {
pod, ok := newPod.(*kube_core_v1.Pod)
if !ok {
log.Error().Msgf("could not parse pod: %v", newPod)
return
}
if pod.Name == podName {
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
finishedOnce.Do(func() { close(finished) })
}
switch pod.Status.Phase {
case kube_core_v1.PodSucceeded, kube_core_v1.PodFailed, kube_core_v1.PodUnknown:
finishedOnce.Do(func() { close(finished) })
}
}
}
podDeleted := func(obj any) {
pod, ok := obj.(*kube_core_v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
pod, ok = tombstone.Obj.(*kube_core_v1.Pod)
if !ok {
return
}
}
if pod.Name == podName {
finishedOnce.Do(func() { close(finished) })
}
}
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.GetNamespace(step.OrgID)))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
DeleteFunc: podDeleted,
},
); err != nil {
return nil, err
}
stop := make(chan struct{})
si.Start(stop)
defer close(stop)
// If the pod was deleted before the informer started, no events will
// ever arrive. Check explicitly so we don't hang forever.
if _, err := e.client.CoreV1().Pods(e.config.GetNamespace(step.OrgID)).Get(ctx, podName, kube_meta_v1.GetOptions{}); kube_errors.IsNotFound(err) {
return &types.State{ExitCode: 0, Exited: true}, nil
}
select {
case <-finished:
case <-ctx.Done():
return nil, ctx.Err()
}
// After the informer signals completion, kubelet may not have finalized
// containerStatuses yet (phase=Succeeded before state.terminated is set).
// Retry with backoff to allow kubelet to catch up.
pod, err := backoff.Retry(ctx,
func() (*kube_core_v1.Pod, error) {
p, err := e.client.CoreV1().Pods(e.config.GetNamespace(step.OrgID)).Get(ctx, podName, kube_meta_v1.GetOptions{})
if err != nil {
if kube_errors.IsNotFound(err) {
return nil, backoff.Permanent(err)
}
return nil, err
}
if len(p.Status.ContainerStatuses) == 0 {
return nil, fmt.Errorf("no container statuses found for pod %s", podName)
}
if p.Status.ContainerStatuses[0].State.Terminated == nil {
return nil, fmt.Errorf("container %s/%s terminated state not yet finalized", podName, p.Status.ContainerStatuses[0].Name)
}
return p, nil
},
backoff.WithBackOff(backoff.NewExponentialBackOff()),
backoff.WithMaxElapsedTime(maxRetryDuration),
backoff.WithNotify(func(err error, delay time.Duration) {
log.Warn().Err(err).Str("pod", podName).Dur("backoff", delay).Msg("waiting for container terminated state, retrying with backoff")
}),
)
if err != nil {
if kube_errors.IsNotFound(err) {
return &types.State{ExitCode: 0, Exited: true}, nil
}
return nil, err
}
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
return nil, fmt.Errorf("could not pull image for pod %s", podName)
}
cs := pod.Status.ContainerStatuses[0]
bs := &types.State{
ExitCode: int(cs.State.Terminated.ExitCode),
Exited: true,
OOMKilled: false,
}
return bs, nil
}
// TailStep tails the pipeline step logs.
func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
podName, err := stepToPodName(step)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of pod: %s", podName)
up := make(chan struct{})
var upOnce sync.Once
podUpdated := func(_, newPod any) {
pod, ok := newPod.(*kube_core_v1.Pod)
if !ok {
log.Error().Msgf("could not parse pod: %v", newPod)
return
}
if pod.Name == podName {
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
upOnce.Do(func() { close(up) })
}
switch pod.Status.Phase {
case kube_core_v1.PodRunning, kube_core_v1.PodSucceeded, kube_core_v1.PodFailed:
upOnce.Do(func() { close(up) })
}
}
}
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.GetNamespace(step.OrgID)))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
},
); err != nil {
return nil, err
}
stop := make(chan struct{})
si.Start(stop)
defer close(stop)
select {
case <-up:
case <-ctx.Done():
return nil, ctx.Err()
}
opts := &kube_core_v1.PodLogOptions{
Follow: true,
Container: podName,
}
logs, err := backoff.Retry(
ctx,
func() (io.ReadCloser, error) {
return e.client.CoreV1().RESTClient().Get().
Namespace(e.config.GetNamespace(step.OrgID)).
Name(podName).
Resource("pods").
SubResource("log").
VersionedParams(opts, scheme.ParameterCodec).
Stream(ctx)
},
backoff.WithBackOff(backoff.NewExponentialBackOff()),
backoff.WithMaxElapsedTime(maxRetryDuration),
backoff.WithNotify(func(err error, delay time.Duration) {
log.Warn().Err(err).Str("pod", podName).Dur("backoff", delay).Msg("failed to open pod log stream, retrying with backoff")
}),
)
if err != nil {
return nil, err
}
rc, wc := io.Pipe()
go func() {
defer logs.Close()
defer wc.Close()
_, err = io.Copy(wc, logs)
if err != nil {
return
}
}()
return rc, nil
}
func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
var errs []error
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name)
if needsRegistrySecret(step) {
err := stopRegistrySecret(ctx, e, step, e.config.newDefaultDeleteOptions())
if err != nil {
errs = append(errs, err)
}
}
if needsStepSecret(step) {
err := stopStepSecret(ctx, e, step, e.config.newDefaultDeleteOptions())
if err != nil {
errs = append(errs, err)
}
}
err := stopPod(ctx, e, step, e.config.newDefaultDeleteOptions())
if err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
}
// DestroyWorkflow destroys the pipeline environment.
func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("deleting Kubernetes primitives")
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
err := stopPod(ctx, e, step, e.config.newDefaultDeleteOptions())
if err != nil {
return err
}
}
}
namespace := e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID)
log.Trace().Str("taskUUID", taskUUID).Msgf("deleting workflow headless service")
err := e.stopHeadlessService(ctx, e, namespace, taskUUID)
if err != nil {
return err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("deleting workflow volume")
err = stopVolume(ctx, e, conf.Volume, e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID), e.config.newDefaultDeleteOptions())
if err != nil {
return err
}
return nil
}