2022-01-23 09:36:24 +02:00
|
|
|
package ch
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"crypto/tls"
|
|
|
|
"database/sql"
|
|
|
|
"database/sql/driver"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"reflect"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/chpool"
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/chproto"
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/chschema"
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/internal"
|
|
|
|
)
|
|
|
|
|
|
|
|
type DBStats struct {
|
|
|
|
Queries uint64
|
|
|
|
Errors uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
type DB struct {
|
2022-10-26 13:38:40 +03:00
|
|
|
conf *Config
|
2022-01-23 09:36:24 +02:00
|
|
|
pool *chpool.ConnPool
|
|
|
|
|
|
|
|
queryHooks []QueryHook
|
|
|
|
|
|
|
|
fmter chschema.Formatter
|
|
|
|
flags internal.Flag
|
|
|
|
stats DBStats
|
|
|
|
}
|
|
|
|
|
|
|
|
func Connect(opts ...Option) *DB {
|
2022-07-28 11:20:11 +03:00
|
|
|
db := newDB(defaultConfig(), opts...)
|
|
|
|
if db.flags.Has(autoCreateDatabaseFlag) {
|
|
|
|
db.autoCreateDatabase()
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
2022-07-28 11:20:11 +03:00
|
|
|
return db
|
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
func newDB(conf *Config, opts ...Option) *DB {
|
2022-07-28 11:20:11 +03:00
|
|
|
db := &DB{
|
2022-10-26 13:38:40 +03:00
|
|
|
conf: conf,
|
2022-07-28 11:20:11 +03:00
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
for _, opt := range opts {
|
|
|
|
opt(db)
|
|
|
|
}
|
2022-10-26 13:38:40 +03:00
|
|
|
db.pool = newConnPool(db.conf)
|
2022-01-23 09:36:24 +02:00
|
|
|
return db
|
|
|
|
}
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
func newConnPool(conf *Config) *chpool.ConnPool {
|
|
|
|
poolconf := conf.Config
|
|
|
|
poolconf.Dialer = func(ctx context.Context) (net.Conn, error) {
|
|
|
|
if conf.TLSConfig != nil {
|
2022-01-23 09:36:24 +02:00
|
|
|
return tls.DialWithDialer(
|
2022-10-26 13:38:40 +03:00
|
|
|
conf.netDialer(),
|
2022-07-28 11:20:11 +03:00
|
|
|
"tcp",
|
2022-10-26 13:38:40 +03:00
|
|
|
conf.Addr,
|
|
|
|
conf.TLSConfig,
|
2022-01-23 09:36:24 +02:00
|
|
|
)
|
|
|
|
}
|
2022-10-26 13:38:40 +03:00
|
|
|
return conf.netDialer().DialContext(ctx, "tcp", conf.Addr)
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
2022-10-26 13:38:40 +03:00
|
|
|
return chpool.New(&poolconf)
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the database client, releasing any open resources.
|
|
|
|
//
|
|
|
|
// It is rare to Close a DB, as the DB handle is meant to be
|
|
|
|
// long-lived and shared between many goroutines.
|
|
|
|
func (db *DB) Close() error {
|
|
|
|
return db.pool.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) String() string {
|
2022-10-26 13:38:40 +03:00
|
|
|
return fmt.Sprintf("DB<addr: %s>", db.conf.Addr)
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) Config() *Config {
|
2022-10-26 13:38:40 +03:00
|
|
|
return db.conf
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) WithTimeout(d time.Duration) *DB {
|
2022-10-26 13:38:40 +03:00
|
|
|
newconf := *db.conf
|
|
|
|
newconf.ReadTimeout = d
|
|
|
|
newconf.WriteTimeout = d
|
2022-01-23 09:36:24 +02:00
|
|
|
|
|
|
|
clone := db.clone()
|
2022-10-26 13:38:40 +03:00
|
|
|
clone.conf = &newconf
|
2022-01-23 09:36:24 +02:00
|
|
|
|
|
|
|
return clone
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) clone() *DB {
|
|
|
|
clone := *db
|
|
|
|
|
|
|
|
l := len(db.queryHooks)
|
|
|
|
clone.queryHooks = db.queryHooks[:l:l]
|
|
|
|
|
|
|
|
return &clone
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) Stats() DBStats {
|
|
|
|
return DBStats{
|
|
|
|
Queries: atomic.LoadUint64(&db.stats.Queries),
|
|
|
|
Errors: atomic.LoadUint64(&db.stats.Errors),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-28 11:20:11 +03:00
|
|
|
func (db *DB) autoCreateDatabase() {
|
|
|
|
ctx := context.Background()
|
|
|
|
|
|
|
|
switch err := db.Ping(ctx); err := err.(type) {
|
|
|
|
case nil: // all is good
|
|
|
|
return
|
|
|
|
case *Error:
|
|
|
|
if err.Code != 81 { // 81 - database does not exist
|
|
|
|
return
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
// ignore the error
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
conf := db.conf.clone()
|
|
|
|
conf.Database = ""
|
2022-07-28 11:20:11 +03:00
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
tmp := newDB(conf)
|
2022-07-28 11:20:11 +03:00
|
|
|
defer tmp.Close()
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
if _, err := tmp.Exec("CREATE DATABASE IF NOT EXISTS ?", Ident(db.conf.Database)); err != nil {
|
|
|
|
internal.Logger.Printf("create database %q failed: %s", db.conf.Database, err)
|
2022-07-28 11:20:11 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
func (db *DB) getConn(ctx context.Context) (*chpool.Conn, error) {
|
|
|
|
cn, err := db.pool.Get(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := db.initConn(ctx, cn); err != nil {
|
|
|
|
db.pool.Remove(cn, err)
|
|
|
|
if err := internal.Unwrap(err); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return cn, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) initConn(ctx context.Context, cn *chpool.Conn) error {
|
|
|
|
if cn.Inited {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
cn.Inited = true
|
|
|
|
|
|
|
|
return db.hello(ctx, cn)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) releaseConn(cn *chpool.Conn, err error) {
|
|
|
|
if isBadConn(err, false) || cn.Closed() {
|
|
|
|
db.pool.Remove(cn, err)
|
|
|
|
} else {
|
|
|
|
db.pool.Put(cn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) withConn(ctx context.Context, fn func(*chpool.Conn) error) error {
|
|
|
|
err := db._withConn(ctx, fn)
|
|
|
|
|
|
|
|
atomic.AddUint64(&db.stats.Queries, 1)
|
|
|
|
if err != nil {
|
|
|
|
atomic.AddUint64(&db.stats.Errors, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) _withConn(ctx context.Context, fn func(*chpool.Conn) error) error {
|
|
|
|
cn, err := db.getConn(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
var done chan struct{}
|
|
|
|
|
|
|
|
if ctxDone := ctx.Done(); ctxDone != nil {
|
|
|
|
done = make(chan struct{})
|
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-done:
|
|
|
|
// fn has finished, skip cancel
|
|
|
|
case <-ctxDone:
|
|
|
|
db.cancelConn(ctx, cn)
|
|
|
|
// Signal end of conn use.
|
|
|
|
done <- struct{}{}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if done != nil {
|
|
|
|
select {
|
|
|
|
case <-done: // wait for cancel to finish request
|
|
|
|
case done <- struct{}{}: // signal fn finish, skip cancel goroutine
|
|
|
|
}
|
|
|
|
}
|
|
|
|
db.releaseConn(cn, err)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// err is used in releaseConn above
|
|
|
|
err = fn(cn)
|
|
|
|
|
|
|
|
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
|
|
|
db.cancelConn(ctx, cn)
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) {
|
2022-10-26 13:38:40 +03:00
|
|
|
if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
|
2022-01-23 09:36:24 +02:00
|
|
|
writeCancel(wr)
|
|
|
|
}); err != nil {
|
|
|
|
internal.Logger.Printf("writeCancel failed: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
_ = cn.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) Ping(ctx context.Context) error {
|
|
|
|
return db.withConn(ctx, func(cn *chpool.Conn) error {
|
2022-10-26 13:38:40 +03:00
|
|
|
if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
|
2022-01-23 09:36:24 +02:00
|
|
|
writePing(wr)
|
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-10-26 13:38:40 +03:00
|
|
|
return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
|
2022-01-23 09:36:24 +02:00
|
|
|
return readPong(rd)
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) Exec(query string, args ...any) (sql.Result, error) {
|
|
|
|
return db.ExecContext(context.Background(), query, args...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) ExecContext(
|
|
|
|
ctx context.Context, query string, args ...any,
|
|
|
|
) (sql.Result, error) {
|
|
|
|
query = db.FormatQuery(query, args...)
|
|
|
|
ctx, evt := db.beforeQuery(ctx, nil, query, args, nil)
|
2022-04-30 10:30:34 +03:00
|
|
|
res, err := db.exec(ctx, query)
|
2022-01-23 09:36:24 +02:00
|
|
|
db.afterQuery(ctx, evt, res, err)
|
|
|
|
return res, err
|
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
func (db *DB) exec(ctx context.Context, query string) (*result, error) {
|
|
|
|
var res *result
|
|
|
|
var lastErr error
|
2022-10-26 13:38:40 +03:00
|
|
|
for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
|
2022-04-30 10:30:34 +03:00
|
|
|
if attempt > 0 {
|
|
|
|
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
|
|
|
|
if lastErr != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res, lastErr = db._exec(ctx, query)
|
|
|
|
if !db.shouldRetry(lastErr) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return res, lastErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) _exec(ctx context.Context, query string) (*result, error) {
|
|
|
|
var res *result
|
|
|
|
err := db.withConn(ctx, func(cn *chpool.Conn) error {
|
2022-10-26 13:38:40 +03:00
|
|
|
if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
|
2022-05-18 16:23:57 +03:00
|
|
|
db.writeQuery(ctx, cn, wr, query)
|
2022-05-02 09:31:14 +03:00
|
|
|
db.writeBlock(ctx, wr, nil)
|
2022-04-30 10:30:34 +03:00
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-10-26 13:38:40 +03:00
|
|
|
return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
|
2022-04-30 10:30:34 +03:00
|
|
|
var err error
|
2022-05-18 16:23:57 +03:00
|
|
|
res, err = db.readDataBlocks(cn, rd)
|
2022-04-30 10:30:34 +03:00
|
|
|
return err
|
|
|
|
})
|
|
|
|
})
|
|
|
|
return res, err
|
|
|
|
}
|
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
func (db *DB) Query(query string, args ...any) (*Rows, error) {
|
|
|
|
return db.QueryContext(context.Background(), query, args...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) QueryContext(
|
|
|
|
ctx context.Context, query string, args ...any,
|
|
|
|
) (*Rows, error) {
|
|
|
|
query = db.FormatQuery(query, args...)
|
|
|
|
|
|
|
|
ctx, evt := db.beforeQuery(ctx, nil, query, args, nil)
|
2022-04-30 10:30:34 +03:00
|
|
|
blocks, err := db.query(ctx, query)
|
|
|
|
db.afterQuery(ctx, evt, nil, err)
|
2022-01-23 09:36:24 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-04-30 10:30:34 +03:00
|
|
|
|
|
|
|
return newRows(ctx, blocks), nil
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) QueryRow(query string, args ...any) *Row {
|
|
|
|
return db.QueryRowContext(context.Background(), query, args...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
|
|
|
|
rows, err := db.QueryContext(ctx, query, args...)
|
|
|
|
return &Row{rows: rows, err: err}
|
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
func (db *DB) query(ctx context.Context, query string) (*blockIter, error) {
|
|
|
|
var blocks *blockIter
|
2022-01-23 09:36:24 +02:00
|
|
|
var lastErr error
|
2022-04-30 10:30:34 +03:00
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
|
2022-01-23 09:36:24 +02:00
|
|
|
if attempt > 0 {
|
|
|
|
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
|
|
|
|
if lastErr != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
blocks, lastErr = db._query(ctx, query)
|
2022-01-23 09:36:24 +02:00
|
|
|
if !db.shouldRetry(lastErr) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
return blocks, lastErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) _query(ctx context.Context, query string) (*blockIter, error) {
|
|
|
|
cn, err := db.getConn(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
|
2022-05-18 16:23:57 +03:00
|
|
|
db.writeQuery(ctx, cn, wr, query)
|
2022-05-02 09:31:14 +03:00
|
|
|
db.writeBlock(ctx, wr, nil)
|
2022-04-30 10:30:34 +03:00
|
|
|
}); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
return newBlockIter(db, cn), nil
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) insert(
|
|
|
|
ctx context.Context, model TableModel, query string, fields []*chschema.Field,
|
|
|
|
) (*result, error) {
|
|
|
|
block := model.Block(fields)
|
|
|
|
|
|
|
|
var res *result
|
|
|
|
var lastErr error
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
|
2022-01-23 09:36:24 +02:00
|
|
|
if attempt > 0 {
|
|
|
|
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
|
|
|
|
if lastErr != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res, lastErr = db._insert(ctx, model, query, block)
|
|
|
|
if !db.shouldRetry(lastErr) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return res, lastErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) _insert(
|
|
|
|
ctx context.Context, model TableModel, query string, block *chschema.Block,
|
|
|
|
) (*result, error) {
|
|
|
|
var res *result
|
|
|
|
err := db.withConn(ctx, func(cn *chpool.Conn) error {
|
2022-10-26 13:38:40 +03:00
|
|
|
if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
|
2022-05-18 16:23:57 +03:00
|
|
|
db.writeQuery(ctx, cn, wr, query)
|
2022-05-02 09:31:14 +03:00
|
|
|
db.writeBlock(ctx, wr, nil)
|
2022-01-23 09:36:24 +02:00
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
if err := cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
|
2022-05-02 09:31:14 +03:00
|
|
|
_, err := db.readSampleBlock(rd)
|
2022-01-23 09:36:24 +02:00
|
|
|
return err
|
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
|
2022-05-02 09:31:14 +03:00
|
|
|
db.writeBlock(ctx, wr, block)
|
|
|
|
db.writeBlock(ctx, wr, nil)
|
2022-01-23 09:36:24 +02:00
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-10-26 13:38:40 +03:00
|
|
|
return cn.WithReader(ctx, db.conf.ReadTimeout, func(rd *chproto.Reader) error {
|
2022-01-23 09:36:24 +02:00
|
|
|
var err error
|
2022-05-18 16:23:57 +03:00
|
|
|
res, err = readPacket(cn, rd)
|
2022-01-23 09:36:24 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
res.affected = block.NumRow
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
})
|
|
|
|
return res, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) NewSelect() *SelectQuery {
|
|
|
|
return NewSelectQuery(db)
|
|
|
|
}
|
|
|
|
|
2022-08-30 09:22:39 +03:00
|
|
|
func (db *DB) NewRaw(query string, args ...any) *RawQuery {
|
|
|
|
return NewRawQuery(db, query, args...)
|
|
|
|
}
|
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
func (db *DB) NewInsert() *InsertQuery {
|
|
|
|
return NewInsertQuery(db)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) NewCreateTable() *CreateTableQuery {
|
|
|
|
return NewCreateTableQuery(db)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) NewDropTable() *DropTableQuery {
|
|
|
|
return NewDropTableQuery(db)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) NewTruncateTable() *TruncateTableQuery {
|
|
|
|
return NewTruncateTableQuery(db)
|
|
|
|
}
|
|
|
|
|
2022-11-03 11:58:19 +02:00
|
|
|
func (db *DB) NewCreateView() *CreateViewQuery {
|
|
|
|
return NewCreateViewQuery(db)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) NewDropView() *DropViewQuery {
|
|
|
|
return NewDropViewQuery(db)
|
|
|
|
}
|
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
func (db *DB) ResetModel(ctx context.Context, models ...any) error {
|
|
|
|
for _, model := range models {
|
|
|
|
if _, err := db.NewDropTable().Model(model).IfExists().Exec(ctx); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := db.NewCreateTable().Model(model).Exec(ctx); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) Formatter() chschema.Formatter {
|
|
|
|
return db.fmter
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) WithFormatter(fmter chschema.Formatter) *DB {
|
|
|
|
clone := db.clone()
|
|
|
|
clone.fmter = fmter
|
|
|
|
return clone
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) shouldRetry(err error) bool {
|
|
|
|
switch err {
|
|
|
|
case driver.ErrBadConn:
|
|
|
|
return true
|
|
|
|
case nil, context.Canceled, context.DeadlineExceeded:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
if err, ok := err.(*Error); ok {
|
|
|
|
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
|
|
|
|
const (
|
|
|
|
timeoutExceeded = 159
|
|
|
|
tooManySimultaneousQueries = 202
|
|
|
|
memoryLimitExceeded = 241
|
|
|
|
)
|
|
|
|
|
|
|
|
switch err.Code {
|
|
|
|
case timeoutExceeded, tooManySimultaneousQueries, memoryLimitExceeded:
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) retryBackoff(attempt int) time.Duration {
|
|
|
|
return internal.RetryBackoff(
|
2022-10-26 13:38:40 +03:00
|
|
|
attempt, db.conf.MinRetryBackoff, db.conf.MaxRetryBackoff)
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) FormatQuery(query string, args ...any) string {
|
|
|
|
return db.fmter.FormatQuery(query, args...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) makeQueryBytes() []byte {
|
|
|
|
// TODO: make this configurable?
|
|
|
|
return make([]byte, 0, 4096)
|
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// Rows is the result of a query. Its cursor starts before the first row of the result set.
|
|
|
|
// Use Next to advance from row to row.
|
|
|
|
type Rows struct {
|
2022-04-30 10:30:34 +03:00
|
|
|
ctx context.Context
|
|
|
|
blocks *blockIter
|
|
|
|
block *chschema.Block
|
2022-01-23 09:36:24 +02:00
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
rowIndex int
|
|
|
|
hasNext bool
|
|
|
|
closed bool
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
func newRows(ctx context.Context, blocks *blockIter) *Rows {
|
|
|
|
return &Rows{
|
|
|
|
ctx: ctx,
|
|
|
|
blocks: blocks,
|
|
|
|
block: new(chschema.Block),
|
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *Rows) Close() error {
|
2022-04-30 10:30:34 +03:00
|
|
|
if !rs.closed {
|
|
|
|
for rs.blocks.Next(rs.ctx, rs.block) {
|
|
|
|
}
|
|
|
|
rs.close()
|
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
func (rs *Rows) close() {
|
|
|
|
rs.closed = true
|
|
|
|
_ = rs.blocks.Close()
|
|
|
|
}
|
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
func (rs *Rows) ColumnTypes() ([]*sql.ColumnType, error) {
|
|
|
|
return nil, errors.New("not implemented")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *Rows) Columns() ([]string, error) {
|
|
|
|
return nil, errors.New("not implemented")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *Rows) Err() error {
|
2022-04-30 10:30:34 +03:00
|
|
|
return rs.blocks.Err()
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *Rows) Next() bool {
|
2022-04-30 10:30:34 +03:00
|
|
|
if rs.closed {
|
|
|
|
return false
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
for rs.rowIndex >= rs.block.NumRow {
|
|
|
|
if !rs.blocks.Next(rs.ctx, rs.block) {
|
|
|
|
rs.close()
|
|
|
|
return false
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
2022-04-30 10:30:34 +03:00
|
|
|
rs.rowIndex = 0
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
rs.hasNext = true
|
|
|
|
rs.rowIndex++
|
|
|
|
return true
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *Rows) NextResultSet() bool {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rs *Rows) Scan(dest ...any) error {
|
2022-04-30 10:30:34 +03:00
|
|
|
if rs.closed {
|
|
|
|
return rs.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
if !rs.hasNext {
|
2022-01-23 09:36:24 +02:00
|
|
|
return errors.New("ch: Scan called without calling Next")
|
|
|
|
}
|
2022-04-30 10:30:34 +03:00
|
|
|
rs.hasNext = false
|
2022-01-23 09:36:24 +02:00
|
|
|
|
|
|
|
if rs.block.NumColumn != len(dest) {
|
|
|
|
return fmt.Errorf("ch: got %d columns, but Scan has %d values",
|
|
|
|
rs.block.NumColumn, len(dest))
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, col := range rs.block.Columns {
|
|
|
|
if err := col.ConvertAssign(rs.rowIndex-1, reflect.ValueOf(dest[i]).Elem()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type Row struct {
|
|
|
|
rows *Rows
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Row) Err() error {
|
|
|
|
return r.err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Row) Scan(dest ...any) error {
|
|
|
|
if r.err != nil {
|
|
|
|
return r.err
|
|
|
|
}
|
|
|
|
defer r.rows.Close()
|
|
|
|
if r.rows.Next() {
|
|
|
|
return r.rows.Scan(dest...)
|
|
|
|
}
|
|
|
|
return sql.ErrNoRows
|
|
|
|
}
|