diff --git a/ch/ch.go b/ch/ch.go index 55587dc..dd2b37a 100644 --- a/ch/ch.go +++ b/ch/ch.go @@ -70,34 +70,89 @@ func isBadConn(err error, allowTimeout bool) bool { //------------------------------------------------------------------------------ +type ListValues struct { + slice any +} + +var _ chschema.QueryAppender = ListValues{} + +func List(slice any) ListValues { + return ListValues{ + slice: slice, + } +} + +func (in ListValues) AppendQuery(fmter chschema.Formatter, b []byte) (_ []byte, err error) { + v := reflect.ValueOf(in.slice) + if v.Kind() != reflect.Slice { + return nil, fmt.Errorf("ch: In(non-slice %T)", in.slice) + } + + b = appendList(fmter, b, v) + return b, nil +} + +//------------------------------------------------------------------------------ + type InValues struct { - slice reflect.Value - err error + slice any } var _ chschema.QueryAppender = InValues{} func In(slice any) InValues { - v := reflect.ValueOf(slice) - if v.Kind() != reflect.Slice { - return InValues{ - err: fmt.Errorf("ch: In(non-slice %T)", slice), - } - } return InValues{ - slice: v, + slice: slice, } } func (in InValues) AppendQuery(fmter chschema.Formatter, b []byte) (_ []byte, err error) { - if in.err != nil { - return nil, in.err + v := reflect.ValueOf(in.slice) + if v.Kind() != reflect.Slice { + return nil, fmt.Errorf("ch: In(non-slice %T)", in.slice) } - return appendIn(fmter, b, in.slice), nil + + b = append(b, '(') + b = appendList(fmter, b, v) + b = append(b, ')') + return b, nil } -func appendIn(fmter chschema.Formatter, b []byte, slice reflect.Value) []byte { +//------------------------------------------------------------------------------ + +type ArrayValues struct { + slice any +} + +var _ chschema.QueryAppender = ArrayValues{} + +func Array(slice any) ArrayValues { + return ArrayValues{ + slice: slice, + } +} + +func (in ArrayValues) AppendQuery(fmter chschema.Formatter, b []byte) (_ []byte, err error) { + v := reflect.ValueOf(in.slice) + if v.Kind() != reflect.Slice { + return nil, fmt.Errorf("ch: Array(non-slice %T)", in.slice) + } + + b = append(b, '[') + b = appendList(fmter, b, v) + b = append(b, ']') + return b, nil +} + +//------------------------------------------------------------------------------ + +func appendList(fmter chschema.Formatter, b []byte, slice reflect.Value) []byte { sliceLen := slice.Len() + + if sliceLen == 0 { + return append(b, "NULL"...) + } + for i := 0; i < sliceLen; i++ { if i > 0 { b = append(b, ", "...) @@ -108,13 +163,7 @@ func appendIn(fmter chschema.Formatter, b []byte, slice reflect.Value) []byte { elem = elem.Elem() } - if elem.Kind() == reflect.Slice { - b = append(b, '(') - b = appendIn(fmter, b, elem) - b = append(b, ')') - } else { - b = chschema.AppendValue(fmter, b, elem) - } + b = chschema.AppendValue(fmter, b, elem) } return b } diff --git a/ch/config.go b/ch/config.go index 76acc20..236d802 100644 --- a/ch/config.go +++ b/ch/config.go @@ -76,9 +76,9 @@ func defaultConfig() *Config { ReadTimeout: 30 * time.Second, WriteTimeout: 10 * time.Second, - MaxRetries: 2, - MinRetryBackoff: 500 * time.Millisecond, - MaxRetryBackoff: time.Second, + MaxRetries: 3, + MinRetryBackoff: time.Millisecond, + MaxRetryBackoff: 3 * time.Second, } return conf } diff --git a/ch/db.go b/ch/db.go index 4d7a594..fe84290 100644 --- a/ch/db.go +++ b/ch/db.go @@ -272,9 +272,8 @@ func (db *DB) exec(ctx context.Context, query string) (*result, error) { var lastErr error for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ { if attempt > 0 { - lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) - if lastErr != nil { - break + if err := internal.Sleep(ctx, db.retryBackoff()); err != nil { + return nil, err } } @@ -338,9 +337,8 @@ func (db *DB) query(ctx context.Context, query string) (*blockIter, error) { for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ { if attempt > 0 { - lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) - if lastErr != nil { - break + if err := internal.Sleep(ctx, db.retryBackoff()); err != nil { + return nil, err } } @@ -379,9 +377,8 @@ func (db *DB) insert( for attempt := 0; attempt <= db.conf.MaxRetries; attempt++ { if attempt > 0 { - lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) - if lastErr != nil { - break + if err := internal.Sleep(ctx, db.retryBackoff()); err != nil { + return nil, err } } @@ -489,22 +486,28 @@ func (db *DB) WithFormatter(fmter chschema.Formatter) *DB { func (db *DB) shouldRetry(err error) bool { switch err { - case driver.ErrBadConn: - return true case nil, context.Canceled, context.DeadlineExceeded: return false + case driver.ErrBadConn: + return true } if err, ok := err.(*Error); ok { // https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp const ( timeoutExceeded = 159 + tooSlow = 160 tooManySimultaneousQueries = 202 memoryLimitExceeded = 241 + cannotDecompress = 271 ) switch err.Code { - case timeoutExceeded, tooManySimultaneousQueries, memoryLimitExceeded: + case timeoutExceeded, + tooSlow, + tooManySimultaneousQueries, + memoryLimitExceeded, + cannotDecompress: return true } } @@ -512,9 +515,8 @@ func (db *DB) shouldRetry(err error) bool { return false } -func (db *DB) retryBackoff(attempt int) time.Duration { - return internal.RetryBackoff( - attempt, db.conf.MinRetryBackoff, db.conf.MaxRetryBackoff) +func (db *DB) retryBackoff() time.Duration { + return internal.RetryBackoff(db.conf.MinRetryBackoff, db.conf.MaxRetryBackoff) } func (db *DB) FormatQuery(query string, args ...any) string { diff --git a/ch/internal/util.go b/ch/internal/util.go index cf67bed..8d5d0c4 100644 --- a/ch/internal/util.go +++ b/ch/internal/util.go @@ -79,24 +79,10 @@ func MakeSliceNextElemFunc(v reflect.Value) func() reflect.Value { } } -func RetryBackoff(retry int, minBackoff, maxBackoff time.Duration) time.Duration { - if retry < 0 { - panic("not reached") - } - if minBackoff == 0 { - return 0 - } - - d := minBackoff << uint(retry) - if d < minBackoff { +func RetryBackoff(minBackoff, maxBackoff time.Duration) time.Duration { + backoff := minBackoff + time.Duration(rand.Int63n(int64(maxBackoff))) + if backoff > maxBackoff { return maxBackoff } - - d = minBackoff + time.Duration(rand.Int63n(int64(d))) - - if d > maxBackoff || d < minBackoff { - d = maxBackoff - } - - return d + return backoff }