1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-06-27 00:21:13 +02:00

chore: bump default timeouts

This commit is contained in:
Vladimir Mihailenco
2022-10-26 13:38:40 +03:00
parent 0da7ce6381
commit 92d13a878d
3 changed files with 81 additions and 81 deletions

View File

@ -42,22 +42,22 @@ type Config struct {
MaxRetryBackoff time.Duration MaxRetryBackoff time.Duration
} }
func (cfg *Config) clone() *Config { func (conf *Config) clone() *Config {
clone := *cfg clone := *conf
return &clone return &clone
} }
func (cfg *Config) netDialer() *net.Dialer { func (conf *Config) netDialer() *net.Dialer {
return &net.Dialer{ return &net.Dialer{
Timeout: cfg.DialTimeout, Timeout: conf.DialTimeout,
KeepAlive: 5 * time.Minute, KeepAlive: 5 * time.Minute,
} }
} }
func defaultConfig() *Config { func defaultConfig() *Config {
var cfg *Config var conf *Config
poolSize := 2 * runtime.GOMAXPROCS(0) poolSize := 2 * runtime.GOMAXPROCS(0)
cfg = &Config{ conf = &Config{
Config: chpool.Config{ Config: chpool.Config{
PoolSize: poolSize, PoolSize: poolSize,
PoolTimeout: 30 * time.Second, PoolTimeout: 30 * time.Second,
@ -72,14 +72,14 @@ func defaultConfig() *Config {
Database: "default", Database: "default",
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second, ReadTimeout: 30 * time.Second,
WriteTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second,
MaxRetries: 2, MaxRetries: 2,
MinRetryBackoff: 500 * time.Millisecond, MinRetryBackoff: 500 * time.Millisecond,
MaxRetryBackoff: time.Second, MaxRetryBackoff: time.Second,
} }
return cfg return conf
} }
type Option func(db *DB) type Option func(db *DB)
@ -93,7 +93,7 @@ func WithDiscardUnknownColumns() Option {
// WithCompression enables/disables LZ4 compression. // WithCompression enables/disables LZ4 compression.
func WithCompression(enabled bool) Option { func WithCompression(enabled bool) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.Compression = enabled db.conf.Compression = enabled
} }
} }
@ -106,48 +106,48 @@ func WithAutoCreateDatabase(enabled bool) Option {
// WithAddr configures TCP host:port. // WithAddr configures TCP host:port.
func WithAddr(addr string) Option { func WithAddr(addr string) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.Addr = addr db.conf.Addr = addr
} }
} }
// WithTLSConfig configures TLS config for secure connections. // WithTLSConfig configures TLS config for secure connections.
func WithTLSConfig(cfg *tls.Config) Option { func WithTLSConfig(conf *tls.Config) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.TLSConfig = cfg db.conf.TLSConfig = conf
} }
} }
func WithQuerySettings(params map[string]any) Option { func WithQuerySettings(params map[string]any) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.QuerySettings = params db.conf.QuerySettings = params
} }
} }
func WithInsecure(on bool) Option { func WithInsecure(on bool) Option {
return func(db *DB) { return func(db *DB) {
if on { if on {
db.cfg.TLSConfig = nil db.conf.TLSConfig = nil
} else { } else {
db.cfg.TLSConfig = &tls.Config{InsecureSkipVerify: true} db.conf.TLSConfig = &tls.Config{InsecureSkipVerify: true}
} }
} }
} }
func WithUser(user string) Option { func WithUser(user string) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.User = user db.conf.User = user
} }
} }
func WithPassword(password string) Option { func WithPassword(password string) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.Password = password db.conf.Password = password
} }
} }
func WithDatabase(database string) Option { func WithDatabase(database string) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.Database = database db.conf.Database = database
} }
} }
@ -155,7 +155,7 @@ func WithDatabase(database string) Option {
// Default is 5 seconds. // Default is 5 seconds.
func WithDialTimeout(timeout time.Duration) Option { func WithDialTimeout(timeout time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.DialTimeout = timeout db.conf.DialTimeout = timeout
} }
} }
@ -163,7 +163,7 @@ func WithDialTimeout(timeout time.Duration) Option {
// with a timeout instead of blocking. // with a timeout instead of blocking.
func WithReadTimeout(timeout time.Duration) Option { func WithReadTimeout(timeout time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.ReadTimeout = timeout db.conf.ReadTimeout = timeout
} }
} }
@ -171,15 +171,15 @@ func WithReadTimeout(timeout time.Duration) Option {
// with a timeout instead of blocking. // with a timeout instead of blocking.
func WithWriteTimeout(timeout time.Duration) Option { func WithWriteTimeout(timeout time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.WriteTimeout = timeout db.conf.WriteTimeout = timeout
} }
} }
func WithTimeout(timeout time.Duration) Option { func WithTimeout(timeout time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.DialTimeout = timeout db.conf.DialTimeout = timeout
db.cfg.ReadTimeout = timeout db.conf.ReadTimeout = timeout
db.cfg.WriteTimeout = timeout db.conf.WriteTimeout = timeout
} }
} }
@ -187,7 +187,7 @@ func WithTimeout(timeout time.Duration) Option {
// Default is to retry query 2 times. // Default is to retry query 2 times.
func WithMaxRetries(maxRetries int) Option { func WithMaxRetries(maxRetries int) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.MaxRetries = maxRetries db.conf.MaxRetries = maxRetries
} }
} }
@ -195,7 +195,7 @@ func WithMaxRetries(maxRetries int) Option {
// Default is 250 milliseconds; -1 disables backoff. // Default is 250 milliseconds; -1 disables backoff.
func WithMinRetryBackoff(backoff time.Duration) Option { func WithMinRetryBackoff(backoff time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.MinRetryBackoff = backoff db.conf.MinRetryBackoff = backoff
} }
} }
@ -203,7 +203,7 @@ func WithMinRetryBackoff(backoff time.Duration) Option {
// Default is 4 seconds; -1 disables backoff. // Default is 4 seconds; -1 disables backoff.
func WithMaxRetryBackoff(backoff time.Duration) Option { func WithMaxRetryBackoff(backoff time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.MaxRetryBackoff = backoff db.conf.MaxRetryBackoff = backoff
} }
} }
@ -211,8 +211,8 @@ func WithMaxRetryBackoff(backoff time.Duration) Option {
// Default is 2 connections per every CPU as reported by runtime.NumCPU. // Default is 2 connections per every CPU as reported by runtime.NumCPU.
func WithPoolSize(poolSize int) Option { func WithPoolSize(poolSize int) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.PoolSize = poolSize db.conf.PoolSize = poolSize
db.cfg.MaxIdleConns = poolSize db.conf.MaxIdleConns = poolSize
} }
} }
@ -222,7 +222,7 @@ func WithPoolSize(poolSize int) Option {
// If d <= 0, connections are not closed due to a connection's age. // If d <= 0, connections are not closed due to a connection's age.
func WithConnMaxLifetime(d time.Duration) Option { func WithConnMaxLifetime(d time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.ConnMaxLifetime = d db.conf.ConnMaxLifetime = d
} }
} }
@ -234,7 +234,7 @@ func WithConnMaxLifetime(d time.Duration) Option {
// ClickHouse closes idle connections after 1 hour (see idle_connection_timeout). // ClickHouse closes idle connections after 1 hour (see idle_connection_timeout).
func WithConnMaxIdleTime(d time.Duration) Option { func WithConnMaxIdleTime(d time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.ConnMaxIdleTime = d db.conf.ConnMaxIdleTime = d
} }
} }
@ -244,7 +244,7 @@ func WithConnMaxIdleTime(d time.Duration) Option {
// ReadTimeout + 1 second. // ReadTimeout + 1 second.
func WithPoolTimeout(timeout time.Duration) Option { func WithPoolTimeout(timeout time.Duration) Option {
return func(db *DB) { return func(db *DB) {
db.cfg.PoolTimeout = timeout db.conf.PoolTimeout = timeout
} }
} }

View File

@ -24,7 +24,7 @@ type DBStats struct {
} }
type DB struct { type DB struct {
cfg *Config conf *Config
pool *chpool.ConnPool pool *chpool.ConnPool
queryHooks []QueryHook queryHooks []QueryHook
@ -42,31 +42,31 @@ func Connect(opts ...Option) *DB {
return db return db
} }
func newDB(cfg *Config, opts ...Option) *DB { func newDB(conf *Config, opts ...Option) *DB {
db := &DB{ db := &DB{
cfg: cfg, conf: conf,
} }
for _, opt := range opts { for _, opt := range opts {
opt(db) opt(db)
} }
db.pool = newConnPool(db.cfg) db.pool = newConnPool(db.conf)
return db return db
} }
func newConnPool(cfg *Config) *chpool.ConnPool { func newConnPool(conf *Config) *chpool.ConnPool {
poolcfg := cfg.Config poolconf := conf.Config
poolcfg.Dialer = func(ctx context.Context) (net.Conn, error) { poolconf.Dialer = func(ctx context.Context) (net.Conn, error) {
if cfg.TLSConfig != nil { if conf.TLSConfig != nil {
return tls.DialWithDialer( return tls.DialWithDialer(
cfg.netDialer(), conf.netDialer(),
"tcp", "tcp",
cfg.Addr, conf.Addr,
cfg.TLSConfig, conf.TLSConfig,
) )
} }
return cfg.netDialer().DialContext(ctx, "tcp", cfg.Addr) return conf.netDialer().DialContext(ctx, "tcp", conf.Addr)
} }
return chpool.New(&poolcfg) return chpool.New(&poolconf)
} }
// Close closes the database client, releasing any open resources. // Close closes the database client, releasing any open resources.
@ -78,20 +78,20 @@ func (db *DB) Close() error {
} }
func (db *DB) String() string { func (db *DB) String() string {
return fmt.Sprintf("DB<addr: %s>", db.cfg.Addr) return fmt.Sprintf("DB<addr: %s>", db.conf.Addr)
} }
func (db *DB) Config() *Config { func (db *DB) Config() *Config {
return db.cfg return db.conf
} }
func (db *DB) WithTimeout(d time.Duration) *DB { func (db *DB) WithTimeout(d time.Duration) *DB {
newcfg := *db.cfg newconf := *db.conf
newcfg.ReadTimeout = d newconf.ReadTimeout = d
newcfg.WriteTimeout = d newconf.WriteTimeout = d
clone := db.clone() clone := db.clone()
clone.cfg = &newcfg clone.conf = &newconf
return clone return clone
} }
@ -127,14 +127,14 @@ func (db *DB) autoCreateDatabase() {
return return
} }
cfg := db.cfg.clone() conf := db.conf.clone()
cfg.Database = "" conf.Database = ""
tmp := newDB(cfg) tmp := newDB(conf)
defer tmp.Close() defer tmp.Close()
if _, err := tmp.Exec("CREATE DATABASE IF NOT EXISTS ?", Ident(db.cfg.Database)); err != nil { if _, err := tmp.Exec("CREATE DATABASE IF NOT EXISTS ?", Ident(db.conf.Database)); err != nil {
internal.Logger.Printf("create database %q failed: %s", db.cfg.Database, err) internal.Logger.Printf("create database %q failed: %s", db.conf.Database, err)
} }
} }
@ -226,7 +226,7 @@ func (db *DB) _withConn(ctx context.Context, fn func(*chpool.Conn) error) error
} }
func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) { func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
writeCancel(wr) writeCancel(wr)
}); err != nil { }); err != nil {
internal.Logger.Printf("writeCancel failed: %s", err) internal.Logger.Printf("writeCancel failed: %s", err)
@ -237,12 +237,12 @@ func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) {
func (db *DB) Ping(ctx context.Context) error { func (db *DB) Ping(ctx context.Context) error {
return db.withConn(ctx, func(cn *chpool.Conn) error { return db.withConn(ctx, func(cn *chpool.Conn) error {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
writePing(wr) writePing(wr)
}); err != nil { }); err != nil {
return err return err
} }
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
return readPong(rd) return readPong(rd)
}) })
}) })
@ -265,7 +265,7 @@ func (db *DB) ExecContext(
func (db *DB) exec(ctx context.Context, query string) (*result, error) { func (db *DB) exec(ctx context.Context, query string) (*result, error) {
var res *result var res *result
var lastErr error var lastErr error
for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
if attempt > 0 { if attempt > 0 {
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
if lastErr != nil { if lastErr != nil {
@ -284,13 +284,13 @@ func (db *DB) exec(ctx context.Context, query string) (*result, error) {
func (db *DB) _exec(ctx context.Context, query string) (*result, error) { func (db *DB) _exec(ctx context.Context, query string) (*result, error) {
var res *result var res *result
err := db.withConn(ctx, func(cn *chpool.Conn) error { err := db.withConn(ctx, func(cn *chpool.Conn) error {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
db.writeQuery(ctx, cn, wr, query) db.writeQuery(ctx, cn, wr, query)
db.writeBlock(ctx, wr, nil) db.writeBlock(ctx, wr, nil)
}); err != nil { }); err != nil {
return err return err
} }
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
var err error var err error
res, err = db.readDataBlocks(cn, rd) res, err = db.readDataBlocks(cn, rd)
return err return err
@ -331,7 +331,7 @@ func (db *DB) query(ctx context.Context, query string) (*blockIter, error) {
var blocks *blockIter var blocks *blockIter
var lastErr error var lastErr error
for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
if attempt > 0 { if attempt > 0 {
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
if lastErr != nil { if lastErr != nil {
@ -354,7 +354,7 @@ func (db *DB) _query(ctx context.Context, query string) (*blockIter, error) {
return nil, err return nil, err
} }
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
db.writeQuery(ctx, cn, wr, query) db.writeQuery(ctx, cn, wr, query)
db.writeBlock(ctx, wr, nil) db.writeBlock(ctx, wr, nil)
}); err != nil { }); err != nil {
@ -372,7 +372,7 @@ func (db *DB) insert(
var res *result var res *result
var lastErr error var lastErr error
for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
if attempt > 0 { if attempt > 0 {
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
if lastErr != nil { if lastErr != nil {
@ -394,28 +394,28 @@ func (db *DB) _insert(
) (*result, error) { ) (*result, error) {
var res *result var res *result
err := db.withConn(ctx, func(cn *chpool.Conn) error { err := db.withConn(ctx, func(cn *chpool.Conn) error {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
db.writeQuery(ctx, cn, wr, query) db.writeQuery(ctx, cn, wr, query)
db.writeBlock(ctx, wr, nil) db.writeBlock(ctx, wr, nil)
}); err != nil { }); err != nil {
return err return err
} }
if err := cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { if err := cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
_, err := db.readSampleBlock(rd) _, err := db.readSampleBlock(rd)
return err return err
}); err != nil { }); err != nil {
return err return err
} }
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
db.writeBlock(ctx, wr, block) db.writeBlock(ctx, wr, block)
db.writeBlock(ctx, wr, nil) db.writeBlock(ctx, wr, nil)
}); err != nil { }); err != nil {
return err return err
} }
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
var err error var err error
res, err = readPacket(cn, rd) res, err = readPacket(cn, rd)
if err != nil { if err != nil {
@ -501,7 +501,7 @@ func (db *DB) shouldRetry(err error) bool {
func (db *DB) retryBackoff(attempt int) time.Duration { func (db *DB) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff( return internal.RetryBackoff(
attempt, db.cfg.MinRetryBackoff, db.cfg.MaxRetryBackoff) attempt, db.conf.MinRetryBackoff, db.conf.MaxRetryBackoff)
} }
func (db *DB) FormatQuery(query string, args ...any) string { func (db *DB) FormatQuery(query string, args ...any) string {

View File

@ -75,7 +75,7 @@ func (it *blockIter) Next(ctx context.Context, block *chschema.Block) bool {
} }
func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) { func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) {
rd := it.cn.Reader(ctx, it.db.cfg.ReadTimeout) rd := it.cn.Reader(ctx, it.db.conf.ReadTimeout)
for { for {
packet, err := rd.Uvarint() packet, err := rd.Uvarint()
if err != nil { if err != nil {
@ -116,19 +116,19 @@ func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, err
} }
func (db *DB) hello(ctx context.Context, cn *chpool.Conn) error { func (db *DB) hello(ctx context.Context, cn *chpool.Conn) error {
err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
wr.WriteByte(chproto.ClientHello) wr.WriteByte(chproto.ClientHello)
writeClientInfo(wr) writeClientInfo(wr)
wr.String(db.cfg.Database) wr.String(db.conf.Database)
wr.String(db.cfg.User) wr.String(db.conf.User)
wr.String(db.cfg.Password) wr.String(db.conf.Password)
}) })
if err != nil { if err != nil {
return err return err
} }
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
packet, err := rd.Uvarint() packet, err := rd.Uvarint()
if err != nil { if err != nil {
return err return err
@ -303,7 +303,7 @@ func (db *DB) writeQuery(ctx context.Context, cn *chpool.Conn, wr *chproto.Write
wr.String("") wr.String("")
} }
wr.Uvarint(2) // state complete wr.Uvarint(2) // state complete
wr.Bool(db.cfg.Compression) wr.Bool(db.conf.Compression)
wr.String(query) wr.String(query)
} }
@ -315,7 +315,7 @@ func reverseBytes(b []byte) []byte {
} }
func (db *DB) writeSettings(cn *chpool.Conn, wr *chproto.Writer) { func (db *DB) writeSettings(cn *chpool.Conn, wr *chproto.Writer) {
for key, value := range db.cfg.QuerySettings { for key, value := range db.conf.QuerySettings {
wr.String(key) wr.String(key)
if cn.ServerInfo.Revision > chproto.DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS { if cn.ServerInfo.Revision > chproto.DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS {
@ -353,7 +353,7 @@ func (db *DB) writeBlock(ctx context.Context, wr *chproto.Writer, block *chschem
wr.WriteByte(chproto.ClientData) wr.WriteByte(chproto.ClientData)
wr.String("") wr.String("")
wr.WithCompression(db.cfg.Compression, func() error { wr.WithCompression(db.conf.Compression, func() error {
writeBlockInfo(wr) writeBlockInfo(wr)
return block.WriteTo(wr) return block.WriteTo(wr)
}) })
@ -478,7 +478,7 @@ func (db *DB) readBlock(rd *chproto.Reader, block *chschema.Block, compressible
return err return err
} }
return rd.WithCompression(compressible && db.cfg.Compression, func() error { return rd.WithCompression(compressible && db.conf.Compression, func() error {
if err := readBlockInfo(rd); err != nil { if err := readBlockInfo(rd); err != nil {
return err return err
} }