1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-06-08 23:26:11 +02:00
go-clickhouse/ch/chschema/column_array.go
2022-04-30 14:56:46 +03:00

540 lines
11 KiB
Go

package chschema
import (
"fmt"
"reflect"
"unsafe"
"github.com/uptrace/go-clickhouse/ch/chproto"
)
type ArrayColumnar interface {
WriteOffset(wr *chproto.Writer, offset int) int
WriteData(wr *chproto.Writer) error
}
//------------------------------------------------------------------------------
type ArrayColumnOf[T any] struct {
Column [][]T
elem Columnar
}
func (c *ArrayColumnOf[T]) Reset(numRow int) {
if cap(c.Column) >= numRow {
c.Column = c.Column[:0]
} else {
c.Column = make([][]T, 0, numRow)
}
}
func (c *ArrayColumnOf[T]) Set(v any) {
c.Column = v.([][]T)
}
func (c *ArrayColumnOf[T]) Value() any {
return c.Column
}
func (c *ArrayColumnOf[T]) Nullable(nulls UInt8Column) any {
panic("not implemented")
}
func (c *ArrayColumnOf[T]) Len() int {
return len(c.Column)
}
func (c *ArrayColumnOf[T]) Index(idx int) any {
return c.Column[idx]
}
func (c *ArrayColumnOf[T]) Slice(s, e int) any {
return c.Column[s:e]
}
func (c *ArrayColumnOf[T]) ConvertAssign(idx int, v reflect.Value) error {
v.Set(reflect.ValueOf(c.Column[idx]))
return nil
}
func (c *ArrayColumnOf[T]) AppendValue(v reflect.Value) {
ptr := unsafe.Pointer(v.UnsafeAddr())
c.AppendPointer(v.Type(), ptr)
}
func (c *ArrayColumnOf[T]) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) {
c.Column = append(c.Column, *(*[]T)(ptr))
}
func (c *ArrayColumnOf[T]) ReadFrom(rd *chproto.Reader, numRow int) error {
if cap(c.Column) >= numRow {
c.Column = c.Column[:numRow]
} else {
c.Column = make([][]T, numRow)
}
if numRow == 0 {
return nil
}
offsets := make([]int, numRow)
for i := 0; i < numRow; i++ {
offset, err := rd.UInt64()
if err != nil {
return err
}
offsets[i] = int(offset)
}
if err := c.elem.ReadFrom(rd, offsets[len(offsets)-1]); err != nil {
return err
}
var prev int
for i, offset := range offsets {
c.Column[i] = c.elem.Slice(prev, offset).([]T)
prev = offset
}
return nil
}
func (c *ArrayColumnOf[T]) WriteTo(wr *chproto.Writer) error {
_ = c.WriteOffset(wr, 0)
return c.WriteData(wr)
}
var _ ArrayColumnar = (*Int64ArrayColumn)(nil)
func (c *ArrayColumnOf[T]) WriteOffset(wr *chproto.Writer, offset int) int {
for _, el := range c.Column {
offset += len(el)
wr.UInt64(uint64(offset))
}
return offset
}
func (c *ArrayColumnOf[T]) WriteData(wr *chproto.Writer) error {
for _, ss := range c.Column {
c.elem.Set(ss)
if err := c.elem.WriteTo(wr); err != nil {
return err
}
}
return nil
}
//------------------------------------------------------------------------------
type Int64ArrayColumn struct {
ArrayColumnOf[int64]
}
var _ Columnar = (*Int64ArrayColumn)(nil)
func NewInt64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
return &Int64ArrayColumn{
ArrayColumnOf: ArrayColumnOf[int64]{
Column: make([][]int64, 0, numRow),
elem: NewInt64Column(typ.Elem(), "", 0),
},
}
}
func (c *Int64ArrayColumn) Type() reflect.Type {
return int64SliceType
}
//------------------------------------------------------------------------------
type Uint64ArrayColumn struct {
ArrayColumnOf[uint64]
}
var _ Columnar = (*Uint64ArrayColumn)(nil)
func NewUint64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
return &Uint64ArrayColumn{
ArrayColumnOf: ArrayColumnOf[uint64]{
Column: make([][]uint64, 0, numRow),
elem: NewUInt64Column(typ.Elem(), "", 0),
},
}
}
func (c *Uint64ArrayColumn) Type() reflect.Type {
return uint64SliceType
}
//------------------------------------------------------------------------------
type Float64ArrayColumn struct {
ArrayColumnOf[float64]
}
var _ Columnar = (*Float64ArrayColumn)(nil)
func NewFloat64ArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
return &Float64ArrayColumn{
ArrayColumnOf: ArrayColumnOf[float64]{
Column: make([][]float64, 0, numRow),
elem: NewFloat64Column(typ.Elem(), "", 0),
},
}
}
func (c *Float64ArrayColumn) Type() reflect.Type {
return float64SliceType
}
//------------------------------------------------------------------------------
type StringArrayColumn struct {
Column [][]string
elem Columnar
stringElem *StringColumn
lcElem *LCStringColumn
}
var _ Columnar = (*StringArrayColumn)(nil)
func NewStringArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
if _, funcType := aggFuncNameAndType(chType); funcType != "" {
chType = funcType
}
elemType := chArrayElemType(chType)
if elemType == "" {
panic(fmt.Errorf("invalid array type: %q (Go type is %s)",
chType, typ.String()))
}
columnar := NewColumn(typ.Elem(), elemType, 0)
var stringElem *StringColumn
var lcElem *LCStringColumn
switch v := columnar.(type) {
case *StringColumn:
stringElem = v
case *LCStringColumn:
stringElem = &v.StringColumn
lcElem = v
columnar = &ArrayLCStringColumn{v}
case *EnumColumn:
stringElem = &v.StringColumn
default:
panic(fmt.Errorf("unsupported column: %T", v))
}
return &StringArrayColumn{
Column: make([][]string, 0, numRow),
elem: columnar,
stringElem: stringElem,
lcElem: lcElem,
}
}
func (c *StringArrayColumn) Reset(numRow int) {
if cap(c.Column) >= numRow {
c.Column = c.Column[:0]
} else {
c.Column = make([][]string, 0, numRow)
}
}
func (c *StringArrayColumn) Type() reflect.Type {
return stringSliceType
}
func (c *StringArrayColumn) Set(v any) {
c.Column = v.([][]string)
}
func (c *StringArrayColumn) Value() any {
return c.Column
}
func (c *StringArrayColumn) Nullable(nulls UInt8Column) any {
panic("not implemented")
}
func (c *StringArrayColumn) Len() int {
return len(c.Column)
}
func (c *StringArrayColumn) Index(idx int) any {
return c.Column[idx]
}
func (c StringArrayColumn) Slice(s, e int) any {
return c.Column[s:e]
}
func (c *StringArrayColumn) ConvertAssign(idx int, v reflect.Value) error {
v.Set(reflect.ValueOf(c.Column[idx]))
return nil
}
func (c *StringArrayColumn) AppendValue(v reflect.Value) {
c.Column = append(c.Column, v.Interface().([]string))
}
func (c *StringArrayColumn) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) {
c.Column = append(c.Column, *(*[]string)(ptr))
}
func (c *StringArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
if numRow == 0 {
return nil
}
if cap(c.Column) >= numRow {
c.Column = c.Column[:numRow]
} else {
c.Column = make([][]string, numRow)
}
if c.lcElem != nil {
if err := c.lcElem.readPrefix(rd, numRow); err != nil {
return err
}
}
offsets := make([]int, numRow)
for i := 0; i < len(offsets); i++ {
offset, err := rd.UInt64()
if err != nil {
return err
}
offsets[i] = int(offset)
}
if err := c.elem.ReadFrom(rd, offsets[len(offsets)-1]); err != nil {
return err
}
var prev int
for i, offset := range offsets {
c.Column[i] = c.stringElem.Column[prev:offset]
prev = offset
}
return nil
}
func (c *StringArrayColumn) WriteTo(wr *chproto.Writer) error {
if c.lcElem != nil {
c.lcElem.writePrefix(wr)
}
_ = c.WriteOffset(wr, 0)
return c.WriteData(wr)
}
var _ ArrayColumnar = (*StringArrayColumn)(nil)
func (c *StringArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int {
for _, el := range c.Column {
offset += len(el)
wr.UInt64(uint64(offset))
}
return offset
}
func (c *StringArrayColumn) WriteData(wr *chproto.Writer) error {
for _, ss := range c.Column {
c.stringElem.Column = ss
if err := c.elem.WriteTo(wr); err != nil {
return err
}
}
return nil
}
//------------------------------------------------------------------------------
type ArrayLCStringColumn struct {
*LCStringColumn
}
func (c ArrayLCStringColumn) Type() reflect.Type {
return stringSliceType
}
func (c *ArrayLCStringColumn) WriteTo(wr *chproto.Writer) error {
c.writeData(wr)
return nil
}
func (c *ArrayLCStringColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
if numRow == 0 {
return nil
}
return c.readData(rd, numRow)
}
//------------------------------------------------------------------------------
type GenericArrayColumn struct {
Column reflect.Value
typ reflect.Type
elem Columnar
arrayElem ArrayColumnar
}
var _ Columnar = (*GenericArrayColumn)(nil)
func NewGenericArrayColumn(typ reflect.Type, chType string, numRow int) Columnar {
elemType := chArrayElemType(chType)
if elemType == "" {
panic(fmt.Errorf("invalid array type: %q (Go type is %s)",
chType, typ.String()))
}
elem := NewColumn(typ.Elem(), elemType, 0)
var arrayElem ArrayColumnar
if _, ok := elem.(*LCStringColumn); ok {
panic("not reached")
}
arrayElem, _ = elem.(ArrayColumnar)
c := &GenericArrayColumn{
typ: reflect.SliceOf(typ),
elem: elem,
arrayElem: arrayElem,
}
c.Column = reflect.MakeSlice(c.typ, 0, numRow)
return c
}
func (c GenericArrayColumn) Type() reflect.Type {
return c.typ.Elem()
}
func (c *GenericArrayColumn) Reset(numRow int) {
if c.Column.Cap() >= numRow {
c.Column = c.Column.Slice(0, 0)
} else {
c.Column = reflect.MakeSlice(c.typ, 0, numRow)
}
}
func (c *GenericArrayColumn) Set(v any) {
c.Column = reflect.ValueOf(v)
}
func (c *GenericArrayColumn) Value() any {
return c.Column.Interface()
}
func (c *GenericArrayColumn) Nullable(nulls UInt8Column) any {
panic("not implemented")
}
func (c *GenericArrayColumn) Len() int {
return c.Column.Len()
}
func (c *GenericArrayColumn) Index(idx int) any {
return c.Column.Index(idx).Interface()
}
func (c GenericArrayColumn) Slice(s, e int) any {
return c.Column.Slice(s, e).Interface()
}
func (c *GenericArrayColumn) ConvertAssign(idx int, v reflect.Value) error {
v.Set(c.Column.Index(idx))
return nil
}
func (c *GenericArrayColumn) AppendValue(v reflect.Value) {
c.Column = reflect.Append(c.Column, v)
}
func (c *GenericArrayColumn) AppendPointer(typ reflect.Type, ptr unsafe.Pointer) {
c.AppendValue(reflect.NewAt(typ.Elem(), ptr).Elem())
}
func (c *GenericArrayColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
if c.Column.Cap() >= numRow {
c.Column = c.Column.Slice(0, numRow)
} else {
c.Column = reflect.MakeSlice(c.typ, numRow, numRow)
}
if numRow == 0 {
return nil
}
offsets := make([]int, numRow)
for i := 0; i < len(offsets); i++ {
offset, err := rd.UInt64()
if err != nil {
return err
}
offsets[i] = int(offset)
}
if err := c.elem.ReadFrom(rd, offsets[len(offsets)-1]); err != nil {
return err
}
var prev int
for i, offset := range offsets {
c.Column.Index(i).Set(reflect.ValueOf(c.elem.Slice(prev, offset)))
prev = offset
}
return nil
}
func (c *GenericArrayColumn) WriteTo(wr *chproto.Writer) error {
_ = c.WriteOffset(wr, 0)
colLen := c.Column.Len()
for i := 0; i < colLen; i++ {
// TODO: add SetValue or SetPointer
c.elem.Set(c.Column.Index(i).Interface())
var err error
if c.arrayElem != nil {
err = c.arrayElem.WriteData(wr)
} else {
err = c.elem.WriteTo(wr)
}
if err != nil {
return err
}
}
return nil
}
func (c *GenericArrayColumn) WriteOffset(wr *chproto.Writer, offset int) int {
colLen := c.Column.Len()
for i := 0; i < colLen; i++ {
el := c.Column.Index(i)
offset += el.Len()
wr.UInt64(uint64(offset))
}
if c.arrayElem == nil {
return offset
}
offset = 0
for i := 0; i < colLen; i++ {
el := c.Column.Index(i)
c.elem.Set(el.Interface()) // Use SetValue or SetPointer
offset = c.arrayElem.WriteOffset(wr, offset)
}
return offset
}