1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-08-08 22:16:32 +02:00

feat: allow disabling compression for benchmarks

This commit is contained in:
Vladimir Mihailenco
2022-05-02 09:31:14 +03:00
parent fa44842a34
commit ef260678ec
6 changed files with 53 additions and 37 deletions

View File

@ -30,8 +30,6 @@ func newLZ4Reader(r *bufio.Reader) *lz4Reader {
}
}
func (r *lz4Reader) Init() {}
func (r *lz4Reader) Release() error {
var err error
if r.Buffered() > 0 {

View File

@ -36,16 +36,19 @@ func NewReader(r io.Reader) *Reader {
}
}
func (r *Reader) WithCompression(fn func() error) error {
r.zr.Init()
func (r *Reader) WithCompression(enabled bool, fn func() error) error {
if enabled {
r.rd = r.zr
}
firstErr := fn()
if enabled {
r.rd = r.br
if err := r.zr.Release(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}

View File

@ -42,19 +42,23 @@ func NewWriter(w io.Writer) *Writer {
}
}
func (w *Writer) WithCompression(fn func() error) {
func (w *Writer) WithCompression(enabled bool, fn func() error) {
if w.err != nil {
return
}
if enabled {
w.wr = w.zw
}
w.err = fn()
if enabled {
if err := w.zw.Close(); err != nil && w.err == nil {
w.err = err
}
w.wr = w.bw
}
}
func (w *Writer) Flush() (err error) {

View File

@ -22,6 +22,8 @@ const (
type Config struct {
chpool.Config
Compression bool
Network string
Addr string
User string
@ -51,6 +53,15 @@ func defaultConfig() *Config {
var cfg *Config
poolSize := 2 * runtime.GOMAXPROCS(0)
cfg = &Config{
Config: chpool.Config{
PoolSize: poolSize,
PoolTimeout: 30 * time.Second,
MaxIdleConns: poolSize,
ConnMaxIdleTime: 30 * time.Minute,
},
Compression: true,
Network: "tcp",
Addr: "localhost:9000",
User: "default",
@ -63,13 +74,6 @@ func defaultConfig() *Config {
MaxRetries: 2,
MinRetryBackoff: 500 * time.Millisecond,
MaxRetryBackoff: time.Second,
Config: chpool.Config{
PoolSize: poolSize,
PoolTimeout: 30 * time.Second,
MaxIdleConns: poolSize,
ConnMaxIdleTime: 30 * time.Minute,
},
}
return cfg
}
@ -82,6 +86,13 @@ func WithDiscardUnknownColumns() Option {
}
}
// WithCompression enables/disables LZ4 compression.
func WithCompression(enabled bool) Option {
return func(db *DB) {
db.cfg.Compression = enabled
}
}
// WithAddr configures TCP host:port or Unix socket depending on Network.
func WithAddr(addr string) Option {
return func(db *DB) {

View File

@ -254,13 +254,13 @@ func (db *DB) _exec(ctx context.Context, query string) (*result, error) {
err := db.withConn(ctx, func(cn *chpool.Conn) error {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
db.writeQuery(wr, query)
writeBlock(ctx, wr, nil)
db.writeBlock(ctx, wr, nil)
}); err != nil {
return err
}
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error {
var err error
res, err = readDataBlocks(rd)
res, err = db.readDataBlocks(rd)
return err
})
})
@ -324,7 +324,7 @@ func (db *DB) _query(ctx context.Context, query string) (*blockIter, error) {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
db.writeQuery(wr, query)
writeBlock(ctx, wr, nil)
db.writeBlock(ctx, wr, nil)
}); err != nil {
return nil, err
}
@ -364,21 +364,21 @@ func (db *DB) _insert(
err := db.withConn(ctx, func(cn *chpool.Conn) error {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
db.writeQuery(wr, query)
writeBlock(ctx, wr, nil)
db.writeBlock(ctx, wr, nil)
}); err != nil {
return err
}
if err := cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error {
_, err := readSampleBlock(rd)
_, err := db.readSampleBlock(rd)
return err
}); err != nil {
return err
}
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
writeBlock(ctx, wr, block)
writeBlock(ctx, wr, nil)
db.writeBlock(ctx, wr, block)
db.writeBlock(ctx, wr, nil)
}); err != nil {
return err
}

View File

@ -79,7 +79,7 @@ func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, err
switch packet {
case chproto.ServerData:
if err := readBlock(rd, block); err != nil {
if err := it.db.readBlock(rd, block); err != nil {
return false, err
}
return true, nil
@ -262,7 +262,7 @@ func (db *DB) writeQuery(wr *chproto.Writer, query string) {
db.writeSettings(wr)
wr.Uvarint(2)
wr.Uvarint(chproto.CompressionEnabled)
wr.Bool(db.cfg.Compression)
wr.String(query)
}
@ -290,14 +290,14 @@ func (db *DB) writeSettings(wr *chproto.Writer) {
var emptyBlock chschema.Block
func writeBlock(ctx context.Context, wr *chproto.Writer, block *chschema.Block) {
func (db *DB) writeBlock(ctx context.Context, wr *chproto.Writer, block *chschema.Block) {
if block == nil {
block = &emptyBlock
}
wr.Uvarint(chproto.ClientData)
wr.String("")
wr.WithCompression(func() error {
wr.WithCompression(db.cfg.Compression, func() error {
writeBlockInfo(wr)
return block.WriteTo(wr)
})
@ -313,7 +313,7 @@ func writeBlockInfo(wr *chproto.Writer) {
wr.Uvarint(0)
}
func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) {
func (db *DB) readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) {
for {
packet, err := rd.Uvarint()
if err != nil {
@ -323,7 +323,7 @@ func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) {
switch packet {
case chproto.ServerData:
block := new(chschema.Block)
if err := readBlock(rd, block); err != nil {
if err := db.readBlock(rd, block); err != nil {
return nil, err
}
return block, nil
@ -339,7 +339,7 @@ func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) {
}
}
func readDataBlocks(rd *chproto.Reader) (*result, error) {
func (db *DB) readDataBlocks(rd *chproto.Reader) (*result, error) {
var res *result
block := new(chschema.Block)
for {
@ -350,7 +350,7 @@ func readDataBlocks(rd *chproto.Reader) (*result, error) {
switch packet {
case chproto.ServerData:
if err := readBlock(rd, block); err != nil {
if err := db.readBlock(rd, block); err != nil {
return nil, err
}
@ -412,12 +412,12 @@ func readPacket(rd *chproto.Reader) (*result, error) {
}
}
func readBlock(rd *chproto.Reader, block *chschema.Block) error {
func (db *DB) readBlock(rd *chproto.Reader, block *chschema.Block) error {
if _, err := rd.String(); err != nil {
return err
}
return rd.WithCompression(func() error {
return rd.WithCompression(db.cfg.Compression, func() error {
if err := readBlockInfo(rd); err != nil {
return err
}