1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-06-08 23:26:11 +02:00
go-clickhouse/ch/chproto/lz4_writer.go

128 lines
2.5 KiB
Go
Raw Permalink Normal View History

2022-01-23 09:36:24 +02:00
package chproto
import (
"bufio"
"encoding/binary"
"github.com/pierrec/lz4/v4"
"github.com/uptrace/go-clickhouse/ch/internal"
"github.com/uptrace/go-clickhouse/ch/internal/cityhash102"
)
const (
noCompression = 0x02
lz4Compression = 0x82
zstdCompression = 0x90
)
const (
checksumSize = 16 // city hash 128
compressionHeaderSize = 1 + 4 + 4 // method + compressed + uncompressed
headerSize = checksumSize + compressionHeaderSize
blockSize = 1 << 20 // 1 MB
)
//------------------------------------------------------------------------------
type lz4Writer struct {
wr *bufio.Writer
data []byte
pos int
zdata []byte
2022-01-23 09:36:24 +02:00
}
func newLZ4Writer(w *bufio.Writer) *lz4Writer {
return &lz4Writer{
wr: w,
data: make([]byte, blockSize),
2022-01-23 09:36:24 +02:00
}
}
func (w *lz4Writer) Close() error {
err := w.flush()
w.pos = 0
2022-01-23 09:36:24 +02:00
return err
}
func (w *lz4Writer) Flush() error {
return w.Close()
}
func (w *lz4Writer) WriteByte(c byte) error {
w.data[w.pos] = c
2022-01-23 09:36:24 +02:00
w.pos++
return w.checkFlush()
}
func (w *lz4Writer) WriteString(s string) (int, error) {
return w.Write(internal.Bytes(s))
}
func (w *lz4Writer) Write(data []byte) (int, error) {
var written int
for len(data) > 0 {
n := copy(w.data[w.pos:], data)
2022-01-23 09:36:24 +02:00
data = data[n:]
w.pos += n
if err := w.checkFlush(); err != nil {
return written, err
}
written += n
}
return written, nil
}
func (w *lz4Writer) checkFlush() error {
if w.pos < len(w.data) {
2022-01-23 09:36:24 +02:00
return nil
}
return w.flush()
}
func (w *lz4Writer) flush() error {
if w.pos == 0 {
return nil
}
zlen := headerSize + lz4.CompressBlockBound(w.pos)
w.zdata = grow(w.zdata, zlen)
2022-01-23 09:36:24 +02:00
compressedSize, err := compress(w.zdata[headerSize:], w.data[:w.pos])
2022-01-23 09:36:24 +02:00
if err != nil {
return err
}
compressedSize += compressionHeaderSize
w.zdata[16] = lz4Compression
binary.LittleEndian.PutUint32(w.zdata[17:], uint32(compressedSize))
binary.LittleEndian.PutUint32(w.zdata[21:], uint32(w.pos))
2022-01-23 09:36:24 +02:00
checkSum := cityhash102.CityHash128(w.zdata[16:], uint32(compressedSize))
binary.LittleEndian.PutUint64(w.zdata[0:], checkSum.Lower64())
binary.LittleEndian.PutUint64(w.zdata[8:], checkSum.Higher64())
2022-01-23 09:36:24 +02:00
w.wr.Write(w.zdata[:checksumSize+compressedSize])
2022-01-23 09:36:24 +02:00
w.pos = 0
return nil
}
//------------------------------------------------------------------------------
func compress(dest, src []byte) (int, error) {
if len(src) < 16 {
return uncompressable(dest, src), nil
}
var c lz4.Compressor
return c.CompressBlock(src, dest)
}
func uncompressable(dest, src []byte) int {
dest[0] = byte(len(src)) << 4
copy(dest[1:], src)
return len(src) + 1
}