2022-01-23 09:36:24 +02:00
|
|
|
package ch
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"os"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/chpool"
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/chproto"
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/chschema"
|
2022-05-18 16:23:57 +03:00
|
|
|
"go.opentelemetry.io/otel/trace"
|
2022-01-23 09:36:24 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
clientName = "go-clickhouse"
|
2022-05-18 16:23:57 +03:00
|
|
|
chVersionMajor = 1
|
|
|
|
chVersionMinor = 1
|
|
|
|
chProtoVersion = chproto.DBMS_TCP_PROTOCOL_VERSION
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
osUser = os.Getenv("USER")
|
|
|
|
hostname, _ = os.Hostname()
|
2022-01-23 09:36:24 +02:00
|
|
|
)
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
type blockIter struct {
|
|
|
|
db *DB
|
|
|
|
cn *chpool.Conn
|
|
|
|
|
|
|
|
stickyErr error
|
|
|
|
}
|
|
|
|
|
|
|
|
func newBlockIter(db *DB, cn *chpool.Conn) *blockIter {
|
|
|
|
return &blockIter{
|
|
|
|
db: db,
|
|
|
|
cn: cn,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (it *blockIter) Close() error {
|
|
|
|
if it.cn != nil {
|
2022-04-30 16:54:06 +03:00
|
|
|
it.close()
|
2022-04-30 10:30:34 +03:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-04-30 16:54:06 +03:00
|
|
|
func (it *blockIter) close() {
|
|
|
|
it.db.releaseConn(it.cn, it.stickyErr)
|
|
|
|
it.cn = nil
|
|
|
|
}
|
|
|
|
|
2022-04-30 10:30:34 +03:00
|
|
|
func (it *blockIter) Err() error {
|
|
|
|
return it.stickyErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (it *blockIter) Next(ctx context.Context, block *chschema.Block) bool {
|
2022-04-30 16:54:06 +03:00
|
|
|
if it.cn == nil {
|
2022-04-30 10:30:34 +03:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
ok, err := it.read(ctx, block)
|
|
|
|
if err != nil {
|
|
|
|
it.stickyErr = err
|
2022-04-30 16:54:06 +03:00
|
|
|
it.close()
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
it.close()
|
2022-04-30 10:30:34 +03:00
|
|
|
return false
|
|
|
|
}
|
2022-04-30 16:54:06 +03:00
|
|
|
return true
|
2022-04-30 10:30:34 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (it *blockIter) read(ctx context.Context, block *chschema.Block) (bool, error) {
|
|
|
|
rd := it.cn.Reader(ctx, it.db.cfg.ReadTimeout)
|
|
|
|
for {
|
|
|
|
packet, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
switch packet {
|
|
|
|
case chproto.ServerData:
|
2022-05-18 16:23:57 +03:00
|
|
|
if err := it.db.readBlock(rd, block, true); err != nil {
|
2022-04-30 10:30:34 +03:00
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
return true, nil
|
|
|
|
case chproto.ServerException:
|
|
|
|
return false, readException(rd)
|
|
|
|
case chproto.ServerProgress:
|
2022-05-18 16:23:57 +03:00
|
|
|
if err := readProgress(it.cn, rd); err != nil {
|
2022-04-30 10:30:34 +03:00
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
case chproto.ServerProfileInfo:
|
|
|
|
if err := readProfileInfo(rd); err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
case chproto.ServerTableColumns:
|
|
|
|
if err := readServerTableColumns(rd); err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
2022-05-18 16:23:57 +03:00
|
|
|
case chproto.ServerProfileEvents:
|
|
|
|
block := new(chschema.Block)
|
|
|
|
if err := it.db.readBlock(rd, block, false); err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
2022-04-30 10:30:34 +03:00
|
|
|
case chproto.ServerEndOfStream:
|
|
|
|
return false, nil
|
|
|
|
default:
|
|
|
|
return false, fmt.Errorf("ch: blockIter.Next: unexpected packet: %d", packet)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
func (db *DB) hello(ctx context.Context, cn *chpool.Conn) error {
|
|
|
|
err := cn.WithWriter(ctx, db.cfg.WriteTimeout, func(wr *chproto.Writer) {
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.WriteByte(chproto.ClientHello)
|
2022-01-23 09:36:24 +02:00
|
|
|
writeClientInfo(wr)
|
|
|
|
|
|
|
|
wr.String(db.cfg.Database)
|
|
|
|
wr.String(db.cfg.User)
|
|
|
|
wr.String(db.cfg.Password)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return cn.WithReader(ctx, db.cfg.ReadTimeout, func(rd *chproto.Reader) error {
|
|
|
|
packet, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
switch packet {
|
|
|
|
case chproto.ServerHello:
|
|
|
|
return cn.ServerInfo.ReadFrom(rd)
|
|
|
|
case chproto.ServerException:
|
|
|
|
return readException(rd)
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("ch: hello: unexpected packet: %d", packet)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func writeClientInfo(wr *chproto.Writer) {
|
|
|
|
wr.String(clientName)
|
|
|
|
wr.Uvarint(chVersionMajor)
|
|
|
|
wr.Uvarint(chVersionMinor)
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.Uvarint(chProtoVersion)
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func readException(rd *chproto.Reader) (err error) {
|
|
|
|
var exc Error
|
|
|
|
|
|
|
|
if exc.Code, err = rd.Int32(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if exc.Name, err = rd.String(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if exc.Message, err = rd.String(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
exc.Message = strings.TrimSpace(strings.TrimPrefix(exc.Message, exc.Name+":"))
|
|
|
|
|
|
|
|
if exc.StackTrace, err = rd.String(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
hasNested, err := rd.Bool()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if hasNested {
|
|
|
|
exc.nested = readException(rd)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &exc
|
|
|
|
}
|
|
|
|
|
|
|
|
func readProfileInfo(rd *chproto.Reader) error {
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Bool(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Bool(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
func readProgress(cn *chpool.Conn, rd *chproto.Reader) error {
|
2022-01-23 09:36:24 +02:00
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-05-18 16:23:57 +03:00
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO {
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func writePing(wr *chproto.Writer) {
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.WriteByte(chproto.ClientPing)
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func readPong(rd *chproto.Reader) error {
|
|
|
|
for {
|
|
|
|
packet, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
switch packet {
|
|
|
|
case chproto.ServerPong:
|
|
|
|
return nil
|
|
|
|
case chproto.ServerException:
|
|
|
|
return readException(rd)
|
|
|
|
case chproto.ServerEndOfStream:
|
|
|
|
return nil
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("ch: readPong: unexpected packet: %d", packet)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
func (db *DB) writeQuery(ctx context.Context, cn *chpool.Conn, wr *chproto.Writer, query string) {
|
|
|
|
wr.WriteByte(chproto.ClientQuery)
|
|
|
|
wr.String("") // query id
|
2022-01-23 09:36:24 +02:00
|
|
|
|
|
|
|
// TODO: use QuerySecondary - https://github.com/ClickHouse/ClickHouse/blob/master/dbms/src/Client/Connection.cpp#L388-L404
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.WriteByte(chproto.QueryInitial)
|
2022-01-23 09:36:24 +02:00
|
|
|
wr.String("") // initial user
|
|
|
|
wr.String("") // initial query id
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.String(cn.LocalAddr().String())
|
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME {
|
|
|
|
wr.Int64(0) // initial_query_start_time_microseconds
|
|
|
|
}
|
|
|
|
wr.WriteByte(1) // interface [tcp - 1, http - 2]
|
|
|
|
wr.String(osUser)
|
2022-01-23 09:36:24 +02:00
|
|
|
wr.String(hostname)
|
|
|
|
writeClientInfo(wr)
|
2022-05-18 16:23:57 +03:00
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO {
|
|
|
|
wr.String("") // quota key
|
|
|
|
}
|
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH {
|
|
|
|
wr.Uvarint(0)
|
|
|
|
}
|
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_REVISION_WITH_VERSION_PATCH {
|
|
|
|
wr.Uvarint(0) // client version patch
|
|
|
|
}
|
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_REVISION_WITH_OPENTELEMETRY {
|
|
|
|
if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() {
|
|
|
|
wr.WriteByte(1)
|
|
|
|
{
|
|
|
|
v := spanCtx.TraceID()
|
|
|
|
wr.UUID(v[:])
|
|
|
|
}
|
|
|
|
{
|
|
|
|
v := spanCtx.SpanID()
|
|
|
|
wr.Write(reverseBytes(v[:]))
|
|
|
|
}
|
|
|
|
wr.String(spanCtx.TraceState().String())
|
|
|
|
wr.WriteByte(byte(spanCtx.TraceFlags()))
|
|
|
|
} else {
|
|
|
|
wr.WriteByte(0)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS {
|
|
|
|
wr.Uvarint(0) // collaborate_with_initiator
|
|
|
|
wr.Uvarint(0) // count_participating_replicas
|
|
|
|
wr.Uvarint(0) // number_of_current_replica
|
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
db.writeSettings(cn, wr)
|
2022-01-23 09:36:24 +02:00
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
if cn.ServerInfo.Revision >= chproto.DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET {
|
|
|
|
wr.String("")
|
|
|
|
}
|
|
|
|
wr.Uvarint(2) // state complete
|
2022-05-02 09:31:14 +03:00
|
|
|
wr.Bool(db.cfg.Compression)
|
2022-01-23 09:36:24 +02:00
|
|
|
wr.String(query)
|
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
func reverseBytes(b []byte) []byte {
|
|
|
|
for i, j := 0, len(b)-1; i < j; i, j = i+1, j-1 {
|
|
|
|
b[i], b[j] = b[j], b[i]
|
|
|
|
}
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) writeSettings(cn *chpool.Conn, wr *chproto.Writer) {
|
2022-01-23 09:36:24 +02:00
|
|
|
for key, value := range db.cfg.QuerySettings {
|
|
|
|
wr.String(key)
|
2022-05-18 16:23:57 +03:00
|
|
|
|
|
|
|
if cn.ServerInfo.Revision > chproto.DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS {
|
|
|
|
wr.Bool(true) // is_important
|
|
|
|
wr.String(fmt.Sprint(value))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
switch value := value.(type) {
|
|
|
|
case string:
|
|
|
|
wr.String(value)
|
|
|
|
case int:
|
|
|
|
wr.Uvarint(uint64(value))
|
|
|
|
case int64:
|
|
|
|
wr.Uvarint(uint64(value))
|
|
|
|
case uint64:
|
|
|
|
wr.Uvarint(value)
|
|
|
|
case bool:
|
|
|
|
wr.Bool(value)
|
|
|
|
default:
|
|
|
|
panic(fmt.Errorf("%s setting has unsupported type: %T", key, value))
|
|
|
|
}
|
2022-05-18 16:23:57 +03:00
|
|
|
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.String("") // end of settings
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
var emptyBlock chschema.Block
|
|
|
|
|
2022-05-02 09:31:14 +03:00
|
|
|
func (db *DB) writeBlock(ctx context.Context, wr *chproto.Writer, block *chschema.Block) {
|
2022-01-23 09:36:24 +02:00
|
|
|
if block == nil {
|
|
|
|
block = &emptyBlock
|
|
|
|
}
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.WriteByte(chproto.ClientData)
|
2022-01-23 09:36:24 +02:00
|
|
|
wr.String("")
|
|
|
|
|
2022-05-02 09:31:14 +03:00
|
|
|
wr.WithCompression(db.cfg.Compression, func() error {
|
2022-01-23 09:36:24 +02:00
|
|
|
writeBlockInfo(wr)
|
|
|
|
return block.WriteTo(wr)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func writeBlockInfo(wr *chproto.Writer) {
|
|
|
|
wr.Uvarint(1)
|
|
|
|
wr.Bool(false)
|
|
|
|
|
|
|
|
wr.Uvarint(2)
|
|
|
|
wr.Int32(-1)
|
|
|
|
|
|
|
|
wr.Uvarint(0)
|
|
|
|
}
|
|
|
|
|
2022-05-02 09:31:14 +03:00
|
|
|
func (db *DB) readSampleBlock(rd *chproto.Reader) (*chschema.Block, error) {
|
2022-01-23 09:36:24 +02:00
|
|
|
for {
|
|
|
|
packet, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
switch packet {
|
|
|
|
case chproto.ServerData:
|
|
|
|
block := new(chschema.Block)
|
2022-05-18 16:23:57 +03:00
|
|
|
if err := db.readBlock(rd, block, true); err != nil {
|
2022-01-23 09:36:24 +02:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return block, nil
|
|
|
|
case chproto.ServerTableColumns:
|
|
|
|
if err := readServerTableColumns(rd); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
case chproto.ServerException:
|
|
|
|
return nil, readException(rd)
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("ch: readSampleBlock: unexpected packet: %d", packet)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
func (db *DB) readDataBlocks(cn *chpool.Conn, rd *chproto.Reader) (*result, error) {
|
2022-01-23 09:36:24 +02:00
|
|
|
var res *result
|
2022-04-30 10:30:34 +03:00
|
|
|
block := new(chschema.Block)
|
2022-01-23 09:36:24 +02:00
|
|
|
for {
|
|
|
|
packet, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
switch packet {
|
2022-05-18 16:23:57 +03:00
|
|
|
case chproto.ServerData, chproto.ServerTotals, chproto.ServerExtremes:
|
|
|
|
if err := db.readBlock(rd, block, true); err != nil {
|
2022-01-23 09:36:24 +02:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if res == nil {
|
|
|
|
res = new(result)
|
|
|
|
}
|
|
|
|
res.affected += block.NumRow
|
|
|
|
case chproto.ServerException:
|
|
|
|
return nil, readException(rd)
|
|
|
|
case chproto.ServerProgress:
|
2022-05-18 16:23:57 +03:00
|
|
|
if err := readProgress(cn, rd); err != nil {
|
2022-01-23 09:36:24 +02:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
case chproto.ServerProfileInfo:
|
|
|
|
if err := readProfileInfo(rd); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
case chproto.ServerTableColumns:
|
|
|
|
if err := readServerTableColumns(rd); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-05-18 16:23:57 +03:00
|
|
|
case chproto.ServerProfileEvents:
|
|
|
|
block := new(chschema.Block)
|
|
|
|
if err := db.readBlock(rd, block, false); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-01-23 09:36:24 +02:00
|
|
|
case chproto.ServerEndOfStream:
|
|
|
|
return res, nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("ch: readDataBlocks: unexpected packet: %d", packet)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
func readPacket(cn *chpool.Conn, rd *chproto.Reader) (*result, error) {
|
2022-01-23 09:36:24 +02:00
|
|
|
packet, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
res := new(result)
|
|
|
|
switch packet {
|
|
|
|
case chproto.ServerException:
|
|
|
|
return nil, readException(rd)
|
|
|
|
case chproto.ServerProgress:
|
2022-05-18 16:23:57 +03:00
|
|
|
if err := readProgress(cn, rd); err != nil {
|
2022-01-23 09:36:24 +02:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return res, nil
|
|
|
|
case chproto.ServerProfileInfo:
|
|
|
|
if err := readProfileInfo(rd); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return res, nil
|
|
|
|
case chproto.ServerTableColumns:
|
|
|
|
if err := readServerTableColumns(rd); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return res, nil
|
|
|
|
case chproto.ServerEndOfStream:
|
|
|
|
return res, nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("ch: readPacket: unexpected packet: %d", packet)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
func (db *DB) readBlock(rd *chproto.Reader, block *chschema.Block, compressible bool) error {
|
2022-01-23 09:36:24 +02:00
|
|
|
if _, err := rd.String(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-05-18 16:23:57 +03:00
|
|
|
return rd.WithCompression(compressible && db.cfg.Compression, func() error {
|
2022-01-23 09:36:24 +02:00
|
|
|
if err := readBlockInfo(rd); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
numColumn, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
numRow, err := rd.Uvarint()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
block.NumColumn = int(numColumn)
|
|
|
|
block.NumRow = int(numRow)
|
|
|
|
|
|
|
|
for i := 0; i < int(numColumn); i++ {
|
|
|
|
colName, err := rd.String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if colName == "" {
|
|
|
|
return errors.New("ch: column has empty name")
|
|
|
|
}
|
|
|
|
|
|
|
|
colType, err := rd.String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if colType == "" {
|
|
|
|
return fmt.Errorf("ch: column=%s has empty type", colName)
|
|
|
|
}
|
|
|
|
|
|
|
|
col := block.Column(colName, colType)
|
|
|
|
if err := col.ReadFrom(rd, int(numRow)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func readBlockInfo(rd *chproto.Reader) error {
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Bool(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := rd.Int32(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err := rd.Uvarint(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func writeCancel(wr *chproto.Writer) {
|
2022-05-18 16:23:57 +03:00
|
|
|
wr.WriteByte(chproto.ClientCancel)
|
2022-01-23 09:36:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func readServerTableColumns(rd *chproto.Reader) error {
|
|
|
|
_, err := rd.String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
_, err = rd.String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|