diff --git a/client/options.go b/client/options.go index 102c8400..bcf4fdf7 100644 --- a/client/options.go +++ b/client/options.go @@ -16,6 +16,7 @@ type Options struct { Selector selector.Selector Transport transport.Transport Wrappers []Wrapper + Retries int // Other options to be used by client implementations Options map[string]string @@ -40,6 +41,44 @@ type RequestOptions struct { Options map[string]string } +func newOptions(options ...Option) Options { + opts := Options{ + Codecs: make(map[string]codec.NewCodec), + } + + for _, o := range options { + o(&opts) + } + + if opts.Retries == 0 { + opts.Retries = 1 + } + + if len(opts.ContentType) == 0 { + opts.ContentType = defaultContentType + } + + if opts.Broker == nil { + opts.Broker = broker.DefaultBroker + } + + if opts.Registry == nil { + opts.Registry = registry.DefaultRegistry + } + + if opts.Selector == nil { + opts.Selector = selector.NewSelector( + selector.Registry(opts.Registry), + ) + } + + if opts.Transport == nil { + opts.Transport = transport.DefaultTransport + } + + return opts +} + // Broker to be used for pub/sub func Broker(b broker.Broker) Option { return func(o *Options) { @@ -89,6 +128,13 @@ func Wrap(w Wrapper) Option { } } +// Number of retries when making the request +func Retries(i int) Option { + return func(o *Options) { + o.Retries = i + } +} + // Call Options func WithSelectOption(so selector.SelectOption) CallOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 77622c35..97219576 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -9,7 +9,6 @@ import ( "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/registry" "github.com/micro/go-micro/selector" "github.com/micro/go-micro/transport" @@ -24,35 +23,7 @@ type rpcClient struct { func newRpcClient(opt ...Option) Client { var once sync.Once - opts := Options{ - Codecs: make(map[string]codec.NewCodec), - } - - for _, o := range opt { - o(&opts) - } - - if len(opts.ContentType) == 0 { - opts.ContentType = defaultContentType - } - - if opts.Broker == nil { - opts.Broker = broker.DefaultBroker - } - - if opts.Registry == nil { - opts.Registry = registry.DefaultRegistry - } - - if opts.Selector == nil { - opts.Selector = selector.NewSelector( - selector.Registry(opts.Registry), - ) - } - - if opts.Transport == nil { - opts.Transport = transport.DefaultTransport - } + opts := newOptions(opt...) rc := &rpcClient{ once: once, @@ -177,20 +148,28 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return errors.InternalServerError("go.micro.client", err.Error()) } - node, err := next() - if err != nil && err == selector.ErrNotFound { - return errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + for i := 0; i < r.opts.Retries; i++ { + node, err := next() + if err != nil && err == selector.ErrNotFound { + return errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + + err = r.call(ctx, address, request, response) + r.opts.Selector.Mark(request.Service(), node, err) + + // if the call succeeded lets bail early + if err == nil { + return nil + } } - address := node.Address - if node.Port > 0 { - address = fmt.Sprintf("%s:%d", address, node.Port) - } - - err = r.call(ctx, address, request, response) - r.opts.Selector.Mark(request.Service(), node, err) return err } @@ -211,20 +190,30 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt return nil, errors.InternalServerError("go.micro.client", err.Error()) } - node, err := next() - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + var stream Streamer + + for i := 0; i < r.opts.Retries; i++ { + node, err := next() + if err != nil && err == selector.ErrNotFound { + return nil, errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + + stream, err = r.stream(ctx, address, request) + r.opts.Selector.Mark(request.Service(), node, err) + + // bail early if succeeds + if err == nil { + return stream, nil + } } - address := node.Address - if node.Port > 0 { - address = fmt.Sprintf("%s:%d", address, node.Port) - } - - stream, err := r.stream(ctx, address, request) - r.opts.Selector.Mark(request.Service(), node, err) return stream, err }