You've already forked go-clickhouse
mirror of
https://github.com/uptrace/go-clickhouse.git
synced 2025-06-27 00:21:13 +02:00
feat: split ch.In into ch.List, ch.Array, and ch.In
This commit is contained in:
89
ch/ch.go
89
ch/ch.go
@ -70,34 +70,89 @@ func isBadConn(err error, allowTimeout bool) bool {
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type ListValues struct {
|
||||
slice any
|
||||
}
|
||||
|
||||
var _ chschema.QueryAppender = ListValues{}
|
||||
|
||||
func List(slice any) ListValues {
|
||||
return ListValues{
|
||||
slice: slice,
|
||||
}
|
||||
}
|
||||
|
||||
func (in ListValues) AppendQuery(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
||||
v := reflect.ValueOf(in.slice)
|
||||
if v.Kind() != reflect.Slice {
|
||||
return nil, fmt.Errorf("ch: In(non-slice %T)", in.slice)
|
||||
}
|
||||
|
||||
b = appendList(fmter, b, v)
|
||||
return b, nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type InValues struct {
|
||||
slice reflect.Value
|
||||
err error
|
||||
slice any
|
||||
}
|
||||
|
||||
var _ chschema.QueryAppender = InValues{}
|
||||
|
||||
func In(slice any) InValues {
|
||||
v := reflect.ValueOf(slice)
|
||||
if v.Kind() != reflect.Slice {
|
||||
return InValues{
|
||||
err: fmt.Errorf("ch: In(non-slice %T)", slice),
|
||||
}
|
||||
}
|
||||
return InValues{
|
||||
slice: v,
|
||||
slice: slice,
|
||||
}
|
||||
}
|
||||
|
||||
func (in InValues) AppendQuery(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
||||
if in.err != nil {
|
||||
return nil, in.err
|
||||
v := reflect.ValueOf(in.slice)
|
||||
if v.Kind() != reflect.Slice {
|
||||
return nil, fmt.Errorf("ch: In(non-slice %T)", in.slice)
|
||||
}
|
||||
return appendIn(fmter, b, in.slice), nil
|
||||
|
||||
b = append(b, '(')
|
||||
b = appendList(fmter, b, v)
|
||||
b = append(b, ')')
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func appendIn(fmter chschema.Formatter, b []byte, slice reflect.Value) []byte {
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type ArrayValues struct {
|
||||
slice any
|
||||
}
|
||||
|
||||
var _ chschema.QueryAppender = ArrayValues{}
|
||||
|
||||
func Array(slice any) ArrayValues {
|
||||
return ArrayValues{
|
||||
slice: slice,
|
||||
}
|
||||
}
|
||||
|
||||
func (in ArrayValues) AppendQuery(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
||||
v := reflect.ValueOf(in.slice)
|
||||
if v.Kind() != reflect.Slice {
|
||||
return nil, fmt.Errorf("ch: Array(non-slice %T)", in.slice)
|
||||
}
|
||||
|
||||
b = append(b, '[')
|
||||
b = appendList(fmter, b, v)
|
||||
b = append(b, ']')
|
||||
return b, nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
func appendList(fmter chschema.Formatter, b []byte, slice reflect.Value) []byte {
|
||||
sliceLen := slice.Len()
|
||||
|
||||
if sliceLen == 0 {
|
||||
return append(b, "NULL"...)
|
||||
}
|
||||
|
||||
for i := 0; i < sliceLen; i++ {
|
||||
if i > 0 {
|
||||
b = append(b, ", "...)
|
||||
@ -108,13 +163,7 @@ func appendIn(fmter chschema.Formatter, b []byte, slice reflect.Value) []byte {
|
||||
elem = elem.Elem()
|
||||
}
|
||||
|
||||
if elem.Kind() == reflect.Slice {
|
||||
b = append(b, '(')
|
||||
b = appendIn(fmter, b, elem)
|
||||
b = append(b, ')')
|
||||
} else {
|
||||
b = chschema.AppendValue(fmter, b, elem)
|
||||
}
|
||||
b = chschema.AppendValue(fmter, b, elem)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
@ -76,9 +76,9 @@ func defaultConfig() *Config {
|
||||
ReadTimeout: 30 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
|
||||
MaxRetries: 2,
|
||||
MinRetryBackoff: 500 * time.Millisecond,
|
||||
MaxRetryBackoff: time.Second,
|
||||
MaxRetries: 3,
|
||||
MinRetryBackoff: time.Millisecond,
|
||||
MaxRetryBackoff: 3 * time.Second,
|
||||
}
|
||||
return conf
|
||||
}
|
||||
|
32
ch/db.go
32
ch/db.go
@ -272,9 +272,8 @@ func (db *DB) exec(ctx context.Context, query string) (*result, error) {
|
||||
var lastErr error
|
||||
for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
|
||||
if lastErr != nil {
|
||||
break
|
||||
if err := internal.Sleep(ctx, db.retryBackoff()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -338,9 +337,8 @@ func (db *DB) query(ctx context.Context, query string) (*blockIter, error) {
|
||||
|
||||
for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
|
||||
if lastErr != nil {
|
||||
break
|
||||
if err := internal.Sleep(ctx, db.retryBackoff()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -379,9 +377,8 @@ func (db *DB) insert(
|
||||
|
||||
for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
|
||||
if lastErr != nil {
|
||||
break
|
||||
if err := internal.Sleep(ctx, db.retryBackoff()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -489,22 +486,28 @@ func (db *DB) WithFormatter(fmter chschema.Formatter) *DB {
|
||||
|
||||
func (db *DB) shouldRetry(err error) bool {
|
||||
switch err {
|
||||
case driver.ErrBadConn:
|
||||
return true
|
||||
case nil, context.Canceled, context.DeadlineExceeded:
|
||||
return false
|
||||
case driver.ErrBadConn:
|
||||
return true
|
||||
}
|
||||
|
||||
if err, ok := err.(*Error); ok {
|
||||
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
|
||||
const (
|
||||
timeoutExceeded = 159
|
||||
tooSlow = 160
|
||||
tooManySimultaneousQueries = 202
|
||||
memoryLimitExceeded = 241
|
||||
cannotDecompress = 271
|
||||
)
|
||||
|
||||
switch err.Code {
|
||||
case timeoutExceeded, tooManySimultaneousQueries, memoryLimitExceeded:
|
||||
case timeoutExceeded,
|
||||
tooSlow,
|
||||
tooManySimultaneousQueries,
|
||||
memoryLimitExceeded,
|
||||
cannotDecompress:
|
||||
return true
|
||||
}
|
||||
}
|
||||
@ -512,9 +515,8 @@ func (db *DB) shouldRetry(err error) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (db *DB) retryBackoff(attempt int) time.Duration {
|
||||
return internal.RetryBackoff(
|
||||
attempt, db.conf.MinRetryBackoff, db.conf.MaxRetryBackoff)
|
||||
func (db *DB) retryBackoff() time.Duration {
|
||||
return internal.RetryBackoff(db.conf.MinRetryBackoff, db.conf.MaxRetryBackoff)
|
||||
}
|
||||
|
||||
func (db *DB) FormatQuery(query string, args ...any) string {
|
||||
|
@ -79,24 +79,10 @@ func MakeSliceNextElemFunc(v reflect.Value) func() reflect.Value {
|
||||
}
|
||||
}
|
||||
|
||||
func RetryBackoff(retry int, minBackoff, maxBackoff time.Duration) time.Duration {
|
||||
if retry < 0 {
|
||||
panic("not reached")
|
||||
}
|
||||
if minBackoff == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
d := minBackoff << uint(retry)
|
||||
if d < minBackoff {
|
||||
func RetryBackoff(minBackoff, maxBackoff time.Duration) time.Duration {
|
||||
backoff := minBackoff + time.Duration(rand.Int63n(int64(maxBackoff)))
|
||||
if backoff > maxBackoff {
|
||||
return maxBackoff
|
||||
}
|
||||
|
||||
d = minBackoff + time.Duration(rand.Int63n(int64(d)))
|
||||
|
||||
if d > maxBackoff || d < minBackoff {
|
||||
d = maxBackoff
|
||||
}
|
||||
|
||||
return d
|
||||
return backoff
|
||||
}
|
||||
|
Reference in New Issue
Block a user