mirror of
https://github.com/uptrace/go-clickhouse.git
synced 2025-06-12 23:37:29 +02:00
chore: automatically close block iterator
This commit is contained in:
parent
94da9c644c
commit
c580ef9f5a
@ -5,6 +5,9 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -305,12 +308,13 @@ func TestORM(t *testing.T) {
|
|||||||
testORMSlice,
|
testORMSlice,
|
||||||
testORMColumnarStruct,
|
testORMColumnarStruct,
|
||||||
testORMInvalidEnumValue,
|
testORMInvalidEnumValue,
|
||||||
|
testORMInsertSelect,
|
||||||
}
|
}
|
||||||
for _, fn := range tests {
|
for _, fn := range tests {
|
||||||
_, err := db.NewTruncateTable().Model((*Event)(nil)).Exec(ctx)
|
_, err := db.NewTruncateTable().Model((*Event)(nil)).Exec(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run(funcName(fn), func(t *testing.T) {
|
||||||
fn(t, db)
|
fn(t, db)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -493,6 +497,37 @@ func testORMInvalidEnumValue(t *testing.T, db *ch.DB) {
|
|||||||
require.Equal(t, "invalid", dest.Kind)
|
require.Equal(t, "invalid", dest.Kind)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testORMInsertSelect(t *testing.T, db *ch.DB) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
src := &Event{
|
||||||
|
ID: 1,
|
||||||
|
Name: "hello",
|
||||||
|
Count: 42,
|
||||||
|
Keys: []string{"foo", "bar"},
|
||||||
|
Values: [][]string{{}, {"hello", "world"}},
|
||||||
|
Kind: "hello",
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
_, err := db.NewInsert().Model(src).Exec(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var dest []Event
|
||||||
|
err := db.NewSelect().Model(&dest).Scan(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 100, len(dest))
|
||||||
|
}
|
||||||
|
|
||||||
|
func funcName(x interface{}) string {
|
||||||
|
s := runtime.FuncForPC(reflect.ValueOf(x).Pointer()).Name()
|
||||||
|
if i := strings.LastIndexByte(s, '.'); i >= 0 {
|
||||||
|
return s[i+1:]
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
func strptr(s string) *string {
|
func strptr(s string) *string {
|
||||||
return &s
|
return &s
|
||||||
}
|
}
|
||||||
|
18
ch/proto.go
18
ch/proto.go
@ -36,27 +36,37 @@ func newBlockIter(db *DB, cn *chpool.Conn) *blockIter {
|
|||||||
|
|
||||||
func (it *blockIter) Close() error {
|
func (it *blockIter) Close() error {
|
||||||
if it.cn != nil {
|
if it.cn != nil {
|
||||||
it.db.releaseConn(it.cn, it.stickyErr)
|
it.close()
|
||||||
it.cn = nil
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *blockIter) close() {
|
||||||
|
it.db.releaseConn(it.cn, it.stickyErr)
|
||||||
|
it.cn = nil
|
||||||
|
}
|
||||||
|
|
||||||
func (it *blockIter) Err() error {
|
func (it *blockIter) Err() error {
|
||||||
return it.stickyErr
|
return it.stickyErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *blockIter) Next(ctx context.Context, block *chschema.Block) bool {
|
func (it *blockIter) Next(ctx context.Context, block *chschema.Block) bool {
|
||||||
if it.stickyErr != nil {
|
if it.cn == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := it.read(ctx, block)
|
ok, err := it.read(ctx, block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
it.stickyErr = err
|
it.stickyErr = err
|
||||||
|
it.close()
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return ok
|
|
||||||
|
if !ok {
|
||||||
|
it.close()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) {
|
func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user