From 7167f998ce634395332c32105ce5f26de81a6f16 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 5 Apr 2016 20:04:37 +0100 Subject: [PATCH] First crack at backoff function --- client/backoff.go | 18 ++++++++++++++++++ client/backoff_test.go | 26 ++++++++++++++++++++++++++ client/client.go | 1 + client/options.go | 19 +++++++++++++++++++ client/rpc_client.go | 22 ++++++++++++++++++++++ 5 files changed, 86 insertions(+) create mode 100644 client/backoff.go create mode 100644 client/backoff_test.go diff --git a/client/backoff.go b/client/backoff.go new file mode 100644 index 00000000..f92fdbd3 --- /dev/null +++ b/client/backoff.go @@ -0,0 +1,18 @@ +package client + +import ( + "math" + "time" + + "golang.org/x/net/context" +) + +type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error) + +// exponential backoff +func exponentialBackoff(ctx context.Context, req Request, attempts int) (time.Duration, error) { + if attempts == 0 { + return time.Duration(0), nil + } + return time.Duration(math.Pow(10, float64(attempts))) * time.Millisecond, nil +} diff --git a/client/backoff_test.go b/client/backoff_test.go new file mode 100644 index 00000000..70cb40f7 --- /dev/null +++ b/client/backoff_test.go @@ -0,0 +1,26 @@ +package client + +import ( + "math" + "testing" + "time" + + "golang.org/x/net/context" +) + +func TestBackoff(t *testing.T) { + delta := time.Duration(0) + + for i := 0; i < 5; i++ { + d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i) + if err != nil { + t.Fatal(err) + } + + if d < delta { + t.Fatalf("Expected greater than %v, got %v", delta, d) + } + + delta = time.Millisecond * time.Duration(math.Pow(10, float64(i+1))) + } +} diff --git a/client/client.go b/client/client.go index 1993e8c2..39c26e60 100644 --- a/client/client.go +++ b/client/client.go @@ -77,6 +77,7 @@ type RequestOption func(*RequestOptions) var ( DefaultClient Client = newRpcClient() + DefaultBackoff = exponentialBackoff DefaultRetries = 1 DefaultRequestTimeout = time.Second * 5 ) diff --git a/client/options.go b/client/options.go index 9d310943..28a4e9c4 100644 --- a/client/options.go +++ b/client/options.go @@ -37,6 +37,8 @@ type Options struct { type CallOptions struct { SelectOptions []selector.SelectOption + // Backoff func + Backoff BackoffFunc // Transport Dial Timeout DialTimeout time.Duration // Number of Call attempts @@ -67,6 +69,7 @@ func newOptions(options ...Option) Options { opts := Options{ Codecs: make(map[string]codec.NewCodec), CallOptions: CallOptions{ + Backoff: DefaultBackoff, Retries: DefaultRetries, RequestTimeout: DefaultRequestTimeout, DialTimeout: transport.DefaultDialTimeout, @@ -151,6 +154,14 @@ func Wrap(w Wrapper) Option { } } +// Backoff is used to set the backoff function used +// when retrying Calls +func Backoff(fn BackoffFunc) Option { + return func(o *Options) { + o.CallOptions.Backoff = fn + } +} + // Number of retries when making the request. // Should this be a Call Option? func Retries(i int) Option { @@ -182,6 +193,14 @@ func WithSelectOption(so selector.SelectOption) CallOption { } } +// WithBackoff is a CallOption which overrides that which +// set in Options.CallOptions +func WithBackoff(fn BackoffFunc) CallOption { + return func(o *CallOptions) { + o.Backoff = fn + } +} + // WithRetries is a CallOption which overrides that which // set in Options.CallOptions func WithRetries(i int) CallOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index d51dfc2f..0bce57fb 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -203,6 +203,17 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac var grr error for i := 0; i < callOpts.Retries; i++ { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, request, i) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) @@ -257,6 +268,17 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt var grr error for i := 0; i < callOpts.Retries; i++ { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, request, i) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + node, err := next() if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error())