1
0
mirror of https://github.com/uptrace/go-clickhouse.git synced 2025-06-12 23:37:29 +02:00

feat(chmigrate): add WithReplicated option

This commit is contained in:
Vladimir Mihailenco 2022-07-02 14:39:22 +03:00
parent 87e8ceb2a0
commit 988091e532
4 changed files with 57 additions and 33 deletions

View File

@ -52,7 +52,7 @@ func (q *SelectQuery) Err(err error) *SelectQuery {
return q return q
} }
func (q *SelectQuery) Apply(fn func(*SelectQuery) *SelectQuery) *SelectQuery { func (q *SelectQuery) WithQuery(fn func(*SelectQuery) *SelectQuery) *SelectQuery {
return fn(q) return fn(q)
} }

View File

@ -33,6 +33,10 @@ func (q *CreateTableQuery) Model(model any) *CreateTableQuery {
return q return q
} }
func (q *CreateTableQuery) WithQuery(fn func(*CreateTableQuery) *CreateTableQuery) *CreateTableQuery {
return fn(q)
}
// ------------------------------------------------------------------------------ // ------------------------------------------------------------------------------
func (q *CreateTableQuery) Table(tables ...string) *CreateTableQuery { func (q *CreateTableQuery) Table(tables ...string) *CreateTableQuery {

View File

@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"io/fs" "io/fs"
"sort" "sort"
"strings" "strings"
@ -14,8 +15,6 @@ import (
) )
type Migration struct { type Migration struct {
ch.CHModel `ch:"engine:CollapsingMergeTree(sign)"`
Name string `ch:",pk"` Name string `ch:",pk"`
Comment string `ch:"-"` Comment string `ch:"-"`
GroupID int64 GroupID int64
@ -42,45 +41,47 @@ func NewSQLMigrationFunc(fsys fs.FS, name string) MigrationFunc {
if err != nil { if err != nil {
return err return err
} }
return Exec(ctx, db, f)
}
}
scanner := bufio.NewScanner(f) func Exec(ctx context.Context, db *ch.DB, f io.Reader) error {
var queries []string scanner := bufio.NewScanner(f)
var queries []string
var query []byte var query []byte
for scanner.Scan() { for scanner.Scan() {
b := scanner.Bytes() b := scanner.Bytes()
const prefix = "--migration:" const prefix = "--migration:"
if bytes.HasPrefix(b, []byte(prefix)) { if bytes.HasPrefix(b, []byte(prefix)) {
b = b[len(prefix):] b = b[len(prefix):]
if bytes.Equal(b, []byte("split")) { if bytes.Equal(b, []byte("split")) {
queries = append(queries, string(query)) queries = append(queries, string(query))
query = query[:0] query = query[:0]
continue continue
}
return fmt.Errorf("ch: unknown directive: %q", b)
} }
return fmt.Errorf("ch: unknown directive: %q", b)
query = append(query, b...)
query = append(query, '\n')
} }
if len(query) > 0 { query = append(query, b...)
queries = append(queries, string(query)) query = append(query, '\n')
} }
if err := scanner.Err(); err != nil {
if len(query) > 0 {
queries = append(queries, string(query))
}
if err := scanner.Err(); err != nil {
return err
}
for _, q := range queries {
if _, err := db.ExecContext(ctx, q); err != nil {
return err return err
} }
for _, q := range queries {
_, err = db.ExecContext(ctx, q)
if err != nil {
return err
}
}
return nil
} }
return nil
} }
const goTemplate = `package %s const goTemplate = `package %s

View File

@ -27,6 +27,12 @@ func WithLocksTableName(table string) MigratorOption {
} }
} }
func WithReplicated(on bool) MigratorOption {
return func(m *Migrator) {
m.replicated = on
}
}
// WithMarkAppliedOnSuccess sets the migrator to only mark migrations as applied/unapplied // WithMarkAppliedOnSuccess sets the migrator to only mark migrations as applied/unapplied
// when their up/down is successful // when their up/down is successful
func WithMarkAppliedOnSuccess(enabled bool) MigratorOption { func WithMarkAppliedOnSuccess(enabled bool) MigratorOption {
@ -43,6 +49,7 @@ type Migrator struct {
table string table string
locksTable string locksTable string
replicated bool
markAppliedOnSuccess bool markAppliedOnSuccess bool
} }
@ -95,6 +102,12 @@ func (m *Migrator) migrationsWithStatus(ctx context.Context) (MigrationSlice, in
func (m *Migrator) Init(ctx context.Context) error { func (m *Migrator) Init(ctx context.Context) error {
if _, err := m.db.NewCreateTable(). if _, err := m.db.NewCreateTable().
Model((*Migration)(nil)). Model((*Migration)(nil)).
WithQuery(func(q *ch.CreateTableQuery) *ch.CreateTableQuery {
if m.replicated {
return q.Engine("ReplicatedCollapsingMergeTree(sign)")
}
return q.Engine("CollapsingMergeTree(sign)")
}).
ModelTableExpr(m.table). ModelTableExpr(m.table).
IfNotExists(). IfNotExists().
Exec(ctx); err != nil { Exec(ctx); err != nil {
@ -102,6 +115,12 @@ func (m *Migrator) Init(ctx context.Context) error {
} }
if _, err := m.db.NewCreateTable(). if _, err := m.db.NewCreateTable().
Model((*migrationLock)(nil)). Model((*migrationLock)(nil)).
WithQuery(func(q *ch.CreateTableQuery) *ch.CreateTableQuery {
if m.replicated {
return q.Engine("ReplicatedMergeTree")
}
return q.Engine("MergeTree")
}).
ModelTableExpr(m.locksTable). ModelTableExpr(m.locksTable).
IfNotExists(). IfNotExists().
Exec(ctx); err != nil { Exec(ctx); err != nil {