From 844981bf1a831ab476e8854d413d2ea31c087d42 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 23 Mar 2022 10:48:38 +0200 Subject: [PATCH] feat: close idle connections after 30 minutes --- ch/chpool/conn.go | 2 +- ch/chpool/pool.go | 97 +++++++++-------------------------------------- ch/config.go | 30 +++++++++------ 3 files changed, 36 insertions(+), 93 deletions(-) diff --git a/ch/chpool/conn.go b/ch/chpool/conn.go index 29259ca..9c340b8 100644 --- a/ch/chpool/conn.go +++ b/ch/chpool/conn.go @@ -19,7 +19,6 @@ type Conn struct { ServerInfo chproto.ServerInfo - pooled bool Inited bool createdAt time.Time usedAt int64 // atomic @@ -33,6 +32,7 @@ func NewConn(netConn net.Conn) *Conn { wr: chproto.NewWriter(netConn), createdAt: time.Now(), } + cn.SetUsedAt(time.Now()) return cn } diff --git a/ch/chpool/pool.go b/ch/chpool/pool.go index c405a49..75f7514 100644 --- a/ch/chpool/pool.go +++ b/ch/chpool/pool.go @@ -76,11 +76,11 @@ type Config struct { Dialer func(context.Context) (net.Conn, error) OnClose func(*Conn) error - PoolSize int - PoolTimeout time.Duration - MinIdleConns int - MaxIdleConns int - MaxConnAge time.Duration + PoolSize int + PoolTimeout time.Duration + MaxIdleConns int + ConnMaxIdleTime time.Duration + ConnMaxLifetime time.Duration } type ConnPool struct { @@ -100,9 +100,6 @@ type ConnPool struct { connsMu sync.Mutex conns []*Conn idleConns []*Conn - - poolSize int - idleConnsLen int } var _ Pooler = (*ConnPool)(nil) @@ -116,70 +113,22 @@ func New(cfg *Config) *ConnPool { idleConns: make([]*Conn, 0, cfg.PoolSize), } - p.connsMu.Lock() - p.checkMinIdleConns() - p.connsMu.Unlock() - return p } -func (p *ConnPool) checkMinIdleConns() { - if p.cfg.MinIdleConns == 0 { - return - } - for p.poolSize < p.cfg.PoolSize && p.idleConnsLen < p.cfg.MinIdleConns { - p.poolSize++ - p.idleConnsLen++ - go func() { - err := p.addIdleConn() - if err != nil { - p.connsMu.Lock() - p.poolSize-- - p.idleConnsLen-- - p.connsMu.Unlock() - } - }() - } -} - -func (p *ConnPool) addIdleConn() error { - cn, err := p.dialConn(context.TODO(), true) - if err != nil { - return err - } - - p.connsMu.Lock() - p.conns = append(p.conns, cn) - p.idleConns = append(p.idleConns, cn) - p.connsMu.Unlock() - return nil -} - -func (p *ConnPool) NewConn(c context.Context) (*Conn, error) { - return p.newConn(c, false) -} - -func (p *ConnPool) newConn(c context.Context, pooled bool) (*Conn, error) { - cn, err := p.dialConn(c, pooled) +func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error) { + cn, err := p.dialConn(ctx) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) - if pooled { - // If pool is full remove the cn on next Put. - if p.poolSize >= p.cfg.PoolSize { - cn.pooled = false - } else { - p.poolSize++ - } - } p.connsMu.Unlock() return cn, nil } -func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) { +func (p *ConnPool) dialConn(ctx context.Context) (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -188,7 +137,7 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) { return nil, p.getLastDialError() } - netConn, err := p.cfg.Dialer(c) + netConn, err := p.cfg.Dialer(ctx) if err != nil { p.setLastDialError(err) if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) { @@ -198,7 +147,6 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) { } cn := NewConn(netConn) - cn.pooled = pooled return cn, nil } @@ -254,7 +202,11 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { break } - if p.cfg.MaxConnAge > 0 && time.Since(cn.createdAt) >= p.cfg.MaxConnAge { + if p.cfg.ConnMaxLifetime > 0 && time.Since(cn.createdAt) >= p.cfg.ConnMaxLifetime { + _ = p.CloseConn(cn) + continue + } + if p.cfg.ConnMaxIdleTime > 0 && time.Since(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime { _ = p.CloseConn(cn) continue } @@ -265,7 +217,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { atomic.AddUint32(&p.stats.Misses, 1) - newcn, err := p.newConn(ctx, true) + newcn, err := p.NewConn(ctx) if err != nil { p.freeTurn() return nil, err @@ -326,8 +278,6 @@ func (p *ConnPool) popIdle() *Conn { idx := len(p.idleConns) - 1 cn := p.idleConns[idx] p.idleConns = p.idleConns[:idx] - p.idleConnsLen-- - p.checkMinIdleConns() return cn } @@ -338,18 +288,12 @@ func (p *ConnPool) Put(cn *Conn) { return } - if !cn.pooled { - p.Remove(cn, nil) - return - } - var atMaxCap bool p.connsMu.Lock() if len(p.idleConns) < p.cfg.MaxIdleConns { p.idleConns = append(p.idleConns, cn) - p.idleConnsLen++ } else { atMaxCap = true } @@ -384,10 +328,6 @@ func (p *ConnPool) removeConn(cn *Conn) { for i, c := range p.conns { if c == cn { p.conns = append(p.conns[:i], p.conns[i+1:]...) - if cn.pooled { - p.poolSize-- - p.checkMinIdleConns() - } return } } @@ -411,20 +351,19 @@ func (p *ConnPool) Len() int { // IdleLen returns number of idle connections. func (p *ConnPool) IdleLen() int { p.connsMu.Lock() - n := p.idleConnsLen + n := len(p.idleConns) p.connsMu.Unlock() return n } func (p *ConnPool) Stats() *Stats { - idleLen := p.IdleLen() return &Stats{ Hits: atomic.LoadUint32(&p.stats.Hits), Misses: atomic.LoadUint32(&p.stats.Misses), Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - IdleConns: uint32(idleLen), + IdleConns: uint32(p.IdleLen()), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } } @@ -446,9 +385,7 @@ func (p *ConnPool) Close() error { } } p.conns = nil - p.poolSize = 0 p.idleConns = nil - p.idleConnsLen = 0 p.connsMu.Unlock() return firstErr diff --git a/ch/config.go b/ch/config.go index 83c1fa4..2958bdd 100644 --- a/ch/config.go +++ b/ch/config.go @@ -65,9 +65,10 @@ func defaultConfig() *Config { MaxRetryBackoff: time.Second, Config: chpool.Config{ - PoolSize: poolSize, - MaxIdleConns: poolSize, - PoolTimeout: 30 * time.Second, + PoolSize: poolSize, + PoolTimeout: 30 * time.Second, + MaxIdleConns: poolSize, + ConnMaxIdleTime: 30 * time.Minute, }, } return cfg @@ -194,20 +195,25 @@ func WithPoolSize(poolSize int) Option { } } -// WithMinIdleConns configures minimum number of idle connections which is useful when establishing -// new connection is slow. -func WithMinIdleConns(minIdleConns int) Option { +// WithConnMaxLifetime sets the maximum amount of time a connection may be reused. +// Expired connections may be closed lazily before reuse. + +// If d <= 0, connections are not closed due to a connection's age. +func WithConnMaxLifetime(d time.Duration) Option { return func(db *DB) { - db.cfg.MinIdleConns = minIdleConns + db.cfg.ConnMaxLifetime = d } } -// WithMaxConnAge configures Connection age at which client retires (closes) the connection. -// It is useful with proxies like HAProxy. -// Default is to not close aged connections. -func WithMaxConnAge(timeout time.Duration) Option { +// SetConnMaxIdleTime sets the maximum amount of time a connection may be idle. +// Expired connections may be closed lazily before reuse. +// +// If d <= 0, connections are not closed due to a connection's idle time. +// +// ClickHouse closes idle connections after 1 hour (see idle_connection_timeout). +func WithConnMaxIdleTime(d time.Duration) Option { return func(db *DB) { - db.cfg.MaxConnAge = timeout + db.cfg.ConnMaxIdleTime = d } }