diff --git a/ch/chproto/lz4_reader.go b/ch/chproto/lz4_reader.go index efaab5f..b922549 100644 --- a/ch/chproto/lz4_reader.go +++ b/ch/chproto/lz4_reader.go @@ -30,8 +30,6 @@ func newLZ4Reader(r *bufio.Reader) *lz4Reader { } } -func (r *lz4Reader) Init() {} - func (r *lz4Reader) Release() error { var err error if r.Buffered() > 0 { diff --git a/ch/chproto/reader.go b/ch/chproto/reader.go index 3d9bc72..84e372d 100644 --- a/ch/chproto/reader.go +++ b/ch/chproto/reader.go @@ -36,15 +36,18 @@ func NewReader(r io.Reader) *Reader { } } -func (r *Reader) WithCompression(fn func() error) error { - r.zr.Init() - r.rd = r.zr +func (r *Reader) WithCompression(enabled bool, fn func() error) error { + if enabled { + r.rd = r.zr + } firstErr := fn() - r.rd = r.br - if err := r.zr.Release(); err != nil && firstErr == nil { - firstErr = err + if enabled { + r.rd = r.br + if err := r.zr.Release(); err != nil && firstErr == nil { + firstErr = err + } } return firstErr diff --git a/ch/chproto/writer.go b/ch/chproto/writer.go index afd06a2..286c568 100644 --- a/ch/chproto/writer.go +++ b/ch/chproto/writer.go @@ -42,19 +42,23 @@ func NewWriter(w io.Writer) *Writer { } } -func (w *Writer) WithCompression(fn func() error) { +func (w *Writer) WithCompression(enabled bool, fn func() error) { if w.err != nil { return } - w.wr = w.zw + if enabled { + w.wr = w.zw + } w.err = fn() - if err := w.zw.Close(); err != nil && w.err == nil { - w.err = err + if enabled { + if err := w.zw.Close(); err != nil && w.err == nil { + w.err = err + } + w.wr = w.bw } - w.wr = w.bw } func (w *Writer) Flush() (err error) { diff --git a/ch/config.go b/ch/config.go index 2958bdd..f29e1d7 100644 --- a/ch/config.go +++ b/ch/config.go @@ -22,6 +22,8 @@ const ( type Config struct { chpool.Config + Compression bool + Network string Addr string User string @@ -51,6 +53,15 @@ func defaultConfig() *Config { var cfg *Config poolSize := 2 * runtime.GOMAXPROCS(0) cfg = &Config{ + Config: chpool.Config{ + PoolSize: poolSize, + PoolTimeout: 30 * time.Second, + MaxIdleConns: poolSize, + ConnMaxIdleTime: 30 * time.Minute, + }, + + Compression: true, + Network: "tcp", Addr: "localhost:9000", User: "default", @@ -63,13 +74,6 @@ func defaultConfig() *Config { MaxRetries: 2, MinRetryBackoff: 500 * time.Millisecond, MaxRetryBackoff: time.Second, - - Config: chpool.Config{ - PoolSize: poolSize, - PoolTimeout: 30 * time.Second, - MaxIdleConns: poolSize, - ConnMaxIdleTime: 30 * time.Minute, - }, } return cfg } @@ -82,6 +86,13 @@ func WithDiscardUnknownColumns() Option { } } +// WithCompression enables/disables LZ4 compression. +func WithCompression(enabled bool) Option { + return func(db *DB) { + db.cfg.Compression = enabled + } +} + // WithAddr configures TCP host:port or Unix socket depending on Network. func WithAddr(addr string) Option { return func(db *DB) { diff --git a/ch/db.go b/ch/db.go index e9279e8..f004291 100644 --- a/ch/db.go +++ b/ch/db.go @@ -254,13 +254,13 @@ func (db *DB) _exec(ctx context.Context, query string) (*result, error) { err := db.withConn(ctx, func(cn *chpool.Conn) error { if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { db.writeQuery(wr, query) - writeBlock(ctx, wr, nil) + db.writeBlock(ctx, wr, nil) }); err != nil { return err } return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { var err error - res, err = readDataBlocks(rd) + res, err = db.readDataBlocks(rd) return err }) }) @@ -324,7 +324,7 @@ func (db *DB) _query(ctx context.Context, query string) (*blockIter, error) { if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { db.writeQuery(wr, query) - writeBlock(ctx, wr, nil) + db.writeBlock(ctx, wr, nil) }); err != nil { return nil, err } @@ -364,21 +364,21 @@ func (db *DB) _insert( err := db.withConn(ctx, func(cn *chpool.Conn) error { if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { db.writeQuery(wr, query) - writeBlock(ctx, wr, nil) + db.writeBlock(ctx, wr, nil) }); err != nil { return err } if err := cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { - _, err := readSampleBlock(rd) + _, err := db.readSampleBlock(rd) return err }); err != nil { return err } if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { - writeBlock(ctx, wr, block) - writeBlock(ctx, wr, nil) + db.writeBlock(ctx, wr, block) + db.writeBlock(ctx, wr, nil) }); err != nil { return err } diff --git a/ch/proto.go b/ch/proto.go index cd7f926..b22f74e 100644 --- a/ch/proto.go +++ b/ch/proto.go @@ -79,7 +79,7 @@ func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, err switch packet { case chproto.ServerData: - if err := readBlock(rd, block); err != nil { + if err := it.db.readBlock(rd, block); err != nil { return false, err } return true, nil @@ -262,7 +262,7 @@ func (db *DB) writeQuery(wr *chproto.Writer, query string) { db.writeSettings(wr) wr.Uvarint(2) - wr.Uvarint(chproto.CompressionEnabled) + wr.Bool(db.cfg.Compression) wr.String(query) } @@ -290,14 +290,14 @@ func (db *DB) writeSettings(wr *chproto.Writer) { var emptyBlock chschema.Block -func writeBlock(ctx context.Context, wr *chproto.Writer, block *chschema.Block) { +func (db *DB) writeBlock(ctx context.Context, wr *chproto.Writer, block *chschema.Block) { if block == nil { block = &emptyBlock } wr.Uvarint(chproto.ClientData) wr.String("") - wr.WithCompression(func() error { + wr.WithCompression(db.cfg.Compression, func() error { writeBlockInfo(wr) return block.WriteTo(wr) }) @@ -313,7 +313,7 @@ func writeBlockInfo(wr *chproto.Writer) { wr.Uvarint(0) } -func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) { +func (db *DB) readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) { for { packet, err := rd.Uvarint() if err != nil { @@ -323,7 +323,7 @@ func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) { switch packet { case chproto.ServerData: block := new(chschema.Block) - if err := readBlock(rd, block); err != nil { + if err := db.readBlock(rd, block); err != nil { return nil, err } return block, nil @@ -339,7 +339,7 @@ func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) { } } -func readDataBlocks(rd *chproto.Reader) (*result, error) { +func (db *DB) readDataBlocks(rd *chproto.Reader) (*result, error) { var res *result block := new(chschema.Block) for { @@ -350,7 +350,7 @@ func readDataBlocks(rd *chproto.Reader) (*result, error) { switch packet { case chproto.ServerData: - if err := readBlock(rd, block); err != nil { + if err := db.readBlock(rd, block); err != nil { return nil, err } @@ -412,12 +412,12 @@ func readPacket(rd *chproto.Reader) (*result, error) { } } -func readBlock(rd *chproto.Reader, block *chschema.Block) error { +func (db *DB) readBlock(rd *chproto.Reader, block *chschema.Block) error { if _, err := rd.String(); err != nil { return err } - return rd.WithCompression(func() error { + return rd.WithCompression(db.cfg.Compression, func() error { if err := readBlockInfo(rd); err != nil { return err }