mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-30 10:10:44 +02:00
Better connection pool handling (#2725)
* [fix] etcd config source prefix issue (#2389) * http transport data race issue (#2436) * [fix] #2431 http transport data race issue * [feature] Ability to close connection while receiving. Ability to send messages while receiving. Icreased r channel limit to 100 to more fluently communication. Do not dropp sent request if r channel is full. * [fix] Use pool connection close timeout * [fix] replace Close with private function * [fix] Do not close the transport client twice in stream connection , the transport client is closed in the rpc codec * [fix] tests --------- Co-authored-by: Johnson C <chengqiaosheng@gmail.com>
This commit is contained in:
parent
1c6c1ff1a3
commit
0433e98dbc
@ -27,6 +27,8 @@ var (
|
||||
DefaultPoolSize = 100
|
||||
// DefaultPoolTTL sets the connection pool ttl.
|
||||
DefaultPoolTTL = time.Minute
|
||||
// DefaultPoolCloseTimeout sets the connection pool colse timeout.
|
||||
DefaultPoolCloseTimeout = time.Second
|
||||
)
|
||||
|
||||
// Options are the Client options.
|
||||
@ -63,8 +65,9 @@ type Options struct {
|
||||
Wrappers []Wrapper
|
||||
|
||||
// Connection Pool
|
||||
PoolSize int
|
||||
PoolTTL time.Duration
|
||||
PoolSize int
|
||||
PoolTTL time.Duration
|
||||
PoolCloseTimeout time.Duration
|
||||
}
|
||||
|
||||
// CallOptions are options used to make calls to a server.
|
||||
@ -140,13 +143,14 @@ func NewOptions(options ...Option) Options {
|
||||
ConnectionTimeout: DefaultConnectionTimeout,
|
||||
DialTimeout: transport.DefaultDialTimeout,
|
||||
},
|
||||
PoolSize: DefaultPoolSize,
|
||||
PoolTTL: DefaultPoolTTL,
|
||||
Broker: broker.DefaultBroker,
|
||||
Selector: selector.DefaultSelector,
|
||||
Registry: registry.DefaultRegistry,
|
||||
Transport: transport.DefaultTransport,
|
||||
Logger: logger.DefaultLogger,
|
||||
PoolSize: DefaultPoolSize,
|
||||
PoolTTL: DefaultPoolTTL,
|
||||
PoolCloseTimeout: DefaultPoolCloseTimeout,
|
||||
Broker: broker.DefaultBroker,
|
||||
Selector: selector.DefaultSelector,
|
||||
Registry: registry.DefaultRegistry,
|
||||
Transport: transport.DefaultTransport,
|
||||
Logger: logger.DefaultLogger,
|
||||
}
|
||||
|
||||
for _, o := range options {
|
||||
@ -191,6 +195,13 @@ func PoolTTL(d time.Duration) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// PoolCloseTimeout sets the connection pool close timeout.
|
||||
func PoolCloseTimeout(d time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.PoolCloseTimeout = d
|
||||
}
|
||||
}
|
||||
|
||||
// Registry to find nodes for a given service.
|
||||
func Registry(r registry.Registry) Option {
|
||||
return func(o *Options) {
|
||||
|
@ -46,6 +46,7 @@ func newRPCClient(opt ...Option) Client {
|
||||
pool.Size(opts.PoolSize),
|
||||
pool.TTL(opts.PoolTTL),
|
||||
pool.Transport(opts.Transport),
|
||||
pool.CloseTimeout(opts.PoolCloseTimeout),
|
||||
)
|
||||
|
||||
rc := &rpcClient{
|
||||
@ -148,7 +149,10 @@ func (r *rpcClient) call(
|
||||
|
||||
c, err := r.pool.Get(address, dOpts...)
|
||||
if err != nil {
|
||||
return merrors.InternalServerError("go.micro.client", "connection error: %v", err)
|
||||
if c == nil {
|
||||
return merrors.InternalServerError("go.micro.client", "connection error: %v", err)
|
||||
}
|
||||
logger.Log(log.ErrorLevel, "failed to close pool", err)
|
||||
}
|
||||
|
||||
seq := atomic.AddUint64(&r.seq, 1) - 1
|
||||
|
@ -565,6 +565,14 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
clientOpts = append(clientOpts, client.PoolTTL(d))
|
||||
}
|
||||
|
||||
if t := ctx.String("client_pool_close_timeout"); len(t) > 0 {
|
||||
d, err := time.ParseDuration(t)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse client_pool_close_timeout: %v", t)
|
||||
}
|
||||
clientOpts = append(clientOpts, client.PoolCloseTimeout(d))
|
||||
}
|
||||
|
||||
// We have some command line opts for the server.
|
||||
// Lets set it up
|
||||
if len(serverOpts) > 0 {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -12,37 +13,40 @@ import (
|
||||
type pool struct {
|
||||
tr transport.Transport
|
||||
|
||||
conns map[string][]*poolConn
|
||||
size int
|
||||
ttl time.Duration
|
||||
|
||||
sync.Mutex
|
||||
closeTimeout time.Duration
|
||||
conns map[string][]*poolConn
|
||||
mu sync.Mutex
|
||||
size int
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
type poolConn struct {
|
||||
created time.Time
|
||||
transport.Client
|
||||
id string
|
||||
|
||||
closeTimeout time.Duration
|
||||
created time.Time
|
||||
id string
|
||||
}
|
||||
|
||||
func newPool(options Options) *pool {
|
||||
return &pool{
|
||||
size: options.Size,
|
||||
tr: options.Transport,
|
||||
ttl: options.TTL,
|
||||
conns: make(map[string][]*poolConn),
|
||||
size: options.Size,
|
||||
tr: options.Transport,
|
||||
ttl: options.TTL,
|
||||
closeTimeout: options.CloseTimeout,
|
||||
conns: make(map[string][]*poolConn),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *pool) Close() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
var err error
|
||||
|
||||
for k, c := range p.conns {
|
||||
for _, conn := range c {
|
||||
if nerr := conn.Client.Close(); nerr != nil {
|
||||
if nerr := conn.close(); nerr != nil {
|
||||
err = nerr
|
||||
}
|
||||
}
|
||||
@ -67,7 +71,7 @@ func (p *poolConn) Created() time.Time {
|
||||
}
|
||||
|
||||
func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) {
|
||||
p.Lock()
|
||||
p.mu.Lock()
|
||||
conns := p.conns[addr]
|
||||
|
||||
// While we have conns check age and then return one
|
||||
@ -79,22 +83,30 @@ func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) {
|
||||
|
||||
// If conn is old kill it and move on
|
||||
if d := time.Since(conn.Created()); d > p.ttl {
|
||||
if err := conn.Client.Close(); err != nil {
|
||||
p.Unlock()
|
||||
return nil, err
|
||||
if err := conn.close(); err != nil {
|
||||
p.mu.Unlock()
|
||||
c, errConn := p.newConn(addr, opts)
|
||||
if errConn != nil {
|
||||
return nil, errConn
|
||||
}
|
||||
return c, err
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// We got a good conn, lets unlock and return it
|
||||
p.Unlock()
|
||||
p.mu.Unlock()
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
p.Unlock()
|
||||
p.mu.Unlock()
|
||||
|
||||
return p.newConn(addr, opts)
|
||||
}
|
||||
|
||||
func (p *pool) newConn(addr string, opts []transport.DialOption) (Conn, error) {
|
||||
// create new conn
|
||||
c, err := p.tr.Dial(addr, opts...)
|
||||
if err != nil {
|
||||
@ -102,28 +114,46 @@ func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) {
|
||||
}
|
||||
|
||||
return &poolConn{
|
||||
Client: c,
|
||||
id: uuid.New().String(),
|
||||
created: time.Now(),
|
||||
Client: c,
|
||||
id: uuid.New().String(),
|
||||
closeTimeout: p.closeTimeout,
|
||||
created: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *pool) Release(conn Conn, err error) error {
|
||||
// don't store the conn if it has errored
|
||||
if err != nil {
|
||||
return conn.(*poolConn).Client.Close()
|
||||
return conn.(*poolConn).close()
|
||||
}
|
||||
|
||||
// otherwise put it back for reuse
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
conns := p.conns[conn.Remote()]
|
||||
if len(conns) >= p.size {
|
||||
return conn.(*poolConn).Client.Close()
|
||||
return conn.(*poolConn).close()
|
||||
}
|
||||
|
||||
p.conns[conn.Remote()] = append(conns, conn.(*poolConn))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *poolConn) close() error {
|
||||
ch := make(chan error)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
ch <- p.Client.Close()
|
||||
}()
|
||||
t := time.NewTimer(p.closeTimeout)
|
||||
var err error
|
||||
select {
|
||||
case <-t.C:
|
||||
err = errors.New("unable to close in time")
|
||||
case err = <-ch:
|
||||
t.Stop()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -73,12 +73,12 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
|
||||
// release the conn
|
||||
p.Release(c, nil)
|
||||
|
||||
p.Lock()
|
||||
p.mu.Lock()
|
||||
if i := len(p.conns[l.Addr()]); i > size {
|
||||
p.Unlock()
|
||||
p.mu.Unlock()
|
||||
t.Fatalf("pool size %d is greater than expected %d", i, size)
|
||||
}
|
||||
p.Unlock()
|
||||
p.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,9 +7,10 @@ import (
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Transport transport.Transport
|
||||
TTL time.Duration
|
||||
Size int
|
||||
Transport transport.Transport
|
||||
TTL time.Duration
|
||||
CloseTimeout time.Duration
|
||||
Size int
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
@ -31,3 +32,9 @@ func TTL(t time.Duration) Option {
|
||||
o.TTL = t
|
||||
}
|
||||
}
|
||||
|
||||
func CloseTimeout(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.CloseTimeout = t
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user