diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index cda477132..4bc594318 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -20,6 +20,7 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -30,8 +31,6 @@ import ( "github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto" ) -var backoff = time.Second - // set grpc version on compile time to compare against server version response const ClientGrpcVersion int32 = proto.Version @@ -52,6 +51,13 @@ func (c *client) Close() error { return c.conn.Close() } +func (c *client) newBackOff() backoff.BackOff { + b := backoff.NewExponentialBackOff() + b.MaxInterval = 10 * time.Second + b.InitialInterval = 10 * time.Millisecond + return b +} + // Version returns the server- & grpc-version func (c *client) Version(ctx context.Context) (*rpc.Version, error) { res, err := c.client.Version(ctx, &proto.Empty{}) @@ -68,6 +74,7 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) { func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) { var res *proto.NextResponse var err error + retry := c.newBackOff() req := new(proto.NextRequest) req.Filter = new(proto.Filter) req.Filter.Labels = f.Labels @@ -96,10 +103,12 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) default: return nil, err } - if ctx.Err() != nil { + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): return nil, ctx.Err() } - <-time.After(backoff) } if res.GetPipeline() == nil { @@ -118,6 +127,7 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) // Wait blocks until the pipeline is complete. func (c *client) Wait(ctx context.Context, id string) (err error) { + retry := c.newBackOff() req := new(proto.WaitRequest) req.Id = id for { @@ -139,13 +149,19 @@ func (c *client) Wait(ctx context.Context, id string) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Init signals the pipeline is initialized. func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) { + retry := c.newBackOff() req := new(proto.InitRequest) req.Id = id req.State = new(proto.State) @@ -174,13 +190,19 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Done signals the pipeline is complete. func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) { + retry := c.newBackOff() req := new(proto.DoneRequest) req.Id = id req.State = new(proto.State) @@ -209,13 +231,19 @@ func (c *client) Done(ctx context.Context, id string, state rpc.State) (err erro default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Extend extends the pipeline deadline func (c *client) Extend(ctx context.Context, id string) (err error) { + retry := c.newBackOff() req := new(proto.ExtendRequest) req.Id = id for { @@ -237,13 +265,19 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Update updates the pipeline state. func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) { + retry := c.newBackOff() req := new(proto.UpdateRequest) req.Id = id req.State = new(proto.State) @@ -272,13 +306,19 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } // Log writes the pipeline log entry. func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { + retry := c.newBackOff() req := new(proto.LogRequest) req.LogEntry = new(proto.LogEntry) req.LogEntry.StepUuid = logEntry.StepUUID @@ -305,7 +345,12 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } return nil } @@ -322,6 +367,7 @@ func (c *client) RegisterAgent(ctx context.Context, platform, backend, version s } func (c *client) ReportHealth(ctx context.Context) (err error) { + retry := c.newBackOff() req := new(proto.ReportHealthRequest) req.Status = "I am alive!" @@ -341,6 +387,11 @@ func (c *client) ReportHealth(ctx context.Context) (err error) { default: return err } - <-time.After(backoff) + + select { + case <-time.After(retry.NextBackOff()): + case <-ctx.Done(): + return ctx.Err() + } } } diff --git a/go.mod b/go.mod index 4d7ab1d87..ecc290882 100644 --- a/go.mod +++ b/go.mod @@ -69,6 +69,7 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect diff --git a/go.sum b/go.sum index 9b7770f71..a56e6edf1 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/caddyserver/certmagic v0.17.2 h1:o30seC1T/dBqBCNNGNHWwj2i5/I/FMjBbTAhjADP3nE= github.com/caddyserver/certmagic v0.17.2/go.mod h1:ouWUuC490GOLJzkyN35eXfV8bSbwMwSf4bdhkIxtdQE= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=