1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-06-08 23:26:11 +02:00
2022-05-18 16:23:57 +03:00

133 lines
2.2 KiB
Go

package chpool
import (
"context"
"net"
"sync/atomic"
"time"
"github.com/uptrace/go-clickhouse/ch/chproto"
)
var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
rd *chproto.Reader
wr *chproto.Writer
ServerInfo chproto.ServerInfo
Inited bool
createdAt time.Time
usedAt int64 // atomic
closed uint32 // atomic
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
rd: chproto.NewReader(netConn),
wr: chproto.NewWriter(netConn),
createdAt: time.Now(),
}
cn.SetUsedAt(time.Now())
return cn
}
func (cn *Conn) UsedAt() time.Time {
unix := atomic.LoadInt64(&cn.usedAt)
return time.Unix(unix, 0)
}
func (cn *Conn) SetUsedAt(tm time.Time) {
atomic.StoreInt64(&cn.usedAt, tm.Unix())
}
func (cn *Conn) LocalAddr() net.Addr {
return cn.netConn.LocalAddr()
}
func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
}
func (cn *Conn) Reader(ctx context.Context, timeout time.Duration) *chproto.Reader {
_ = cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout))
return cn.rd
}
func (cn *Conn) WithReader(
ctx context.Context,
timeout time.Duration,
fn func(rd *chproto.Reader) error,
) error {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
if err := fn(cn.rd); err != nil {
return err
}
return nil
}
func (cn *Conn) WithWriter(
ctx context.Context,
timeout time.Duration,
fn func(wb *chproto.Writer),
) error {
if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
fn(cn.wr)
if err := cn.wr.Flush(); err != nil {
return err
}
return nil
}
func (cn *Conn) Close() error {
if !atomic.CompareAndSwapUint32(&cn.closed, 0, 1) {
return nil
}
return cn.netConn.Close()
}
func (cn *Conn) Closed() bool {
return atomic.LoadUint32(&cn.closed) == 1
}
func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
tm := time.Now()
cn.SetUsedAt(tm)
if timeout > 0 {
tm = tm.Add(timeout)
}
if ctx != nil {
deadline, ok := ctx.Deadline()
if ok {
if timeout == 0 {
return deadline
}
if deadline.Before(tm) {
return deadline
}
return tm
}
}
if timeout > 0 {
return tm
}
return noDeadline
}