From 988091e532f120f08738f28fb8c620e086d97c41 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 2 Jul 2022 14:39:22 +0300 Subject: [PATCH] feat(chmigrate): add WithReplicated option --- ch/query_select.go | 2 +- ch/query_table_create.go | 4 +++ chmigrate/migration.go | 65 ++++++++++++++++++++-------------------- chmigrate/migrator.go | 19 ++++++++++++ 4 files changed, 57 insertions(+), 33 deletions(-) diff --git a/ch/query_select.go b/ch/query_select.go index d5e724f..5e314d1 100644 --- a/ch/query_select.go +++ b/ch/query_select.go @@ -52,7 +52,7 @@ func (q *SelectQuery) Err(err error) *SelectQuery { return q } -func (q *SelectQuery) Apply(fn func(*SelectQuery) *SelectQuery) *SelectQuery { +func (q *SelectQuery) WithQuery(fn func(*SelectQuery) *SelectQuery) *SelectQuery { return fn(q) } diff --git a/ch/query_table_create.go b/ch/query_table_create.go index e06a804..03afeda 100644 --- a/ch/query_table_create.go +++ b/ch/query_table_create.go @@ -33,6 +33,10 @@ func (q *CreateTableQuery) Model(model any) *CreateTableQuery { return q } +func (q *CreateTableQuery) WithQuery(fn func(*CreateTableQuery) *CreateTableQuery) *CreateTableQuery { + return fn(q) +} + // ------------------------------------------------------------------------------ func (q *CreateTableQuery) Table(tables ...string) *CreateTableQuery { diff --git a/chmigrate/migration.go b/chmigrate/migration.go index 533e48e..57d1540 100644 --- a/chmigrate/migration.go +++ b/chmigrate/migration.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "io" "io/fs" "sort" "strings" @@ -14,8 +15,6 @@ import ( ) type Migration struct { - ch.CHModel `ch:"engine:CollapsingMergeTree(sign)"` - Name string `ch:",pk"` Comment string `ch:"-"` GroupID int64 @@ -42,45 +41,47 @@ func NewSQLMigrationFunc(fsys fs.FS, name string) MigrationFunc { if err != nil { return err } + return Exec(ctx, db, f) + } +} - scanner := bufio.NewScanner(f) - var queries []string +func Exec(ctx context.Context, db *ch.DB, f io.Reader) error { + scanner := bufio.NewScanner(f) + var queries []string - var query []byte - for scanner.Scan() { - b := scanner.Bytes() + var query []byte + for scanner.Scan() { + b := scanner.Bytes() - const prefix = "--migration:" - if bytes.HasPrefix(b, []byte(prefix)) { - b = b[len(prefix):] - if bytes.Equal(b, []byte("split")) { - queries = append(queries, string(query)) - query = query[:0] - continue - } - return fmt.Errorf("ch: unknown directive: %q", b) + const prefix = "--migration:" + if bytes.HasPrefix(b, []byte(prefix)) { + b = b[len(prefix):] + if bytes.Equal(b, []byte("split")) { + queries = append(queries, string(query)) + query = query[:0] + continue } - - query = append(query, b...) - query = append(query, '\n') + return fmt.Errorf("ch: unknown directive: %q", b) } - if len(query) > 0 { - queries = append(queries, string(query)) - } - if err := scanner.Err(); err != nil { + query = append(query, b...) + query = append(query, '\n') + } + + if len(query) > 0 { + queries = append(queries, string(query)) + } + if err := scanner.Err(); err != nil { + return err + } + + for _, q := range queries { + if _, err := db.ExecContext(ctx, q); err != nil { return err } - - for _, q := range queries { - _, err = db.ExecContext(ctx, q) - if err != nil { - return err - } - } - - return nil } + + return nil } const goTemplate = `package %s diff --git a/chmigrate/migrator.go b/chmigrate/migrator.go index c45cb9f..0777721 100644 --- a/chmigrate/migrator.go +++ b/chmigrate/migrator.go @@ -27,6 +27,12 @@ func WithLocksTableName(table string) MigratorOption { } } +func WithReplicated(on bool) MigratorOption { + return func(m *Migrator) { + m.replicated = on + } +} + // WithMarkAppliedOnSuccess sets the migrator to only mark migrations as applied/unapplied // when their up/down is successful func WithMarkAppliedOnSuccess(enabled bool) MigratorOption { @@ -43,6 +49,7 @@ type Migrator struct { table string locksTable string + replicated bool markAppliedOnSuccess bool } @@ -95,6 +102,12 @@ func (m *Migrator) migrationsWithStatus(ctx context.Context) (MigrationSlice, in func (m *Migrator) Init(ctx context.Context) error { if _, err := m.db.NewCreateTable(). Model((*Migration)(nil)). + WithQuery(func(q *ch.CreateTableQuery) *ch.CreateTableQuery { + if m.replicated { + return q.Engine("ReplicatedCollapsingMergeTree(sign)") + } + return q.Engine("CollapsingMergeTree(sign)") + }). ModelTableExpr(m.table). IfNotExists(). Exec(ctx); err != nil { @@ -102,6 +115,12 @@ func (m *Migrator) Init(ctx context.Context) error { } if _, err := m.db.NewCreateTable(). Model((*migrationLock)(nil)). + WithQuery(func(q *ch.CreateTableQuery) *ch.CreateTableQuery { + if m.replicated { + return q.Engine("ReplicatedMergeTree") + } + return q.Engine("MergeTree") + }). ModelTableExpr(m.locksTable). IfNotExists(). Exec(ctx); err != nil {