From 2e9f4271a884b5fd940325f3238f4a15290c1e5e Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 13 May 2016 15:58:53 +0100 Subject: [PATCH 1/7] Pool first attempt --- client/rpc_client.go | 15 ++++++-- client/rpc_pool.go | 71 ++++++++++++++++++++++++++++++++++++ server/rpc_server.go | 87 +++++++++++++++++++++++--------------------- 3 files changed, 127 insertions(+), 46 deletions(-) create mode 100644 client/rpc_pool.go diff --git a/client/rpc_client.go b/client/rpc_client.go index e930ab61..47aedb94 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -19,16 +19,16 @@ import ( type rpcClient struct { once sync.Once opts Options + pool *pool } func newRpcClient(opt ...Option) Client { - var once sync.Once - opts := newOptions(opt...) rc := &rpcClient{ - once: once, + once: sync.Once{}, opts: opts, + pool: newPool(), } c := Client(rc) @@ -73,10 +73,15 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp return errors.InternalServerError("go.micro.client", err.Error()) } - c, err := r.opts.Transport.Dial(address, transport.WithTimeout(opts.DialTimeout)) + var grr error + c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout)) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } + defer func() { + // defer execution of release + r.pool.release(address, c, grr) + }() stream := &rpcStream{ context: ctx, @@ -107,8 +112,10 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp select { case err := <-ch: + grr = err return err case <-ctx.Done(): + grr = ctx.Err() return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) } } diff --git a/client/rpc_pool.go b/client/rpc_pool.go new file mode 100644 index 00000000..7fa0e720 --- /dev/null +++ b/client/rpc_pool.go @@ -0,0 +1,71 @@ +package client + +import ( + "sync" + + "github.com/micro/go-micro/transport" +) + +type pool struct { + tr transport.Transport + + sync.Mutex + conns map[string][]*poolConn +} + +type poolConn struct { + transport.Client +} + +var ( + maxIdleConn = 2 +) + +func newPool() *pool { + return &pool{ + conns: make(map[string][]*poolConn), + } +} + +// NoOp the Close since we manage it +func (p *poolConn) Close() error { + return nil +} + +func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) { + p.Lock() + conns, ok := p.conns[addr] + // no free conn + if !ok || len(conns) == 0 { + p.Unlock() + // create new conn + c, err := tr.Dial(addr, opts...) + if err != nil { + return nil, err + } + return &poolConn{c}, nil + } + + conn := conns[len(conns)-1] + p.conns[addr] = conns[:len(conns)-1] + p.Unlock() + return conn, nil +} + +func (p *pool) release(addr string, conn *poolConn, err error) { + // don't store the conn + if err != nil { + conn.Client.Close() + return + } + + // otherwise put it back + p.Lock() + conns := p.conns[addr] + if len(conns) >= maxIdleConn { + conn.Client.Close() + return + } + p.conns[addr] = append(conns, conn) + p.Unlock() +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 3a70d54d..d6ca68aa 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -55,50 +55,53 @@ func (s *rpcServer) accept(sock transport.Socket) { } }() - var msg transport.Message - if err := sock.Recv(&msg); err != nil { - return - } - - // we use this Timeout header to set a server deadline - to := msg.Header["Timeout"] - // we use this Content-Type header to identify the codec needed - ct := msg.Header["Content-Type"] - - cf, err := s.newCodec(ct) - // TODO: needs better error handling - if err != nil { - sock.Send(&transport.Message{ - Header: map[string]string{ - "Content-Type": "text/plain", - }, - Body: []byte(err.Error()), - }) - return - } - - codec := newRpcPlusCodec(&msg, sock, cf) - - // strip our headers - hdr := make(map[string]string) - for k, v := range msg.Header { - hdr[k] = v - } - delete(hdr, "Content-Type") - delete(hdr, "Timeout") - - ctx := metadata.NewContext(context.Background(), hdr) - - // set the timeout if we have it - if len(to) > 0 { - if n, err := strconv.ParseUint(to, 10, 64); err == nil { - ctx, _ = context.WithTimeout(ctx, time.Duration(n)) + for { + var msg transport.Message + if err := sock.Recv(&msg); err != nil { + return } - } - // TODO: needs better error handling - if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { - log.Printf("Unexpected error serving request, closing socket: %v", err) + // we use this Timeout header to set a server deadline + to := msg.Header["Timeout"] + // we use this Content-Type header to identify the codec needed + ct := msg.Header["Content-Type"] + + cf, err := s.newCodec(ct) + // TODO: needs better error handling + if err != nil { + sock.Send(&transport.Message{ + Header: map[string]string{ + "Content-Type": "text/plain", + }, + Body: []byte(err.Error()), + }) + return + } + + codec := newRpcPlusCodec(&msg, sock, cf) + + // strip our headers + hdr := make(map[string]string) + for k, v := range msg.Header { + hdr[k] = v + } + delete(hdr, "Content-Type") + delete(hdr, "Timeout") + + ctx := metadata.NewContext(context.Background(), hdr) + + // set the timeout if we have it + if len(to) > 0 { + if n, err := strconv.ParseUint(to, 10, 64); err == nil { + ctx, _ = context.WithTimeout(ctx, time.Duration(n)) + } + } + + // TODO: needs better error handling + if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { + log.Printf("Unexpected error serving request, closing socket: %v", err) + return + } } } From 98f295b6b0db3886279504296ba2e9337684b58b Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 13 May 2016 18:24:01 +0100 Subject: [PATCH 2/7] Noobed the deadlock --- client/rpc_pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/rpc_pool.go b/client/rpc_pool.go index 7fa0e720..7bdf6929 100644 --- a/client/rpc_pool.go +++ b/client/rpc_pool.go @@ -63,6 +63,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) { p.Lock() conns := p.conns[addr] if len(conns) >= maxIdleConn { + p.Unlock() conn.Client.Close() return } From 6669248291e4f032db377701bc9bf43427482806 Mon Sep 17 00:00:00 2001 From: Asim Date: Mon, 6 Jun 2016 23:37:07 +0100 Subject: [PATCH 3/7] gofmt --- client/mock/mock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/mock/mock.go b/client/mock/mock.go index 337df041..63365bde 100644 --- a/client/mock/mock.go +++ b/client/mock/mock.go @@ -123,7 +123,7 @@ func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Req } v.Set(reflect.ValueOf(r.Response)) - + return nil } From 38a66817e61966dd34c754fe0c863b9d565087f9 Mon Sep 17 00:00:00 2001 From: Asim Date: Mon, 6 Jun 2016 23:37:52 +0100 Subject: [PATCH 4/7] Add a conn lifetime for the pool --- client/rpc_pool.go | 47 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/client/rpc_pool.go b/client/rpc_pool.go index 7bdf6929..a550c601 100644 --- a/client/rpc_pool.go +++ b/client/rpc_pool.go @@ -2,6 +2,7 @@ package client import ( "sync" + "time" "github.com/micro/go-micro/transport" ) @@ -15,10 +16,14 @@ type pool struct { type poolConn struct { transport.Client + created int64 } var ( + // only hold on to this many conns maxIdleConn = 2 + // only hold on to the conn for this period + maxLifeTime = int64(60) ) func newPool() *pool { @@ -34,32 +39,46 @@ func (p *poolConn) Close() error { func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) { p.Lock() - conns, ok := p.conns[addr] - // no free conn - if !ok || len(conns) == 0 { - p.Unlock() - // create new conn - c, err := tr.Dial(addr, opts...) - if err != nil { - return nil, err + conns := p.conns[addr] + now := time.Now().Unix() + + // while we have conns check age and then return one + // otherwise we'll create a new conn + for len(conns) > 0 { + conn := conns[len(conns)-1] + conns = conns[:len(conns)-1] + p.conns[addr] = conns + + // if conn is old kill it and move on + if d := now - conn.created; d > maxLifeTime { + conn.Client.Close() + continue } - return &poolConn{c}, nil + + // we got a good conn, lets unlock and return it + p.Unlock() + + return conn, nil } - conn := conns[len(conns)-1] - p.conns[addr] = conns[:len(conns)-1] p.Unlock() - return conn, nil + + // create new conn + c, err := tr.Dial(addr, opts...) + if err != nil { + return nil, err + } + return &poolConn{c, time.Now().Unix()}, nil } func (p *pool) release(addr string, conn *poolConn, err error) { - // don't store the conn + // don't store the conn if it has errored if err != nil { conn.Client.Close() return } - // otherwise put it back + // otherwise put it back for reuse p.Lock() conns := p.conns[addr] if len(conns) >= maxIdleConn { From 89401cbb957bdc7e055cb0c072a4002351a6a070 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 7 Jun 2016 00:46:14 +0100 Subject: [PATCH 5/7] Make pool configurable --- client/client.go | 6 ++++-- client/options.go | 20 ++++++++++++++++++++ client/rpc_client.go | 11 ++++++++++- client/rpc_pool.go | 22 +++++++++++----------- cmd/cmd.go | 22 ++++++++++++++++++++++ 5 files changed, 67 insertions(+), 14 deletions(-) diff --git a/client/client.go b/client/client.go index 6f4859d2..723af2e0 100644 --- a/client/client.go +++ b/client/client.go @@ -66,14 +66,16 @@ type RequestOption func(*RequestOptions) var ( // DefaultClient is a default client to use out of the box DefaultClient Client = newRpcClient() - // DefaultBackoff is the default backoff function for retries DefaultBackoff = exponentialBackoff - // DefaultRetries is the default number of times a request is tried DefaultRetries = 1 // DefaultRequestTimeout is the default request timeout DefaultRequestTimeout = time.Second * 5 + // DefaultPoolSize sets the connection pool size + DefaultPoolSize = 0 + // DefaultPoolTTL sets the connection pool ttl + DefaultPoolTTL = time.Minute ) // Makes a synchronous call to a service using the default client diff --git a/client/options.go b/client/options.go index 0ab07aa5..7751d879 100644 --- a/client/options.go +++ b/client/options.go @@ -23,6 +23,10 @@ type Options struct { Selector selector.Selector Transport transport.Transport + // Connection Pool + PoolSize int + PoolTTL time.Duration + // Middleware for client Wrappers []Wrapper @@ -74,6 +78,8 @@ func newOptions(options ...Option) Options { RequestTimeout: DefaultRequestTimeout, DialTimeout: transport.DefaultDialTimeout, }, + PoolSize: DefaultPoolSize, + PoolTTL: DefaultPoolTTL, } for _, o := range options { @@ -126,6 +132,20 @@ func ContentType(ct string) Option { } } +// PoolSize sets the connection pool size +func PoolSize(d int) Option { + return func(o *Options) { + o.PoolSize = d + } +} + +// PoolSize sets the connection pool size +func PoolTTL(d time.Duration) Option { + return func(o *Options) { + o.PoolTTL = d + } +} + // Registry to find nodes for a given service func Registry(r registry.Registry) Option { return func(o *Options) { diff --git a/client/rpc_client.go b/client/rpc_client.go index 47aedb94..33503b34 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -28,7 +28,7 @@ func newRpcClient(opt ...Option) Client { rc := &rpcClient{ once: sync.Once{}, opts: opts, - pool: newPool(), + pool: newPool(opts.PoolSize, opts.PoolTTL), } c := Client(rc) @@ -178,9 +178,18 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt } func (r *rpcClient) Init(opts ...Option) error { + size := r.opts.PoolSize + ttl := r.opts.PoolTTL + for _, o := range opts { o(&r.opts) } + + // recreate the pool if the options changed + if size != r.opts.PoolSize || ttl != r.opts.PoolTTL { + r.pool = newPool(r.opts.PoolSize, r.opts.PoolTTL) + } + return nil } diff --git a/client/rpc_pool.go b/client/rpc_pool.go index a550c601..f4b32bad 100644 --- a/client/rpc_pool.go +++ b/client/rpc_pool.go @@ -1,6 +1,7 @@ package client import ( + "fmt" "sync" "time" @@ -8,7 +9,8 @@ import ( ) type pool struct { - tr transport.Transport + size int + ttl int64 sync.Mutex conns map[string][]*poolConn @@ -19,15 +21,10 @@ type poolConn struct { created int64 } -var ( - // only hold on to this many conns - maxIdleConn = 2 - // only hold on to the conn for this period - maxLifeTime = int64(60) -) - -func newPool() *pool { +func newPool(size int, ttl time.Duration) *pool { return &pool{ + size: size, + ttl: int64(ttl.Seconds()), conns: make(map[string][]*poolConn), } } @@ -50,7 +47,7 @@ func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.Di p.conns[addr] = conns // if conn is old kill it and move on - if d := now - conn.created; d > maxLifeTime { + if d := now - conn.created; d > p.ttl { conn.Client.Close() continue } @@ -58,11 +55,14 @@ func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.Di // we got a good conn, lets unlock and return it p.Unlock() + fmt.Println("old conn") return conn, nil } p.Unlock() + fmt.Println("new conn") + // create new conn c, err := tr.Dial(addr, opts...) if err != nil { @@ -81,7 +81,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) { // otherwise put it back for reuse p.Lock() conns := p.conns[addr] - if len(conns) >= maxIdleConn { + if len(conns) >= p.size { p.Unlock() conn.Client.Close() return diff --git a/cmd/cmd.go b/cmd/cmd.go index 7fbb86c7..41ed0e48 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -62,6 +62,16 @@ var ( EnvVar: "MICRO_CLIENT_RETRIES", Usage: "Sets the client retries. Default: 1", }, + cli.IntFlag{ + Name: "client_pool_size", + EnvVar: "MICRO_CLIENT_POOL_SIZE", + Usage: "Sets the client connection pool size. Default: 0", + }, + cli.StringFlag{ + Name: "client_pool_ttl", + EnvVar: "MICRO_CLIENT_POOL_TTL", + Usage: "Sets the client connection pool ttl. e.g 500ms, 5s, 1m. Default: 1m", + }, cli.StringFlag{ Name: "server_name", EnvVar: "MICRO_SERVER_NAME", @@ -337,6 +347,18 @@ func (c *cmd) Before(ctx *cli.Context) error { clientOpts = append(clientOpts, client.RequestTimeout(d)) } + if r := ctx.Int("client_pool_size"); r > 0 { + clientOpts = append(clientOpts, client.PoolSize(r)) + } + + if t := ctx.String("client_pool_ttl"); len(t) > 0 { + d, err := time.ParseDuration(t) + if err != nil { + return fmt.Errorf("failed to parse client_pool_ttl: %v", t) + } + clientOpts = append(clientOpts, client.PoolTTL(d)) + } + // We have some command line opts for the server. // Lets set it up if len(serverOpts) > 0 { From 58ad01e3e13030d2e6a16dce4c74bc64c9c06109 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 7 Jun 2016 00:48:34 +0100 Subject: [PATCH 6/7] Don't log that cruft --- client/rpc_pool.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/client/rpc_pool.go b/client/rpc_pool.go index f4b32bad..9fdd7736 100644 --- a/client/rpc_pool.go +++ b/client/rpc_pool.go @@ -1,7 +1,6 @@ package client import ( - "fmt" "sync" "time" @@ -55,14 +54,11 @@ func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.Di // we got a good conn, lets unlock and return it p.Unlock() - fmt.Println("old conn") return conn, nil } p.Unlock() - fmt.Println("new conn") - // create new conn c, err := tr.Dial(addr, opts...) if err != nil { From ff0bd76905429112646a9d8c3afa5d88487f96e2 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 7 Jun 2016 01:18:54 +0100 Subject: [PATCH 7/7] Add a pool test --- client/rpc_pool_test.go | 84 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 client/rpc_pool_test.go diff --git a/client/rpc_pool_test.go b/client/rpc_pool_test.go new file mode 100644 index 00000000..cd08252d --- /dev/null +++ b/client/rpc_pool_test.go @@ -0,0 +1,84 @@ +package client + +import ( + "testing" + "time" + + "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/transport/mock" +) + +func testPool(t *testing.T, size int, ttl time.Duration) { + // zero pool + p := newPool(size, ttl) + + // mock transport + tr := mock.NewTransport() + + // listen + l, err := tr.Listen(":0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + // accept loop + go func() { + for { + if err := l.Accept(func(s transport.Socket) { + for { + var msg transport.Message + if err := s.Recv(&msg); err != nil { + return + } + if err := s.Send(&msg); err != nil { + return + } + } + }); err != nil { + return + } + } + }() + + for i := 0; i < 10; i++ { + // get a conn + c, err := p.getConn(l.Addr(), tr) + if err != nil { + t.Fatal(err) + } + + msg := &transport.Message{ + Body: []byte(`hello world`), + } + + if err := c.Send(msg); err != nil { + t.Fatal(err) + } + + var rcv transport.Message + + if err := c.Recv(&rcv); err != nil { + t.Fatal(err) + } + + if string(rcv.Body) != string(msg.Body) { + t.Fatalf("got %v, expected %v", rcv.Body, msg.Body) + } + + // release the conn + p.release(l.Addr(), c, nil) + + p.Lock() + if i := len(p.conns[l.Addr()]); i > size { + p.Unlock() + t.Fatal("pool size %d is greater than expected %d", i, size) + } + p.Unlock() + } +} + +func TestRPCPool(t *testing.T) { + testPool(t, 0, time.Minute) + testPool(t, 2, time.Minute) +}