From f2100be620af466e00740ebfd0835347540d61ce Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 8 Mar 2023 13:29:55 +0200 Subject: [PATCH] feat: add on cluster option --- ch/query_table_create.go | 14 ++++++++++++++ ch/query_table_drop.go | 16 +++++++++++++++- chmigrate/migrator.go | 11 +++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/ch/query_table_create.go b/ch/query_table_create.go index 919fd30..4e2bcc0 100644 --- a/ch/query_table_create.go +++ b/ch/query_table_create.go @@ -12,6 +12,7 @@ type CreateTableQuery struct { baseQuery ifNotExists bool + onCluster chschema.QueryWithArgs engine chschema.QueryWithArgs ttl chschema.QueryWithArgs partition chschema.QueryWithArgs @@ -68,6 +69,11 @@ func (q *CreateTableQuery) IfNotExists() *CreateTableQuery { return q } +func (q *CreateTableQuery) OnCluster(query string, args ...any) *CreateTableQuery { + q.onCluster = chschema.SafeQuery(query, args) + return q +} + func (q *CreateTableQuery) Engine(query string, args ...any) *CreateTableQuery { q.engine = chschema.SafeQuery(query, args) return q @@ -119,6 +125,14 @@ func (q *CreateTableQuery) AppendQuery(fmter chschema.Formatter, b []byte) (_ [] return nil, err } + if !q.onCluster.IsEmpty() { + b = append(b, " ON CLUSTER "...) + b, err = q.onCluster.AppendQuery(fmter, b) + if err != nil { + return nil, err + } + } + b = append(b, " ("...) for i, field := range q.table.Fields { diff --git a/ch/query_table_drop.go b/ch/query_table_drop.go index 010238b..f9feca7 100644 --- a/ch/query_table_drop.go +++ b/ch/query_table_drop.go @@ -11,7 +11,8 @@ import ( type DropTableQuery struct { baseQuery - ifExists bool + ifExists bool + onCluster chschema.QueryWithArgs } var _ Query = (*DropTableQuery)(nil) @@ -56,6 +57,11 @@ func (q *DropTableQuery) IfExists() *DropTableQuery { return q } +func (q *DropTableQuery) OnCluster(query string, args ...any) *DropTableQuery { + q.onCluster = chschema.SafeQuery(query, args) + return q +} + //------------------------------------------------------------------------------ func (q *DropTableQuery) Operation() string { @@ -77,6 +83,14 @@ func (q *DropTableQuery) AppendQuery(fmter chschema.Formatter, b []byte) (_ []by return nil, err } + if !q.onCluster.IsEmpty() { + b = append(b, " ON CLUSTER "...) + b, err = q.onCluster.AppendQuery(fmter, b) + if err != nil { + return nil, err + } + } + return b, nil } diff --git a/chmigrate/migrator.go b/chmigrate/migrator.go index 5dcbeae..045317c 100644 --- a/chmigrate/migrator.go +++ b/chmigrate/migrator.go @@ -33,6 +33,12 @@ func WithReplicated(on bool) MigratorOption { } } +func WithOnCluster(cluster string) MigratorOption { + return func(m *Migrator) { + m.onCluster = cluster + } +} + // WithMarkAppliedOnSuccess sets the migrator to only mark migrations as applied/unapplied // when their up/down is successful func WithMarkAppliedOnSuccess(enabled bool) MigratorOption { @@ -50,6 +56,7 @@ type Migrator struct { table string locksTable string replicated bool + onCluster string markAppliedOnSuccess bool } @@ -109,6 +116,7 @@ func (m *Migrator) Init(ctx context.Context) error { return q.Engine("CollapsingMergeTree(sign)") }). ModelTableExpr(m.table). + OnCluster(m.onCluster). IfNotExists(). Exec(ctx); err != nil { return err @@ -122,6 +130,7 @@ func (m *Migrator) Init(ctx context.Context) error { return q.Engine("MergeTree") }). ModelTableExpr(m.locksTable). + OnCluster(m.onCluster). IfNotExists(). Exec(ctx); err != nil { return err @@ -133,6 +142,7 @@ func (m *Migrator) Reset(ctx context.Context) error { if _, err := m.db.NewDropTable(). Model((*Migration)(nil)). ModelTableExpr(m.table). + OnCluster(m.onCluster). IfExists(). Exec(ctx); err != nil { return err @@ -140,6 +150,7 @@ func (m *Migrator) Reset(ctx context.Context) error { if _, err := m.db.NewDropTable(). Model((*migrationLock)(nil)). ModelTableExpr(m.locksTable). + OnCluster(m.onCluster). IfExists(). Exec(ctx); err != nil { return err