From e1af2632f18f57855c4249e6b42de2cea383cd45 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Thu, 29 Jun 2023 14:34:31 +0300 Subject: [PATCH] chore: cleanup --- ch/db.go | 44 ++++---------------------------------------- ch/db_test.go | 7 ------- ch/proto.go | 4 ---- 3 files changed, 4 insertions(+), 51 deletions(-) diff --git a/ch/db.go b/ch/db.go index fe84290..b1bed78 100644 --- a/ch/db.go +++ b/ch/db.go @@ -194,50 +194,14 @@ func (db *DB) _withConn(ctx context.Context, fn func(*chpool.Conn) error) error return err } - var done chan struct{} - - if ctxDone := ctx.Done(); ctxDone != nil { - done = make(chan struct{}) - go func() { - select { - case <-done: - // fn has finished, skip cancel - case <-ctxDone: - db.cancelConn(ctx, cn) - // Signal end of conn use. - done <- struct{}{} - } - }() - } + var fnErr error defer func() { - if done != nil { - select { - case <-done: // wait for cancel to finish request - case done <- struct{}{}: // signal fn finish, skip cancel goroutine - } - } - db.releaseConn(cn, err) + db.releaseConn(cn, fnErr) }() - // err is used in releaseConn above - err = fn(cn) - - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - db.cancelConn(ctx, cn) - } - - return err -} - -func (db *DB) cancelConn(ctx context.Context, cn *chpool.Conn) { - if err := cn.WithWriter(ctx, db.conf.WriteTimeout, func(wr *chproto.Writer) { - writeCancel(wr) - }); err != nil { - internal.Logger.Printf("writeCancel failed: %s", err) - } - - _ = cn.Close() + fnErr = fn(cn) + return fnErr } func (db *DB) Ping(ctx context.Context) error { diff --git a/ch/db_test.go b/ch/db_test.go index 5f248e8..84c4654 100644 --- a/ch/db_test.go +++ b/ch/db_test.go @@ -85,13 +85,6 @@ func TestCHTimeout(t *testing.T) { ctx, "SELECT sleepEachRow(0.01) from numbers(10000) settings max_block_size=10") require.Error(t, err) require.Contains(t, err.Error(), "i/o timeout") - - require.Eventually(t, func() bool { - var num int - err := db.NewRaw("SELECT count() from system.processes").Scan(ctx, &num) - require.NoError(t, err) - return num == 1 - }, time.Second, 100*time.Millisecond) } func TestDSNSetting(t *testing.T) { diff --git a/ch/proto.go b/ch/proto.go index 6909baa..8280d7a 100644 --- a/ch/proto.go +++ b/ch/proto.go @@ -544,10 +544,6 @@ func readBlockInfo(rd *chproto.Reader) error { return nil } -func writeCancel(wr *chproto.Writer) { - wr.WriteByte(chproto.ClientCancel) -} - func readServerTableColumns(rd *chproto.Reader) error { _, err := rd.String() if err != nil {