From e4ff04188234de498279b92eeb0065be92485237 Mon Sep 17 00:00:00 2001 From: Harry Pidcock Date: Fri, 18 Aug 2023 23:13:13 +1000 Subject: [PATCH] Improve agent rpc retry logic with exponential backoff (#2205) Existing retry logic was a simple second delay, replacing it with a exponential backoff. Initial delay is 10ms up to 10s for the max delay. In the future this should be made configurable. With an extended max delay it becomes important to notice context cancelation, so this now also selects on both the delay and context done. --- agent/rpc/client_grpc.go | 73 ++++++++++++++++++++++++++++++++++------ go.mod | 1 + go.sum | 2 ++ 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index cda477132..4bc594318 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -20,6 +20,7 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -30,8 +31,6 @@ import ( "github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto" ) -var backoff = time.Second - // set grpc version on compile time to compare against server version response const ClientGrpcVersion int32 = proto.Version @@ -52,6 +51,13 @@ func (c *client) Close() error { return c.conn.Close() } +func (c *client) newBackOff() backoff.BackOff { + b := backoff.NewExponentialBackOff() + b.MaxInterval = 10 * time.Second + b.InitialInterval = 10 * time.Millisecond + return b +} + // Version returns the server- & grpc-version func (c *client) Version(ctx context.Context) (*rpc.Version, error) { res, err := c.client.Version(ctx, &proto.Empty{}) @@ -68,6 +74,7 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) { func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) { var res *proto.NextResponse var err error + retry := c.newBackOff() req := new(proto.NextRequest) req.Filter = new(proto.Filter) req.Filter.Labels = f.Labels @@ -96,10 +103,12 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) default: return nil, err } - if ctx.Err() != nil { + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): return nil, ctx.Err() } - <-time.After(backoff) } if res.GetPipeline() == nil { @@ -118,6 +127,7 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) // Wait blocks until the pipeline is complete. func (c *client) Wait(ctx context.Context, id string) (err error) { + retry := c.newBackOff() req := new(proto.WaitRequest) req.Id = id for { @@ -139,13 +149,19 @@ func (c *client) Wait(ctx context.Context, id string) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Init signals the pipeline is initialized. func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) { + retry := c.newBackOff() req := new(proto.InitRequest) req.Id = id req.State = new(proto.State) @@ -174,13 +190,19 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Done signals the pipeline is complete. func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) { + retry := c.newBackOff() req := new(proto.DoneRequest) req.Id = id req.State = new(proto.State) @@ -209,13 +231,19 @@ func (c *client) Done(ctx context.Context, id string, state rpc.State) (err erro default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Extend extends the pipeline deadline func (c *client) Extend(ctx context.Context, id string) (err error) { + retry := c.newBackOff() req := new(proto.ExtendRequest) req.Id = id for { @@ -237,13 +265,19 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Update updates the pipeline state. func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) { + retry := c.newBackOff() req := new(proto.UpdateRequest) req.Id = id req.State = new(proto.State) @@ -272,13 +306,19 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Log writes the pipeline log entry. func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { + retry := c.newBackOff() req := new(proto.LogRequest) req.LogEntry = new(proto.LogEntry) req.LogEntry.StepUuid = logEntry.StepUUID @@ -305,7 +345,12 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } @@ -322,6 +367,7 @@ func (c *client) RegisterAgent(ctx context.Context, platform, backend, version s } func (c *client) ReportHealth(ctx context.Context) (err error) { + retry := c.newBackOff() req := new(proto.ReportHealthRequest) req.Status = "I am alive!" @@ -341,6 +387,11 @@ func (c *client) ReportHealth(ctx context.Context) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } } diff --git a/go.mod b/go.mod index 4d7ab1d87..ecc290882 100644 --- a/go.mod +++ b/go.mod @@ -69,6 +69,7 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect diff --git a/go.sum b/go.sum index 9b7770f71..a56e6edf1 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/caddyserver/certmagic v0.17.2 h1:o30seC1T/dBqBCNNGNHWwj2i5/I/FMjBbTAhjADP3nE= github.com/caddyserver/certmagic v0.17.2/go.mod h1:ouWUuC490GOLJzkyN35eXfV8bSbwMwSf4bdhkIxtdQE= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=