1
0
mirror of https://github.com/axllent/mailpit.git synced 2025-01-24 03:47:38 +02:00
mailpit/storage/migrationTasks.go

201 lines
4.4 KiB
Go
Raw Normal View History

package storage
import (
"bytes"
"context"
"database/sql"
"strings"
"time"
"github.com/axllent/mailpit/config"
"github.com/axllent/mailpit/utils/logger"
"github.com/jhillyerd/enmime"
"github.com/leporo/sqlf"
"golang.org/x/text/language"
"golang.org/x/text/message"
)
func dataMigrations() {
updateOrderByCreatedTask()
assignMessageIDsTask()
}
// Update Created column using Created metadata datetime <= v1.6.5
// Migration task implemented 05/2023 - can be removed end 2023
func updateOrderByCreatedTask() {
q := sqlf.From("mailbox").
Select("ID").
Select(`json_extract(Metadata, '$.Created') as Created`).
Where("Created < ?", 1155000600)
toUpdate := make(map[string]int64)
p := message.NewPrinter(language.English)
if err := q.QueryAndClose(nil, db, func(row *sql.Rows) {
var id string
var ts sql.NullString
if err := row.Scan(&id, &ts); err != nil {
logger.Log().Error("[migration]", err)
return
}
if !ts.Valid {
logger.Log().Errorf("[migration] cannot get Created timestamp from %s", id)
return
}
t, _ := time.Parse(time.RFC3339Nano, ts.String)
toUpdate[id] = t.UnixMilli()
}); err != nil {
logger.Log().Error("[migration]", err)
return
}
total := len(toUpdate)
if total == 0 {
return
}
logger.Log().Infof("[migration] updating timestamp for %s messages", p.Sprintf("%d", len(toUpdate)))
// begin a transaction
ctx := context.Background()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
logger.Log().Error("[migration]", err)
return
}
// roll back if it fails
defer tx.Rollback()
var blockTime = time.Now()
count := 0
for id, ts := range toUpdate {
count++
_, err := tx.Exec(`UPDATE mailbox SET Created = ? WHERE ID = ?`, ts, id)
if err != nil {
logger.Log().Error("[migration]", err)
}
if count%1000 == 0 {
percent := (100 * count) / total
logger.Log().Infof("[migration] updated timestamp for 1,000 messages [%d%%] in %s", percent, time.Since(blockTime))
blockTime = time.Now()
}
}
logger.Log().Infof("[migration] commit %s changes", p.Sprintf("%d", count))
if err := tx.Commit(); err != nil {
logger.Log().Error("[migration]", err)
return
}
logger.Log().Infof("[migration] complete")
}
// Find any messages without a stored Message-ID and update it <= v1.6.5
// Migration task implemented 05/2023 - can be removed end 2023
func assignMessageIDsTask() {
if !config.IgnoreDuplicateIDs {
return
}
q := sqlf.From("mailbox").
Select("ID").
Where("MessageID = ''")
missingIDS := make(map[string]string)
if err := q.QueryAndClose(nil, db, func(row *sql.Rows) {
var id string
if err := row.Scan(&id); err != nil {
logger.Log().Error("[migration]", err)
return
}
missingIDS[id] = ""
}); err != nil {
logger.Log().Error("[migration]", err)
}
if len(missingIDS) == 0 {
return
}
var count int
var blockTime = time.Now()
p := message.NewPrinter(language.English)
total := len(missingIDS)
logger.Log().Infof("[migration] extracting Message-IDs for %s messages", p.Sprintf("%d", total))
for id := range missingIDS {
raw, err := GetMessageRaw(id)
if err != nil {
logger.Log().Error("[migration]", err)
continue
}
r := bytes.NewReader(raw)
env, err := enmime.ReadEnvelope(r)
if err != nil {
logger.Log().Error("[migration]", err)
continue
}
messageID := strings.Trim(env.GetHeader("Message-ID"), "<>")
missingIDS[id] = messageID
count++
if count%1000 == 0 {
percent := (100 * count) / total
logger.Log().Infof("[migration] extracted 1,000 Message-IDs [%d%%] in %s", percent, time.Since(blockTime))
blockTime = time.Now()
}
}
// begin a transaction
ctx := context.Background()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
logger.Log().Error("[migration]", err)
return
}
// roll back if it fails
defer tx.Rollback()
count = 0
for id, mid := range missingIDS {
_, err = tx.Exec(`UPDATE mailbox SET MessageID = ? WHERE ID = ?`, mid, id)
if err != nil {
logger.Log().Error("[migration]", err)
}
count++
if count%1000 == 0 {
percent := (100 * count) / total
logger.Log().Infof("[migration] stored 1,000 Message-IDs [%d%%] in %s", percent, time.Since(blockTime))
blockTime = time.Now()
}
}
logger.Log().Infof("[migration] commit %s changes", p.Sprintf("%d", count))
if err := tx.Commit(); err != nil {
logger.Log().Error("[migration]", err)
return
}
logger.Log().Infof("[migration] complete")
}