1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-07-03 00:27:03 +02:00

Merge pull request #12 from uptrace/fix/conn-max-idle-time

feat: close idle connections after 30 minutes
This commit is contained in:
Vladimir Mihailenco
2022-03-23 10:53:59 +02:00
committed by GitHub
3 changed files with 36 additions and 93 deletions

View File

@ -19,7 +19,6 @@ type Conn struct {
ServerInfo chproto.ServerInfo ServerInfo chproto.ServerInfo
pooled bool
Inited bool Inited bool
createdAt time.Time createdAt time.Time
usedAt int64 // atomic usedAt int64 // atomic
@ -33,6 +32,7 @@ func NewConn(netConn net.Conn) *Conn {
wr: chproto.NewWriter(netConn), wr: chproto.NewWriter(netConn),
createdAt: time.Now(), createdAt: time.Now(),
} }
cn.SetUsedAt(time.Now())
return cn return cn
} }

View File

@ -76,11 +76,11 @@ type Config struct {
Dialer func(context.Context) (net.Conn, error) Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error OnClose func(*Conn) error
PoolSize int PoolSize int
PoolTimeout time.Duration PoolTimeout time.Duration
MinIdleConns int MaxIdleConns int
MaxIdleConns int ConnMaxIdleTime time.Duration
MaxConnAge time.Duration ConnMaxLifetime time.Duration
} }
type ConnPool struct { type ConnPool struct {
@ -100,9 +100,6 @@ type ConnPool struct {
connsMu sync.Mutex connsMu sync.Mutex
conns []*Conn conns []*Conn
idleConns []*Conn idleConns []*Conn
poolSize int
idleConnsLen int
} }
var _ Pooler = (*ConnPool)(nil) var _ Pooler = (*ConnPool)(nil)
@ -116,70 +113,22 @@ func New(cfg *Config) *ConnPool {
idleConns: make([]*Conn, 0, cfg.PoolSize), idleConns: make([]*Conn, 0, cfg.PoolSize),
} }
p.connsMu.Lock()
p.checkMinIdleConns()
p.connsMu.Unlock()
return p return p
} }
func (p *ConnPool) checkMinIdleConns() { func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error) {
if p.cfg.MinIdleConns == 0 { cn, err := p.dialConn(ctx)
return
}
for p.poolSize < p.cfg.PoolSize && p.idleConnsLen < p.cfg.MinIdleConns {
p.poolSize++
p.idleConnsLen++
go func() {
err := p.addIdleConn()
if err != nil {
p.connsMu.Lock()
p.poolSize--
p.idleConnsLen--
p.connsMu.Unlock()
}
}()
}
}
func (p *ConnPool) addIdleConn() error {
cn, err := p.dialConn(context.TODO(), true)
if err != nil {
return err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
p.idleConns = append(p.idleConns, cn)
p.connsMu.Unlock()
return nil
}
func (p *ConnPool) NewConn(c context.Context) (*Conn, error) {
return p.newConn(c, false)
}
func (p *ConnPool) newConn(c context.Context, pooled bool) (*Conn, error) {
cn, err := p.dialConn(c, pooled)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.connsMu.Lock() p.connsMu.Lock()
p.conns = append(p.conns, cn) p.conns = append(p.conns, cn)
if pooled {
// If pool is full remove the cn on next Put.
if p.poolSize >= p.cfg.PoolSize {
cn.pooled = false
} else {
p.poolSize++
}
}
p.connsMu.Unlock() p.connsMu.Unlock()
return cn, nil return cn, nil
} }
func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) { func (p *ConnPool) dialConn(ctx context.Context) (*Conn, error) {
if p.closed() { if p.closed() {
return nil, ErrClosed return nil, ErrClosed
} }
@ -188,7 +137,7 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) {
return nil, p.getLastDialError() return nil, p.getLastDialError()
} }
netConn, err := p.cfg.Dialer(c) netConn, err := p.cfg.Dialer(ctx)
if err != nil { if err != nil {
p.setLastDialError(err) p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) { if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
@ -198,7 +147,6 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) {
} }
cn := NewConn(netConn) cn := NewConn(netConn)
cn.pooled = pooled
return cn, nil return cn, nil
} }
@ -254,7 +202,11 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
break break
} }
if p.cfg.MaxConnAge > 0 && time.Since(cn.createdAt) >= p.cfg.MaxConnAge { if p.cfg.ConnMaxLifetime > 0 && time.Since(cn.createdAt) >= p.cfg.ConnMaxLifetime {
_ = p.CloseConn(cn)
continue
}
if p.cfg.ConnMaxIdleTime > 0 && time.Since(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {
_ = p.CloseConn(cn) _ = p.CloseConn(cn)
continue continue
} }
@ -265,7 +217,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
atomic.AddUint32(&p.stats.Misses, 1) atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.newConn(ctx, true) newcn, err := p.NewConn(ctx)
if err != nil { if err != nil {
p.freeTurn() p.freeTurn()
return nil, err return nil, err
@ -326,8 +278,6 @@ func (p *ConnPool) popIdle() *Conn {
idx := len(p.idleConns) - 1 idx := len(p.idleConns) - 1
cn := p.idleConns[idx] cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx] p.idleConns = p.idleConns[:idx]
p.idleConnsLen--
p.checkMinIdleConns()
return cn return cn
} }
@ -338,18 +288,12 @@ func (p *ConnPool) Put(cn *Conn) {
return return
} }
if !cn.pooled {
p.Remove(cn, nil)
return
}
var atMaxCap bool var atMaxCap bool
p.connsMu.Lock() p.connsMu.Lock()
if len(p.idleConns) < p.cfg.MaxIdleConns { if len(p.idleConns) < p.cfg.MaxIdleConns {
p.idleConns = append(p.idleConns, cn) p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
} else { } else {
atMaxCap = true atMaxCap = true
} }
@ -384,10 +328,6 @@ func (p *ConnPool) removeConn(cn *Conn) {
for i, c := range p.conns { for i, c := range p.conns {
if c == cn { if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...) p.conns = append(p.conns[:i], p.conns[i+1:]...)
if cn.pooled {
p.poolSize--
p.checkMinIdleConns()
}
return return
} }
} }
@ -411,20 +351,19 @@ func (p *ConnPool) Len() int {
// IdleLen returns number of idle connections. // IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int { func (p *ConnPool) IdleLen() int {
p.connsMu.Lock() p.connsMu.Lock()
n := p.idleConnsLen n := len(p.idleConns)
p.connsMu.Unlock() p.connsMu.Unlock()
return n return n
} }
func (p *ConnPool) Stats() *Stats { func (p *ConnPool) Stats() *Stats {
idleLen := p.IdleLen()
return &Stats{ return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits), Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses), Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts), Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()), TotalConns: uint32(p.Len()),
IdleConns: uint32(idleLen), IdleConns: uint32(p.IdleLen()),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns), StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
} }
} }
@ -446,9 +385,7 @@ func (p *ConnPool) Close() error {
} }
} }
p.conns = nil p.conns = nil
p.poolSize = 0
p.idleConns = nil p.idleConns = nil
p.idleConnsLen = 0
p.connsMu.Unlock() p.connsMu.Unlock()
return firstErr return firstErr

View File

@ -65,9 +65,10 @@ func defaultConfig() *Config {
MaxRetryBackoff: time.Second, MaxRetryBackoff: time.Second,
Config: chpool.Config{ Config: chpool.Config{
PoolSize: poolSize, PoolSize: poolSize,
MaxIdleConns: poolSize, PoolTimeout: 30 * time.Second,
PoolTimeout: 30 * time.Second, MaxIdleConns: poolSize,
ConnMaxIdleTime: 30 * time.Minute,
}, },
} }
return cfg return cfg
@ -194,20 +195,25 @@ func WithPoolSize(poolSize int) Option {
} }
} }
// WithMinIdleConns configures minimum number of idle connections which is useful when establishing // WithConnMaxLifetime sets the maximum amount of time a connection may be reused.
// new connection is slow. // Expired connections may be closed lazily before reuse.
func WithMinIdleConns(minIdleConns int) Option {
// If d <= 0, connections are not closed due to a connection's age.
func WithConnMaxLifetime(d time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.MinIdleConns = minIdleConns db.cfg.ConnMaxLifetime = d
} }
} }
// WithMaxConnAge configures Connection age at which client retires (closes) the connection. // SetConnMaxIdleTime sets the maximum amount of time a connection may be idle.
// It is useful with proxies like HAProxy. // Expired connections may be closed lazily before reuse.
// Default is to not close aged connections. //
func WithMaxConnAge(timeout time.Duration) Option { // If d <= 0, connections are not closed due to a connection's idle time.
//
// ClickHouse closes idle connections after 1 hour (see idle_connection_timeout).
func WithConnMaxIdleTime(d time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.MaxConnAge = timeout db.cfg.ConnMaxIdleTime = d
} }
} }