2020-10-16 22:27:16 +02:00
package sqlstore
import (
2021-04-17 09:06:57 +02:00
"bytes"
2022-03-22 15:24:34 +01:00
"context"
2021-05-24 19:06:11 +02:00
"database/sql"
2022-03-22 15:24:34 +01:00
"embed"
2021-04-17 09:06:57 +02:00
"fmt"
2022-03-22 15:24:34 +01:00
"path/filepath"
2021-04-17 09:06:57 +02:00
"text/template"
2020-10-22 13:34:42 +02:00
2022-03-22 15:24:34 +01:00
"github.com/mattermost/morph/models"
"github.com/mattermost/mattermost-server/v6/shared/mlog"
"github.com/mattermost/morph"
drivers "github.com/mattermost/morph/drivers"
mysql "github.com/mattermost/morph/drivers/mysql"
postgres "github.com/mattermost/morph/drivers/postgres"
sqlite "github.com/mattermost/morph/drivers/sqlite"
mbindata "github.com/mattermost/morph/sources/go_bindata"
2021-05-24 19:06:11 +02:00
mysqldriver "github.com/go-sql-driver/mysql"
2021-07-08 21:09:02 -04:00
_ "github.com/lib/pq" // postgres driver
2021-11-11 17:01:43 +01:00
2022-03-22 15:24:34 +01:00
sq "github.com/Masterminds/squirrel"
"github.com/mattermost/focalboard/server/model"
2021-11-11 17:01:43 +01:00
"github.com/mattermost/mattermost-plugin-api/cluster"
)
2022-03-22 15:24:34 +01:00
//go:embed migrations
var assets embed . FS
2021-11-11 17:01:43 +01:00
const (
uniqueIDsMigrationRequiredVersion = 14
2021-04-17 09:06:57 +02:00
2022-03-22 15:24:34 +01:00
tempSchemaMigrationTableName = "temp_schema_migration"
)
2021-04-17 09:06:57 +02:00
2021-06-11 11:18:11 +02:00
func appendMultipleStatementsFlag ( connectionString string ) ( string , error ) {
config , err := mysqldriver . ParseDSN ( connectionString )
if err != nil {
return "" , err
}
if config . Params == nil {
config . Params = map [ string ] string { }
}
config . Params [ "multiStatements" ] = "true"
return config . FormatDSN ( ) , nil
}
2021-05-24 19:06:11 +02:00
// migrations in MySQL need to run with the multiStatements flag
// enabled, so this method creates a new connection ensuring that it's
2021-06-21 05:21:42 -04:00
// enabled.
2021-08-02 16:48:15 +02:00
func ( s * SQLStore ) getMigrationConnection ( ) ( * sql . DB , error ) {
connectionString := s . connectionString
2022-03-22 15:24:34 +01:00
if s . dbType == model . MysqlDBType {
2021-08-02 16:48:15 +02:00
var err error
connectionString , err = appendMultipleStatementsFlag ( s . connectionString )
if err != nil {
return nil , err
}
2021-05-24 19:06:11 +02:00
}
db , err := sql . Open ( s . dbType , connectionString )
if err != nil {
return nil , err
}
if err = db . Ping ( ) ; err != nil {
return nil , err
}
return db , nil
}
2020-10-16 22:27:16 +02:00
func ( s * SQLStore ) Migrate ( ) error {
2022-03-22 15:24:34 +01:00
var driver drivers . Driver
2020-10-16 22:27:16 +02:00
var err error
2020-10-22 13:34:42 +02:00
2022-03-22 15:24:34 +01:00
migrationConfig := drivers . Config {
StatementTimeoutInSecs : 1000000 ,
MigrationsTable : fmt . Sprintf ( "%sschema_migrations" , s . tablePrefix ) ,
}
if s . dbType == model . SqliteDBType {
driver , err = sqlite . WithInstance ( s . db , & sqlite . Config { Config : migrationConfig } )
2020-10-18 02:07:35 +02:00
if err != nil {
return err
}
2020-10-16 22:27:16 +02:00
}
2020-10-22 13:34:42 +02:00
2022-03-22 15:24:34 +01:00
var db * sql . DB
if s . dbType != model . SqliteDBType {
db , err = s . getMigrationConnection ( )
if err != nil {
return err
}
defer db . Close ( )
2021-08-02 16:48:15 +02:00
}
2022-03-22 15:24:34 +01:00
if s . dbType == model . PostgresDBType {
driver , err = postgres . WithInstance ( db , & postgres . Config { Config : migrationConfig } )
2020-10-18 02:07:35 +02:00
if err != nil {
return err
}
2020-10-18 01:09:12 +02:00
}
2022-03-22 15:24:34 +01:00
if s . dbType == model . MysqlDBType {
driver , err = mysql . WithInstance ( db , & mysql . Config { Config : migrationConfig } )
2021-04-22 22:53:01 +02:00
if err != nil {
return err
}
}
2022-03-22 15:24:34 +01:00
assetsList , err := assets . ReadDir ( "migrations" )
if err != nil {
return err
}
assetNamesForDriver := make ( [ ] string , len ( assetsList ) )
for i , dirEntry := range assetsList {
assetNamesForDriver [ i ] = dirEntry . Name ( )
}
params := map [ string ] interface { } {
"prefix" : s . tablePrefix ,
"postgres" : s . dbType == model . PostgresDBType ,
"sqlite" : s . dbType == model . SqliteDBType ,
"mysql" : s . dbType == model . MysqlDBType ,
"plugin" : s . isPlugin ,
}
migrationAssets := & mbindata . AssetSource {
Names : assetNamesForDriver ,
AssetFunc : func ( name string ) ( [ ] byte , error ) {
asset , mErr := assets . ReadFile ( filepath . Join ( "migrations" , name ) )
if mErr != nil {
return nil , mErr
}
tmpl , pErr := template . New ( "sql" ) . Parse ( string ( asset ) )
if pErr != nil {
return nil , pErr
}
buffer := bytes . NewBufferString ( "" )
err = tmpl . Execute ( buffer , params )
if err != nil {
return nil , err
}
return buffer . Bytes ( ) , nil
} ,
}
2021-04-20 11:27:20 +02:00
2022-03-22 15:24:34 +01:00
src , err := mbindata . WithInstance ( migrationAssets )
2020-10-18 01:09:12 +02:00
if err != nil {
return err
2020-10-16 22:27:16 +02:00
}
2022-03-22 15:24:34 +01:00
defer src . Close ( )
2021-09-08 10:22:03 +05:30
2022-03-22 15:24:34 +01:00
opts := [ ] morph . EngineOption {
morph . WithLock ( "mm-lock-key" ) ,
2021-04-17 09:06:57 +02:00
}
2020-10-18 01:09:12 +02:00
2022-03-22 15:24:34 +01:00
if s . dbType == model . SqliteDBType {
opts = opts [ : 0 ] // sqlite driver does not support locking, it doesn't need to anyway.
}
engine , err := morph . New ( context . Background ( ) , driver , src , opts ... )
2020-10-16 22:27:16 +02:00
if err != nil {
return err
}
2022-03-22 15:24:34 +01:00
defer engine . Close ( )
2021-11-11 17:01:43 +01:00
var mutex * cluster . Mutex
if s . isPlugin {
var mutexErr error
mutex , mutexErr = s . NewMutexFn ( "Boards_dbMutex" )
if mutexErr != nil {
return fmt . Errorf ( "error creating database mutex: %w" , mutexErr )
}
}
if s . isPlugin {
s . logger . Debug ( "Acquiring cluster lock for Unique IDs migration" )
mutex . Lock ( )
}
2022-03-22 15:24:34 +01:00
if err := s . migrateSchemaVersionTable ( src . Migrations ( ) ) ; err != nil {
return err
}
if err := ensureMigrationsAppliedUpToVersion ( engine , driver , uniqueIDsMigrationRequiredVersion ) ; err != nil {
return err
}
2021-11-11 17:01:43 +01:00
if err := s . runUniqueIDsMigration ( ) ; err != nil {
if s . isPlugin {
s . logger . Debug ( "Releasing cluster lock for Unique IDs migration" )
mutex . Unlock ( )
}
return fmt . Errorf ( "error running unique IDs migration: %w" , err )
}
2022-03-22 15:24:34 +01:00
if err := ensureMigrationsAppliedUpToVersion ( engine , driver , categoriesUUIDIDMigrationRequiredVersion ) ; err != nil {
return err
}
if err := s . runCategoryUUIDIDMigration ( ) ; err != nil {
if s . isPlugin {
s . logger . Debug ( "Releasing cluster lock for Unique IDs migration" )
mutex . Unlock ( )
}
return fmt . Errorf ( "error running categoryID migration: %w" , err )
}
if err := s . deleteOldSchemaMigrationTable ( ) ; err != nil {
return err
}
2021-11-11 17:01:43 +01:00
if s . isPlugin {
s . logger . Debug ( "Releasing cluster lock for Unique IDs migration" )
mutex . Unlock ( )
}
2022-03-22 15:24:34 +01:00
return engine . ApplyAll ( )
}
// migrateSchemaVersionTable converts the schema version table from
// the old format used by go-migrate to the new format used by
// gomorph.
// When running the Focalboard with go-migrate's schema version table
// existing in the database, gomorph is unable to make sense of it as it's
// not in the format required by gomorph.
func ( s * SQLStore ) migrateSchemaVersionTable ( migrations [ ] * models . Migration ) error {
migrationNeeded , err := s . isSchemaMigrationNeeded ( )
if err != nil {
return err
}
if ! migrationNeeded {
return nil
}
s . logger . Info ( "Migrating schema migration to new format" )
legacySchemaVersion , err := s . getLegacySchemaVersion ( )
if err != nil {
return err
}
if err := s . createTempSchemaTable ( ) ; err != nil {
return err
}
if err := s . populateTempSchemaTable ( migrations , legacySchemaVersion ) ; err != nil {
return err
}
if err := s . useNewSchemaTable ( ) ; err != nil {
2021-11-11 17:01:43 +01:00
return err
}
return nil
}
2022-03-22 15:24:34 +01:00
func ( s * SQLStore ) isSchemaMigrationNeeded ( ) ( bool , error ) {
// Check if `dirty` column exists on schema version table.
// This column exists only for the old schema version table.
// SQLite needs a bit of a special handling
if s . dbType == model . SqliteDBType {
return s . isSchemaMigrationNeededSQLite ( )
}
query := s . getQueryBuilder ( s . db ) .
Select ( "count(*)" ) .
From ( "information_schema.COLUMNS" ) .
Where ( sq . Eq {
"TABLE_NAME" : s . tablePrefix + "schema_migrations" ,
"COLUMN_NAME" : "dirty" ,
} )
row := query . QueryRow ( )
var count int
if err := row . Scan ( & count ) ; err != nil {
s . logger . Error ( "failed to check for columns of schema_migrations table" , mlog . Err ( err ) )
return false , err
}
return count == 1 , nil
}
func ( s * SQLStore ) isSchemaMigrationNeededSQLite ( ) ( bool , error ) {
// the way to check presence of a column is different
// for SQLite. Hence, the separate function
query := fmt . Sprintf ( "PRAGMA table_info(\"%sschema_migrations\");" , s . tablePrefix )
rows , err := s . db . Query ( query )
if err != nil {
s . logger . Error ( "SQLite - failed to check for columns in schema_migrations table" , mlog . Err ( err ) )
return false , err
}
defer s . CloseRows ( rows )
data := [ ] [ ] * string { }
for rows . Next ( ) {
// PRAGMA returns 6 columns
row := make ( [ ] * string , 6 )
err := rows . Scan (
& row [ 0 ] ,
& row [ 1 ] ,
& row [ 2 ] ,
& row [ 3 ] ,
& row [ 4 ] ,
& row [ 5 ] ,
)
if err != nil {
s . logger . Error ( "error scanning rows from SQLite schema_migrations table definition" , mlog . Err ( err ) )
return false , err
}
data = append ( data , row )
}
nameColumnFound := false
for _ , row := range data {
if len ( row ) >= 2 && * row [ 1 ] == "dirty" {
nameColumnFound = true
break
}
}
return nameColumnFound , nil
}
func ( s * SQLStore ) getLegacySchemaVersion ( ) ( uint32 , error ) {
query := s . getQueryBuilder ( s . db ) .
Select ( "version" ) .
From ( s . tablePrefix + "schema_migrations" )
row := query . QueryRow ( )
var version uint32
if err := row . Scan ( & version ) ; err != nil {
s . logger . Error ( "error fetching legacy schema version" , mlog . Err ( err ) )
s . logger . Error ( "getLegacySchemaVersion err " + err . Error ( ) )
return version , err
}
return version , nil
}
func ( s * SQLStore ) createTempSchemaTable ( ) error {
// squirrel doesn't support DDL query in query builder
// so, we need to use a plain old string
query := fmt . Sprintf ( "CREATE TABLE IF NOT EXISTS %s (Version bigint NOT NULL, Name varchar(64) NOT NULL, PRIMARY KEY (Version))" , s . tablePrefix + tempSchemaMigrationTableName )
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to create temporary schema migration table" , mlog . Err ( err ) )
s . logger . Error ( "createTempSchemaTable error " + err . Error ( ) )
return err
}
return nil
}
func ( s * SQLStore ) populateTempSchemaTable ( migrations [ ] * models . Migration , legacySchemaVersion uint32 ) error {
query := s . getQueryBuilder ( s . db ) .
Insert ( s . tablePrefix + tempSchemaMigrationTableName ) .
Columns ( "Version" , "Name" )
for _ , migration := range migrations {
// migrations param contains both up and down variant for
// each migration. Skipping for either one (down in this case)
// to process a migration only a single time.
if migration . Direction == models . Down {
continue
}
if migration . Version > legacySchemaVersion {
break
}
query = query . Values ( migration . Version , migration . Name )
}
if _ , err := query . Exec ( ) ; err != nil {
s . logger . Error ( "failed to insert migration records into temporary schema table" , mlog . Err ( err ) )
return err
}
return nil
}
func ( s * SQLStore ) useNewSchemaTable ( ) error {
// first delete the old table, then
// rename the new table to old table's name
// renaming old schema migration table. Will delete later once the migration is
// complete, just in case.
var query string
if s . dbType == model . MysqlDBType {
query = fmt . Sprintf ( "RENAME TABLE `%sschema_migrations` TO `%sschema_migrations_old_temp`" , s . tablePrefix , s . tablePrefix )
} else {
query = fmt . Sprintf ( "ALTER TABLE %sschema_migrations RENAME TO %sschema_migrations_old_temp" , s . tablePrefix , s . tablePrefix )
}
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to rename old schema migration table" , mlog . Err ( err ) )
return err
}
// renaming new temp table to old table's name
if s . dbType == model . MysqlDBType {
query = fmt . Sprintf ( "RENAME TABLE `%s%s` TO `%sschema_migrations`" , s . tablePrefix , tempSchemaMigrationTableName , s . tablePrefix )
} else {
query = fmt . Sprintf ( "ALTER TABLE %s%s RENAME TO %sschema_migrations" , s . tablePrefix , tempSchemaMigrationTableName , s . tablePrefix )
}
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to rename temp schema table" , mlog . Err ( err ) )
return err
}
return nil
}
func ( s * SQLStore ) deleteOldSchemaMigrationTable ( ) error {
query := "DROP TABLE IF EXISTS " + s . tablePrefix + "schema_migrations_old_temp"
if _ , err := s . db . Exec ( query ) ; err != nil {
s . logger . Error ( "failed to delete old temp schema migrations table" , mlog . Err ( err ) )
return err
}
return nil
}
func ensureMigrationsAppliedUpToVersion ( engine * morph . Morph , driver drivers . Driver , version int ) error {
applied , err := driver . AppliedMigrations ( )
if err != nil {
2021-11-11 17:01:43 +01:00
return err
}
2022-03-22 15:24:34 +01:00
currentVersion := len ( applied )
2021-11-11 17:01:43 +01:00
// if the target version is below or equal to the current one, do
// not migrate either because is not needed (both are equal) or
// because it would downgrade the database (is below)
if version <= currentVersion {
return nil
}
2022-03-22 15:24:34 +01:00
if _ , err = engine . Apply ( version - currentVersion ) ; err != nil {
2020-10-16 22:27:16 +02:00
return err
}
2020-10-22 13:34:42 +02:00
2020-10-16 22:27:16 +02:00
return nil
}