1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2025-01-23 17:53:23 +02:00

Improve agent rpc retry logic with exponential backoff (#2205)

Existing retry logic was a simple second delay, replacing it with a
exponential backoff.
Initial delay is 10ms up to 10s for the max delay. In the future this
should be made configurable.

With an extended max delay it becomes important to notice context
cancelation, so this now also selects on both the delay and context
done.
This commit is contained in:
Harry Pidcock 2023-08-18 23:13:13 +10:00 committed by GitHub
parent 0c282e86e8
commit e4ff041882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 11 deletions

View File

@ -20,6 +20,7 @@ import (
"strings"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -30,8 +31,6 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
)
var backoff = time.Second
// set grpc version on compile time to compare against server version response
const ClientGrpcVersion int32 = proto.Version
@ -52,6 +51,13 @@ func (c *client) Close() error {
return c.conn.Close()
}
func (c *client) newBackOff() backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.MaxInterval = 10 * time.Second
b.InitialInterval = 10 * time.Millisecond
return b
}
// Version returns the server- & grpc-version
func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
res, err := c.client.Version(ctx, &proto.Empty{})
@ -68,6 +74,7 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) {
var res *proto.NextResponse
var err error
retry := c.newBackOff()
req := new(proto.NextRequest)
req.Filter = new(proto.Filter)
req.Filter.Labels = f.Labels
@ -96,10 +103,12 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error)
default:
return nil, err
}
if ctx.Err() != nil {
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return nil, ctx.Err()
}
<-time.After(backoff)
}
if res.GetPipeline() == nil {
@ -118,6 +127,7 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error)
// Wait blocks until the pipeline is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = id
for {
@ -139,13 +149,19 @@ func (c *client) Wait(ctx context.Context, id string) (err error) {
default:
return err
}
<-time.After(backoff)
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Init signals the pipeline is initialized.
func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.InitRequest)
req.Id = id
req.State = new(proto.State)
@ -174,13 +190,19 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro
default:
return err
}
<-time.After(backoff)
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Done signals the pipeline is complete.
func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.DoneRequest)
req.Id = id
req.State = new(proto.State)
@ -209,13 +231,19 @@ func (c *client) Done(ctx context.Context, id string, state rpc.State) (err erro
default:
return err
}
<-time.After(backoff)
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Extend extends the pipeline deadline
func (c *client) Extend(ctx context.Context, id string) (err error) {
retry := c.newBackOff()
req := new(proto.ExtendRequest)
req.Id = id
for {
@ -237,13 +265,19 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
default:
return err
}
<-time.After(backoff)
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Update updates the pipeline state.
func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = id
req.State = new(proto.State)
@ -272,13 +306,19 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er
default:
return err
}
<-time.After(backoff)
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Log writes the pipeline log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)
req.LogEntry = new(proto.LogEntry)
req.LogEntry.StepUuid = logEntry.StepUUID
@ -305,7 +345,12 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
default:
return err
}
<-time.After(backoff)
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
@ -322,6 +367,7 @@ func (c *client) RegisterAgent(ctx context.Context, platform, backend, version s
}
func (c *client) ReportHealth(ctx context.Context) (err error) {
retry := c.newBackOff()
req := new(proto.ReportHealthRequest)
req.Status = "I am alive!"
@ -341,6 +387,11 @@ func (c *client) ReportHealth(ctx context.Context) (err error) {
default:
return err
}
<-time.After(backoff)
select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
}

1
go.mod
View File

@ -69,6 +69,7 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect

2
go.sum
View File

@ -36,6 +36,8 @@ github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/caddyserver/certmagic v0.17.2 h1:o30seC1T/dBqBCNNGNHWwj2i5/I/FMjBbTAhjADP3nE=
github.com/caddyserver/certmagic v0.17.2/go.mod h1:ouWUuC490GOLJzkyN35eXfV8bSbwMwSf4bdhkIxtdQE=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=