1
0
mirror of https://github.com/axllent/mailpit.git synced 2025-03-17 21:18:19 +02:00

Chore: Better handling of automatic database compression (vacuuming) after deleting messages

This commit is contained in:
Ralph Slooten 2024-02-17 11:12:37 +13:00
parent 9861bf96e1
commit 400d5a36c1
6 changed files with 256 additions and 137 deletions

160
internal/storage/cron.go Normal file
View File

@ -0,0 +1,160 @@
package storage
import (
"context"
"database/sql"
"strings"
"time"
"github.com/axllent/mailpit/config"
"github.com/axllent/mailpit/internal/logger"
"github.com/axllent/mailpit/server/websockets"
"github.com/leporo/sqlf"
)
// Database cron runs every minute
func dbCron() {
for {
time.Sleep(60 * time.Second)
currentTime := time.Now()
sinceLastDbAction := currentTime.Sub(dbLastAction)
// only run the database has been idle for 5 minutes
if sinceLastDbAction.Minutes() >= 5 {
deletedSize := getDeletedSize()
if deletedSize > 0 {
total := totalMessagesSize()
deletedPercent := deletedSize * 100 / total
// only vacuum the DB if at least 1% of mail storage size has been deleted
if deletedPercent >= 1 {
logger.Log().Debugf("[db] deleted messages is %d%% of total size, reclaim space", deletedPercent)
vacuumDb()
}
}
}
pruneMessages()
}
}
// PruneMessages will auto-delete the oldest messages if messages > config.MaxMessages.
// Set config.MaxMessages to 0 to disable.
func pruneMessages() {
if config.MaxMessages < 1 {
return
}
start := time.Now()
q := sqlf.Select("ID, Size").
From("mailbox").
OrderBy("Created DESC").
Limit(5000).
Offset(config.MaxMessages)
ids := []string{}
var prunedSize int64
var size int
if err := q.Query(nil, db, func(row *sql.Rows) {
var id string
if err := row.Scan(&id, &size); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
ids = append(ids, id)
prunedSize = prunedSize + int64(size)
}); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
if len(ids) == 0 {
return
}
tx, err := db.BeginTx(context.Background(), nil)
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
args := make([]interface{}, len(ids))
for i, id := range ids {
args[i] = id
}
_, err = tx.Query(`DELETE FROM mailbox WHERE ID IN (?`+strings.Repeat(",?", len(ids)-1)+`)`, args...) // #nosec
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
_, err = tx.Query(`DELETE FROM mailbox_data WHERE ID IN (?`+strings.Repeat(",?", len(ids)-1)+`)`, args...) // #nosec
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
_, err = tx.Query(`DELETE FROM message_tags WHERE ID IN (?`+strings.Repeat(",?", len(ids)-1)+`)`, args...) // #nosec
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
err = tx.Commit()
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
if err := tx.Rollback(); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
}
if err := pruneUnusedTags(); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
addDeletedSize(prunedSize)
dbLastAction = time.Now()
elapsed := time.Since(start)
logger.Log().Debugf("[db] auto-pruned %d messages in %s", len(ids), elapsed)
logMessagesDeleted(len(ids))
websockets.Broadcast("prune", nil)
}
// Vacuum the database to reclaim space from deleted messages
func vacuumDb() {
start := time.Now()
// set WAL file checkpoint
if _, err := db.Exec("PRAGMA wal_checkpoint"); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
// vacuum database
if _, err := db.Exec("VACUUM"); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
// truncate WAL file
if _, err := db.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
if err := SettingPut("DeletedSize", "0"); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
elapsed := time.Since(start)
logger.Log().Debugf("[db] vacuumed database in %s", elapsed)
}

View File

@ -37,8 +37,6 @@ var (
dbFile string
dbIsTemp bool
dbLastAction time.Time
dbIsIdle bool
deletedSize int64
// zstd compression encoder & decoder
dbEncoder, _ = zstd.NewWriter(nil)
@ -654,7 +652,7 @@ func DeleteOneMessage(id string) error {
}
dbLastAction = time.Now()
deletedSize = deletedSize + int64(m.Size)
addDeletedSize(int64(m.Size))
logMessagesDeleted(1)
@ -708,14 +706,13 @@ func DeleteAllMessages() error {
return err
}
_, err = db.Exec("VACUUM")
if err == nil {
elapsed := time.Since(start)
logger.Log().Debugf("[db] deleted %d messages in %s", total, elapsed)
}
elapsed := time.Since(start)
logger.Log().Debugf("[db] deleted %d messages in %s", total, elapsed)
vacuumDb()
dbLastAction = time.Now()
deletedSize = 0
SettingPut("DeletedSize", "0")
logMessagesDeleted(total)

View File

@ -88,6 +88,18 @@ var (
CREATE INDEX IF NOT EXISTS idx_message_tag_id ON message_tags (ID);
CREATE INDEX IF NOT EXISTS idx_message_tag_tagid ON message_tags (TagID);`,
},
{
// assume deleted messages account for 50% of storage
// to handle previously-deleted messages
Version: 1.5,
Description: "Create settings table",
Script: `CREATE TABLE IF NOT EXISTS settings (
Key TEXT,
Value TEXT
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_settings_key ON settings (Key);
INSERT INTO settings (Key, Value) VALUES("DeletedSize", (SELECT SUM(Size)/2 FROM mailbox));`,
},
}
)

View File

@ -193,7 +193,7 @@ func DeleteSearch(search string) error {
}
dbLastAction = time.Now()
deletedSize = deletedSize + int64(deleteSize)
addDeletedSize(int64(deleteSize))
logMessagesDeleted(total)

View File

@ -0,0 +1,75 @@
package storage
import (
"database/sql"
"github.com/axllent/mailpit/internal/logger"
"github.com/leporo/sqlf"
)
// SettingGet returns a setting string value, blank is it does not exist
func SettingGet(k string) string {
var result string
err := sqlf.From("settings").
Select("Value").To(&result).
Where("Key = ?", k).
Limit(1).
QueryAndClose(nil, db, func(row *sql.Rows) {})
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return result
}
return result
}
// SettingPut sets a setting string value, inserting if new
func SettingPut(k, v string) error {
_, err := db.Exec("INSERT INTO settings (Key, Value) VALUES(?, ?) ON CONFLICT(Key) DO UPDATE SET Value = ?", k, v, v)
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
return err
}
// The total deleted message size as an int64 value
func getDeletedSize() int64 {
var result int64
err := sqlf.From("settings").
Select("Value").To(&result).
Where("Key = ?", "DeletedSize").
Limit(1).
QueryAndClose(nil, db, func(row *sql.Rows) {})
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return result
}
return result
}
// The total raw non-compressed messages size in bytes of all messages in the database
func totalMessagesSize() int64 {
var result int64
err := sqlf.From("mailbox").
Select("SUM(Size)").To(&result).
QueryAndClose(nil, db, func(row *sql.Rows) {})
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return result
}
return result
}
// AddDeletedSize will add the value to the DeletedSize setting
func addDeletedSize(v int64) {
if _, err := db.Exec("INSERT OR IGNORE INTO settings (Key, Value) VALUES(?, ?)", "DeletedSize", 0); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
if _, err := db.Exec("UPDATE settings SET Value = Value + ? WHERE Key = ?", v, "DeletedSize"); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
}

View File

@ -1,21 +1,14 @@
package storage
import (
"context"
"database/sql"
"net/mail"
"os"
"regexp"
"strings"
"sync"
"time"
"github.com/axllent/mailpit/config"
"github.com/axllent/mailpit/internal/html2text"
"github.com/axllent/mailpit/internal/logger"
"github.com/axllent/mailpit/server/websockets"
"github.com/jhillyerd/enmime"
"github.com/leporo/sqlf"
)
var (
@ -77,124 +70,6 @@ func cleanString(str string) string {
return strings.ToLower(strings.Join(strings.Fields(strings.TrimSpace(str)), " "))
}
// Auto-prune runs every minute to automatically delete oldest messages
// if total is greater than the threshold
func dbCron() {
for {
time.Sleep(60 * time.Second)
start := time.Now()
// check if database contains deleted data and has not been in use
// for 5 minutes, if so VACUUM
currentTime := time.Now()
diff := currentTime.Sub(dbLastAction)
// get DB file size
fileInfo, err := os.Stat(config.DataFile)
if err != nil {
logger.Log().Errorf("[db] unable to stat database %s: %s", config.DataFile, err.Error())
continue
}
deletedPercent := deletedSize * 100 / fileInfo.Size()
// only vacuum DB when at least 2% of mail storage size has been deleted
// as this saves a lot of CPU on large databases
if deletedPercent >= 1 && diff.Minutes() > 5 {
logger.Log().Debugf("[db] compressing database as %d%% has been deleted", deletedPercent)
deletedSize = 0
_, err := db.Exec("VACUUM")
if err == nil {
elapsed := time.Since(start)
logger.Log().Debugf("[db] compressed idle database in %s", elapsed)
}
continue
}
if config.MaxMessages > 0 {
q := sqlf.Select("ID, Size").
From("mailbox").
OrderBy("Created DESC").
Limit(5000).
Offset(config.MaxMessages)
ids := []string{}
var prunedSize int64
var size int
if err := q.Query(nil, db, func(row *sql.Rows) {
var id string
if err := row.Scan(&id, &size); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
return
}
ids = append(ids, id)
prunedSize = prunedSize + int64(size)
}); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
continue
}
if len(ids) == 0 {
continue
}
tx, err := db.BeginTx(context.Background(), nil)
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
continue
}
args := make([]interface{}, len(ids))
for i, id := range ids {
args[i] = id
}
_, err = tx.Query(`DELETE FROM mailbox WHERE ID IN (?`+strings.Repeat(",?", len(ids)-1)+`)`, args...) // #nosec
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
continue
}
_, err = tx.Query(`DELETE FROM mailbox_data WHERE ID IN (?`+strings.Repeat(",?", len(ids)-1)+`)`, args...) // #nosec
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
continue
}
_, err = tx.Query(`DELETE FROM message_tags WHERE ID IN (?`+strings.Repeat(",?", len(ids)-1)+`)`, args...) // #nosec
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
continue
}
err = tx.Commit()
if err != nil {
logger.Log().Errorf("[db] %s", err.Error())
if err := tx.Rollback(); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
}
if err := pruneUnusedTags(); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
deletedSize = deletedSize + prunedSize
elapsed := time.Since(start)
logger.Log().Debugf("[db] auto-pruned %d messages in %s", len(ids), elapsed)
logMessagesDeleted(len(ids))
websockets.Broadcast("prune", nil)
}
}
}
// LogMessagesDeleted logs the number of messages deleted
func logMessagesDeleted(n int) {
mu.Lock()
@ -212,7 +87,7 @@ func isFile(path string) bool {
return true
}
// InArray tests if a string in within an array. It is not case sensitive.
// Tests if a string is within an array. It is not case sensitive.
func inArray(k string, arr []string) bool {
k = strings.ToLower(k)
for _, v := range arr {
@ -224,7 +99,7 @@ func inArray(k string, arr []string) bool {
return false
}
// escPercentChar replaces `%` with `%%` for SQL searches
// Convert `%` to `%%` for SQL searches
func escPercentChar(s string) string {
return strings.ReplaceAll(s, "%", "%%")
}