1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-06-08 23:26:11 +02:00

309 lines
6.3 KiB
Go
Raw Normal View History

2022-01-23 09:36:24 +02:00
package chschema
import (
"fmt"
"reflect"
"github.com/codemodus/kace"
"github.com/jinzhu/inflection"
"github.com/uptrace/go-clickhouse/ch/chtype"
"github.com/uptrace/go-clickhouse/ch/internal"
"github.com/uptrace/go-clickhouse/ch/internal/tagparser"
)
const (
columnarFlag = internal.Flag(1) << iota
2022-01-23 09:36:24 +02:00
afterScanBlockHookFlag
)
var (
chModelType = reflect.TypeOf((*CHModel)(nil)).Elem()
tableNameInflector = inflection.Plural
)
type CHModel struct{}
// SetTableNameInflector overrides the default func that pluralizes
// model name to get table name, e.g. my_article becomes my_articles.
func SetTableNameInflector(fn func(string) string) {
tableNameInflector = fn
}
type Table struct {
Type reflect.Type
ModelName string
Name string
CHName Safe
CHInsertName Safe
CHAlias Safe
CHEngine string
CHPartition string
Fields []*Field // PKs + DataFields
PKs []*Field
DataFields []*Field
FieldMap map[string]*Field
flags internal.Flag
}
func newTable(typ reflect.Type) *Table {
t := new(Table)
t.Type = typ
t.ModelName = kace.Snake(t.Type.Name())
tableName := tableNameInflector(t.ModelName)
t.setName(tableName)
t.CHAlias = quoteColumnName(t.ModelName)
t.initFields()
typ = reflect.PtrTo(t.Type)
if typ.Implements(afterScanBlockHookType) {
t.flags.Set(afterScanBlockHookFlag)
}
return t
}
func (t *Table) String() string {
return "model=" + t.ModelName
}
func (t *Table) IsColumnar() bool {
return t.flags.Has(columnarFlag)
}
func (t *Table) setName(name string) {
quoted := quoteTableName(name)
t.Name = name
t.CHName = quoted
t.CHInsertName = quoted
if t.CHAlias == "" {
t.CHAlias = quoted
}
}
func (t *Table) Field(name string) (*Field, error) {
field, ok := t.FieldMap[name]
if !ok {
return nil, &UnknownColumnError{
Table: t,
Column: name,
}
}
return field, nil
}
func (t *Table) initFields() {
t.Fields = make([]*Field, 0, t.Type.NumField())
t.FieldMap = make(map[string]*Field, t.Type.NumField())
t.addFields(t.Type, nil)
}
func (t *Table) addFields(typ reflect.Type, baseIndex []int) {
for i := 0; i < typ.NumField(); i++ {
f := typ.Field(i)
tag := tagparser.Parse(f.Tag.Get("ch"))
if tag.Name == "-" {
continue
}
// Make a copy so slice is not shared between fields.
index := make([]int, len(baseIndex))
copy(index, baseIndex)
if f.Anonymous {
if f.Name == "CHModel" && f.Type == chModelType {
if len(index) == 0 {
t.processCHModelField(f)
}
continue
}
fieldType := indirectType(f.Type)
if fieldType.Kind() != reflect.Struct {
continue
}
t.addFields(fieldType, append(index, f.Index...))
if _, ok := tag.Options["inherit"]; ok {
embeddedTable := globalTables.Get(fieldType)
t.ModelName = embeddedTable.ModelName
t.CHName = embeddedTable.CHName
t.CHAlias = embeddedTable.CHAlias
}
continue
}
if field := t.newField(f, index, tag); field != nil {
t.addField(field)
}
}
for _, f := range t.FieldMap {
if t.IsColumnar() {
f.Type = f.Type.Elem()
if !f.hasFlag(customTypeFlag) {
if s := chArrayElemType(f.CHType); s != "" {
f.CHType = s
}
}
}
if f.NewColumn == nil {
2023-01-21 12:14:00 +02:00
f.NewColumn = ColumnFactory(f.CHType, f.Type)
2022-01-23 09:36:24 +02:00
}
}
}
func (t *Table) processCHModelField(f reflect.StructField) {
tag := tagparser.Parse(f.Tag.Get("ch"))
if tag.Name != "" {
t.setName(tag.Name)
}
if s, ok := tag.Option("table"); ok {
t.setName(s)
}
if s, ok := tag.Option("alias"); ok {
t.CHAlias = quoteColumnName(s)
}
if s, ok := tag.Option("insert"); ok {
t.CHInsertName = quoteTableName(s)
}
if s, ok := tag.Option("engine"); ok {
t.CHEngine = s
}
if s, ok := tag.Option("partition"); ok {
t.CHPartition = s
}
if tag.HasOption("columnar") {
t.flags |= columnarFlag
}
}
func (t *Table) newField(f reflect.StructField, index []int, tag tagparser.Tag) *Field {
if f.PkgPath != "" {
return nil
}
if tag.Name == "" {
tag.Name = kace.Snake(f.Name)
}
field := &Field{
Field: f,
Type: f.Type,
GoName: f.Name,
CHName: tag.Name,
Column: quoteColumnName(tag.Name),
Index: append(index, f.Index...),
}
field.NotNull = tag.HasOption("notnull")
field.IsPK = tag.HasOption("pk")
if s, ok := tag.Option("type"); ok {
field.CHType = s
field.setFlag(customTypeFlag)
} else {
2023-01-21 12:14:00 +02:00
field.CHType = chType(f.Type)
2022-01-23 09:36:24 +02:00
}
if tag.HasOption("lc") {
if s := chSubType(field.CHType, "Array("); s != "" && s == chtype.String {
field.CHType = "Array(LowCardinality(String))"
} else if field.CHType == chtype.String {
field.CHType = "LowCardinality(String)"
} else {
panic(fmt.Errorf("unsupported lc option on %s type", field.CHType))
}
}
if s, ok := tag.Option("default"); ok {
field.CHDefault = Safe(s)
}
field.appendValue = Appender(f.Type)
if s, ok := tag.Option("alt"); ok {
t.FieldMap[s] = field
}
if tag.HasOption("scanonly") {
t.FieldMap[field.CHName] = field
return nil
}
return field
}
func (t *Table) addField(field *Field) {
t.Fields = append(t.Fields, field)
if field.IsPK {
t.PKs = append(t.PKs, field)
} else {
t.DataFields = append(t.DataFields, field)
}
t.FieldMap[field.CHName] = field
}
2023-01-21 12:14:00 +02:00
func (t *Table) NewColumn(colName, colType string) *Column {
2022-01-23 09:36:24 +02:00
field, ok := t.FieldMap[colName]
if !ok {
internal.Logger.Printf("ch: %s has no column=%q", t, colName)
return nil
}
if colType != field.CHType {
return &Column{
Name: colName,
Type: colType,
2023-01-21 12:14:00 +02:00
Columnar: NewColumn(colType, field.Type),
2022-01-23 09:36:24 +02:00
}
}
2023-01-21 12:14:00 +02:00
col := field.NewColumn()
col.Init(field.CHType)
2022-01-23 09:36:24 +02:00
return &Column{
Name: colName,
Type: field.CHType,
2023-01-21 12:14:00 +02:00
Columnar: col,
2022-01-23 09:36:24 +02:00
}
}
func (t *Table) HasAfterScanRowHook() bool { return t.flags.Has(afterScanBlockHookFlag) }
func (t *Table) AppendNamedArg(
fmter Formatter, b []byte, name string, strct reflect.Value,
) ([]byte, bool) {
if field, ok := t.FieldMap[name]; ok {
return field.AppendValue(fmter, b, strct), true
}
return b, false
}
func quoteTableName(s string) Safe {
return Safe(appendFQN(nil, internal.Bytes(s)))
}
func quoteColumnName(s string) Safe {
return Safe(appendIdent(nil, internal.Bytes(s)))
}
//------------------------------------------------------------------------------
type UnknownColumnError struct {
Table *Table
Column string
}
func (err *UnknownColumnError) Error() string {
return fmt.Sprintf("ch: %s does not have column=%q",
err.Table, err.Column)
}