mirror of
https://github.com/uptrace/go-clickhouse.git
synced 2025-06-08 23:26:11 +02:00
128 lines
2.5 KiB
Go
128 lines
2.5 KiB
Go
package chproto
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/binary"
|
|
|
|
"github.com/pierrec/lz4/v4"
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/internal"
|
|
"github.com/uptrace/go-clickhouse/ch/internal/cityhash102"
|
|
)
|
|
|
|
const (
|
|
noCompression = 0x02
|
|
lz4Compression = 0x82
|
|
zstdCompression = 0x90
|
|
)
|
|
|
|
const (
|
|
checksumSize = 16 // city hash 128
|
|
compressionHeaderSize = 1 + 4 + 4 // method + compressed + uncompressed
|
|
|
|
headerSize = checksumSize + compressionHeaderSize
|
|
blockSize = 1 << 20 // 1 MB
|
|
)
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
type lz4Writer struct {
|
|
wr *bufio.Writer
|
|
|
|
data []byte
|
|
pos int
|
|
zdata []byte
|
|
}
|
|
|
|
func newLZ4Writer(w *bufio.Writer) *lz4Writer {
|
|
return &lz4Writer{
|
|
wr: w,
|
|
data: make([]byte, blockSize),
|
|
}
|
|
}
|
|
|
|
func (w *lz4Writer) Close() error {
|
|
err := w.flush()
|
|
w.pos = 0
|
|
return err
|
|
}
|
|
|
|
func (w *lz4Writer) Flush() error {
|
|
return w.Close()
|
|
}
|
|
|
|
func (w *lz4Writer) WriteByte(c byte) error {
|
|
w.data[w.pos] = c
|
|
w.pos++
|
|
return w.checkFlush()
|
|
}
|
|
|
|
func (w *lz4Writer) WriteString(s string) (int, error) {
|
|
return w.Write(internal.Bytes(s))
|
|
}
|
|
|
|
func (w *lz4Writer) Write(data []byte) (int, error) {
|
|
var written int
|
|
for len(data) > 0 {
|
|
n := copy(w.data[w.pos:], data)
|
|
data = data[n:]
|
|
w.pos += n
|
|
if err := w.checkFlush(); err != nil {
|
|
return written, err
|
|
}
|
|
written += n
|
|
}
|
|
return written, nil
|
|
}
|
|
|
|
func (w *lz4Writer) checkFlush() error {
|
|
if w.pos < len(w.data) {
|
|
return nil
|
|
}
|
|
return w.flush()
|
|
}
|
|
|
|
func (w *lz4Writer) flush() error {
|
|
if w.pos == 0 {
|
|
return nil
|
|
}
|
|
|
|
zlen := headerSize + lz4.CompressBlockBound(w.pos)
|
|
w.zdata = grow(w.zdata, zlen)
|
|
|
|
compressedSize, err := compress(w.zdata[headerSize:], w.data[:w.pos])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
compressedSize += compressionHeaderSize
|
|
|
|
w.zdata[16] = lz4Compression
|
|
binary.LittleEndian.PutUint32(w.zdata[17:], uint32(compressedSize))
|
|
binary.LittleEndian.PutUint32(w.zdata[21:], uint32(w.pos))
|
|
|
|
checkSum := cityhash102.CityHash128(w.zdata[16:], uint32(compressedSize))
|
|
binary.LittleEndian.PutUint64(w.zdata[0:], checkSum.Lower64())
|
|
binary.LittleEndian.PutUint64(w.zdata[8:], checkSum.Higher64())
|
|
|
|
w.wr.Write(w.zdata[:checksumSize+compressedSize])
|
|
w.pos = 0
|
|
|
|
return nil
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
func compress(dest, src []byte) (int, error) {
|
|
if len(src) < 16 {
|
|
return uncompressable(dest, src), nil
|
|
}
|
|
var c lz4.Compressor
|
|
return c.CompressBlock(src, dest)
|
|
}
|
|
|
|
func uncompressable(dest, src []byte) int {
|
|
dest[0] = byte(len(src)) << 4
|
|
copy(dest[1:], src)
|
|
return len(src) + 1
|
|
}
|