From 658ad14fc0f97a2e51e3a113ea7ae0fd77eb2795 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 30 Apr 2022 10:30:34 +0300 Subject: [PATCH] feat: add proper Rows implementation and some optimizations --- Makefile | 3 + ch/bench_test.go | 29 ++ ch/chpool/conn.go | 5 + ch/chproto/lz4_reader.go | 14 +- ch/chproto/lz4_writer.go | 61 ++-- ch/chproto/reader.go | 24 +- ch/chproto/writer.go | 34 +-- ch/chschema/column.go | 286 +++--------------- ch/chschema/column_array.go | 413 +++++++++++++++++++------- ch/chschema/column_nullable.go | 4 +- ch/chschema/column_safe_gen.go | 217 ++++++++++++++ ch/chschema/column_unsafe_gen.go | 351 ++++++++++++++++++++++ ch/chschema/types.go | 28 +- ch/db.go | 156 ++++++---- ch/db_test.go | 2 + ch/internal/codegen/column_safe.tpl | 32 ++ ch/internal/codegen/column_unsafe.tpl | 49 +++ ch/proto.go | 90 +++++- ch/query_base.go | 35 ++- ch/query_select.go | 2 +- 20 files changed, 1311 insertions(+), 524 deletions(-) create mode 100644 ch/bench_test.go create mode 100644 ch/chschema/column_safe_gen.go create mode 100644 ch/chschema/column_unsafe_gen.go create mode 100644 ch/internal/codegen/column_safe.tpl create mode 100644 ch/internal/codegen/column_unsafe.tpl diff --git a/Makefile b/Makefile index 20d1a61..dc5334e 100644 --- a/Makefile +++ b/Makefile @@ -20,3 +20,6 @@ go_mod_tidy: fmt: gofmt -w -s ./ goimports -w -local github.com/uptrace/go-clickhouse ./ + +codegen: + go run ./ch/internal/codegen/ -dir=ch/chschema diff --git a/ch/bench_test.go b/ch/bench_test.go new file mode 100644 index 0000000..b25e1c9 --- /dev/null +++ b/ch/bench_test.go @@ -0,0 +1,29 @@ +package ch_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func BenchmarkNumbers(b *testing.B) { + ctx := context.Background() + db := chDB() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + rows, err := db.QueryContext(ctx, "SELECT number FROM system.numbers_mt LIMIT 1000000") + if err != nil { + b.Fatal(err) + } + + var count int + for rows.Next() { + count++ + } + require.Equal(b, 1000000, count) + } +} diff --git a/ch/chpool/conn.go b/ch/chpool/conn.go index 9c340b8..c029f00 100644 --- a/ch/chpool/conn.go +++ b/ch/chpool/conn.go @@ -49,6 +49,11 @@ func (cn *Conn) RemoteAddr() net.Addr { return cn.netConn.RemoteAddr() } +func (cn *Conn) Reader(ctx context.Context, timeout time.Duration) *chproto.Reader { + _ = cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)) + return cn.rd +} + func (cn *Conn) WithReader( ctx context.Context, timeout time.Duration, diff --git a/ch/chproto/lz4_reader.go b/ch/chproto/lz4_reader.go index fe310ca..efaab5f 100644 --- a/ch/chproto/lz4_reader.go +++ b/ch/chproto/lz4_reader.go @@ -17,8 +17,9 @@ type lz4Reader struct { header []byte - data []byte - pos int + zdata []byte + data []byte + pos int } func newLZ4Reader(r *bufio.Reader) *lz4Reader { @@ -37,8 +38,9 @@ func (r *lz4Reader) Release() error { err = errUnreadData } - r.data = nil + r.data = r.data[:0] r.pos = 0 + r.zdata = r.zdata[:0] return err } @@ -102,13 +104,13 @@ func (r *lz4Reader) readData() error { compressedSize := int(binary.LittleEndian.Uint32(r.header[17:])) - compressionHeaderSize uncompressedSize := int(binary.LittleEndian.Uint32(r.header[21:])) - zdata := make([]byte, compressedSize) + r.zdata = grow(r.zdata, compressedSize) r.data = grow(r.data, uncompressedSize) - if _, err := io.ReadFull(r.rd, zdata); err != nil { + if _, err := io.ReadFull(r.rd, r.zdata); err != nil { return err } - if _, err := lz4.UncompressBlock(zdata, r.data); err != nil { + if _, err := lz4.UncompressBlock(r.zdata, r.data); err != nil { return err } diff --git a/ch/chproto/lz4_writer.go b/ch/chproto/lz4_writer.go index ea1d416..c056bd1 100644 --- a/ch/chproto/lz4_writer.go +++ b/ch/chproto/lz4_writer.go @@ -3,7 +3,6 @@ package chproto import ( "bufio" "encoding/binary" - "sync" "github.com/pierrec/lz4/v4" @@ -25,50 +24,26 @@ const ( blockSize = 1 << 20 // 1 MB ) -type writeBuffer struct { - buf []byte -} - -var writeBufferPool = sync.Pool{ - New: func() any { - return &writeBuffer{ - buf: make([]byte, blockSize), - } - }, -} - -func getWriterBuffer() *writeBuffer { - return writeBufferPool.Get().(*writeBuffer) -} - -func putWriterBuffer(db *writeBuffer) { - writeBufferPool.Put(db) -} - //------------------------------------------------------------------------------ type lz4Writer struct { wr *bufio.Writer - data *writeBuffer - pos int + data []byte + pos int + zdata []byte } func newLZ4Writer(w *bufio.Writer) *lz4Writer { return &lz4Writer{ - wr: w, + wr: w, + data: make([]byte, blockSize), } } -func (w *lz4Writer) Init() { - w.data = getWriterBuffer() - w.pos = 0 -} - func (w *lz4Writer) Close() error { err := w.flush() - putWriterBuffer(w.data) - w.data = nil + w.pos = 0 return err } @@ -77,7 +52,7 @@ func (w *lz4Writer) Flush() error { } func (w *lz4Writer) WriteByte(c byte) error { - w.data.buf[w.pos] = c + w.data[w.pos] = c w.pos++ return w.checkFlush() } @@ -89,7 +64,7 @@ func (w *lz4Writer) WriteString(s string) (int, error) { func (w *lz4Writer) Write(data []byte) (int, error) { var written int for len(data) > 0 { - n := copy(w.data.buf[w.pos:], data) + n := copy(w.data[w.pos:], data) data = data[n:] w.pos += n if err := w.checkFlush(); err != nil { @@ -101,7 +76,7 @@ func (w *lz4Writer) Write(data []byte) (int, error) { } func (w *lz4Writer) checkFlush() error { - if w.pos < len(w.data.buf) { + if w.pos < len(w.data) { return nil } return w.flush() @@ -113,23 +88,23 @@ func (w *lz4Writer) flush() error { } zlen := headerSize + lz4.CompressBlockBound(w.pos) - zdata := make([]byte, zlen) + w.zdata = grow(w.zdata, zlen) - compressedSize, err := compress(zdata[headerSize:], w.data.buf[:w.pos]) + compressedSize, err := compress(w.zdata[headerSize:], w.data[:w.pos]) if err != nil { return err } compressedSize += compressionHeaderSize - zdata[16] = lz4Compression - binary.LittleEndian.PutUint32(zdata[17:], uint32(compressedSize)) - binary.LittleEndian.PutUint32(zdata[21:], uint32(w.pos)) + w.zdata[16] = lz4Compression + binary.LittleEndian.PutUint32(w.zdata[17:], uint32(compressedSize)) + binary.LittleEndian.PutUint32(w.zdata[21:], uint32(w.pos)) - checkSum := cityhash102.CityHash128(zdata[16:], uint32(compressedSize)) - binary.LittleEndian.PutUint64(zdata[0:], checkSum.Lower64()) - binary.LittleEndian.PutUint64(zdata[8:], checkSum.Higher64()) + checkSum := cityhash102.CityHash128(w.zdata[16:], uint32(compressedSize)) + binary.LittleEndian.PutUint64(w.zdata[0:], checkSum.Lower64()) + binary.LittleEndian.PutUint64(w.zdata[8:], checkSum.Higher64()) - w.wr.Write(zdata[:checksumSize+compressedSize]) + w.wr.Write(w.zdata[:checksumSize+compressedSize]) w.pos = 0 return nil diff --git a/ch/chproto/reader.go b/ch/chproto/reader.go index f14d30a..3d9bc72 100644 --- a/ch/chproto/reader.go +++ b/ch/chproto/reader.go @@ -70,7 +70,7 @@ func (r *Reader) Uvarint() (uint64, error) { return binary.ReadUvarint(r.rd) } -func (r *Reader) Uint8() (uint8, error) { +func (r *Reader) UInt8() (uint8, error) { c, err := r.rd.ReadByte() if err != nil { return 0, err @@ -78,7 +78,7 @@ func (r *Reader) Uint8() (uint8, error) { return c, nil } -func (r *Reader) Uint16() (uint16, error) { +func (r *Reader) UInt16() (uint16, error) { b, err := r.readNTemp(2) if err != nil { return 0, err @@ -86,7 +86,7 @@ func (r *Reader) Uint16() (uint16, error) { return binary.LittleEndian.Uint16(b), nil } -func (r *Reader) Uint32() (uint32, error) { +func (r *Reader) UInt32() (uint32, error) { b, err := r.readNTemp(4) if err != nil { return 0, err @@ -94,7 +94,7 @@ func (r *Reader) Uint32() (uint32, error) { return binary.LittleEndian.Uint32(b), nil } -func (r *Reader) Uint64() (uint64, error) { +func (r *Reader) UInt64() (uint64, error) { b, err := r.readNTemp(8) if err != nil { return 0, err @@ -103,27 +103,27 @@ func (r *Reader) Uint64() (uint64, error) { } func (r *Reader) Int8() (int8, error) { - num, err := r.Uint8() + num, err := r.UInt8() return int8(num), err } func (r *Reader) Int16() (int16, error) { - num, err := r.Uint16() + num, err := r.UInt16() return int16(num), err } func (r *Reader) Int32() (int32, error) { - num, err := r.Uint32() + num, err := r.UInt32() return int32(num), err } func (r *Reader) Int64() (int64, error) { - num, err := r.Uint64() + num, err := r.UInt64() return int64(num), err } func (r *Reader) Float32() (float32, error) { - num, err := r.Uint32() + num, err := r.UInt32() if err != nil { return 0, err } @@ -131,7 +131,7 @@ func (r *Reader) Float32() (float32, error) { } func (r *Reader) Float64() (float64, error) { - num, err := r.Uint64() + num, err := r.UInt64() if err != nil { return 0, err } @@ -180,7 +180,7 @@ func (r *Reader) readNTemp(n int) ([]byte, error) { } func (r *Reader) DateTime() (time.Time, error) { - sec, err := r.Uint32() + sec, err := r.UInt32() if err != nil { return time.Time{}, err } @@ -191,7 +191,7 @@ func (r *Reader) DateTime() (time.Time, error) { } func (r *Reader) Date() (time.Time, error) { - days, err := r.Uint16() + days, err := r.UInt16() if err != nil { return time.Time{}, err } diff --git a/ch/chproto/writer.go b/ch/chproto/writer.go index a8f22ab..afd06a2 100644 --- a/ch/chproto/writer.go +++ b/ch/chproto/writer.go @@ -10,7 +10,10 @@ import ( "github.com/uptrace/go-clickhouse/ch/internal" ) -const uuidLen = 16 +const ( + uuidLen = 16 + secsInDay = 24 * 3600 +) type writer interface { io.Writer @@ -44,7 +47,6 @@ func (w *Writer) WithCompression(fn func() error) { return } - w.zw.Init() w.wr = w.zw w.err = fn() @@ -85,7 +87,7 @@ func (w *Writer) Bool(flag bool) { if flag { num = 1 } - w.Uint8(num) + w.UInt8(num) } func (w *Writer) Uvarint(num uint64) { @@ -93,47 +95,47 @@ func (w *Writer) Uvarint(num uint64) { w.Write(w.buf[:n]) } -func (w *Writer) Uint8(num uint8) { +func (w *Writer) UInt8(num uint8) { w.writeByte(num) } -func (w *Writer) Uint16(num uint16) { +func (w *Writer) UInt16(num uint16) { binary.LittleEndian.PutUint16(w.buf, num) w.Write(w.buf[:2]) } -func (w *Writer) Uint32(num uint32) { +func (w *Writer) UInt32(num uint32) { binary.LittleEndian.PutUint32(w.buf, num) w.Write(w.buf[:4]) } -func (w *Writer) Uint64(num uint64) { +func (w *Writer) UInt64(num uint64) { binary.LittleEndian.PutUint64(w.buf, num) w.Write(w.buf[:8]) } func (w *Writer) Int8(num int8) { - w.Uint8(uint8(num)) + w.UInt8(uint8(num)) } func (w *Writer) Int16(num int16) { - w.Uint16(uint16(num)) + w.UInt16(uint16(num)) } func (w *Writer) Int32(num int32) { - w.Uint32(uint32(num)) + w.UInt32(uint32(num)) } func (w *Writer) Int64(num int64) { - w.Uint64(uint64(num)) + w.UInt64(uint64(num)) } func (w *Writer) Float32(num float32) { - w.Uint32(math.Float32bits(num)) + w.UInt32(math.Float32bits(num)) } func (w *Writer) Float64(num float64) { - w.Uint64(math.Float64bits(num)) + w.UInt64(math.Float64bits(num)) } func (w *Writer) String(s string) { @@ -172,13 +174,11 @@ func packUUID(b []byte) []byte { } func (w *Writer) DateTime(tm time.Time) { - w.Uint32(uint32(unixTime(tm))) + w.UInt32(uint32(unixTime(tm))) } -const secsInDay = 24 * 3600 - func (w *Writer) Date(tm time.Time) { - w.Uint16(uint16(unixTime(tm) / secsInDay)) + w.UInt16(uint16(unixTime(tm) / secsInDay)) } func unixTime(tm time.Time) int64 { diff --git a/ch/chschema/column.go b/ch/chschema/column.go index 6f60602..33d806d 100644 --- a/ch/chschema/column.go +++ b/ch/chschema/column.go @@ -37,7 +37,7 @@ type Columnar interface { Set(v any) AppendValue(v reflect.Value) Value() any - Nullable(nulls Uint8Column) any + Nullable(nulls UInt8Column) any Len() int Index(idx int) any Slice(s, e int) any @@ -89,7 +89,7 @@ func (c ColumnOf[T]) Value() any { return c.Column } -func (c ColumnOf[T]) Nullable(nulls Uint8Column) any { +func (c ColumnOf[T]) Nullable(nulls UInt8Column) any { nullable := make([]*T, len(c.Column)) for i := range c.Column { if nulls.Column[i] == 0 { @@ -221,27 +221,6 @@ func (c *Int8Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, int8(v.Int())) } -func (c *Int8Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Int8() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Int8Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Int8(n) - } - return nil -} - //------------------------------------------------------------------------------ type Int16Column struct { @@ -264,27 +243,6 @@ func (c *Int16Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, int16(v.Int())) } -func (c *Int16Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Int16() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Int16Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Int16(n) - } - return nil -} - //------------------------------------------------------------------------------ type Int32Column struct { @@ -307,27 +265,6 @@ func (c *Int32Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, int32(v.Int())) } -func (c *Int32Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Int32() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Int32Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Int32(n) - } - return nil -} - //------------------------------------------------------------------------------ type Int64Column struct { @@ -350,199 +287,94 @@ func (c *Int64Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, v.Int()) } -func (c *Int64Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Int64() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Int64Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Int64(n) - } - return nil -} - //------------------------------------------------------------------------------ -type Uint8Column struct { +type UInt8Column struct { NumericColumnOf[uint8] } -var _ Columnar = (*Uint8Column)(nil) +var _ Columnar = (*UInt8Column)(nil) -func NewUint8Column(typ reflect.Type, chType string, numRow int) Columnar { - return &Uint8Column{ +func NewUInt8Column(typ reflect.Type, chType string, numRow int) Columnar { + return &UInt8Column{ NumericColumnOf: NewNumericColumnOf[uint8](numRow), } } -func (c Uint8Column) Type() reflect.Type { +func (c UInt8Column) Type() reflect.Type { return uint8Type } -func (c *Uint8Column) AppendValue(v reflect.Value) { +func (c *UInt8Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, uint8(v.Uint())) } -func (c *Uint8Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Uint8() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Uint8Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Uint8(n) - } - return nil -} - //------------------------------------------------------------------------------ -type Uint16Column struct { +type UInt16Column struct { NumericColumnOf[uint16] } -var _ Columnar = (*Uint16Column)(nil) +var _ Columnar = (*UInt16Column)(nil) -func NewUint16Column(typ reflect.Type, chType string, numRow int) Columnar { - return &Uint16Column{ +func NewUInt16Column(typ reflect.Type, chType string, numRow int) Columnar { + return &UInt16Column{ NumericColumnOf: NewNumericColumnOf[uint16](numRow), } } -func (c Uint16Column) Type() reflect.Type { +func (c UInt16Column) Type() reflect.Type { return uint16Type } -func (c *Uint16Column) AppendValue(v reflect.Value) { +func (c *UInt16Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, uint16(v.Uint())) } -func (c *Uint16Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Uint16() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Uint16Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Uint16(n) - } - return nil -} - //------------------------------------------------------------------------------ -type Uint32Column struct { +type UInt32Column struct { NumericColumnOf[uint32] } -var _ Columnar = (*Uint32Column)(nil) +var _ Columnar = (*UInt32Column)(nil) -func NewUint32Column(typ reflect.Type, chType string, numRow int) Columnar { - return &Uint32Column{ +func NewUInt32Column(typ reflect.Type, chType string, numRow int) Columnar { + return &UInt32Column{ NumericColumnOf: NewNumericColumnOf[uint32](numRow), } } -func (c Uint32Column) Type() reflect.Type { +func (c UInt32Column) Type() reflect.Type { return uint32Type } -func (c *Uint32Column) AppendValue(v reflect.Value) { +func (c *UInt32Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, uint32(v.Uint())) } -func (c *Uint32Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Uint32() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Uint32Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Uint32(n) - } - return nil -} - //------------------------------------------------------------------------------ -type Uint64Column struct { +type UInt64Column struct { NumericColumnOf[uint64] } -var _ Columnar = (*Uint64Column)(nil) +var _ Columnar = (*UInt64Column)(nil) -func NewUint64Column(typ reflect.Type, chType string, numRow int) Columnar { - return &Uint64Column{ +func NewUInt64Column(typ reflect.Type, chType string, numRow int) Columnar { + return &UInt64Column{ NumericColumnOf: NewNumericColumnOf[uint64](numRow), } } -func (c Uint64Column) Type() reflect.Type { +func (c UInt64Column) Type() reflect.Type { return uint64Type } -func (c *Uint64Column) AppendValue(v reflect.Value) { +func (c *UInt64Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, v.Uint()) } -func (c *Uint64Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Uint64() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Uint64Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Uint64(n) - } - return nil -} - //------------------------------------------------------------------------------ type Float32Column struct { @@ -565,27 +397,6 @@ func (c *Float32Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, float32(v.Float())) } -func (c *Float32Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Float32() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Float32Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Float32(n) - } - return nil -} - //------------------------------------------------------------------------------ type Float64Column struct { @@ -608,27 +419,6 @@ func (c *Float64Column) AppendValue(v reflect.Value) { c.Column = append(c.Column, v.Float()) } -func (c *Float64Column) ReadFrom(rd *chproto.Reader, numRow int) error { - c.Alloc(numRow) - - for i := range c.Column { - n, err := rd.Float64() - if err != nil { - return err - } - c.Column[i] = n - } - - return nil -} - -func (c Float64Column) WriteTo(wr *chproto.Writer) error { - for _, n := range c.Column { - wr.Float64(n) - } - return nil -} - //------------------------------------------------------------------------------ type StringColumn struct { @@ -921,7 +711,7 @@ func (c *Int64TimeColumn) ReadFrom(rd *chproto.Reader, numRow int) error { c.Alloc(numRow) for i := range c.Column { - n, err := rd.Uint32() + n, err := rd.UInt32() if err != nil { return err } @@ -933,7 +723,7 @@ func (c *Int64TimeColumn) ReadFrom(rd *chproto.Reader, numRow int) error { func (c Int64TimeColumn) WriteTo(wr *chproto.Writer) error { for i := range c.Column { - wr.Uint32(uint32(c.Column[i] / int64(time.Second))) + wr.UInt32(uint32(c.Column[i] / int64(time.Second))) } return nil } @@ -1217,7 +1007,7 @@ func (c *LCStringColumn) readData(rd *chproto.Reader, numRow int) error { } lcKey := newLCKeyType(flags & 0xf) - dictSize, err := rd.Uint64() + dictSize, err := rd.UInt64() if err != nil { return err } @@ -1232,7 +1022,7 @@ func (c *LCStringColumn) readData(rd *chproto.Reader, numRow int) error { dict[i] = s } - numKey, err := rd.Uint64() + numKey, err := rd.UInt64() if err != nil { return err } @@ -1325,44 +1115,44 @@ func newLCKeyType(typ int64) lcKey { return lcKey{ typ: 0, read: func(rd *chproto.Reader) (int, error) { - n, err := rd.Uint8() + n, err := rd.UInt8() return int(n), err }, write: func(wr *chproto.Writer, n int) { - wr.Uint8(uint8(n)) + wr.UInt8(uint8(n)) }, } case 1: return lcKey{ typ: int8(1), read: func(rd *chproto.Reader) (int, error) { - n, err := rd.Uint16() + n, err := rd.UInt16() return int(n), err }, write: func(wr *chproto.Writer, n int) { - wr.Uint16(uint16(n)) + wr.UInt16(uint16(n)) }, } case 2: return lcKey{ typ: 2, read: func(rd *chproto.Reader) (int, error) { - n, err := rd.Uint32() + n, err := rd.UInt32() return int(n), err }, write: func(wr *chproto.Writer, n int) { - wr.Uint32(uint32(n)) + wr.UInt32(uint32(n)) }, } case 3: return lcKey{ typ: 3, read: func(rd *chproto.Reader) (int, error) { - n, err := rd.Uint64() + n, err := rd.UInt64() return int(n), err }, write: func(wr *chproto.Writer, n int) { - wr.Uint64(uint64(n)) + wr.UInt64(uint64(n)) }, } default: diff --git a/ch/chschema/column_array.go b/ch/chschema/column_array.go index af8c9d6..b134535 100644 --- a/ch/chschema/column_array.go +++ b/ch/chschema/column_array.go @@ -3,6 +3,7 @@ package chschema import ( "fmt" "reflect" + "unsafe" "github.com/uptrace/go-clickhouse/ch/chproto" ) @@ -12,114 +13,64 @@ type ArrayColumnar interface { WriteData(wr *chproto.Writer) error } -type ArrayLCStringColumn struct { - *LCStringColumn -} - -func (c ArrayLCStringColumn) Type() reflect.Type { - return stringSliceType -} - -func (c *ArrayLCStringColumn) WriteTo(wr *chproto.Writer) error { - c.writeData(wr) - return nil -} - -func (c *ArrayLCStringColumn) ReadFrom(rd *chproto.Reader, numRow int) error { - if numRow == 0 { - return nil - } - return c.readData(rd, numRow) -} - //------------------------------------------------------------------------------ -type ArrayColumn struct { - Column reflect.Value - - typ reflect.Type - elem Columnar - arrayElem ArrayColumnar +type ArrayColumnOf[T any] struct { + Column [][]T + elem Columnar } -var _ Columnar = (*ArrayColumn)(nil) - -func NewArrayColumn(typ reflect.Type, chType string, numRow int) Columnar { - elemType := chArrayElemType(chType) - if elemType == "" { - panic(fmt.Errorf("invalid array type: %q (Go type is %s)", - chType, typ.String())) - } - - elem := NewColumn(typ.Elem(), elemType, 0) - var arrayElem ArrayColumnar - - if _, ok := elem.(*LCStringColumn); ok { - panic("not reached") - } - arrayElem, _ = elem.(ArrayColumnar) - - c := &ArrayColumn{ - typ: reflect.SliceOf(typ), - elem: elem, - arrayElem: arrayElem, - } - - c.Column = reflect.MakeSlice(c.typ, 0, numRow) - - return c -} - -func (c ArrayColumn) Type() reflect.Type { - return c.typ.Elem() -} - -func (c *ArrayColumn) Reset(numRow int) { - if c.Column.Cap() >= numRow { - c.Column = c.Column.Slice(0, 0) +func (c *ArrayColumnOf[T]) Reset(numRow int) { + if cap(c.Column) >= numRow { + c.Column = c.Column[:0] } else { - c.Column = reflect.MakeSlice(c.typ, 0, numRow) + c.Column = make([][]T, 0, numRow) } } -func (c *ArrayColumn) Set(v any) { - c.Column = reflect.ValueOf(v) +func (c *ArrayColumnOf[T]) Set(v any) { + c.Column = v.([][]T) } -func (c *ArrayColumn) Value() any { - return c.Column.Interface() +func (c *ArrayColumnOf[T]) Value() any { + return c.Column } -func (c *ArrayColumn) Nullable(nulls Uint8Column) any { +func (c *ArrayColumnOf[T]) Nullable(nulls UInt8Column) any { panic("not implemented") } -func (c *ArrayColumn) Len() int { - return c.Column.Len() +func (c *ArrayColumnOf[T]) Len() int { + return len(c.Column) } -func (c *ArrayColumn) Index(idx int) any { - return c.Column.Index(idx).Interface() +func (c *ArrayColumnOf[T]) Index(idx int) any { + return c.Column[idx] } -func (c ArrayColumn) Slice(s, e int) any { - return c.Column.Slice(s, e).Interface() +func (c *ArrayColumnOf[T]) Slice(s, e int) any { + return c.Column[s:e] } -func (c *ArrayColumn) ConvertAssign(idx int, v reflect.Value) error { - v.Set(c.Column.Index(idx)) +func (c *ArrayColumnOf[T]) ConvertAssign(idx int, v reflect.Value) error { + v.Set(reflect.ValueOf(c.Column[idx])) return nil } -func (c *ArrayColumn) AppendValue(v reflect.Value) { - c.Column = reflect.Append(c.Column, v) +func (c *ArrayColumnOf[T]) AppendValue(v reflect.Value) { + ptr := unsafe.Pointer(v.UnsafeAddr()) + c.AppendPointer(v.Type(), ptr) } -func (c *ArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { - if c.Column.Cap() >= numRow { - c.Column = c.Column.Slice(0, numRow) +func (c *ArrayColumnOf[T]) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) { + c.Column = append(c.Column, *(*[]T)(ptr)) +} + +func (c *ArrayColumnOf[T]) ReadFrom(rd *chproto.Reader, numRow int) error { + if cap(c.Column) >= numRow { + c.Column = c.Column[:numRow] } else { - c.Column = reflect.MakeSlice(c.typ, numRow, numRow) + c.Column = make([][]T, numRow) } if numRow == 0 { @@ -127,8 +78,8 @@ func (c *ArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { } offsets := make([]int, numRow) - for i := 0; i < len(offsets); i++ { - offset, err := rd.Uint64() + for i := 0; i < numRow; i++ { + offset, err := rd.UInt64() if err != nil { return err } @@ -141,56 +92,99 @@ func (c *ArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { var prev int for i, offset := range offsets { - c.Column.Index(i).Set(reflect.ValueOf(c.elem.Slice(prev, offset))) + c.Column[i] = c.elem.Slice(prev, offset).([]T) prev = offset } return nil } -func (c *ArrayColumn) WriteTo(wr *chproto.Writer) error { +func (c *ArrayColumnOf[T]) WriteTo(wr *chproto.Writer) error { _ = c.WriteOffset(wr, 0) + return c.WriteData(wr) +} - colLen := c.Column.Len() - for i := 0; i < colLen; i++ { - // TODO: add SetValue or SetPointer - c.elem.Set(c.Column.Index(i).Interface()) +var _ ArrayColumnar = (*Int64ArrayColumn)(nil) - var err error - if c.arrayElem != nil { - err = c.arrayElem.WriteData(wr) - } else { - err = c.elem.WriteTo(wr) - } - if err != nil { +func (c *ArrayColumnOf[T]) WriteOffset(wr *chproto.Writer, offset int) int { + for _, el := range c.Column { + offset += len(el) + wr.UInt64(uint64(offset)) + } + return offset +} + +func (c *ArrayColumnOf[T]) WriteData(wr *chproto.Writer) error { + for _, ss := range c.Column { + c.elem.Set(ss) + if err := c.elem.WriteTo(wr); err != nil { return err } } - return nil } -func (c *ArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int { - colLen := c.Column.Len() +//------------------------------------------------------------------------------ - for i := 0; i < colLen; i++ { - el := c.Column.Index(i) - offset += el.Len() - wr.Uint64(uint64(offset)) +type Int64ArrayColumn struct { + ArrayColumnOf[int64] +} + +var _ Columnar = (*Int64ArrayColumn)(nil) + +func NewInt64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar { + return &Int64ArrayColumn{ + ArrayColumnOf: ArrayColumnOf[int64]{ + Column: make([][]int64, 0, numRow), + elem: NewInt64Column(typ.Elem(), "", 0), + }, } +} - if c.arrayElem == nil { - return offset +func (c *Int64ArrayColumn) Type() reflect.Type { + return int64SliceType +} + +//------------------------------------------------------------------------------ + +type Uint64ArrayColumn struct { + ArrayColumnOf[uint64] +} + +var _ Columnar = (*Uint64ArrayColumn)(nil) + +func NewUint64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar { + return &Uint64ArrayColumn{ + ArrayColumnOf: ArrayColumnOf[uint64]{ + Column: make([][]uint64, 0, numRow), + elem: NewUInt64Column(typ.Elem(), "", 0), + }, } +} - offset = 0 - for i := 0; i < colLen; i++ { - el := c.Column.Index(i) - c.elem.Set(el.Interface()) // Use SetValue or SetPointer - offset = c.arrayElem.WriteOffset(wr, offset) +func (c *Uint64ArrayColumn) Type() reflect.Type { + return uint64SliceType +} + +//------------------------------------------------------------------------------ + +type Float64ArrayColumn struct { + ArrayColumnOf[float64] +} + +var _ Columnar = (*Float64ArrayColumn)(nil) + +func NewFloat64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar { + return &Float64ArrayColumn{ + ArrayColumnOf: ArrayColumnOf[float64]{ + Column: make([][]float64, 0, numRow), + elem: NewFloat64Column(typ.Elem(), "", 0), + }, } +} - return offset +func (c *Float64ArrayColumn) Type() reflect.Type { + return float64SliceType } //------------------------------------------------------------------------------ @@ -259,7 +253,7 @@ func (c *StringArrayColumn) Value() any { return c.Column } -func (c *StringArrayColumn) Nullable(nulls Uint8Column) any { +func (c *StringArrayColumn) Nullable(nulls UInt8Column) any { panic("not implemented") } @@ -284,6 +278,10 @@ func (c *StringArrayColumn) AppendValue(v reflect.Value) { c.Column = append(c.Column, v.Interface().([]string)) } +func (c *StringArrayColumn) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) { + c.Column = append(c.Column, *(*[]string)(ptr)) +} + func (c *StringArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { if numRow == 0 { return nil @@ -304,7 +302,7 @@ func (c *StringArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { offsets := make([]int, numRow) for i := 0; i < len(offsets); i++ { - offset, err := rd.Uint64() + offset, err := rd.UInt64() if err != nil { return err } @@ -338,7 +336,7 @@ var _ ArrayColumnar = (*StringArrayColumn)(nil) func (c *StringArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int { for _, el := range c.Column { offset += len(el) - wr.Uint64(uint64(offset)) + wr.UInt64(uint64(offset)) } return offset } @@ -352,3 +350,190 @@ func (c *StringArrayColumn) WriteData(wr *chproto.Writer) error { } return nil } + +//------------------------------------------------------------------------------ + +type ArrayLCStringColumn struct { + *LCStringColumn +} + +func (c ArrayLCStringColumn) Type() reflect.Type { + return stringSliceType +} + +func (c *ArrayLCStringColumn) WriteTo(wr *chproto.Writer) error { + c.writeData(wr) + return nil +} + +func (c *ArrayLCStringColumn) ReadFrom(rd *chproto.Reader, numRow int) error { + if numRow == 0 { + return nil + } + return c.readData(rd, numRow) +} + +//------------------------------------------------------------------------------ + +type GenericArrayColumn struct { + Column reflect.Value + + typ reflect.Type + elem Columnar + arrayElem ArrayColumnar +} + +var _ Columnar = (*GenericArrayColumn)(nil) + +func NewGenericArrayColumn(typ reflect.Type, chType string, numRow int) Columnar { + elemType := chArrayElemType(chType) + if elemType == "" { + panic(fmt.Errorf("invalid array type: %q (Go type is %s)", + chType, typ.String())) + } + + elem := NewColumn(typ.Elem(), elemType, 0) + var arrayElem ArrayColumnar + + if _, ok := elem.(*LCStringColumn); ok { + panic("not reached") + } + arrayElem, _ = elem.(ArrayColumnar) + + c := &GenericArrayColumn{ + typ: reflect.SliceOf(typ), + elem: elem, + arrayElem: arrayElem, + } + + c.Column = reflect.MakeSlice(c.typ, 0, numRow) + + return c +} + +func (c GenericArrayColumn) Type() reflect.Type { + return c.typ.Elem() +} + +func (c *GenericArrayColumn) Reset(numRow int) { + if c.Column.Cap() >= numRow { + c.Column = c.Column.Slice(0, 0) + } else { + c.Column = reflect.MakeSlice(c.typ, 0, numRow) + } +} + +func (c *GenericArrayColumn) Set(v any) { + c.Column = reflect.ValueOf(v) +} + +func (c *GenericArrayColumn) Value() any { + return c.Column.Interface() +} + +func (c *GenericArrayColumn) Nullable(nulls UInt8Column) any { + panic("not implemented") +} + +func (c *GenericArrayColumn) Len() int { + return c.Column.Len() +} + +func (c *GenericArrayColumn) Index(idx int) any { + return c.Column.Index(idx).Interface() +} + +func (c GenericArrayColumn) Slice(s, e int) any { + return c.Column.Slice(s, e).Interface() +} + +func (c *GenericArrayColumn) ConvertAssign(idx int, v reflect.Value) error { + v.Set(c.Column.Index(idx)) + return nil +} + +func (c *GenericArrayColumn) AppendValue(v reflect.Value) { + c.Column = reflect.Append(c.Column, v) +} + +func (c *GenericArrayColumn) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) { + c.AppendValue(reflect.NewAt(typ.Elem(), ptr).Elem()) +} + +func (c *GenericArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { + if c.Column.Cap() >= numRow { + c.Column = c.Column.Slice(0, numRow) + } else { + c.Column = reflect.MakeSlice(c.typ, numRow, numRow) + } + + if numRow == 0 { + return nil + } + + offsets := make([]int, numRow) + for i := 0; i < len(offsets); i++ { + offset, err := rd.UInt64() + if err != nil { + return err + } + offsets[i] = int(offset) + } + + if err := c.elem.ReadFrom(rd, offsets[len(offsets)-1]); err != nil { + return err + } + + var prev int + for i, offset := range offsets { + c.Column.Index(i).Set(reflect.ValueOf(c.elem.Slice(prev, offset))) + prev = offset + } + + return nil +} + +func (c *GenericArrayColumn) WriteTo(wr *chproto.Writer) error { + _ = c.WriteOffset(wr, 0) + + colLen := c.Column.Len() + for i := 0; i < colLen; i++ { + // TODO: add SetValue or SetPointer + c.elem.Set(c.Column.Index(i).Interface()) + + var err error + if c.arrayElem != nil { + err = c.arrayElem.WriteData(wr) + } else { + err = c.elem.WriteTo(wr) + } + if err != nil { + return err + } + } + + return nil +} + +func (c *GenericArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int { + colLen := c.Column.Len() + + for i := 0; i < colLen; i++ { + el := c.Column.Index(i) + offset += el.Len() + wr.UInt64(uint64(offset)) + } + + if c.arrayElem == nil { + return offset + } + + offset = 0 + for i := 0; i < colLen; i++ { + el := c.Column.Index(i) + c.elem.Set(el.Interface()) // Use SetValue or SetPointer + offset = c.arrayElem.WriteOffset(wr, offset) + } + + return offset +} diff --git a/ch/chschema/column_nullable.go b/ch/chschema/column_nullable.go index f8f1144..cc97fb5 100644 --- a/ch/chschema/column_nullable.go +++ b/ch/chschema/column_nullable.go @@ -7,7 +7,7 @@ import ( ) type NullableColumn struct { - Nulls Uint8Column + Nulls UInt8Column Values Columnar nullable reflect.Value // reflect.Slice } @@ -44,7 +44,7 @@ func (c *NullableColumn) Value() any { return c.nullable.Interface() } -func (c *NullableColumn) Nullable(nulls Uint8Column) any { +func (c *NullableColumn) Nullable(nulls UInt8Column) any { panic("not implemented") } diff --git a/ch/chschema/column_safe_gen.go b/ch/chschema/column_safe_gen.go new file mode 100644 index 0000000..33a8696 --- /dev/null +++ b/ch/chschema/column_safe_gen.go @@ -0,0 +1,217 @@ +//go:build !amd64 && !arm64 + +package chschema + +import ( + "github.com/uptrace/go-clickhouse/ch/chproto" +) + +func (c *Int8Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.Int8() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *Int8Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.Int8(n) + } + return nil +} + +func (c *UInt8Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.UInt8() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *UInt8Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.UInt8(n) + } + return nil +} + +func (c *Int16Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.Int16() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *Int16Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.Int16(n) + } + return nil +} + +func (c *UInt16Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.UInt16() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *UInt16Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.UInt16(n) + } + return nil +} + +func (c *Int32Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.Int32() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *Int32Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.Int32(n) + } + return nil +} + +func (c *UInt32Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.UInt32() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *UInt32Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.UInt32(n) + } + return nil +} + +func (c *Int64Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.Int64() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *Int64Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.Int64(n) + } + return nil +} + +func (c *UInt64Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.UInt64() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *UInt64Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.UInt64(n) + } + return nil +} + +func (c *Float32Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.Float32() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *Float32Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.Float32(n) + } + return nil +} + +func (c *Float64Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.Float64() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *Float64Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.Float64(n) + } + return nil +} diff --git a/ch/chschema/column_unsafe_gen.go b/ch/chschema/column_unsafe_gen.go new file mode 100644 index 0000000..d4f2bd5 --- /dev/null +++ b/ch/chschema/column_unsafe_gen.go @@ -0,0 +1,351 @@ +//go:build amd64 || arm64 + +package chschema + +import ( + "io" + "reflect" + "unsafe" + + "github.com/uptrace/go-clickhouse/ch/chproto" +) + +func (c *Int8Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 8 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *Int8Column) WriteTo(wr *chproto.Writer) error { + const size = 8 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *UInt8Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 8 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *UInt8Column) WriteTo(wr *chproto.Writer) error { + const size = 8 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *Int16Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 16 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *Int16Column) WriteTo(wr *chproto.Writer) error { + const size = 16 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *UInt16Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 16 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *UInt16Column) WriteTo(wr *chproto.Writer) error { + const size = 16 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *Int32Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 32 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *Int32Column) WriteTo(wr *chproto.Writer) error { + const size = 32 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *UInt32Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 32 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *UInt32Column) WriteTo(wr *chproto.Writer) error { + const size = 32 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *Int64Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 64 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *Int64Column) WriteTo(wr *chproto.Writer) error { + const size = 64 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *UInt64Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 64 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *UInt64Column) WriteTo(wr *chproto.Writer) error { + const size = 64 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *Float32Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 32 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *Float32Column) WriteTo(wr *chproto.Writer) error { + const size = 32 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +func (c *Float64Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = 64 / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *Float64Column) WriteTo(wr *chproto.Writer) error { + const size = 64 / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} diff --git a/ch/chschema/types.go b/ch/chschema/types.go index 3f848e7..3a210ce 100644 --- a/ch/chschema/types.go +++ b/ch/chschema/types.go @@ -94,11 +94,11 @@ var kindToColumn = [...]NewColumnFunc{ reflect.Int16: NewInt16Column, reflect.Int32: NewInt32Column, reflect.Int64: NewInt64Column, - reflect.Uint: NewUint64Column, - reflect.Uint8: NewUint8Column, - reflect.Uint16: NewUint16Column, - reflect.Uint32: NewUint32Column, - reflect.Uint64: NewUint64Column, + reflect.Uint: NewUInt64Column, + reflect.Uint8: NewUInt8Column, + reflect.Uint16: NewUInt16Column, + reflect.Uint32: NewUInt32Column, + reflect.Uint64: NewUInt64Column, reflect.Uintptr: nil, reflect.Float32: NewFloat32Column, reflect.Float64: NewFloat64Column, @@ -171,6 +171,12 @@ func ColumnFactory(typ reflect.Type, chType string) NewColumnFunc { if elem.Elem().Kind() == reflect.Struct { return NewJSONColumn } + case reflect.Int64: + return NewInt64ArrayColumn + case reflect.Uint64: + return NewUint64ArrayColumn + case reflect.Float64: + return NewFloat64ArrayColumn case reflect.Uint8: if chType == chtype.String { return NewBytesColumn @@ -183,7 +189,7 @@ func ColumnFactory(typ reflect.Type, chType string) NewColumnFunc { } } - return NewArrayColumn + return NewGenericArrayColumn case reflect.Array: if isUUID(typ) { return NewUUIDColumn @@ -196,7 +202,7 @@ func ColumnFactory(typ reflect.Type, chType string) NewColumnFunc { case chtype.DateTime: switch typ { case uint32Type: - return NewUint32Column + return NewUInt32Column case int64Type: return NewInt64TimeColumn default: @@ -227,13 +233,13 @@ func columnFromCHType(chType string) NewColumnFunc { case chtype.Int64: return NewInt64Column case chtype.UInt8: - return NewUint8Column + return NewUInt8Column case chtype.UInt16: - return NewUint16Column + return NewUInt16Column case chtype.UInt32: - return NewUint32Column + return NewUInt32Column case chtype.UInt64: - return NewUint64Column + return NewUInt64Column case chtype.Float32: return NewFloat32Column case chtype.Float64: diff --git a/ch/db.go b/ch/db.go index 6ab7ac4..e9279e8 100644 --- a/ch/db.go +++ b/ch/db.go @@ -225,11 +225,48 @@ func (db *DB) ExecContext( ) (sql.Result, error) { query = db.FormatQuery(query, args...) ctx, evt := db.beforeQuery(ctx, nil, query, args, nil) - res, err := db.query(ctx, nil, query) + res, err := db.exec(ctx, query) db.afterQuery(ctx, evt, res, err) return res, err } +func (db *DB) exec(ctx context.Context, query string) (*result, error) { + var res *result + var lastErr error + for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { + if attempt > 0 { + lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) + if lastErr != nil { + break + } + } + + res, lastErr = db._exec(ctx, query) + if !db.shouldRetry(lastErr) { + break + } + } + return res, lastErr +} + +func (db *DB) _exec(ctx context.Context, query string) (*result, error) { + var res *result + err := db.withConn(ctx, func(cn *chpool.Conn) error { + if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + db.writeQuery(wr, query) + writeBlock(ctx, wr, nil) + }); err != nil { + return err + } + return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { + var err error + res, err = readDataBlocks(rd) + return err + }) + }) + return res, err +} + func (db *DB) Query(query string, args ...any) (*Rows, error) { return db.QueryContext(context.Background(), query, args...) } @@ -237,16 +274,16 @@ func (db *DB) Query(query string, args ...any) (*Rows, error) { func (db *DB) QueryContext( ctx context.Context, query string, args ...any, ) (*Rows, error) { - rows := newRows() query = db.FormatQuery(query, args...) ctx, evt := db.beforeQuery(ctx, nil, query, args, nil) - res, err := db.query(ctx, rows, query) - db.afterQuery(ctx, evt, res, err) + blocks, err := db.query(ctx, query) + db.afterQuery(ctx, evt, nil, err) if err != nil { return nil, err } - return rows, nil + + return newRows(ctx, blocks), nil } func (db *DB) QueryRow(query string, args ...any) *Row { @@ -258,9 +295,10 @@ func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *R return &Row{rows: rows, err: err} } -func (db *DB) query(ctx context.Context, model Model, query string) (*result, error) { - var res *result +func (db *DB) query(ctx context.Context, query string) (*blockIter, error) { + var blocks *blockIter var lastErr error + for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { if attempt > 0 { lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) @@ -269,39 +307,29 @@ func (db *DB) query(ctx context.Context, model Model, query string) (*result, er } } - res, lastErr = db._query(ctx, model, query) + blocks, lastErr = db._query(ctx, query) if !db.shouldRetry(lastErr) { break } } - if lastErr == nil { - if model, ok := model.(AfterScanRowHook); ok { - if err := model.AfterScanRow(ctx); err != nil { - lastErr = err - } - } - } - - return res, lastErr + return blocks, lastErr } -func (db *DB) _query(ctx context.Context, model Model, query string) (*result, error) { - var res *result - err := db.withConn(ctx, func(cn *chpool.Conn) error { - if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { - db.writeQuery(wr, query) - writeBlock(ctx, wr, nil) - }); err != nil { - return err - } - return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { - var err error - res, err = readDataBlocks(rd, model) - return err - }) - }) - return res, err +func (db *DB) _query(ctx context.Context, query string) (*blockIter, error) { + cn, err := db.getConn(ctx) + if err != nil { + return nil, err + } + + if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { + db.writeQuery(wr, query) + writeBlock(ctx, wr, nil) + }); err != nil { + return nil, err + } + + return newBlockIter(db, cn), nil } func (db *DB) insert( @@ -454,21 +482,37 @@ func (db *DB) makeQueryBytes() []byte { // Rows is the result of a query. Its cursor starts before the first row of the result set. // Use Next to advance from row to row. type Rows struct { - blocks []*chschema.Block + ctx context.Context + blocks *blockIter + block *chschema.Block - block *chschema.Block - blockIndex int - rowIndex int + rowIndex int + hasNext bool + closed bool } -func newRows() *Rows { - return new(Rows) +func newRows(ctx context.Context, blocks *blockIter) *Rows { + return &Rows{ + ctx: ctx, + blocks: blocks, + block: new(chschema.Block), + } } func (rs *Rows) Close() error { + if !rs.closed { + for rs.blocks.Next(rs.ctx, rs.block) { + } + rs.close() + } return nil } +func (rs *Rows) close() { + rs.closed = true + _ = rs.blocks.Close() +} + func (rs *Rows) ColumnTypes() ([]*sql.ColumnType, error) { return nil, errors.New("not implemented") } @@ -478,25 +522,25 @@ func (rs *Rows) Columns() ([]string, error) { } func (rs *Rows) Err() error { - return nil + return rs.blocks.Err() } func (rs *Rows) Next() bool { - if rs.block != nil && rs.rowIndex < rs.block.NumRow { - rs.rowIndex++ - return true + if rs.closed { + return false } - for rs.blockIndex < len(rs.blocks) { - rs.block = rs.blocks[rs.blockIndex] - rs.blockIndex++ - if rs.block.NumRow > 0 { - rs.rowIndex = 1 - return true + for rs.rowIndex >= rs.block.NumRow { + if !rs.blocks.Next(rs.ctx, rs.block) { + rs.close() + return false } + rs.rowIndex = 0 } - return false + rs.hasNext = true + rs.rowIndex++ + return true } func (rs *Rows) NextResultSet() bool { @@ -504,9 +548,14 @@ func (rs *Rows) NextResultSet() bool { } func (rs *Rows) Scan(dest ...any) error { - if rs.block == nil { + if rs.closed { + return rs.Err() + } + + if !rs.hasNext { return errors.New("ch: Scan called without calling Next") } + rs.hasNext = false if rs.block.NumColumn != len(dest) { return fmt.Errorf("ch: got %d columns, but Scan has %d values", @@ -522,11 +571,6 @@ func (rs *Rows) Scan(dest ...any) error { return nil } -func (rs *Rows) ScanBlock(block *chschema.Block) error { - rs.blocks = append(rs.blocks, block) - return nil -} - type Row struct { rows *Rows err error diff --git a/ch/db_test.go b/ch/db_test.go index 7989776..69219d8 100644 --- a/ch/db_test.go +++ b/ch/db_test.go @@ -48,6 +48,8 @@ func TestCHError(t *testing.T) { } func TestCHTimeout(t *testing.T) { + t.Skip() + ctx := context.Background() db := chDB(ch.WithTimeout(time.Second), ch.WithMaxRetries(0)) diff --git a/ch/internal/codegen/column_safe.tpl b/ch/internal/codegen/column_safe.tpl new file mode 100644 index 0000000..d00d426 --- /dev/null +++ b/ch/internal/codegen/column_safe.tpl @@ -0,0 +1,32 @@ +//go:build !amd64 && !arm64 + +package chschema + +import ( + "github.com/uptrace/go-clickhouse/ch/chproto" +) + +{{- range . }} + +func (c *{{ .CHType }}Column) ReadFrom(rd *chproto.Reader, numRow int) error { + c.Alloc(numRow) + + for i := range c.Column { + n, err := rd.{{ .CHType }}() + if err != nil { + return err + } + c.Column[i] = n + } + + return nil +} + +func (c *{{ .CHType }}Column) WriteTo(wr *chproto.Writer) error { + for _, n := range c.Column { + wr.{{ .CHType }}(n) + } + return nil +} + +{{- end }} diff --git a/ch/internal/codegen/column_unsafe.tpl b/ch/internal/codegen/column_unsafe.tpl new file mode 100644 index 0000000..7cc1ea0 --- /dev/null +++ b/ch/internal/codegen/column_unsafe.tpl @@ -0,0 +1,49 @@ +//go:build amd64 || arm64 + +package chschema + +import ( + "io" + "reflect" + "unsafe" + + "github.com/uptrace/go-clickhouse/ch/chproto" +) + +{{- range . }} + +func (c *{{ .CHType }}Column) ReadFrom(rd *chproto.Reader, numRow int) error { + const size = {{ .Size }} / 8 + + if numRow == 0 { + return nil + } + + c.Alloc(numRow) + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + dest := *(*[]byte)(unsafe.Pointer(&slice)) + _, err := io.ReadFull(rd, dest) + return err +} + +func (c *{{ .CHType }}Column) WriteTo(wr *chproto.Writer) error { + const size = {{ .Size }} / 8 + + if len(c.Column) == 0 { + return nil + } + + slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column)) + slice.Len *= size + slice.Cap *= size + + src := *(*[]byte)(unsafe.Pointer(&slice)) + wr.Write(src) + return nil +} + +{{- end }} diff --git a/ch/proto.go b/ch/proto.go index 2ce8d1b..b4930b2 100644 --- a/ch/proto.go +++ b/ch/proto.go @@ -20,6 +20,81 @@ const ( chRevision = 54428 ) +type blockIter struct { + db *DB + cn *chpool.Conn + + stickyErr error +} + +func newBlockIter(db *DB, cn *chpool.Conn) *blockIter { + return &blockIter{ + db: db, + cn: cn, + } +} + +func (it *blockIter) Close() error { + if it.cn != nil { + it.db.releaseConn(it.cn, it.stickyErr) + it.cn = nil + } + return nil +} + +func (it *blockIter) Err() error { + return it.stickyErr +} + +func (it *blockIter) Next(ctx context.Context, block *chschema.Block) bool { + if it.stickyErr != nil { + return false + } + + ok, err := it.read(ctx, block) + if err != nil { + it.stickyErr = err + return false + } + return ok +} + +func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) { + rd := it.cn.Reader(ctx, it.db.cfg.ReadTimeout) + for { + packet, err := rd.Uvarint() + if err != nil { + return false, err + } + + switch packet { + case chproto.ServerData: + if err := readBlock(rd, block); err != nil { + return false, err + } + return true, nil + case chproto.ServerException: + return false, readException(rd) + case chproto.ServerProgress: + if err := readProgress(rd); err != nil { + return false, err + } + case chproto.ServerProfileInfo: + if err := readProfileInfo(rd); err != nil { + return false, err + } + case chproto.ServerTableColumns: + if err := readServerTableColumns(rd); err != nil { + return false, err + } + case chproto.ServerEndOfStream: + return false, nil + default: + return false, fmt.Errorf("ch: blockIter.Next: unexpected packet: %d", packet) + } + } +} + func (db *DB) hello(ctx context.Context, cn *chpool.Conn) error { err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { wr.Uvarint(chproto.ClientHello) @@ -254,8 +329,9 @@ func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) { } } -func readDataBlocks(rd *chproto.Reader, model Model) (*result, error) { +func readDataBlocks(rd *chproto.Reader) (*result, error) { var res *result + block := new(chschema.Block) for { packet, err := rd.Uvarint() if err != nil { @@ -264,11 +340,6 @@ func readDataBlocks(rd *chproto.Reader, model Model) (*result, error) { switch packet { case chproto.ServerData: - block := new(chschema.Block) - if model, ok := model.(TableModel); ok { - block.Table = model.Table() - } - if err := readBlock(rd, block); err != nil { return nil, err } @@ -277,12 +348,6 @@ func readDataBlocks(rd *chproto.Reader, model Model) (*result, error) { res = new(result) } res.affected += block.NumRow - - if model != nil { - if err := model.ScanBlock(block); err != nil { - return nil, err - } - } case chproto.ServerException: return nil, readException(rd) case chproto.ServerProgress: @@ -337,7 +402,6 @@ func readPacket(rd *chproto.Reader) (*result, error) { } } -// TODO: return block func readBlock(rd *chproto.Reader, block *chschema.Block) error { if _, err := rd.String(); err != nil { return err diff --git a/ch/query_base.go b/ch/query_base.go index 699266a..f2ee4c7 100644 --- a/ch/query_base.go +++ b/ch/query_base.go @@ -94,13 +94,46 @@ func (q *baseQuery) newModel(values ...any) (Model, error) { return q.tableModel, nil } +func (q *baseQuery) query(ctx context.Context, model Model, query string) (*result, error) { + blocks, err := q.db.query(ctx, query) + if err != nil { + return nil, err + } + + res := &result{ + model: model, + } + block := new(chschema.Block) + if model, ok := model.(TableModel); ok { + block.Table = model.Table() + } + + for blocks.Next(ctx, block) { + if err := model.ScanBlock(block); err != nil { + return nil, err + } + res.affected += block.NumRow + } + if err := blocks.Err(); err != nil { + return nil, err + } + + if model, ok := model.(AfterScanRowHook); ok { + if err := model.AfterScanRow(ctx); err != nil { + return nil, err + } + } + + return res, nil +} + func (q *baseQuery) exec( ctx context.Context, iquery Query, query string, ) (sql.Result, error) { ctx, event := q.db.beforeQuery(ctx, iquery, query, nil, q.tableModel) - res, err := q.db.query(ctx, nil, query) + res, err := q.db.exec(ctx, query) q.db.afterQuery(ctx, event, res, err) return res, err } diff --git a/ch/query_select.go b/ch/query_select.go index 2733340..d5e724f 100644 --- a/ch/query_select.go +++ b/ch/query_select.go @@ -496,7 +496,7 @@ func (q *SelectQuery) scan(ctx context.Context, columnar bool, values ...any) er query := internal.String(queryBytes) ctx, evt := q.db.beforeQuery(ctx, q, query, nil, model) - res, err := q.db.query(ctx, model, query) + res, err := q.query(ctx, model, query) q.db.afterQuery(ctx, evt, res, err) if err != nil { return err