You've already forked woodpecker
							
							
				mirror of
				https://github.com/woodpecker-ci/woodpecker.git
				synced 2025-10-30 23:27:39 +02:00 
			
		
		
		
	Gracefully shutdown agent (#3895)
This commit is contained in:
		| @@ -23,6 +23,8 @@ import ( | ||||
| 	"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" | ||||
| ) | ||||
|  | ||||
| const authClientTimeout = time.Second * 5 | ||||
|  | ||||
| type AuthClient struct { | ||||
| 	client     proto.WoodpeckerAuthClient | ||||
| 	conn       *grpc.ClientConn | ||||
| @@ -39,8 +41,8 @@ func NewAuthGrpcClient(conn *grpc.ClientConn, agentToken string, agentID int64) | ||||
| 	return client | ||||
| } | ||||
|  | ||||
| func (c *AuthClient) Auth() (string, int64, error) { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) //nolint:mnd | ||||
| func (c *AuthClient) Auth(ctx context.Context) (string, int64, error) { | ||||
| 	ctx, cancel := context.WithTimeout(ctx, authClientTimeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	req := &proto.AuthRequest{ | ||||
|   | ||||
| @@ -30,15 +30,12 @@ type AuthInterceptor struct { | ||||
| } | ||||
|  | ||||
| // NewAuthInterceptor returns a new auth interceptor. | ||||
| func NewAuthInterceptor( | ||||
| 	authClient *AuthClient, | ||||
| 	refreshDuration time.Duration, | ||||
| ) (*AuthInterceptor, error) { | ||||
| func NewAuthInterceptor(ctx context.Context, authClient *AuthClient, refreshDuration time.Duration) (*AuthInterceptor, error) { | ||||
| 	interceptor := &AuthInterceptor{ | ||||
| 		authClient: authClient, | ||||
| 	} | ||||
|  | ||||
| 	err := interceptor.scheduleRefreshToken(refreshDuration) | ||||
| 	err := interceptor.scheduleRefreshToken(ctx, refreshDuration) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -78,21 +75,26 @@ func (interceptor *AuthInterceptor) attachToken(ctx context.Context) context.Con | ||||
| 	return metadata.AppendToOutgoingContext(ctx, "token", interceptor.accessToken) | ||||
| } | ||||
|  | ||||
| func (interceptor *AuthInterceptor) scheduleRefreshToken(refreshDuration time.Duration) error { | ||||
| 	err := interceptor.refreshToken() | ||||
| func (interceptor *AuthInterceptor) scheduleRefreshToken(ctx context.Context, refreshInterval time.Duration) error { | ||||
| 	err := interceptor.refreshToken(ctx) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	go func() { | ||||
| 		wait := refreshDuration | ||||
| 		wait := refreshInterval | ||||
|  | ||||
| 		for { | ||||
| 			time.Sleep(wait) | ||||
| 			err := interceptor.refreshToken() | ||||
| 			if err != nil { | ||||
| 				wait = time.Second | ||||
| 			} else { | ||||
| 				wait = refreshDuration | ||||
| 			select { | ||||
| 			case <-ctx.Done(): | ||||
| 				return | ||||
| 			case <-time.After(wait): | ||||
| 				err := interceptor.refreshToken(ctx) | ||||
| 				if err != nil { | ||||
| 					wait = time.Second | ||||
| 				} else { | ||||
| 					wait = refreshInterval | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| @@ -100,8 +102,8 @@ func (interceptor *AuthInterceptor) scheduleRefreshToken(refreshDuration time.Du | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (interceptor *AuthInterceptor) refreshToken() error { | ||||
| 	accessToken, _, err := interceptor.authClient.Auth() | ||||
| func (interceptor *AuthInterceptor) refreshToken(ctx context.Context) error { | ||||
| 	accessToken, _, err := interceptor.authClient.Auth(ctx) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|   | ||||
| @@ -17,7 +17,6 @@ package rpc | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| @@ -90,8 +89,10 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: next(): context canceled") | ||||
| 				return nil, nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) | ||||
| 			return nil, err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| @@ -105,10 +106,11 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) | ||||
| 				// https://github.com/woodpecker-ci/woodpecker/issues/717#issuecomment-1049365104 | ||||
| 				log.Trace().Err(err).Msg("grpc: to many keepalive pings without sending data") | ||||
| 			} else { | ||||
| 				log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) | ||||
| 				log.Warn().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) | ||||
| 			} | ||||
| 		default: | ||||
| 			return nil, fmt.Errorf("grpc error: next(): code: %v: %w", status.Code(err), err) | ||||
| 			log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		select { | ||||
| @@ -143,9 +145,15 @@ func (c *client) Wait(ctx context.Context, id string) (err error) { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) | ||||
|  | ||||
| 		switch status.Code(err) { | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: wait(): context canceled") | ||||
| 				return nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| 			codes.DataLoss, | ||||
| @@ -153,7 +161,9 @@ func (c *client) Wait(ctx context.Context, id string) (err error) { | ||||
| 			codes.Internal, | ||||
| 			codes.Unavailable: | ||||
| 			// non-fatal errors | ||||
| 			log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) | ||||
| 		default: | ||||
| 			log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| @@ -184,6 +194,14 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow | ||||
| 		log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) | ||||
|  | ||||
| 		switch status.Code(err) { | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: init(): context canceled") | ||||
| 				return nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| 			codes.DataLoss, | ||||
| @@ -191,7 +209,9 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow | ||||
| 			codes.Internal, | ||||
| 			codes.Unavailable: | ||||
| 			// non-fatal errors | ||||
| 			log.Warn().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) | ||||
| 		default: | ||||
| 			log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| @@ -222,6 +242,14 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow | ||||
| 		log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) | ||||
|  | ||||
| 		switch status.Code(err) { | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: done(): context canceled") | ||||
| 				return nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| 			codes.DataLoss, | ||||
| @@ -229,7 +257,9 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow | ||||
| 			codes.Internal, | ||||
| 			codes.Unavailable: | ||||
| 			// non-fatal errors | ||||
| 			log.Warn().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) | ||||
| 		default: | ||||
| 			log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| @@ -256,6 +286,14 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { | ||||
| 		log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) | ||||
|  | ||||
| 		switch status.Code(err) { | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: extend(): context canceled") | ||||
| 				return nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| 			codes.DataLoss, | ||||
| @@ -263,7 +301,9 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { | ||||
| 			codes.Internal, | ||||
| 			codes.Unavailable: | ||||
| 			// non-fatal errors | ||||
| 			log.Warn().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) | ||||
| 		default: | ||||
| 			log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| @@ -297,6 +337,14 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er | ||||
| 		log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) | ||||
|  | ||||
| 		switch status.Code(err) { | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: update(): context canceled") | ||||
| 				return nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| 			codes.DataLoss, | ||||
| @@ -304,7 +352,9 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er | ||||
| 			codes.Internal, | ||||
| 			codes.Unavailable: | ||||
| 			// non-fatal errors | ||||
| 			log.Warn().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) | ||||
| 		default: | ||||
| 			log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| @@ -333,9 +383,15 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) | ||||
|  | ||||
| 		switch status.Code(err) { | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: log(): context canceled") | ||||
| 				return nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| 			codes.DataLoss, | ||||
| @@ -343,7 +399,9 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { | ||||
| 			codes.Internal, | ||||
| 			codes.Unavailable: | ||||
| 			// non-fatal errors | ||||
| 			log.Warn().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) | ||||
| 		default: | ||||
| 			log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| @@ -383,6 +441,14 @@ func (c *client) ReportHealth(ctx context.Context) (err error) { | ||||
| 			return nil | ||||
| 		} | ||||
| 		switch status.Code(err) { | ||||
| 		case codes.Canceled: | ||||
| 			if ctx.Err() != nil { | ||||
| 				// expected as context was canceled | ||||
| 				log.Debug().Err(err).Msgf("grpc error: report_health(): context canceled") | ||||
| 				return nil | ||||
| 			} | ||||
| 			log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		case | ||||
| 			codes.Aborted, | ||||
| 			codes.DataLoss, | ||||
| @@ -390,7 +456,9 @@ func (c *client) ReportHealth(ctx context.Context) (err error) { | ||||
| 			codes.Internal, | ||||
| 			codes.Unavailable: | ||||
| 			// non-fatal errors | ||||
| 			log.Warn().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) | ||||
| 		default: | ||||
| 			log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
|   | ||||
| @@ -49,7 +49,7 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck | ||||
| func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:contextcheck | ||||
| 	log.Debug().Msg("request next execution") | ||||
|  | ||||
| 	meta, _ := metadata.FromOutgoingContext(runnerCtx) | ||||
| @@ -178,7 +178,11 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck | ||||
| 		Str("error", state.Error). | ||||
| 		Msg("updating workflow status") | ||||
|  | ||||
| 	if err := r.client.Done(runnerCtx, workflow.ID, state); err != nil { | ||||
| 	doneCtx := runnerCtx | ||||
| 	if doneCtx.Err() != nil { | ||||
| 		doneCtx = shutdownCtx | ||||
| 	} | ||||
| 	if err := r.client.Done(doneCtx, workflow.ID, state); err != nil { | ||||
| 		logger.Error().Err(err).Msg("updating workflow status failed") | ||||
| 	} else { | ||||
| 		logger.Debug().Msg("updating workflow status complete") | ||||
|   | ||||
| @@ -23,12 +23,12 @@ import ( | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 	"github.com/tevino/abool/v2" | ||||
| 	"github.com/urfave/cli/v2" | ||||
| 	"golang.org/x/sync/errgroup" | ||||
| 	"google.golang.org/grpc" | ||||
| 	"google.golang.org/grpc/codes" | ||||
| 	grpc_credentials "google.golang.org/grpc/credentials" | ||||
| @@ -47,7 +47,43 @@ import ( | ||||
| 	"go.woodpecker-ci.org/woodpecker/v2/version" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	reportHealthInterval           = time.Second * 10 | ||||
| 	authInterceptorRefreshInterval = time.Minute * 30 | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	shutdownTimeout = time.Second * 5 | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	stopAgentFunc      context.CancelCauseFunc = func(error) {} | ||||
| 	shutdownCancelFunc context.CancelFunc      = func() {} | ||||
| 	shutdownCtx                                = context.Background() | ||||
| ) | ||||
|  | ||||
| func run(c *cli.Context, backends []types.Backend) error { | ||||
| 	ctx := utils.WithContextSigtermCallback(c.Context, func() { | ||||
| 		log.Info().Msg("termination signal is received, shutting down agent") | ||||
| 	}) | ||||
|  | ||||
| 	agentCtx, ctxCancel := context.WithCancelCause(ctx) | ||||
| 	stopAgentFunc = func(err error) { | ||||
| 		msg := "shutdown of whole agent" | ||||
| 		if err != nil { | ||||
| 			log.Error().Err(err).Msg(msg) | ||||
| 		} else { | ||||
| 			log.Info().Msg(msg) | ||||
| 		} | ||||
| 		stopAgentFunc = func(error) {} | ||||
| 		shutdownCtx, shutdownCancelFunc = context.WithTimeout(shutdownCtx, shutdownTimeout) | ||||
| 		ctxCancel(err) | ||||
| 	} | ||||
| 	defer stopAgentFunc(nil) | ||||
| 	defer shutdownCancelFunc() | ||||
|  | ||||
| 	serviceWaitingGroup := errgroup.Group{} | ||||
|  | ||||
| 	agentConfigPath := c.String("agent-config") | ||||
| 	hostname := c.String("hostname") | ||||
| 	if len(hostname) == 0 { | ||||
| @@ -58,11 +94,23 @@ func run(c *cli.Context, backends []types.Backend) error { | ||||
| 	counter.Running = 0 | ||||
|  | ||||
| 	if c.Bool("healthcheck") { | ||||
| 		go func() { | ||||
| 			if err := http.ListenAndServe(c.String("healthcheck-addr"), nil); err != nil { | ||||
| 				log.Error().Err(err).Msgf("cannot listen on address %s", c.String("healthcheck-addr")) | ||||
| 			} | ||||
| 		}() | ||||
| 		serviceWaitingGroup.Go( | ||||
| 			func() error { | ||||
| 				server := &http.Server{Addr: c.String("healthcheck-addr")} | ||||
| 				go func() { | ||||
| 					<-agentCtx.Done() | ||||
| 					log.Info().Msg("shutdown healthcheck server ...") | ||||
| 					if err := server.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck | ||||
| 						log.Error().Err(err).Msg("shutdown healthcheck server failed") | ||||
| 					} else { | ||||
| 						log.Info().Msg("healthcheck server stopped") | ||||
| 					} | ||||
| 				}() | ||||
| 				if err := server.ListenAndServe(); err != nil { | ||||
| 					log.Error().Err(err).Msgf("cannot listen on address %s", c.String("healthcheck-addr")) | ||||
| 				} | ||||
| 				return nil | ||||
| 			}) | ||||
| 	} | ||||
|  | ||||
| 	var transport grpc.DialOption | ||||
| @@ -88,8 +136,10 @@ func run(c *cli.Context, backends []types.Backend) error { | ||||
| 	agentConfig := readAgentConfig(agentConfigPath) | ||||
|  | ||||
| 	agentToken := c.String("grpc-token") | ||||
| 	grpcClientCtx, grpcClientCtxCancel := context.WithCancelCause(context.Background()) | ||||
| 	defer grpcClientCtxCancel(nil) | ||||
| 	authClient := agent_rpc.NewAuthGrpcClient(authConn, agentToken, agentConfig.AgentID) | ||||
| 	authInterceptor, err := agent_rpc.NewAuthInterceptor(authClient, 30*time.Minute) //nolint:mnd | ||||
| 	authInterceptor, err := agent_rpc.NewAuthInterceptor(grpcClientCtx, authClient, authInterceptorRefreshInterval) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -110,30 +160,12 @@ func run(c *cli.Context, backends []types.Backend) error { | ||||
| 	defer conn.Close() | ||||
|  | ||||
| 	client := agent_rpc.NewGrpcClient(conn) | ||||
| 	agentConfigPersisted := atomic.Bool{} | ||||
|  | ||||
| 	sigterm := abool.New() | ||||
| 	ctx := metadata.NewOutgoingContext( | ||||
| 		context.Background(), | ||||
| 		metadata.Pairs("hostname", hostname), | ||||
| 	) | ||||
|  | ||||
| 	agentConfigPersisted := abool.New() | ||||
| 	ctx = utils.WithContextSigtermCallback(ctx, func() { | ||||
| 		log.Info().Msg("termination signal is received, shutting down") | ||||
| 		sigterm.Set() | ||||
|  | ||||
| 		// Remove stateless agents from server | ||||
| 		if agentConfigPersisted.IsNotSet() { | ||||
| 			log.Debug().Msg("unregistering agent from server") | ||||
| 			err := client.UnregisterAgent(ctx) | ||||
| 			if err != nil { | ||||
| 				log.Err(err).Msg("failed to unregister agent from server") | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
| 	grpcCtx := metadata.NewOutgoingContext(grpcClientCtx, metadata.Pairs("hostname", hostname)) | ||||
|  | ||||
| 	// check if grpc server version is compatible with agent | ||||
| 	grpcServerVersion, err := client.Version(ctx) | ||||
| 	grpcServerVersion, err := client.Version(grpcCtx) | ||||
| 	if err != nil { | ||||
| 		log.Error().Err(err).Msg("could not get grpc server version") | ||||
| 		return err | ||||
| @@ -147,12 +179,8 @@ func run(c *cli.Context, backends []types.Backend) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
| 	parallel := c.Int("max-workflows") | ||||
| 	wg.Add(parallel) | ||||
|  | ||||
| 	// new engine | ||||
| 	backendCtx := context.WithValue(ctx, types.CliContext, c) | ||||
| 	backendCtx := context.WithValue(agentCtx, types.CliContext, c) | ||||
| 	backendName := c.String("backend-engine") | ||||
| 	backendEngine, err := backend.FindBackend(backendCtx, backends, backendName) | ||||
| 	if err != nil { | ||||
| @@ -172,14 +200,34 @@ func run(c *cli.Context, backends []types.Backend) error { | ||||
| 	} | ||||
| 	log.Debug().Msgf("loaded %s backend engine", backendEngine.Name()) | ||||
|  | ||||
| 	agentConfig.AgentID, err = client.RegisterAgent(ctx, engInfo.Platform, backendEngine.Name(), version.String(), parallel) | ||||
| 	maxWorkflows := c.Int("max-workflows") | ||||
| 	agentConfig.AgentID, err = client.RegisterAgent(grpcCtx, engInfo.Platform, backendEngine.Name(), version.String(), maxWorkflows) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	serviceWaitingGroup.Go(func() error { | ||||
| 		// we close grpc client context once unregister was handled | ||||
| 		defer grpcClientCtxCancel(nil) | ||||
| 		// we wait till agent context is done | ||||
| 		<-agentCtx.Done() | ||||
| 		// Remove stateless agents from server | ||||
| 		if !agentConfigPersisted.Load() { | ||||
| 			log.Debug().Msg("unregistering agent from server ...") | ||||
| 			// we want to run it explicit run when context got canceled so run it in background | ||||
| 			err := client.UnregisterAgent(grpcClientCtx) | ||||
| 			if err != nil { | ||||
| 				log.Err(err).Msg("failed to unregister agent from server") | ||||
| 			} else { | ||||
| 				log.Info().Msg("agent unregistered from server") | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
|  | ||||
| 	if agentConfigPath != "" { | ||||
| 		if err := writeAgentConfig(agentConfig, agentConfigPath); err == nil { | ||||
| 			agentConfigPersisted.Set() | ||||
| 			agentConfigPersisted.Store(true) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -200,66 +248,62 @@ func run(c *cli.Context, backends []types.Backend) error { | ||||
|  | ||||
| 	log.Debug().Msgf("agent registered with ID %d", agentConfig.AgentID) | ||||
|  | ||||
| 	go func() { | ||||
| 	serviceWaitingGroup.Go(func() error { | ||||
| 		for { | ||||
| 			if sigterm.IsSet() { | ||||
| 				log.Debug().Msg("terminating health reporting") | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			err := client.ReportHealth(ctx) | ||||
| 			err := client.ReportHealth(grpcCtx) | ||||
| 			if err != nil { | ||||
| 				log.Err(err).Msg("failed to report health") | ||||
| 			} | ||||
|  | ||||
| 			<-time.After(time.Second * 10) | ||||
| 			select { | ||||
| 			case <-agentCtx.Done(): | ||||
| 				log.Debug().Msg("terminating health reporting") | ||||
| 				return nil | ||||
| 			case <-time.After(reportHealthInterval): | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	}) | ||||
|  | ||||
| 	for i := 0; i < parallel; i++ { | ||||
| 	for i := 0; i < maxWorkflows; i++ { | ||||
| 		i := i | ||||
| 		go func() { | ||||
| 			defer wg.Done() | ||||
|  | ||||
| 			r := agent.NewRunner(client, filter, hostname, counter, &backendEngine) | ||||
| 		serviceWaitingGroup.Go(func() error { | ||||
| 			runner := agent.NewRunner(client, filter, hostname, counter, &backendEngine) | ||||
| 			log.Debug().Msgf("created new runner %d", i) | ||||
|  | ||||
| 			for { | ||||
| 				if sigterm.IsSet() { | ||||
| 					log.Debug().Msgf("terminating runner %d", i) | ||||
| 					return | ||||
| 				if agentCtx.Err() != nil { | ||||
| 					return nil | ||||
| 				} | ||||
|  | ||||
| 				log.Debug().Msg("polling new steps") | ||||
| 				if err := r.Run(ctx); err != nil { | ||||
| 					log.Error().Err(err).Msg("pipeline done with error") | ||||
| 					return | ||||
| 				if err := runner.Run(agentCtx, shutdownCtx); err != nil { | ||||
| 					log.Error().Err(err).Msg("runner done with error") | ||||
| 					return err | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	log.Info().Msgf( | ||||
| 		"starting Woodpecker agent with version '%s' and backend '%s' using platform '%s' running up to %d pipelines in parallel", | ||||
| 		version.String(), backendEngine.Name(), engInfo.Platform, parallel) | ||||
| 		version.String(), backendEngine.Name(), engInfo.Platform, maxWorkflows) | ||||
|  | ||||
| 	wg.Wait() | ||||
| 	return nil | ||||
| 	return serviceWaitingGroup.Wait() | ||||
| } | ||||
|  | ||||
| func runWithRetry(backendEngines []types.Backend) func(context *cli.Context) error { | ||||
| 	return func(context *cli.Context) error { | ||||
| 		if err := logger.SetupGlobalLogger(context, true); err != nil { | ||||
| func runWithRetry(backendEngines []types.Backend) func(c *cli.Context) error { | ||||
| 	return func(c *cli.Context) error { | ||||
| 		if err := logger.SetupGlobalLogger(c, true); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		initHealth() | ||||
|  | ||||
| 		retryCount := context.Int("connect-retry-count") | ||||
| 		retryDelay := context.Duration("connect-retry-delay") | ||||
| 		retryCount := c.Int("connect-retry-count") | ||||
| 		retryDelay := c.Duration("connect-retry-delay") | ||||
| 		var err error | ||||
| 		for i := 0; i < retryCount; i++ { | ||||
| 			if err = run(context, backendEngines); status.Code(err) == codes.Unavailable { | ||||
| 			if err = run(c, backendEngines); status.Code(err) == codes.Unavailable { | ||||
| 				log.Warn().Err(err).Msg(fmt.Sprintf("cannot connect to server, retrying in %v", retryDelay)) | ||||
| 				time.Sleep(retryDelay) | ||||
| 			} else { | ||||
|   | ||||
| @@ -82,12 +82,18 @@ var counter = &agent.State{ | ||||
| // handles pinging the endpoint and returns an error if the | ||||
| // agent is in an unhealthy state. | ||||
| func pinger(c *cli.Context) error { | ||||
| 	ctx := c.Context | ||||
| 	healthcheckAddress := c.String("healthcheck-addr") | ||||
| 	if strings.HasPrefix(healthcheckAddress, ":") { | ||||
| 		// this seems sufficient according to https://pkg.go.dev/net#Dial | ||||
| 		healthcheckAddress = "localhost" + healthcheckAddress | ||||
| 	} | ||||
| 	resp, err := http.Get("http://" + healthcheckAddress + "/healthz") | ||||
|  | ||||
| 	req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://"+healthcheckAddress+"/healthz", nil) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	resp, err := http.DefaultClient.Do(req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|   | ||||
| @@ -105,7 +105,11 @@ func (r *Runtime) Run(runnerCtx context.Context) error { | ||||
| 	} | ||||
|  | ||||
| 	defer func() { | ||||
| 		if err := r.engine.DestroyWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil { | ||||
| 		ctx := runnerCtx //nolint:contextcheck | ||||
| 		if ctx.Err() != nil { | ||||
| 			ctx = GetShutdownCtx() | ||||
| 		} | ||||
| 		if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil { | ||||
| 			logger.Error().Err(err).Msg("could not destroy engine") | ||||
| 		} | ||||
| 	}() | ||||
|   | ||||
							
								
								
									
										48
									
								
								pipeline/shutdown.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								pipeline/shutdown.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,48 @@ | ||||
| // Copyright 2024 Woodpecker Authors | ||||
| // | ||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| // you may not use this file except in compliance with the License. | ||||
| // You may obtain a copy of the License at | ||||
| // | ||||
| //     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| // | ||||
| // Unless required by applicable law or agreed to in writing, software | ||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| // See the License for the specific language governing permissions and | ||||
| // limitations under the License. | ||||
|  | ||||
| package pipeline | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| const shutdownTimeout = time.Second * 5 | ||||
|  | ||||
| var ( | ||||
| 	shutdownCtx       context.Context | ||||
| 	shutdownCtxCancel context.CancelFunc | ||||
| 	shutdownCtxLock   sync.Mutex | ||||
| ) | ||||
|  | ||||
| func GetShutdownCtx() context.Context { | ||||
| 	shutdownCtxLock.Lock() | ||||
| 	defer shutdownCtxLock.Unlock() | ||||
| 	if shutdownCtx == nil { | ||||
| 		shutdownCtx, shutdownCtxCancel = context.WithTimeout(context.Background(), shutdownTimeout) | ||||
| 	} | ||||
| 	return shutdownCtx | ||||
| } | ||||
|  | ||||
| func CancelShutdown() { | ||||
| 	shutdownCtxLock.Lock() | ||||
| 	defer shutdownCtxLock.Unlock() | ||||
| 	if shutdownCtxCancel == nil { | ||||
| 		// we create an canceled context | ||||
| 		shutdownCtx, shutdownCtxCancel = context.WithCancel(context.Background()) //nolint:forbidigo | ||||
| 	} | ||||
| 	shutdownCtxCancel() | ||||
| } | ||||
		Reference in New Issue
	
	Block a user