1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-07-17 01:12:33 +02:00

feat: add proper Rows implementation and some optimizations

This commit is contained in:
Vladimir Mihailenco
2022-04-30 10:30:34 +03:00
parent c1e00ef235
commit 658ad14fc0
20 changed files with 1311 additions and 524 deletions

View File

@ -20,3 +20,6 @@ go_mod_tidy:
fmt: fmt:
gofmt -w -s ./ gofmt -w -s ./
goimports -w -local github.com/uptrace/go-clickhouse ./ goimports -w -local github.com/uptrace/go-clickhouse ./
codegen:
go run ./ch/internal/codegen/ -dir=ch/chschema

29
ch/bench_test.go Normal file
View File

@ -0,0 +1,29 @@
package ch_test
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func BenchmarkNumbers(b *testing.B) {
ctx := context.Background()
db := chDB()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
rows, err := db.QueryContext(ctx, "SELECT number FROM system.numbers_mt LIMIT 1000000")
if err != nil {
b.Fatal(err)
}
var count int
for rows.Next() {
count++
}
require.Equal(b, 1000000, count)
}
}

View File

@ -49,6 +49,11 @@ func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr() return cn.netConn.RemoteAddr()
} }
func (cn *Conn) Reader(ctx context.Context, timeout time.Duration) *chproto.Reader {
_ = cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout))
return cn.rd
}
func (cn *Conn) WithReader( func (cn *Conn) WithReader(
ctx context.Context, ctx context.Context,
timeout time.Duration, timeout time.Duration,

View File

@ -17,8 +17,9 @@ type lz4Reader struct {
header []byte header []byte
data []byte zdata []byte
pos int data []byte
pos int
} }
func newLZ4Reader(r *bufio.Reader) *lz4Reader { func newLZ4Reader(r *bufio.Reader) *lz4Reader {
@ -37,8 +38,9 @@ func (r *lz4Reader) Release() error {
err = errUnreadData err = errUnreadData
} }
r.data = nil r.data = r.data[:0]
r.pos = 0 r.pos = 0
r.zdata = r.zdata[:0]
return err return err
} }
@ -102,13 +104,13 @@ func (r *lz4Reader) readData() error {
compressedSize := int(binary.LittleEndian.Uint32(r.header[17:])) - compressionHeaderSize compressedSize := int(binary.LittleEndian.Uint32(r.header[17:])) - compressionHeaderSize
uncompressedSize := int(binary.LittleEndian.Uint32(r.header[21:])) uncompressedSize := int(binary.LittleEndian.Uint32(r.header[21:]))
zdata := make([]byte, compressedSize) r.zdata = grow(r.zdata, compressedSize)
r.data = grow(r.data, uncompressedSize) r.data = grow(r.data, uncompressedSize)
if _, err := io.ReadFull(r.rd, zdata); err != nil { if _, err := io.ReadFull(r.rd, r.zdata); err != nil {
return err return err
} }
if _, err := lz4.UncompressBlock(zdata, r.data); err != nil { if _, err := lz4.UncompressBlock(r.zdata, r.data); err != nil {
return err return err
} }

View File

@ -3,7 +3,6 @@ package chproto
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"sync"
"github.com/pierrec/lz4/v4" "github.com/pierrec/lz4/v4"
@ -25,50 +24,26 @@ const (
blockSize = 1 << 20 // 1 MB blockSize = 1 << 20 // 1 MB
) )
type writeBuffer struct {
buf []byte
}
var writeBufferPool = sync.Pool{
New: func() any {
return &writeBuffer{
buf: make([]byte, blockSize),
}
},
}
func getWriterBuffer() *writeBuffer {
return writeBufferPool.Get().(*writeBuffer)
}
func putWriterBuffer(db *writeBuffer) {
writeBufferPool.Put(db)
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type lz4Writer struct { type lz4Writer struct {
wr *bufio.Writer wr *bufio.Writer
data *writeBuffer data []byte
pos int pos int
zdata []byte
} }
func newLZ4Writer(w *bufio.Writer) *lz4Writer { func newLZ4Writer(w *bufio.Writer) *lz4Writer {
return &lz4Writer{ return &lz4Writer{
wr: w, wr: w,
data: make([]byte, blockSize),
} }
} }
func (w *lz4Writer) Init() {
w.data = getWriterBuffer()
w.pos = 0
}
func (w *lz4Writer) Close() error { func (w *lz4Writer) Close() error {
err := w.flush() err := w.flush()
putWriterBuffer(w.data) w.pos = 0
w.data = nil
return err return err
} }
@ -77,7 +52,7 @@ func (w *lz4Writer) Flush() error {
} }
func (w *lz4Writer) WriteByte(c byte) error { func (w *lz4Writer) WriteByte(c byte) error {
w.data.buf[w.pos] = c w.data[w.pos] = c
w.pos++ w.pos++
return w.checkFlush() return w.checkFlush()
} }
@ -89,7 +64,7 @@ func (w *lz4Writer) WriteString(s string) (int, error) {
func (w *lz4Writer) Write(data []byte) (int, error) { func (w *lz4Writer) Write(data []byte) (int, error) {
var written int var written int
for len(data) > 0 { for len(data) > 0 {
n := copy(w.data.buf[w.pos:], data) n := copy(w.data[w.pos:], data)
data = data[n:] data = data[n:]
w.pos += n w.pos += n
if err := w.checkFlush(); err != nil { if err := w.checkFlush(); err != nil {
@ -101,7 +76,7 @@ func (w *lz4Writer) Write(data []byte) (int, error) {
} }
func (w *lz4Writer) checkFlush() error { func (w *lz4Writer) checkFlush() error {
if w.pos < len(w.data.buf) { if w.pos < len(w.data) {
return nil return nil
} }
return w.flush() return w.flush()
@ -113,23 +88,23 @@ func (w *lz4Writer) flush() error {
} }
zlen := headerSize + lz4.CompressBlockBound(w.pos) zlen := headerSize + lz4.CompressBlockBound(w.pos)
zdata := make([]byte, zlen) w.zdata = grow(w.zdata, zlen)
compressedSize, err := compress(zdata[headerSize:], w.data.buf[:w.pos]) compressedSize, err := compress(w.zdata[headerSize:], w.data[:w.pos])
if err != nil { if err != nil {
return err return err
} }
compressedSize += compressionHeaderSize compressedSize += compressionHeaderSize
zdata[16] = lz4Compression w.zdata[16] = lz4Compression
binary.LittleEndian.PutUint32(zdata[17:], uint32(compressedSize)) binary.LittleEndian.PutUint32(w.zdata[17:], uint32(compressedSize))
binary.LittleEndian.PutUint32(zdata[21:], uint32(w.pos)) binary.LittleEndian.PutUint32(w.zdata[21:], uint32(w.pos))
checkSum := cityhash102.CityHash128(zdata[16:], uint32(compressedSize)) checkSum := cityhash102.CityHash128(w.zdata[16:], uint32(compressedSize))
binary.LittleEndian.PutUint64(zdata[0:], checkSum.Lower64()) binary.LittleEndian.PutUint64(w.zdata[0:], checkSum.Lower64())
binary.LittleEndian.PutUint64(zdata[8:], checkSum.Higher64()) binary.LittleEndian.PutUint64(w.zdata[8:], checkSum.Higher64())
w.wr.Write(zdata[:checksumSize+compressedSize]) w.wr.Write(w.zdata[:checksumSize+compressedSize])
w.pos = 0 w.pos = 0
return nil return nil

View File

@ -70,7 +70,7 @@ func (r *Reader) Uvarint() (uint64, error) {
return binary.ReadUvarint(r.rd) return binary.ReadUvarint(r.rd)
} }
func (r *Reader) Uint8() (uint8, error) { func (r *Reader) UInt8() (uint8, error) {
c, err := r.rd.ReadByte() c, err := r.rd.ReadByte()
if err != nil { if err != nil {
return 0, err return 0, err
@ -78,7 +78,7 @@ func (r *Reader) Uint8() (uint8, error) {
return c, nil return c, nil
} }
func (r *Reader) Uint16() (uint16, error) { func (r *Reader) UInt16() (uint16, error) {
b, err := r.readNTemp(2) b, err := r.readNTemp(2)
if err != nil { if err != nil {
return 0, err return 0, err
@ -86,7 +86,7 @@ func (r *Reader) Uint16() (uint16, error) {
return binary.LittleEndian.Uint16(b), nil return binary.LittleEndian.Uint16(b), nil
} }
func (r *Reader) Uint32() (uint32, error) { func (r *Reader) UInt32() (uint32, error) {
b, err := r.readNTemp(4) b, err := r.readNTemp(4)
if err != nil { if err != nil {
return 0, err return 0, err
@ -94,7 +94,7 @@ func (r *Reader) Uint32() (uint32, error) {
return binary.LittleEndian.Uint32(b), nil return binary.LittleEndian.Uint32(b), nil
} }
func (r *Reader) Uint64() (uint64, error) { func (r *Reader) UInt64() (uint64, error) {
b, err := r.readNTemp(8) b, err := r.readNTemp(8)
if err != nil { if err != nil {
return 0, err return 0, err
@ -103,27 +103,27 @@ func (r *Reader) Uint64() (uint64, error) {
} }
func (r *Reader) Int8() (int8, error) { func (r *Reader) Int8() (int8, error) {
num, err := r.Uint8() num, err := r.UInt8()
return int8(num), err return int8(num), err
} }
func (r *Reader) Int16() (int16, error) { func (r *Reader) Int16() (int16, error) {
num, err := r.Uint16() num, err := r.UInt16()
return int16(num), err return int16(num), err
} }
func (r *Reader) Int32() (int32, error) { func (r *Reader) Int32() (int32, error) {
num, err := r.Uint32() num, err := r.UInt32()
return int32(num), err return int32(num), err
} }
func (r *Reader) Int64() (int64, error) { func (r *Reader) Int64() (int64, error) {
num, err := r.Uint64() num, err := r.UInt64()
return int64(num), err return int64(num), err
} }
func (r *Reader) Float32() (float32, error) { func (r *Reader) Float32() (float32, error) {
num, err := r.Uint32() num, err := r.UInt32()
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -131,7 +131,7 @@ func (r *Reader) Float32() (float32, error) {
} }
func (r *Reader) Float64() (float64, error) { func (r *Reader) Float64() (float64, error) {
num, err := r.Uint64() num, err := r.UInt64()
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -180,7 +180,7 @@ func (r *Reader) readNTemp(n int) ([]byte, error) {
} }
func (r *Reader) DateTime() (time.Time, error) { func (r *Reader) DateTime() (time.Time, error) {
sec, err := r.Uint32() sec, err := r.UInt32()
if err != nil { if err != nil {
return time.Time{}, err return time.Time{}, err
} }
@ -191,7 +191,7 @@ func (r *Reader) DateTime() (time.Time, error) {
} }
func (r *Reader) Date() (time.Time, error) { func (r *Reader) Date() (time.Time, error) {
days, err := r.Uint16() days, err := r.UInt16()
if err != nil { if err != nil {
return time.Time{}, err return time.Time{}, err
} }

View File

@ -10,7 +10,10 @@ import (
"github.com/uptrace/go-clickhouse/ch/internal" "github.com/uptrace/go-clickhouse/ch/internal"
) )
const uuidLen = 16 const (
uuidLen = 16
secsInDay = 24 * 3600
)
type writer interface { type writer interface {
io.Writer io.Writer
@ -44,7 +47,6 @@ func (w *Writer) WithCompression(fn func() error) {
return return
} }
w.zw.Init()
w.wr = w.zw w.wr = w.zw
w.err = fn() w.err = fn()
@ -85,7 +87,7 @@ func (w *Writer) Bool(flag bool) {
if flag { if flag {
num = 1 num = 1
} }
w.Uint8(num) w.UInt8(num)
} }
func (w *Writer) Uvarint(num uint64) { func (w *Writer) Uvarint(num uint64) {
@ -93,47 +95,47 @@ func (w *Writer) Uvarint(num uint64) {
w.Write(w.buf[:n]) w.Write(w.buf[:n])
} }
func (w *Writer) Uint8(num uint8) { func (w *Writer) UInt8(num uint8) {
w.writeByte(num) w.writeByte(num)
} }
func (w *Writer) Uint16(num uint16) { func (w *Writer) UInt16(num uint16) {
binary.LittleEndian.PutUint16(w.buf, num) binary.LittleEndian.PutUint16(w.buf, num)
w.Write(w.buf[:2]) w.Write(w.buf[:2])
} }
func (w *Writer) Uint32(num uint32) { func (w *Writer) UInt32(num uint32) {
binary.LittleEndian.PutUint32(w.buf, num) binary.LittleEndian.PutUint32(w.buf, num)
w.Write(w.buf[:4]) w.Write(w.buf[:4])
} }
func (w *Writer) Uint64(num uint64) { func (w *Writer) UInt64(num uint64) {
binary.LittleEndian.PutUint64(w.buf, num) binary.LittleEndian.PutUint64(w.buf, num)
w.Write(w.buf[:8]) w.Write(w.buf[:8])
} }
func (w *Writer) Int8(num int8) { func (w *Writer) Int8(num int8) {
w.Uint8(uint8(num)) w.UInt8(uint8(num))
} }
func (w *Writer) Int16(num int16) { func (w *Writer) Int16(num int16) {
w.Uint16(uint16(num)) w.UInt16(uint16(num))
} }
func (w *Writer) Int32(num int32) { func (w *Writer) Int32(num int32) {
w.Uint32(uint32(num)) w.UInt32(uint32(num))
} }
func (w *Writer) Int64(num int64) { func (w *Writer) Int64(num int64) {
w.Uint64(uint64(num)) w.UInt64(uint64(num))
} }
func (w *Writer) Float32(num float32) { func (w *Writer) Float32(num float32) {
w.Uint32(math.Float32bits(num)) w.UInt32(math.Float32bits(num))
} }
func (w *Writer) Float64(num float64) { func (w *Writer) Float64(num float64) {
w.Uint64(math.Float64bits(num)) w.UInt64(math.Float64bits(num))
} }
func (w *Writer) String(s string) { func (w *Writer) String(s string) {
@ -172,13 +174,11 @@ func packUUID(b []byte) []byte {
} }
func (w *Writer) DateTime(tm time.Time) { func (w *Writer) DateTime(tm time.Time) {
w.Uint32(uint32(unixTime(tm))) w.UInt32(uint32(unixTime(tm)))
} }
const secsInDay = 24 * 3600
func (w *Writer) Date(tm time.Time) { func (w *Writer) Date(tm time.Time) {
w.Uint16(uint16(unixTime(tm) / secsInDay)) w.UInt16(uint16(unixTime(tm) / secsInDay))
} }
func unixTime(tm time.Time) int64 { func unixTime(tm time.Time) int64 {

View File

@ -37,7 +37,7 @@ type Columnar interface {
Set(v any) Set(v any)
AppendValue(v reflect.Value) AppendValue(v reflect.Value)
Value() any Value() any
Nullable(nulls Uint8Column) any Nullable(nulls UInt8Column) any
Len() int Len() int
Index(idx int) any Index(idx int) any
Slice(s, e int) any Slice(s, e int) any
@ -89,7 +89,7 @@ func (c ColumnOf[T]) Value() any {
return c.Column return c.Column
} }
func (c ColumnOf[T]) Nullable(nulls Uint8Column) any { func (c ColumnOf[T]) Nullable(nulls UInt8Column) any {
nullable := make([]*T, len(c.Column)) nullable := make([]*T, len(c.Column))
for i := range c.Column { for i := range c.Column {
if nulls.Column[i] == 0 { if nulls.Column[i] == 0 {
@ -221,27 +221,6 @@ func (c *Int8Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, int8(v.Int())) c.Column = append(c.Column, int8(v.Int()))
} }
func (c *Int8Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int8()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Int8Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int8(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Int16Column struct { type Int16Column struct {
@ -264,27 +243,6 @@ func (c *Int16Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, int16(v.Int())) c.Column = append(c.Column, int16(v.Int()))
} }
func (c *Int16Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int16()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Int16Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int16(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Int32Column struct { type Int32Column struct {
@ -307,27 +265,6 @@ func (c *Int32Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, int32(v.Int())) c.Column = append(c.Column, int32(v.Int()))
} }
func (c *Int32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int32()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Int32Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int32(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Int64Column struct { type Int64Column struct {
@ -350,199 +287,94 @@ func (c *Int64Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, v.Int()) c.Column = append(c.Column, v.Int())
} }
func (c *Int64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int64()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Int64Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int64(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Uint8Column struct { type UInt8Column struct {
NumericColumnOf[uint8] NumericColumnOf[uint8]
} }
var _ Columnar = (*Uint8Column)(nil) var _ Columnar = (*UInt8Column)(nil)
func NewUint8Column(typ reflect.Type, chType string, numRow int) Columnar { func NewUInt8Column(typ reflect.Type, chType string, numRow int) Columnar {
return &Uint8Column{ return &UInt8Column{
NumericColumnOf: NewNumericColumnOf[uint8](numRow), NumericColumnOf: NewNumericColumnOf[uint8](numRow),
} }
} }
func (c Uint8Column) Type() reflect.Type { func (c UInt8Column) Type() reflect.Type {
return uint8Type return uint8Type
} }
func (c *Uint8Column) AppendValue(v reflect.Value) { func (c *UInt8Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, uint8(v.Uint())) c.Column = append(c.Column, uint8(v.Uint()))
} }
func (c *Uint8Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Uint8()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Uint8Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Uint8(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Uint16Column struct { type UInt16Column struct {
NumericColumnOf[uint16] NumericColumnOf[uint16]
} }
var _ Columnar = (*Uint16Column)(nil) var _ Columnar = (*UInt16Column)(nil)
func NewUint16Column(typ reflect.Type, chType string, numRow int) Columnar { func NewUInt16Column(typ reflect.Type, chType string, numRow int) Columnar {
return &Uint16Column{ return &UInt16Column{
NumericColumnOf: NewNumericColumnOf[uint16](numRow), NumericColumnOf: NewNumericColumnOf[uint16](numRow),
} }
} }
func (c Uint16Column) Type() reflect.Type { func (c UInt16Column) Type() reflect.Type {
return uint16Type return uint16Type
} }
func (c *Uint16Column) AppendValue(v reflect.Value) { func (c *UInt16Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, uint16(v.Uint())) c.Column = append(c.Column, uint16(v.Uint()))
} }
func (c *Uint16Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Uint16()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Uint16Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Uint16(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Uint32Column struct { type UInt32Column struct {
NumericColumnOf[uint32] NumericColumnOf[uint32]
} }
var _ Columnar = (*Uint32Column)(nil) var _ Columnar = (*UInt32Column)(nil)
func NewUint32Column(typ reflect.Type, chType string, numRow int) Columnar { func NewUInt32Column(typ reflect.Type, chType string, numRow int) Columnar {
return &Uint32Column{ return &UInt32Column{
NumericColumnOf: NewNumericColumnOf[uint32](numRow), NumericColumnOf: NewNumericColumnOf[uint32](numRow),
} }
} }
func (c Uint32Column) Type() reflect.Type { func (c UInt32Column) Type() reflect.Type {
return uint32Type return uint32Type
} }
func (c *Uint32Column) AppendValue(v reflect.Value) { func (c *UInt32Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, uint32(v.Uint())) c.Column = append(c.Column, uint32(v.Uint()))
} }
func (c *Uint32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Uint32()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Uint32Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Uint32(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Uint64Column struct { type UInt64Column struct {
NumericColumnOf[uint64] NumericColumnOf[uint64]
} }
var _ Columnar = (*Uint64Column)(nil) var _ Columnar = (*UInt64Column)(nil)
func NewUint64Column(typ reflect.Type, chType string, numRow int) Columnar { func NewUInt64Column(typ reflect.Type, chType string, numRow int) Columnar {
return &Uint64Column{ return &UInt64Column{
NumericColumnOf: NewNumericColumnOf[uint64](numRow), NumericColumnOf: NewNumericColumnOf[uint64](numRow),
} }
} }
func (c Uint64Column) Type() reflect.Type { func (c UInt64Column) Type() reflect.Type {
return uint64Type return uint64Type
} }
func (c *Uint64Column) AppendValue(v reflect.Value) { func (c *UInt64Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, v.Uint()) c.Column = append(c.Column, v.Uint())
} }
func (c *Uint64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Uint64()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Uint64Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Uint64(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Float32Column struct { type Float32Column struct {
@ -565,27 +397,6 @@ func (c *Float32Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, float32(v.Float())) c.Column = append(c.Column, float32(v.Float()))
} }
func (c *Float32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Float32()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Float32Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Float32(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Float64Column struct { type Float64Column struct {
@ -608,27 +419,6 @@ func (c *Float64Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, v.Float()) c.Column = append(c.Column, v.Float())
} }
func (c *Float64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Float64()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c Float64Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Float64(n)
}
return nil
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type StringColumn struct { type StringColumn struct {
@ -921,7 +711,7 @@ func (c *Int64TimeColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow) c.Alloc(numRow)
for i := range c.Column { for i := range c.Column {
n, err := rd.Uint32() n, err := rd.UInt32()
if err != nil { if err != nil {
return err return err
} }
@ -933,7 +723,7 @@ func (c *Int64TimeColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
func (c Int64TimeColumn) WriteTo(wr *chproto.Writer) error { func (c Int64TimeColumn) WriteTo(wr *chproto.Writer) error {
for i := range c.Column { for i := range c.Column {
wr.Uint32(uint32(c.Column[i] / int64(time.Second))) wr.UInt32(uint32(c.Column[i] / int64(time.Second)))
} }
return nil return nil
} }
@ -1217,7 +1007,7 @@ func (c *LCStringColumn) readData(rd *chproto.Reader, numRow int) error {
} }
lcKey := newLCKeyType(flags & 0xf) lcKey := newLCKeyType(flags & 0xf)
dictSize, err := rd.Uint64() dictSize, err := rd.UInt64()
if err != nil { if err != nil {
return err return err
} }
@ -1232,7 +1022,7 @@ func (c *LCStringColumn) readData(rd *chproto.Reader, numRow int) error {
dict[i] = s dict[i] = s
} }
numKey, err := rd.Uint64() numKey, err := rd.UInt64()
if err != nil { if err != nil {
return err return err
} }
@ -1325,44 +1115,44 @@ func newLCKeyType(typ int64) lcKey {
return lcKey{ return lcKey{
typ: 0, typ: 0,
read: func(rd *chproto.Reader) (int, error) { read: func(rd *chproto.Reader) (int, error) {
n, err := rd.Uint8() n, err := rd.UInt8()
return int(n), err return int(n), err
}, },
write: func(wr *chproto.Writer, n int) { write: func(wr *chproto.Writer, n int) {
wr.Uint8(uint8(n)) wr.UInt8(uint8(n))
}, },
} }
case 1: case 1:
return lcKey{ return lcKey{
typ: int8(1), typ: int8(1),
read: func(rd *chproto.Reader) (int, error) { read: func(rd *chproto.Reader) (int, error) {
n, err := rd.Uint16() n, err := rd.UInt16()
return int(n), err return int(n), err
}, },
write: func(wr *chproto.Writer, n int) { write: func(wr *chproto.Writer, n int) {
wr.Uint16(uint16(n)) wr.UInt16(uint16(n))
}, },
} }
case 2: case 2:
return lcKey{ return lcKey{
typ: 2, typ: 2,
read: func(rd *chproto.Reader) (int, error) { read: func(rd *chproto.Reader) (int, error) {
n, err := rd.Uint32() n, err := rd.UInt32()
return int(n), err return int(n), err
}, },
write: func(wr *chproto.Writer, n int) { write: func(wr *chproto.Writer, n int) {
wr.Uint32(uint32(n)) wr.UInt32(uint32(n))
}, },
} }
case 3: case 3:
return lcKey{ return lcKey{
typ: 3, typ: 3,
read: func(rd *chproto.Reader) (int, error) { read: func(rd *chproto.Reader) (int, error) {
n, err := rd.Uint64() n, err := rd.UInt64()
return int(n), err return int(n), err
}, },
write: func(wr *chproto.Writer, n int) { write: func(wr *chproto.Writer, n int) {
wr.Uint64(uint64(n)) wr.UInt64(uint64(n))
}, },
} }
default: default:

View File

@ -3,6 +3,7 @@ package chschema
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"unsafe"
"github.com/uptrace/go-clickhouse/ch/chproto" "github.com/uptrace/go-clickhouse/ch/chproto"
) )
@ -12,114 +13,64 @@ type ArrayColumnar interface {
WriteData(wr *chproto.Writer) error WriteData(wr *chproto.Writer) error
} }
type ArrayLCStringColumn struct {
*LCStringColumn
}
func (c ArrayLCStringColumn) Type() reflect.Type {
return stringSliceType
}
func (c *ArrayLCStringColumn) WriteTo(wr *chproto.Writer) error {
c.writeData(wr)
return nil
}
func (c *ArrayLCStringColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
if numRow == 0 {
return nil
}
return c.readData(rd, numRow)
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type ArrayColumn struct { type ArrayColumnOf[T any] struct {
Column reflect.Value Column [][]T
elem Columnar
typ reflect.Type
elem Columnar
arrayElem ArrayColumnar
} }
var _ Columnar = (*ArrayColumn)(nil) func (c *ArrayColumnOf[T]) Reset(numRow int) {
if cap(c.Column) >= numRow {
func NewArrayColumn(typ reflect.Type, chType string, numRow int) Columnar { c.Column = c.Column[:0]
elemType := chArrayElemType(chType)
if elemType == "" {
panic(fmt.Errorf("invalid array type: %q (Go type is %s)",
chType, typ.String()))
}
elem := NewColumn(typ.Elem(), elemType, 0)
var arrayElem ArrayColumnar
if _, ok := elem.(*LCStringColumn); ok {
panic("not reached")
}
arrayElem, _ = elem.(ArrayColumnar)
c := &ArrayColumn{
typ: reflect.SliceOf(typ),
elem: elem,
arrayElem: arrayElem,
}
c.Column = reflect.MakeSlice(c.typ, 0, numRow)
return c
}
func (c ArrayColumn) Type() reflect.Type {
return c.typ.Elem()
}
func (c *ArrayColumn) Reset(numRow int) {
if c.Column.Cap() >= numRow {
c.Column = c.Column.Slice(0, 0)
} else { } else {
c.Column = reflect.MakeSlice(c.typ, 0, numRow) c.Column = make([][]T, 0, numRow)
} }
} }
func (c *ArrayColumn) Set(v any) { func (c *ArrayColumnOf[T]) Set(v any) {
c.Column = reflect.ValueOf(v) c.Column = v.([][]T)
} }
func (c *ArrayColumn) Value() any { func (c *ArrayColumnOf[T]) Value() any {
return c.Column.Interface() return c.Column
} }
func (c *ArrayColumn) Nullable(nulls Uint8Column) any { func (c *ArrayColumnOf[T]) Nullable(nulls UInt8Column) any {
panic("not implemented") panic("not implemented")
} }
func (c *ArrayColumn) Len() int { func (c *ArrayColumnOf[T]) Len() int {
return c.Column.Len() return len(c.Column)
} }
func (c *ArrayColumn) Index(idx int) any { func (c *ArrayColumnOf[T]) Index(idx int) any {
return c.Column.Index(idx).Interface() return c.Column[idx]
} }
func (c ArrayColumn) Slice(s, e int) any { func (c *ArrayColumnOf[T]) Slice(s, e int) any {
return c.Column.Slice(s, e).Interface() return c.Column[s:e]
} }
func (c *ArrayColumn) ConvertAssign(idx int, v reflect.Value) error { func (c *ArrayColumnOf[T]) ConvertAssign(idx int, v reflect.Value) error {
v.Set(c.Column.Index(idx)) v.Set(reflect.ValueOf(c.Column[idx]))
return nil return nil
} }
func (c *ArrayColumn) AppendValue(v reflect.Value) { func (c *ArrayColumnOf[T]) AppendValue(v reflect.Value) {
c.Column = reflect.Append(c.Column, v) ptr := unsafe.Pointer(v.UnsafeAddr())
c.AppendPointer(v.Type(), ptr)
} }
func (c *ArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { func (c *ArrayColumnOf[T]) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) {
if c.Column.Cap() >= numRow { c.Column = append(c.Column, *(*[]T)(ptr))
c.Column = c.Column.Slice(0, numRow) }
func (c *ArrayColumnOf[T]) ReadFrom(rd *chproto.Reader, numRow int) error {
if cap(c.Column) >= numRow {
c.Column = c.Column[:numRow]
} else { } else {
c.Column = reflect.MakeSlice(c.typ, numRow, numRow) c.Column = make([][]T, numRow)
} }
if numRow == 0 { if numRow == 0 {
@ -127,8 +78,8 @@ func (c *ArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
} }
offsets := make([]int, numRow) offsets := make([]int, numRow)
for i := 0; i < len(offsets); i++ { for i := 0; i < numRow; i++ {
offset, err := rd.Uint64() offset, err := rd.UInt64()
if err != nil { if err != nil {
return err return err
} }
@ -141,56 +92,99 @@ func (c *ArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
var prev int var prev int
for i, offset := range offsets { for i, offset := range offsets {
c.Column.Index(i).Set(reflect.ValueOf(c.elem.Slice(prev, offset))) c.Column[i] = c.elem.Slice(prev, offset).([]T)
prev = offset prev = offset
} }
return nil return nil
} }
func (c *ArrayColumn) WriteTo(wr *chproto.Writer) error { func (c *ArrayColumnOf[T]) WriteTo(wr *chproto.Writer) error {
_ = c.WriteOffset(wr, 0) _ = c.WriteOffset(wr, 0)
return c.WriteData(wr)
}
colLen := c.Column.Len() var _ ArrayColumnar = (*Int64ArrayColumn)(nil)
for i := 0; i < colLen; i++ {
// TODO: add SetValue or SetPointer
c.elem.Set(c.Column.Index(i).Interface())
var err error func (c *ArrayColumnOf[T]) WriteOffset(wr *chproto.Writer, offset int) int {
if c.arrayElem != nil { for _, el := range c.Column {
err = c.arrayElem.WriteData(wr) offset += len(el)
} else { wr.UInt64(uint64(offset))
err = c.elem.WriteTo(wr) }
} return offset
if err != nil { }
func (c *ArrayColumnOf[T]) WriteData(wr *chproto.Writer) error {
for _, ss := range c.Column {
c.elem.Set(ss)
if err := c.elem.WriteTo(wr); err != nil {
return err return err
} }
} }
return nil return nil
} }
func (c *ArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int { //------------------------------------------------------------------------------
colLen := c.Column.Len()
for i := 0; i < colLen; i++ { type Int64ArrayColumn struct {
el := c.Column.Index(i) ArrayColumnOf[int64]
offset += el.Len() }
wr.Uint64(uint64(offset))
var _ Columnar = (*Int64ArrayColumn)(nil)
func NewInt64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
return &Int64ArrayColumn{
ArrayColumnOf: ArrayColumnOf[int64]{
Column: make([][]int64, 0, numRow),
elem: NewInt64Column(typ.Elem(), "", 0),
},
} }
}
if c.arrayElem == nil { func (c *Int64ArrayColumn) Type() reflect.Type {
return offset return int64SliceType
}
//------------------------------------------------------------------------------
type Uint64ArrayColumn struct {
ArrayColumnOf[uint64]
}
var _ Columnar = (*Uint64ArrayColumn)(nil)
func NewUint64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
return &Uint64ArrayColumn{
ArrayColumnOf: ArrayColumnOf[uint64]{
Column: make([][]uint64, 0, numRow),
elem: NewUInt64Column(typ.Elem(), "", 0),
},
} }
}
offset = 0 func (c *Uint64ArrayColumn) Type() reflect.Type {
for i := 0; i < colLen; i++ { return uint64SliceType
el := c.Column.Index(i) }
c.elem.Set(el.Interface()) // Use SetValue or SetPointer
offset = c.arrayElem.WriteOffset(wr, offset) //------------------------------------------------------------------------------
type Float64ArrayColumn struct {
ArrayColumnOf[float64]
}
var _ Columnar = (*Float64ArrayColumn)(nil)
func NewFloat64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
return &Float64ArrayColumn{
ArrayColumnOf: ArrayColumnOf[float64]{
Column: make([][]float64, 0, numRow),
elem: NewFloat64Column(typ.Elem(), "", 0),
},
} }
}
return offset func (c *Float64ArrayColumn) Type() reflect.Type {
return float64SliceType
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -259,7 +253,7 @@ func (c *StringArrayColumn) Value() any {
return c.Column return c.Column
} }
func (c *StringArrayColumn) Nullable(nulls Uint8Column) any { func (c *StringArrayColumn) Nullable(nulls UInt8Column) any {
panic("not implemented") panic("not implemented")
} }
@ -284,6 +278,10 @@ func (c *StringArrayColumn) AppendValue(v reflect.Value) {
c.Column = append(c.Column, v.Interface().([]string)) c.Column = append(c.Column, v.Interface().([]string))
} }
func (c *StringArrayColumn) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) {
c.Column = append(c.Column, *(*[]string)(ptr))
}
func (c *StringArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error { func (c *StringArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
if numRow == 0 { if numRow == 0 {
return nil return nil
@ -304,7 +302,7 @@ func (c *StringArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
offsets := make([]int, numRow) offsets := make([]int, numRow)
for i := 0; i < len(offsets); i++ { for i := 0; i < len(offsets); i++ {
offset, err := rd.Uint64() offset, err := rd.UInt64()
if err != nil { if err != nil {
return err return err
} }
@ -338,7 +336,7 @@ var _ ArrayColumnar = (*StringArrayColumn)(nil)
func (c *StringArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int { func (c *StringArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int {
for _, el := range c.Column { for _, el := range c.Column {
offset += len(el) offset += len(el)
wr.Uint64(uint64(offset)) wr.UInt64(uint64(offset))
} }
return offset return offset
} }
@ -352,3 +350,190 @@ func (c *StringArrayColumn) WriteData(wr *chproto.Writer) error {
} }
return nil return nil
} }
//------------------------------------------------------------------------------
type ArrayLCStringColumn struct {
*LCStringColumn
}
func (c ArrayLCStringColumn) Type() reflect.Type {
return stringSliceType
}
func (c *ArrayLCStringColumn) WriteTo(wr *chproto.Writer) error {
c.writeData(wr)
return nil
}
func (c *ArrayLCStringColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
if numRow == 0 {
return nil
}
return c.readData(rd, numRow)
}
//------------------------------------------------------------------------------
type GenericArrayColumn struct {
Column reflect.Value
typ reflect.Type
elem Columnar
arrayElem ArrayColumnar
}
var _ Columnar = (*GenericArrayColumn)(nil)
func NewGenericArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
elemType := chArrayElemType(chType)
if elemType == "" {
panic(fmt.Errorf("invalid array type: %q (Go type is %s)",
chType, typ.String()))
}
elem := NewColumn(typ.Elem(), elemType, 0)
var arrayElem ArrayColumnar
if _, ok := elem.(*LCStringColumn); ok {
panic("not reached")
}
arrayElem, _ = elem.(ArrayColumnar)
c := &GenericArrayColumn{
typ: reflect.SliceOf(typ),
elem: elem,
arrayElem: arrayElem,
}
c.Column = reflect.MakeSlice(c.typ, 0, numRow)
return c
}
func (c GenericArrayColumn) Type() reflect.Type {
return c.typ.Elem()
}
func (c *GenericArrayColumn) Reset(numRow int) {
if c.Column.Cap() >= numRow {
c.Column = c.Column.Slice(0, 0)
} else {
c.Column = reflect.MakeSlice(c.typ, 0, numRow)
}
}
func (c *GenericArrayColumn) Set(v any) {
c.Column = reflect.ValueOf(v)
}
func (c *GenericArrayColumn) Value() any {
return c.Column.Interface()
}
func (c *GenericArrayColumn) Nullable(nulls UInt8Column) any {
panic("not implemented")
}
func (c *GenericArrayColumn) Len() int {
return c.Column.Len()
}
func (c *GenericArrayColumn) Index(idx int) any {
return c.Column.Index(idx).Interface()
}
func (c GenericArrayColumn) Slice(s, e int) any {
return c.Column.Slice(s, e).Interface()
}
func (c *GenericArrayColumn) ConvertAssign(idx int, v reflect.Value) error {
v.Set(c.Column.Index(idx))
return nil
}
func (c *GenericArrayColumn) AppendValue(v reflect.Value) {
c.Column = reflect.Append(c.Column, v)
}
func (c *GenericArrayColumn) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) {
c.AppendValue(reflect.NewAt(typ.Elem(), ptr).Elem())
}
func (c *GenericArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
if c.Column.Cap() >= numRow {
c.Column = c.Column.Slice(0, numRow)
} else {
c.Column = reflect.MakeSlice(c.typ, numRow, numRow)
}
if numRow == 0 {
return nil
}
offsets := make([]int, numRow)
for i := 0; i < len(offsets); i++ {
offset, err := rd.UInt64()
if err != nil {
return err
}
offsets[i] = int(offset)
}
if err := c.elem.ReadFrom(rd, offsets[len(offsets)-1]); err != nil {
return err
}
var prev int
for i, offset := range offsets {
c.Column.Index(i).Set(reflect.ValueOf(c.elem.Slice(prev, offset)))
prev = offset
}
return nil
}
func (c *GenericArrayColumn) WriteTo(wr *chproto.Writer) error {
_ = c.WriteOffset(wr, 0)
colLen := c.Column.Len()
for i := 0; i < colLen; i++ {
// TODO: add SetValue or SetPointer
c.elem.Set(c.Column.Index(i).Interface())
var err error
if c.arrayElem != nil {
err = c.arrayElem.WriteData(wr)
} else {
err = c.elem.WriteTo(wr)
}
if err != nil {
return err
}
}
return nil
}
func (c *GenericArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int {
colLen := c.Column.Len()
for i := 0; i < colLen; i++ {
el := c.Column.Index(i)
offset += el.Len()
wr.UInt64(uint64(offset))
}
if c.arrayElem == nil {
return offset
}
offset = 0
for i := 0; i < colLen; i++ {
el := c.Column.Index(i)
c.elem.Set(el.Interface()) // Use SetValue or SetPointer
offset = c.arrayElem.WriteOffset(wr, offset)
}
return offset
}

View File

@ -7,7 +7,7 @@ import (
) )
type NullableColumn struct { type NullableColumn struct {
Nulls Uint8Column Nulls UInt8Column
Values Columnar Values Columnar
nullable reflect.Value // reflect.Slice nullable reflect.Value // reflect.Slice
} }
@ -44,7 +44,7 @@ func (c *NullableColumn) Value() any {
return c.nullable.Interface() return c.nullable.Interface()
} }
func (c *NullableColumn) Nullable(nulls Uint8Column) any { func (c *NullableColumn) Nullable(nulls UInt8Column) any {
panic("not implemented") panic("not implemented")
} }

View File

@ -0,0 +1,217 @@
//go:build !amd64 && !arm64
package chschema
import (
"github.com/uptrace/go-clickhouse/ch/chproto"
)
func (c *Int8Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int8()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *Int8Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int8(n)
}
return nil
}
func (c *UInt8Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.UInt8()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *UInt8Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.UInt8(n)
}
return nil
}
func (c *Int16Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int16()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *Int16Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int16(n)
}
return nil
}
func (c *UInt16Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.UInt16()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *UInt16Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.UInt16(n)
}
return nil
}
func (c *Int32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int32()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *Int32Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int32(n)
}
return nil
}
func (c *UInt32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.UInt32()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *UInt32Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.UInt32(n)
}
return nil
}
func (c *Int64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Int64()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *Int64Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Int64(n)
}
return nil
}
func (c *UInt64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.UInt64()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *UInt64Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.UInt64(n)
}
return nil
}
func (c *Float32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Float32()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *Float32Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Float32(n)
}
return nil
}
func (c *Float64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.Float64()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *Float64Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.Float64(n)
}
return nil
}

View File

@ -0,0 +1,351 @@
//go:build amd64 || arm64
package chschema
import (
"io"
"reflect"
"unsafe"
"github.com/uptrace/go-clickhouse/ch/chproto"
)
func (c *Int8Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 8 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *Int8Column) WriteTo(wr *chproto.Writer) error {
const size = 8 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *UInt8Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 8 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *UInt8Column) WriteTo(wr *chproto.Writer) error {
const size = 8 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *Int16Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 16 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *Int16Column) WriteTo(wr *chproto.Writer) error {
const size = 16 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *UInt16Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 16 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *UInt16Column) WriteTo(wr *chproto.Writer) error {
const size = 16 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *Int32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 32 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *Int32Column) WriteTo(wr *chproto.Writer) error {
const size = 32 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *UInt32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 32 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *UInt32Column) WriteTo(wr *chproto.Writer) error {
const size = 32 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *Int64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 64 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *Int64Column) WriteTo(wr *chproto.Writer) error {
const size = 64 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *UInt64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 64 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *UInt64Column) WriteTo(wr *chproto.Writer) error {
const size = 64 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *Float32Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 32 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *Float32Column) WriteTo(wr *chproto.Writer) error {
const size = 32 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
func (c *Float64Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = 64 / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *Float64Column) WriteTo(wr *chproto.Writer) error {
const size = 64 / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}

View File

@ -94,11 +94,11 @@ var kindToColumn = [...]NewColumnFunc{
reflect.Int16: NewInt16Column, reflect.Int16: NewInt16Column,
reflect.Int32: NewInt32Column, reflect.Int32: NewInt32Column,
reflect.Int64: NewInt64Column, reflect.Int64: NewInt64Column,
reflect.Uint: NewUint64Column, reflect.Uint: NewUInt64Column,
reflect.Uint8: NewUint8Column, reflect.Uint8: NewUInt8Column,
reflect.Uint16: NewUint16Column, reflect.Uint16: NewUInt16Column,
reflect.Uint32: NewUint32Column, reflect.Uint32: NewUInt32Column,
reflect.Uint64: NewUint64Column, reflect.Uint64: NewUInt64Column,
reflect.Uintptr: nil, reflect.Uintptr: nil,
reflect.Float32: NewFloat32Column, reflect.Float32: NewFloat32Column,
reflect.Float64: NewFloat64Column, reflect.Float64: NewFloat64Column,
@ -171,6 +171,12 @@ func ColumnFactory(typ reflect.Type, chType string) NewColumnFunc {
if elem.Elem().Kind() == reflect.Struct { if elem.Elem().Kind() == reflect.Struct {
return NewJSONColumn return NewJSONColumn
} }
case reflect.Int64:
return NewInt64ArrayColumn
case reflect.Uint64:
return NewUint64ArrayColumn
case reflect.Float64:
return NewFloat64ArrayColumn
case reflect.Uint8: case reflect.Uint8:
if chType == chtype.String { if chType == chtype.String {
return NewBytesColumn return NewBytesColumn
@ -183,7 +189,7 @@ func ColumnFactory(typ reflect.Type, chType string) NewColumnFunc {
} }
} }
return NewArrayColumn return NewGenericArrayColumn
case reflect.Array: case reflect.Array:
if isUUID(typ) { if isUUID(typ) {
return NewUUIDColumn return NewUUIDColumn
@ -196,7 +202,7 @@ func ColumnFactory(typ reflect.Type, chType string) NewColumnFunc {
case chtype.DateTime: case chtype.DateTime:
switch typ { switch typ {
case uint32Type: case uint32Type:
return NewUint32Column return NewUInt32Column
case int64Type: case int64Type:
return NewInt64TimeColumn return NewInt64TimeColumn
default: default:
@ -227,13 +233,13 @@ func columnFromCHType(chType string) NewColumnFunc {
case chtype.Int64: case chtype.Int64:
return NewInt64Column return NewInt64Column
case chtype.UInt8: case chtype.UInt8:
return NewUint8Column return NewUInt8Column
case chtype.UInt16: case chtype.UInt16:
return NewUint16Column return NewUInt16Column
case chtype.UInt32: case chtype.UInt32:
return NewUint32Column return NewUInt32Column
case chtype.UInt64: case chtype.UInt64:
return NewUint64Column return NewUInt64Column
case chtype.Float32: case chtype.Float32:
return NewFloat32Column return NewFloat32Column
case chtype.Float64: case chtype.Float64:

156
ch/db.go
View File

@ -225,11 +225,48 @@ func (db *DB) ExecContext(
) (sql.Result, error) { ) (sql.Result, error) {
query = db.FormatQuery(query, args...) query = db.FormatQuery(query, args...)
ctx, evt := db.beforeQuery(ctx, nil, query, args, nil) ctx, evt := db.beforeQuery(ctx, nil, query, args, nil)
res, err := db.query(ctx, nil, query) res, err := db.exec(ctx, query)
db.afterQuery(ctx, evt, res, err) db.afterQuery(ctx, evt, res, err)
return res, err return res, err
} }
func (db *DB) exec(ctx context.Context, query string) (*result, error) {
var res *result
var lastErr error
for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ {
if attempt > 0 {
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
if lastErr != nil {
break
}
}
res, lastErr = db._exec(ctx, query)
if !db.shouldRetry(lastErr) {
break
}
}
return res, lastErr
}
func (db *DB) _exec(ctx context.Context, query string) (*result, error) {
var res *result
err := db.withConn(ctx, func(cn *chpool.Conn) error {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
db.writeQuery(wr, query)
writeBlock(ctx, wr, nil)
}); err != nil {
return err
}
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error {
var err error
res, err = readDataBlocks(rd)
return err
})
})
return res, err
}
func (db *DB) Query(query string, args ...any) (*Rows, error) { func (db *DB) Query(query string, args ...any) (*Rows, error) {
return db.QueryContext(context.Background(), query, args...) return db.QueryContext(context.Background(), query, args...)
} }
@ -237,16 +274,16 @@ func (db *DB) Query(query string, args ...any) (*Rows, error) {
func (db *DB) QueryContext( func (db *DB) QueryContext(
ctx context.Context, query string, args ...any, ctx context.Context, query string, args ...any,
) (*Rows, error) { ) (*Rows, error) {
rows := newRows()
query = db.FormatQuery(query, args...) query = db.FormatQuery(query, args...)
ctx, evt := db.beforeQuery(ctx, nil, query, args, nil) ctx, evt := db.beforeQuery(ctx, nil, query, args, nil)
res, err := db.query(ctx, rows, query) blocks, err := db.query(ctx, query)
db.afterQuery(ctx, evt, res, err) db.afterQuery(ctx, evt, nil, err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return rows, nil
return newRows(ctx, blocks), nil
} }
func (db *DB) QueryRow(query string, args ...any) *Row { func (db *DB) QueryRow(query string, args ...any) *Row {
@ -258,9 +295,10 @@ func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *R
return &Row{rows: rows, err: err} return &Row{rows: rows, err: err}
} }
func (db *DB) query(ctx context.Context, model Model, query string) (*result, error) { func (db *DB) query(ctx context.Context, query string) (*blockIter, error) {
var res *result var blocks *blockIter
var lastErr error var lastErr error
for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ { for attempt := 0; attempt <= db.cfg.MaxRetries; attempt++ {
if attempt > 0 { if attempt > 0 {
lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1)) lastErr = internal.Sleep(ctx, db.retryBackoff(attempt-1))
@ -269,39 +307,29 @@ func (db *DB) query(ctx context.Context, model Model, query string) (*result, er
} }
} }
res, lastErr = db._query(ctx, model, query) blocks, lastErr = db._query(ctx, query)
if !db.shouldRetry(lastErr) { if !db.shouldRetry(lastErr) {
break break
} }
} }
if lastErr == nil { return blocks, lastErr
if model, ok := model.(AfterScanRowHook); ok {
if err := model.AfterScanRow(ctx); err != nil {
lastErr = err
}
}
}
return res, lastErr
} }
func (db *DB) _query(ctx context.Context, model Model, query string) (*result, error) { func (db *DB) _query(ctx context.Context, query string) (*blockIter, error) {
var res *result cn, err := db.getConn(ctx)
err := db.withConn(ctx, func(cn *chpool.Conn) error { if err != nil {
if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { return nil, err
db.writeQuery(wr, query) }
writeBlock(ctx, wr, nil)
}); err != nil { if err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
return err db.writeQuery(wr, query)
} writeBlock(ctx, wr, nil)
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error { }); err != nil {
var err error return nil, err
res, err = readDataBlocks(rd, model) }
return err
}) return newBlockIter(db, cn), nil
})
return res, err
} }
func (db *DB) insert( func (db *DB) insert(
@ -454,21 +482,37 @@ func (db *DB) makeQueryBytes() []byte {
// Rows is the result of a query. Its cursor starts before the first row of the result set. // Rows is the result of a query. Its cursor starts before the first row of the result set.
// Use Next to advance from row to row. // Use Next to advance from row to row.
type Rows struct { type Rows struct {
blocks []*chschema.Block ctx context.Context
blocks *blockIter
block *chschema.Block
block *chschema.Block rowIndex int
blockIndex int hasNext bool
rowIndex int closed bool
} }
func newRows() *Rows { func newRows(ctx context.Context, blocks *blockIter) *Rows {
return new(Rows) return &Rows{
ctx: ctx,
blocks: blocks,
block: new(chschema.Block),
}
} }
func (rs *Rows) Close() error { func (rs *Rows) Close() error {
if !rs.closed {
for rs.blocks.Next(rs.ctx, rs.block) {
}
rs.close()
}
return nil return nil
} }
func (rs *Rows) close() {
rs.closed = true
_ = rs.blocks.Close()
}
func (rs *Rows) ColumnTypes() ([]*sql.ColumnType, error) { func (rs *Rows) ColumnTypes() ([]*sql.ColumnType, error) {
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }
@ -478,25 +522,25 @@ func (rs *Rows) Columns() ([]string, error) {
} }
func (rs *Rows) Err() error { func (rs *Rows) Err() error {
return nil return rs.blocks.Err()
} }
func (rs *Rows) Next() bool { func (rs *Rows) Next() bool {
if rs.block != nil && rs.rowIndex < rs.block.NumRow { if rs.closed {
rs.rowIndex++ return false
return true
} }
for rs.blockIndex < len(rs.blocks) { for rs.rowIndex >= rs.block.NumRow {
rs.block = rs.blocks[rs.blockIndex] if !rs.blocks.Next(rs.ctx, rs.block) {
rs.blockIndex++ rs.close()
if rs.block.NumRow > 0 { return false
rs.rowIndex = 1
return true
} }
rs.rowIndex = 0
} }
return false rs.hasNext = true
rs.rowIndex++
return true
} }
func (rs *Rows) NextResultSet() bool { func (rs *Rows) NextResultSet() bool {
@ -504,9 +548,14 @@ func (rs *Rows) NextResultSet() bool {
} }
func (rs *Rows) Scan(dest ...any) error { func (rs *Rows) Scan(dest ...any) error {
if rs.block == nil { if rs.closed {
return rs.Err()
}
if !rs.hasNext {
return errors.New("ch: Scan called without calling Next") return errors.New("ch: Scan called without calling Next")
} }
rs.hasNext = false
if rs.block.NumColumn != len(dest) { if rs.block.NumColumn != len(dest) {
return fmt.Errorf("ch: got %d columns, but Scan has %d values", return fmt.Errorf("ch: got %d columns, but Scan has %d values",
@ -522,11 +571,6 @@ func (rs *Rows) Scan(dest ...any) error {
return nil return nil
} }
func (rs *Rows) ScanBlock(block *chschema.Block) error {
rs.blocks = append(rs.blocks, block)
return nil
}
type Row struct { type Row struct {
rows *Rows rows *Rows
err error err error

View File

@ -48,6 +48,8 @@ func TestCHError(t *testing.T) {
} }
func TestCHTimeout(t *testing.T) { func TestCHTimeout(t *testing.T) {
t.Skip()
ctx := context.Background() ctx := context.Background()
db := chDB(ch.WithTimeout(time.Second), ch.WithMaxRetries(0)) db := chDB(ch.WithTimeout(time.Second), ch.WithMaxRetries(0))

View File

@ -0,0 +1,32 @@
//go:build !amd64 && !arm64
package chschema
import (
"github.com/uptrace/go-clickhouse/ch/chproto"
)
{{- range . }}
func (c *{{ .CHType }}Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.Alloc(numRow)
for i := range c.Column {
n, err := rd.{{ .CHType }}()
if err != nil {
return err
}
c.Column[i] = n
}
return nil
}
func (c *{{ .CHType }}Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
wr.{{ .CHType }}(n)
}
return nil
}
{{- end }}

View File

@ -0,0 +1,49 @@
//go:build amd64 || arm64
package chschema
import (
"io"
"reflect"
"unsafe"
"github.com/uptrace/go-clickhouse/ch/chproto"
)
{{- range . }}
func (c *{{ .CHType }}Column) ReadFrom(rd *chproto.Reader, numRow int) error {
const size = {{ .Size }} / 8
if numRow == 0 {
return nil
}
c.Alloc(numRow)
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
dest := *(*[]byte)(unsafe.Pointer(&slice))
_, err := io.ReadFull(rd, dest)
return err
}
func (c *{{ .CHType }}Column) WriteTo(wr *chproto.Writer) error {
const size = {{ .Size }} / 8
if len(c.Column) == 0 {
return nil
}
slice := *(*reflect.SliceHeader)(unsafe.Pointer(&c.Column))
slice.Len *= size
slice.Cap *= size
src := *(*[]byte)(unsafe.Pointer(&slice))
wr.Write(src)
return nil
}
{{- end }}

View File

@ -20,6 +20,81 @@ const (
chRevision = 54428 chRevision = 54428
) )
type blockIter struct {
db *DB
cn *chpool.Conn
stickyErr error
}
func newBlockIter(db *DB, cn *chpool.Conn) *blockIter {
return &blockIter{
db: db,
cn: cn,
}
}
func (it *blockIter) Close() error {
if it.cn != nil {
it.db.releaseConn(it.cn, it.stickyErr)
it.cn = nil
}
return nil
}
func (it *blockIter) Err() error {
return it.stickyErr
}
func (it *blockIter) Next(ctx context.Context, block *chschema.Block) bool {
if it.stickyErr != nil {
return false
}
ok, err := it.read(ctx, block)
if err != nil {
it.stickyErr = err
return false
}
return ok
}
func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) {
rd := it.cn.Reader(ctx, it.db.cfg.ReadTimeout)
for {
packet, err := rd.Uvarint()
if err != nil {
return false, err
}
switch packet {
case chproto.ServerData:
if err := readBlock(rd, block); err != nil {
return false, err
}
return true, nil
case chproto.ServerException:
return false, readException(rd)
case chproto.ServerProgress:
if err := readProgress(rd); err != nil {
return false, err
}
case chproto.ServerProfileInfo:
if err := readProfileInfo(rd); err != nil {
return false, err
}
case chproto.ServerTableColumns:
if err := readServerTableColumns(rd); err != nil {
return false, err
}
case chproto.ServerEndOfStream:
return false, nil
default:
return false, fmt.Errorf("ch: blockIter.Next: unexpected packet: %d", packet)
}
}
}
func (db *DB) hello(ctx context.Context, cn *chpool.Conn) error { func (db *DB) hello(ctx context.Context, cn *chpool.Conn) error {
err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) { err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
wr.Uvarint(chproto.ClientHello) wr.Uvarint(chproto.ClientHello)
@ -254,8 +329,9 @@ func readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) {
} }
} }
func readDataBlocks(rd *chproto.Reader, model Model) (*result, error) { func readDataBlocks(rd *chproto.Reader) (*result, error) {
var res *result var res *result
block := new(chschema.Block)
for { for {
packet, err := rd.Uvarint() packet, err := rd.Uvarint()
if err != nil { if err != nil {
@ -264,11 +340,6 @@ func readDataBlocks(rd *chproto.Reader, model Model) (*result, error) {
switch packet { switch packet {
case chproto.ServerData: case chproto.ServerData:
block := new(chschema.Block)
if model, ok := model.(TableModel); ok {
block.Table = model.Table()
}
if err := readBlock(rd, block); err != nil { if err := readBlock(rd, block); err != nil {
return nil, err return nil, err
} }
@ -277,12 +348,6 @@ func readDataBlocks(rd *chproto.Reader, model Model) (*result, error) {
res = new(result) res = new(result)
} }
res.affected += block.NumRow res.affected += block.NumRow
if model != nil {
if err := model.ScanBlock(block); err != nil {
return nil, err
}
}
case chproto.ServerException: case chproto.ServerException:
return nil, readException(rd) return nil, readException(rd)
case chproto.ServerProgress: case chproto.ServerProgress:
@ -337,7 +402,6 @@ func readPacket(rd *chproto.Reader) (*result, error) {
} }
} }
// TODO: return block
func readBlock(rd *chproto.Reader, block *chschema.Block) error { func readBlock(rd *chproto.Reader, block *chschema.Block) error {
if _, err := rd.String(); err != nil { if _, err := rd.String(); err != nil {
return err return err

View File

@ -94,13 +94,46 @@ func (q *baseQuery) newModel(values ...any) (Model, error) {
return q.tableModel, nil return q.tableModel, nil
} }
func (q *baseQuery) query(ctx context.Context, model Model, query string) (*result, error) {
blocks, err := q.db.query(ctx, query)
if err != nil {
return nil, err
}
res := &result{
model: model,
}
block := new(chschema.Block)
if model, ok := model.(TableModel); ok {
block.Table = model.Table()
}
for blocks.Next(ctx, block) {
if err := model.ScanBlock(block); err != nil {
return nil, err
}
res.affected += block.NumRow
}
if err := blocks.Err(); err != nil {
return nil, err
}
if model, ok := model.(AfterScanRowHook); ok {
if err := model.AfterScanRow(ctx); err != nil {
return nil, err
}
}
return res, nil
}
func (q *baseQuery) exec( func (q *baseQuery) exec(
ctx context.Context, ctx context.Context,
iquery Query, iquery Query,
query string, query string,
) (sql.Result, error) { ) (sql.Result, error) {
ctx, event := q.db.beforeQuery(ctx, iquery, query, nil, q.tableModel) ctx, event := q.db.beforeQuery(ctx, iquery, query, nil, q.tableModel)
res, err := q.db.query(ctx, nil, query) res, err := q.db.exec(ctx, query)
q.db.afterQuery(ctx, event, res, err) q.db.afterQuery(ctx, event, res, err)
return res, err return res, err
} }

View File

@ -496,7 +496,7 @@ func (q *SelectQuery) scan(ctx context.Context, columnar bool, values ...any) er
query := internal.String(queryBytes) query := internal.String(queryBytes)
ctx, evt := q.db.beforeQuery(ctx, q, query, nil, model) ctx, evt := q.db.beforeQuery(ctx, q, query, nil, model)
res, err := q.db.query(ctx, model, query) res, err := q.query(ctx, model, query)
q.db.afterQuery(ctx, evt, res, err) q.db.afterQuery(ctx, evt, res, err)
if err != nil { if err != nil {
return err return err