diff --git a/ch/config.go b/ch/config.go index 2e4e1ec..861e25a 100644 --- a/ch/config.go +++ b/ch/config.go @@ -42,22 +42,22 @@ type Config struct { MaxRetryBackoff time.Duration } -func (cfg *Config) clone() *Config { - clone := *cfg +func (conf *Config) clone() *Config { + clone := *conf return &clone } -func (cfg *Config) netDialer() *net.Dialer { +func (conf *Config) netDialer() *net.Dialer { return &net.Dialer{ - Timeout: cfg.DialTimeout, + Timeout: conf.DialTimeout, KeepAlive: 5 * time.Minute, } } func defaultConfig() *Config { - var cfg *Config + var conf *Config poolSize := 2 * runtime.GOMAXPROCS(0) - cfg = &Config{ + conf = &Config{ Config: chpool.Config{ PoolSize: poolSize, PoolTimeout: 30 * time.Second, @@ -72,14 +72,14 @@ func defaultConfig() *Config { Database: "default", DialTimeout: 5 * time.Second, - ReadTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 10 * time.Second, MaxRetries: 2, MinRetryBackoff: 500 * time.Millisecond, MaxRetryBackoff: time.Second, } - return cfg + return conf } type Option func(db *DB) @@ -93,7 +93,7 @@ func WithDiscardUnknownColumns() Option { // WithCompression enables/disables LZ4 compression. func WithCompression(enabled bool) Option { return func(db *DB) { - db.cfg.Compression = enabled + db.conf.Compression = enabled } } @@ -106,48 +106,48 @@ func WithAutoCreateDatabase(enabled bool) Option { // WithAddr configures TCP host:port. func WithAddr(addr string) Option { return func(db *DB) { - db.cfg.Addr = addr + db.conf.Addr = addr } } // WithTLSConfig configures TLS config for secure connections. -func WithTLSConfig(cfg *tls.Config) Option { +func WithTLSConfig(conf *tls.Config) Option { return func(db *DB) { - db.cfg.TLSConfig = cfg + db.conf.TLSConfig = conf } } func WithQuerySettings(params map[string]any) Option { return func(db *DB) { - db.cfg.QuerySettings = params + db.conf.QuerySettings = params } } func WithInsecure(on bool) Option { return func(db *DB) { if on { - db.cfg.TLSConfig = nil + db.conf.TLSConfig = nil } else { - db.cfg.TLSConfig = &tls.Config{InsecureSkipVerify: true} + db.conf.TLSConfig = &tls.Config{InsecureSkipVerify: true} } } } func WithUser(user string) Option { return func(db *DB) { - db.cfg.User = user + db.conf.User = user } } func WithPassword(password string) Option { return func(db *DB) { - db.cfg.Password = password + db.conf.Password = password } } func WithDatabase(database string) Option { return func(db *DB) { - db.cfg.Database = database + db.conf.Database = database } } @@ -155,7 +155,7 @@ func WithDatabase(database string) Option { // Default is 5 seconds. func WithDialTimeout(timeout time.Duration) Option { return func(db *DB) { - db.cfg.DialTimeout = timeout + db.conf.DialTimeout = timeout } } @@ -163,7 +163,7 @@ func WithDialTimeout(timeout time.Duration) Option { // with a timeout instead of blocking. func WithReadTimeout(timeout time.Duration) Option { return func(db *DB) { - db.cfg.ReadTimeout = timeout + db.conf.ReadTimeout = timeout } } @@ -171,15 +171,15 @@ func WithReadTimeout(timeout time.Duration) Option { // with a timeout instead of blocking. func WithWriteTimeout(timeout time.Duration) Option { return func(db *DB) { - db.cfg.WriteTimeout = timeout + db.conf.WriteTimeout = timeout } } func WithTimeout(timeout time.Duration) Option { return func(db *DB) { - db.cfg.DialTimeout = timeout - db.cfg.ReadTimeout = timeout - db.cfg.WriteTimeout = timeout + db.conf.DialTimeout = timeout + db.conf.ReadTimeout = timeout + db.conf.WriteTimeout = timeout } } @@ -187,7 +187,7 @@ func WithTimeout(timeout time.Duration) Option { // Default is to retry query 2 times. func WithMaxRetries(maxRetries int) Option { return func(db *DB) { - db.cfg.MaxRetries = maxRetries + db.conf.MaxRetries = maxRetries } } @@ -195,7 +195,7 @@ func WithMaxRetries(maxRetries int) Option { // Default is 250 milliseconds; -1 disables backoff. func WithMinRetryBackoff(backoff time.Duration) Option { return func(db *DB) { - db.cfg.MinRetryBackoff = backoff + db.conf.MinRetryBackoff = backoff } } @@ -203,7 +203,7 @@ func WithMinRetryBackoff(backoff time.Duration) Option { // Default is 4 seconds; -1 disables backoff. func WithMaxRetryBackoff(backoff time.Duration) Option { return func(db *DB) { - db.cfg.MaxRetryBackoff = backoff + db.conf.MaxRetryBackoff = backoff } } @@ -211,8 +211,8 @@ func WithMaxRetryBackoff(backoff time.Duration) Option { // Default is 2 connections per every CPU as reported by runtime.NumCPU. func WithPoolSize(poolSize int) Option { return func(db *DB) { - db.cfg.PoolSize = poolSize - db.cfg.MaxIdleConns = poolSize + db.conf.PoolSize = poolSize + db.conf.MaxIdleConns = poolSize } } @@ -222,7 +222,7 @@ func WithPoolSize(poolSize int) Option { // If d <= 0, connections are not closed due to a connection's age. func WithConnMaxLifetime(d time.Duration) Option { return func(db *DB) { - db.cfg.ConnMaxLifetime = d + db.conf.ConnMaxLifetime = d } } @@ -234,7 +234,7 @@ func WithConnMaxLifetime(d time.Duration) Option { // ClickHouse closes idle connections after 1 hour (see idle_connection_timeout). func WithConnMaxIdleTime(d time.Duration) Option { return func(db *DB) { - db.cfg.ConnMaxIdleTime = d + db.conf.ConnMaxIdleTime = d } } @@ -244,7 +244,7 @@ func WithConnMaxIdleTime(d time.Duration) Option { // ReadTimeout + 1 second. func WithPoolTimeout(timeout time.Duration) Option { return func(db *DB) { - db.cfg.PoolTimeout = timeout + db.conf.PoolTimeout = timeout } } diff --git a/ch/db.go b/ch/db.go index 2a38a99..97d4838 100644 --- a/ch/db.go +++ b/ch/db.go @@ -24,7 +24,7 @@ type DBStats struct { } type DB struct { - cfg *Config + conf *Config pool *chpool.ConnPool queryHooks []QueryHook @@ -42,31 +42,31 @@ func Connect(opts ...Option) *DB { return db } -func newDB(cfg *Config, opts ...Option) *DB { +func newDB(conf *Config, opts ...Option) *DB { db := &DB{ - cfg: cfg, + conf: conf, } for _, opt := range opts { opt(db) } - db.pool = newConnPool(db.cfg) + db.pool = newConnPool(db.conf) return db } -func newConnPool(cfg *Config) *chpool.ConnPool { - poolcfg := cfg.Config - poolcfg.Dialer = func(ctx context.Context) (net.Conn, error) { - if cfg.TLSConfig != nil { +func newConnPool(conf *Config) *chpool.ConnPool { + poolconf := conf.Config + poolconf.Dialer = func(ctx context.Context) (net.Conn, error) { + if conf.TLSConfig != nil { return tls.DialWithDialer( - cfg.netDialer(), + conf.netDialer(), "tcp", - cfg.Addr, - cfg.TLSConfig, + conf.Addr, + conf.TLSConfig, ) } - return cfg.netDialer().DialContext(ctx, "tcp", cfg.Addr) + return conf.netDialer().DialContext(ctx, "tcp", conf.Addr) } - return chpool.New(&poolcfg) + return chpool.New(&poolconf) } // Close closes the database client, releasing any open resources. @@ -78,20 +78,20 @@ func (db *DB) Close() error { } func (db *DB) String() string { - return fmt.Sprintf("DB", db.cfg.Addr) + return fmt.Sprintf("DB", db.conf.Addr) } func (db *DB) Config() *Config { - return db.cfg + return db.conf } func (db *DB) WithTimeout(d time.Duration) *DB { - newcfg := *db.cfg - newcfg.ReadTimeout = d - newcfg.WriteTimeout = d + newconf := *db.conf + newconf.ReadTimeout = d + newconf.WriteTimeout = d clone := db.clone() - clone.cfg = &newcfg + clone.conf = &newconf return clone } @@ -127,14 +127,14 @@ func (db *DB) autoCreateDatabase() { return } - cfg := db.cfg.clone() - cfg.Database = "" + conf := db.conf.clone() + conf.Database = "" - tmp := newDB(cfg) + tmp := newDB(conf) defer tmp.Close() - if _, err := tmp.Exec("CREATE DATABASE IF NOT EXISTS ?", Ident(db.cfg.Database)); err != nil { - internal.Logger.Printf("create database %q failed: %s", db.cfg.Database, err) + if _, err := tmp.Exec("CREATE DATABASE IF NOT EXISTS ?", Ident(db.conf.Database)); err != nil { + internal.Logger.Printf("create database %q failed: %s", db.conf.Database, err) } } @@ -226,7 +226,7 @@ func (db *DB) _withConn(ctx context.Context, fn func(*chpool.Conn) error) error } func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) { - if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { writeCancel(wr) }); err != nil { internal.Logger.Printf("writeCancel failed: %s", err) @@ -237,12 +237,12 @@ func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) { func (db *DB) Ping(ctx context.Context) error { return db.withConn(ctx, func(cn *chpool.Conn) error { - if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { writePing(wr) }); err != nil { return err } - return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { + return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error { return readPong(rd) }) }) @@ -265,7 +265,7 @@ func (db *DB) ExecContext( func (db *DB) exec(ctx context.Context, query string) (*result, error) { var res *result var lastErr error - for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { + for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ { if attempt > 0 { lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) if lastErr != nil { @@ -284,13 +284,13 @@ func (db *DB) exec(ctx context.Context, query string) (*result, error) { func (db *DB) _exec(ctx context.Context, query string) (*result, error) { var res *result err := db.withConn(ctx, func(cn *chpool.Conn) error { - if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { db.writeQuery(ctx, cn, wr, query) db.writeBlock(ctx, wr, nil) }); err != nil { return err } - return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { + return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error { var err error res, err = db.readDataBlocks(cn, rd) return err @@ -331,7 +331,7 @@ func (db *DB) query(ctx context.Context, query string) (*blockIter, error) { var blocks *blockIter var lastErr error - for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { + for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ { if attempt > 0 { lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) if lastErr != nil { @@ -354,7 +354,7 @@ func (db *DB) _query(ctx context.Context, query string) (*blockIter, error) { return nil, err } - if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { db.writeQuery(ctx, cn, wr, query) db.writeBlock(ctx, wr, nil) }); err != nil { @@ -372,7 +372,7 @@ func (db *DB) insert( var res *result var lastErr error - for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { + for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ { if attempt > 0 { lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) if lastErr != nil { @@ -394,28 +394,28 @@ func (db *DB) _insert( ) (*result, error) { var res *result err := db.withConn(ctx, func(cn *chpool.Conn) error { - if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { db.writeQuery(ctx, cn, wr, query) db.writeBlock(ctx, wr, nil) }); err != nil { return err } - if err := cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { + if err := cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error { _, err := db.readSampleBlock(rd) return err }); err != nil { return err } - if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { db.writeBlock(ctx, wr, block) db.writeBlock(ctx, wr, nil) }); err != nil { return err } - return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { + return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error { var err error res, err = readPacket(cn, rd) if err != nil { @@ -501,7 +501,7 @@ func (db *DB) shouldRetry(err error) bool { func (db *DB) retryBackoff(attempt int) time.Duration { return internal.RetryBackoff( - attempt, db.cfg.MinRetryBackoff, db.cfg.MaxRetryBackoff) + attempt, db.conf.MinRetryBackoff, db.conf.MaxRetryBackoff) } func (db *DB) FormatQuery(query string, args ...any) string { diff --git a/ch/proto.go b/ch/proto.go index 774f9cc..6909baa 100644 --- a/ch/proto.go +++ b/ch/proto.go @@ -75,7 +75,7 @@ func (it *blockIter) Next(ctx context.Context, block *chschema.Block) bool { } func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) { - rd := it.cn.Reader(ctx, it.db.cfg.ReadTimeout) + rd := it.cn.Reader(ctx, it.db.conf.ReadTimeout) for { packet, err := rd.Uvarint() if err != nil { @@ -116,19 +116,19 @@ func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, err } func (db *DB) hello(ctx context.Context, cn *chpool.Conn) error { - err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { wr.WriteByte(chproto.ClientHello) writeClientInfo(wr) - wr.String(db.cfg.Database) - wr.String(db.cfg.User) - wr.String(db.cfg.Password) + wr.String(db.conf.Database) + wr.String(db.conf.User) + wr.String(db.conf.Password) }) if err != nil { return err } - return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { + return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error { packet, err := rd.Uvarint() if err != nil { return err @@ -303,7 +303,7 @@ func (db *DB) writeQuery(ctx context.Context, cn *chpool.Conn, wr *chproto.Write wr.String("") } wr.Uvarint(2) // state complete - wr.Bool(db.cfg.Compression) + wr.Bool(db.conf.Compression) wr.String(query) } @@ -315,7 +315,7 @@ func reverseBytes(b []byte) []byte { } func (db *DB) writeSettings(cn *chpool.Conn, wr *chproto.Writer) { - for key, value := range db.cfg.QuerySettings { + for key, value := range db.conf.QuerySettings { wr.String(key) if cn.ServerInfo.Revision > chproto.DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS { @@ -353,7 +353,7 @@ func (db *DB) writeBlock(ctx context.Context, wr *chproto.Writer, block *chschem wr.WriteByte(chproto.ClientData) wr.String("") - wr.WithCompression(db.cfg.Compression, func() error { + wr.WithCompression(db.conf.Compression, func() error { writeBlockInfo(wr) return block.WriteTo(wr) }) @@ -478,7 +478,7 @@ func (db *DB) readBlock(rd *chproto.Reader, block *chschema.Block, compressible return err } - return rd.WithCompression(compressible && db.cfg.Compression, func() error { + return rd.WithCompression(compressible && db.conf.Compression, func() error { if err := readBlockInfo(rd); err != nil { return err }