mirror of
https://github.com/uptrace/go-clickhouse.git
synced 2025-06-08 23:26:11 +02:00
463 lines
9.4 KiB
Go
463 lines
9.4 KiB
Go
package ch
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/uptrace/go-clickhouse/ch/chschema"
|
|
"github.com/uptrace/go-clickhouse/ch/internal"
|
|
)
|
|
|
|
type withQuery struct {
|
|
name string
|
|
query chschema.QueryAppender
|
|
cte bool
|
|
}
|
|
|
|
type baseQuery struct {
|
|
db *DB
|
|
err error
|
|
|
|
tableModel TableModel
|
|
table *chschema.Table
|
|
|
|
with []withQuery
|
|
modelTableName chschema.QueryWithArgs
|
|
tables []chschema.QueryWithArgs
|
|
columns []chschema.QueryWithArgs
|
|
settings []chschema.QueryWithArgs
|
|
|
|
flags internal.Flag
|
|
}
|
|
|
|
func (q *baseQuery) DB() *DB {
|
|
return q.db
|
|
}
|
|
|
|
func (q *baseQuery) GetModel() Model {
|
|
return q.tableModel
|
|
}
|
|
|
|
func (q *baseQuery) GetTableName() string {
|
|
if q.table != nil {
|
|
return q.table.Name
|
|
}
|
|
|
|
for _, wq := range q.with {
|
|
if v, ok := wq.query.(Query); ok {
|
|
if model := v.GetModel(); model != nil {
|
|
return v.GetTableName()
|
|
}
|
|
}
|
|
}
|
|
|
|
if q.modelTableName.Query != "" {
|
|
return q.modelTableName.Query
|
|
}
|
|
|
|
if len(q.tables) > 0 {
|
|
b, _ := q.tables[0].AppendQuery(q.db.fmter, nil)
|
|
if len(b) < 64 {
|
|
return string(b)
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func (q *baseQuery) setConn(db *DB) {
|
|
q.db = db
|
|
}
|
|
|
|
func (q *baseQuery) setErr(err error) {
|
|
if q.err == nil {
|
|
q.err = err
|
|
}
|
|
}
|
|
|
|
func (q *baseQuery) setTableModel(model any) {
|
|
tm, err := newTableModel(q.db, model)
|
|
if err != nil {
|
|
q.setErr(err)
|
|
return
|
|
}
|
|
q.tableModel = tm
|
|
q.table = tm.Table()
|
|
}
|
|
|
|
func (q *baseQuery) newModel(values ...any) (Model, error) {
|
|
if len(values) > 0 {
|
|
return newModel(q.db, values...)
|
|
}
|
|
return q.tableModel, nil
|
|
}
|
|
|
|
func (q *baseQuery) query(ctx context.Context, model Model, query string) (*result, error) {
|
|
blocks, err := q.db.query(ctx, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := &result{
|
|
model: model,
|
|
}
|
|
block := new(chschema.Block)
|
|
if model, ok := model.(TableModel); ok {
|
|
block.Table = model.Table()
|
|
}
|
|
|
|
for blocks.Next(ctx, block) {
|
|
if err := model.ScanBlock(block); err != nil {
|
|
return nil, err
|
|
}
|
|
res.affected += block.NumRow
|
|
}
|
|
if err := blocks.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if model, ok := model.(AfterScanRowHook); ok {
|
|
if err := model.AfterScanRow(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (q *baseQuery) exec(
|
|
ctx context.Context,
|
|
iquery Query,
|
|
query string,
|
|
) (sql.Result, error) {
|
|
ctx, event := q.db.beforeQuery(ctx, iquery, query, nil, q.tableModel)
|
|
res, err := q.db.exec(ctx, query)
|
|
q.db.afterQuery(ctx, event, res, err)
|
|
return res, err
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
func (q *baseQuery) AppendNamedArg(fmter chschema.Formatter, b []byte, name string) ([]byte, bool) {
|
|
return b, false
|
|
}
|
|
|
|
func appendColumns(b []byte, table Safe, fields []*chschema.Field) []byte {
|
|
for i, f := range fields {
|
|
if i > 0 {
|
|
b = append(b, ", "...)
|
|
}
|
|
|
|
if len(table) > 0 {
|
|
b = append(b, table...)
|
|
b = append(b, '.')
|
|
}
|
|
b = append(b, f.Column...)
|
|
}
|
|
return b
|
|
}
|
|
|
|
func formatterWithModel(
|
|
fmter chschema.Formatter, model chschema.NamedArgAppender,
|
|
) chschema.Formatter {
|
|
return fmter.WithArg(model)
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
func (q *baseQuery) addTable(table chschema.QueryWithArgs) {
|
|
q.tables = append(q.tables, table)
|
|
}
|
|
|
|
func (q *baseQuery) modelHasTableName() bool {
|
|
if !q.modelTableName.IsZero() {
|
|
return q.modelTableName.Query != ""
|
|
}
|
|
return q.table != nil
|
|
}
|
|
|
|
func (q *baseQuery) hasTables() bool {
|
|
return q.modelHasTableName() || len(q.tables) > 0
|
|
}
|
|
|
|
func (q *baseQuery) appendTables(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
|
return q._appendTables(fmter, b, false)
|
|
}
|
|
|
|
func (q *baseQuery) appendTablesWithAlias(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
|
return q._appendTables(fmter, b, true)
|
|
}
|
|
|
|
func (q *baseQuery) _appendTables(
|
|
fmter chschema.Formatter, b []byte, withAlias bool,
|
|
) (_ []byte, err error) {
|
|
startLen := len(b)
|
|
|
|
if q.modelHasTableName() {
|
|
if !q.modelTableName.IsZero() {
|
|
b, err = q.modelTableName.AppendQuery(fmter, b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
b = fmter.AppendQuery(b, string(q.table.CHName))
|
|
if withAlias && q.table.CHAlias != q.table.CHName {
|
|
b = append(b, " AS "...)
|
|
b = append(b, q.table.CHAlias...)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, table := range q.tables {
|
|
if len(b) > startLen {
|
|
b = append(b, ", "...)
|
|
}
|
|
b, err = table.AppendQuery(fmter, b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return b, nil
|
|
}
|
|
|
|
func (q *baseQuery) appendFirstTable(fmter chschema.Formatter, b []byte) ([]byte, error) {
|
|
return q._appendFirstTable(fmter, b, false)
|
|
}
|
|
|
|
func (q *baseQuery) appendFirstTableWithAlias(
|
|
fmter chschema.Formatter, b []byte,
|
|
) ([]byte, error) {
|
|
return q._appendFirstTable(fmter, b, true)
|
|
}
|
|
|
|
func (q *baseQuery) _appendFirstTable(
|
|
fmter chschema.Formatter, b []byte, withAlias bool,
|
|
) ([]byte, error) {
|
|
if !q.modelTableName.IsZero() {
|
|
return q.modelTableName.AppendQuery(fmter, b)
|
|
}
|
|
|
|
if q.table != nil {
|
|
b = fmter.AppendQuery(b, string(q.table.CHName))
|
|
if withAlias {
|
|
b = append(b, " AS "...)
|
|
b = append(b, q.table.CHAlias...)
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
if len(q.tables) > 0 {
|
|
return q.tables[0].AppendQuery(fmter, b)
|
|
}
|
|
|
|
return nil, errors.New("ch: query does not have a table")
|
|
}
|
|
|
|
func (q *baseQuery) hasMultiTables() bool {
|
|
if q.modelHasTableName() {
|
|
return len(q.tables) >= 1
|
|
}
|
|
return len(q.tables) >= 2
|
|
}
|
|
|
|
func (q *baseQuery) appendOtherTables(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
|
tables := q.tables
|
|
if !q.modelHasTableName() {
|
|
tables = tables[1:]
|
|
}
|
|
for i, table := range tables {
|
|
if i > 0 {
|
|
b = append(b, ", "...)
|
|
}
|
|
b, err = table.AppendQuery(fmter, b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
func (q *baseQuery) addColumn(column chschema.QueryWithArgs) {
|
|
q.columns = append(q.columns, column)
|
|
}
|
|
|
|
func (q *baseQuery) excludeColumn(columns []string) {
|
|
if q.columns == nil {
|
|
for _, f := range q.table.Fields {
|
|
q.columns = append(q.columns, chschema.UnsafeIdent(f.CHName))
|
|
}
|
|
}
|
|
|
|
if len(columns) == 1 && columns[0] == "*" {
|
|
q.columns = make([]chschema.QueryWithArgs, 0)
|
|
return
|
|
}
|
|
|
|
for _, column := range columns {
|
|
if !q._excludeColumn(column) {
|
|
q.setErr(fmt.Errorf("ch: can't find column=%q", column))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *baseQuery) _excludeColumn(column string) bool {
|
|
for i, col := range q.columns {
|
|
if col.Args == nil && col.Query == column {
|
|
q.columns = append(q.columns[:i], q.columns[i+1:]...)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (q *baseQuery) getFields() ([]*chschema.Field, error) {
|
|
if len(q.columns) == 0 {
|
|
if q.table == nil {
|
|
return nil, nil
|
|
}
|
|
return q.table.Fields, nil
|
|
}
|
|
|
|
fields := make([]*chschema.Field, 0, len(q.columns))
|
|
for _, col := range q.columns {
|
|
if col.Args != nil {
|
|
continue
|
|
}
|
|
|
|
field, err := q.table.Field(col.Query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fields = append(fields, field)
|
|
}
|
|
return fields, nil
|
|
}
|
|
|
|
func (q *baseQuery) appendSettings(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
|
if len(q.settings) > 0 {
|
|
b = append(b, " SETTINGS "...)
|
|
for i, opt := range q.settings {
|
|
if i > 0 {
|
|
b = append(b, ", "...)
|
|
}
|
|
b, err = opt.AppendQuery(fmter, b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
func (q *baseQuery) appendColumns(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
|
switch {
|
|
case q.columns != nil:
|
|
for i, f := range q.columns {
|
|
if i > 0 {
|
|
b = append(b, ", "...)
|
|
}
|
|
b, err = f.AppendQuery(fmter, b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
case q.table != nil:
|
|
b = appendTableColumns(b, q.table.CHAlias, q.table.Fields)
|
|
default:
|
|
b = append(b, '*')
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
func appendTableColumns(b []byte, table chschema.Safe, fields []*chschema.Field) []byte {
|
|
for i, f := range fields {
|
|
if i > 0 {
|
|
b = append(b, ", "...)
|
|
}
|
|
if len(table) > 0 {
|
|
b = append(b, table...)
|
|
b = append(b, '.')
|
|
}
|
|
b = append(b, f.Column...)
|
|
}
|
|
return b
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
type whereBaseQuery struct {
|
|
baseQuery
|
|
|
|
where []chschema.QueryWithSep
|
|
}
|
|
|
|
func (q *whereBaseQuery) addWhere(where chschema.QueryWithSep) {
|
|
q.where = append(q.where, where)
|
|
}
|
|
|
|
func (q *whereBaseQuery) addWhereGroup(sep string, where []chschema.QueryWithSep) {
|
|
if len(where) == 0 {
|
|
return
|
|
}
|
|
|
|
q.addWhere(chschema.SafeQueryWithSep("", nil, sep))
|
|
q.addWhere(chschema.SafeQueryWithSep("", nil, "("))
|
|
|
|
where[0].Sep = ""
|
|
q.where = append(q.where, where...)
|
|
|
|
q.addWhere(chschema.SafeQueryWithSep("", nil, ")"))
|
|
}
|
|
|
|
func (q *whereBaseQuery) mustAppendWhere(fmter chschema.Formatter, b []byte) ([]byte, error) {
|
|
if len(q.where) == 0 {
|
|
err := errors.New("ch: Update and Delete queries require at least one Where")
|
|
return nil, err
|
|
}
|
|
return q.appendWhere(fmter, b)
|
|
}
|
|
|
|
func (q *whereBaseQuery) appendWhere(fmter chschema.Formatter, b []byte) (_ []byte, err error) {
|
|
if len(q.where) == 0 {
|
|
return b, nil
|
|
}
|
|
|
|
b = append(b, " WHERE "...)
|
|
|
|
b, err = appendWhere(fmter, b, q.where)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return b, nil
|
|
}
|
|
|
|
func appendWhere(
|
|
fmter chschema.Formatter, b []byte, where []chschema.QueryWithSep,
|
|
) (_ []byte, err error) {
|
|
for i, where := range where {
|
|
if i > 0 || where.Sep == "(" {
|
|
b = append(b, where.Sep...)
|
|
}
|
|
|
|
if where.Query == "" {
|
|
continue
|
|
}
|
|
|
|
b = append(b, '(')
|
|
b, err = where.AppendQuery(fmter, b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
b = append(b, ')')
|
|
}
|
|
return b, nil
|
|
}
|