You've already forked go-clickhouse
mirror of
https://github.com/uptrace/go-clickhouse.git
synced 2025-08-06 22:12:48 +02:00
chore: cleanup
This commit is contained in:
44
ch/db.go
44
ch/db.go
@ -194,50 +194,14 @@ func (db *DB) _withConn(ctx context.Context, fn func(*chpool.Conn) error) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var done chan struct{}
|
var fnErr error
|
||||||
|
|
||||||
if ctxDone := ctx.Done(); ctxDone != nil {
|
|
||||||
done = make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// fn has finished, skip cancel
|
|
||||||
case <-ctxDone:
|
|
||||||
db.cancelConn(ctx, cn)
|
|
||||||
// Signal end of conn use.
|
|
||||||
done <- struct{}{}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if done != nil {
|
db.releaseConn(cn, fnErr)
|
||||||
select {
|
|
||||||
case <-done: // wait for cancel to finish request
|
|
||||||
case done <- struct{}{}: // signal fn finish, skip cancel goroutine
|
|
||||||
}
|
|
||||||
}
|
|
||||||
db.releaseConn(cn, err)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// err is used in releaseConn above
|
fnErr = fn(cn)
|
||||||
err = fn(cn)
|
return fnErr
|
||||||
|
|
||||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
|
||||||
db.cancelConn(ctx, cn)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) {
|
|
||||||
if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) {
|
|
||||||
writeCancel(wr)
|
|
||||||
}); err != nil {
|
|
||||||
internal.Logger.Printf("writeCancel failed: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = cn.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Ping(ctx context.Context) error {
|
func (db *DB) Ping(ctx context.Context) error {
|
||||||
|
@ -85,13 +85,6 @@ func TestCHTimeout(t *testing.T) {
|
|||||||
ctx, "SELECT sleepEachRow(0.01) from numbers(10000) settings max_block_size=10")
|
ctx, "SELECT sleepEachRow(0.01) from numbers(10000) settings max_block_size=10")
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "i/o timeout")
|
require.Contains(t, err.Error(), "i/o timeout")
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
var num int
|
|
||||||
err := db.NewRaw("SELECT count() from system.processes").Scan(ctx, &num)
|
|
||||||
require.NoError(t, err)
|
|
||||||
return num == 1
|
|
||||||
}, time.Second, 100*time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDSNSetting(t *testing.T) {
|
func TestDSNSetting(t *testing.T) {
|
||||||
|
@ -544,10 +544,6 @@ func readBlockInfo(rd *chproto.Reader) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeCancel(wr *chproto.Writer) {
|
|
||||||
wr.WriteByte(chproto.ClientCancel)
|
|
||||||
}
|
|
||||||
|
|
||||||
func readServerTableColumns(rd *chproto.Reader) error {
|
func readServerTableColumns(rd *chproto.Reader) error {
|
||||||
_, err := rd.String()
|
_, err := rd.String()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Reference in New Issue
Block a user