1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-06-14 23:44:59 +02:00

Merge pull request #63 from uptrace/feat/on-cluster-option

feat: add on cluster option
This commit is contained in:
Vladimir Mihailenco
2023-03-08 13:35:01 +02:00
committed by GitHub
3 changed files with 40 additions and 1 deletions

View File

@ -12,6 +12,7 @@ type CreateTableQuery struct {
baseQuery baseQuery
ifNotExists bool ifNotExists bool
onCluster chschema.QueryWithArgs
engine chschema.QueryWithArgs engine chschema.QueryWithArgs
ttl chschema.QueryWithArgs ttl chschema.QueryWithArgs
partition chschema.QueryWithArgs partition chschema.QueryWithArgs
@ -68,6 +69,11 @@ func (q *CreateTableQuery) IfNotExists() *CreateTableQuery {
return q return q
} }
func (q *CreateTableQuery) OnCluster(query string, args ...any) *CreateTableQuery {
q.onCluster = chschema.SafeQuery(query, args)
return q
}
func (q *CreateTableQuery) Engine(query string, args ...any) *CreateTableQuery { func (q *CreateTableQuery) Engine(query string, args ...any) *CreateTableQuery {
q.engine = chschema.SafeQuery(query, args) q.engine = chschema.SafeQuery(query, args)
return q return q
@ -119,6 +125,14 @@ func (q *CreateTableQuery) AppendQuery(fmter chschema.Formatter, b []byte) (_ []
return nil, err return nil, err
} }
if !q.onCluster.IsEmpty() {
b = append(b, " ON CLUSTER "...)
b, err = q.onCluster.AppendQuery(fmter, b)
if err != nil {
return nil, err
}
}
b = append(b, " ("...) b = append(b, " ("...)
for i, field := range q.table.Fields { for i, field := range q.table.Fields {

View File

@ -11,7 +11,8 @@ import (
type DropTableQuery struct { type DropTableQuery struct {
baseQuery baseQuery
ifExists bool ifExists bool
onCluster chschema.QueryWithArgs
} }
var _ Query = (*DropTableQuery)(nil) var _ Query = (*DropTableQuery)(nil)
@ -56,6 +57,11 @@ func (q *DropTableQuery) IfExists() *DropTableQuery {
return q return q
} }
func (q *DropTableQuery) OnCluster(query string, args ...any) *DropTableQuery {
q.onCluster = chschema.SafeQuery(query, args)
return q
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func (q *DropTableQuery) Operation() string { func (q *DropTableQuery) Operation() string {
@ -77,6 +83,14 @@ func (q *DropTableQuery) AppendQuery(fmter chschema.Formatter, b []byte) (_ []by
return nil, err return nil, err
} }
if !q.onCluster.IsEmpty() {
b = append(b, " ON CLUSTER "...)
b, err = q.onCluster.AppendQuery(fmter, b)
if err != nil {
return nil, err
}
}
return b, nil return b, nil
} }

View File

@ -33,6 +33,12 @@ func WithReplicated(on bool) MigratorOption {
} }
} }
func WithOnCluster(cluster string) MigratorOption {
return func(m *Migrator) {
m.onCluster = cluster
}
}
// WithMarkAppliedOnSuccess sets the migrator to only mark migrations as applied/unapplied // WithMarkAppliedOnSuccess sets the migrator to only mark migrations as applied/unapplied
// when their up/down is successful // when their up/down is successful
func WithMarkAppliedOnSuccess(enabled bool) MigratorOption { func WithMarkAppliedOnSuccess(enabled bool) MigratorOption {
@ -50,6 +56,7 @@ type Migrator struct {
table string table string
locksTable string locksTable string
replicated bool replicated bool
onCluster string
markAppliedOnSuccess bool markAppliedOnSuccess bool
} }
@ -109,6 +116,7 @@ func (m *Migrator) Init(ctx context.Context) error {
return q.Engine("CollapsingMergeTree(sign)") return q.Engine("CollapsingMergeTree(sign)")
}). }).
ModelTableExpr(m.table). ModelTableExpr(m.table).
OnCluster(m.onCluster).
IfNotExists(). IfNotExists().
Exec(ctx); err != nil { Exec(ctx); err != nil {
return err return err
@ -122,6 +130,7 @@ func (m *Migrator) Init(ctx context.Context) error {
return q.Engine("MergeTree") return q.Engine("MergeTree")
}). }).
ModelTableExpr(m.locksTable). ModelTableExpr(m.locksTable).
OnCluster(m.onCluster).
IfNotExists(). IfNotExists().
Exec(ctx); err != nil { Exec(ctx); err != nil {
return err return err
@ -133,6 +142,7 @@ func (m *Migrator) Reset(ctx context.Context) error {
if _, err := m.db.NewDropTable(). if _, err := m.db.NewDropTable().
Model((*Migration)(nil)). Model((*Migration)(nil)).
ModelTableExpr(m.table). ModelTableExpr(m.table).
OnCluster(m.onCluster).
IfExists(). IfExists().
Exec(ctx); err != nil { Exec(ctx); err != nil {
return err return err
@ -140,6 +150,7 @@ func (m *Migrator) Reset(ctx context.Context) error {
if _, err := m.db.NewDropTable(). if _, err := m.db.NewDropTable().
Model((*migrationLock)(nil)). Model((*migrationLock)(nil)).
ModelTableExpr(m.locksTable). ModelTableExpr(m.locksTable).
OnCluster(m.onCluster).
IfExists(). IfExists().
Exec(ctx); err != nil { Exec(ctx); err != nil {
return err return err