From 6e9760d5d93bedb999c613b0cb1765e739a9bc63 Mon Sep 17 00:00:00 2001 From: Ralph Slooten Date: Fri, 28 Feb 2025 22:53:30 +1300 Subject: [PATCH] Feature: Add configuration to set message compression level in db (0-3) (#447 & #448) --- cmd/root.go | 5 +++ config/config.go | 8 +++++ internal/storage/database.go | 24 ++++++++++++-- internal/storage/messages.go | 49 ++++++++++++++++++----------- internal/storage/schemas/1.23.0.sql | 5 +++ 5 files changed, 70 insertions(+), 21 deletions(-) create mode 100644 internal/storage/schemas/1.23.0.sql diff --git a/cmd/root.go b/cmd/root.go index 29aa291..c020bd1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -83,6 +83,7 @@ func init() { initConfigFromEnv() rootCmd.Flags().StringVarP(&config.Database, "database", "d", config.Database, "Database to store persistent data") + rootCmd.Flags().IntVar(&config.Compression, "compression", config.Compression, "Compression level to store raw messages (0-3)") rootCmd.Flags().StringVar(&config.Label, "label", config.Label, "Optional label identify this Mailpit instance") rootCmd.Flags().StringVar(&config.TenantID, "tenant-id", config.TenantID, "Database tenant ID to isolate data") rootCmd.Flags().IntVarP(&config.MaxMessages, "max", "m", config.MaxMessages, "Max number of messages to store") @@ -181,6 +182,10 @@ func initConfigFromEnv() { config.Database = os.Getenv("MP_DATABASE") } + if len(os.Getenv("MP_COMPRESSION")) > 0 { + config.Compression, _ = strconv.Atoi(os.Getenv("MP_COMPRESSION")) + } + config.TenantID = os.Getenv("MP_TENANT_ID") config.Label = os.Getenv("MP_LABEL") diff --git a/config/config.go b/config/config.go index 1bcdcc3..6239408 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,10 @@ var ( // Database for mail (optional) Database string + // Compression is the compression level used to store raw messages in the database: + // 0 = off, 1 = fastest (default), 2 = standard, 3 = best compression + Compression = 1 + // TenantID is an optional prefix to be applied to all database tables, // allowing multiple isolated instances of Mailpit to share a database. TenantID string @@ -250,6 +254,10 @@ func VerifyConfig() error { Database = filepath.Join(Database, "mailpit.db") } + if Compression < 0 || Compression > 3 { + return errors.New("[db] compression level must be between 0 and 3") + } + Label = tools.Normalize(Label) if err := parseMaxAge(); err != nil { diff --git a/internal/storage/database.go b/internal/storage/database.go index ff04be5..0f37775 100644 --- a/internal/storage/database.go +++ b/internal/storage/database.go @@ -32,7 +32,7 @@ var ( dbLastAction time.Time // zstd compression encoder & decoder - dbEncoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest)) + dbEncoder *zstd.Encoder dbDecoder, _ = zstd.NewReader(nil) temporaryFiles = []string{} @@ -40,11 +40,31 @@ var ( // InitDB will initialise the database func InitDB() error { + // dbEncoder var ( dsn string err error ) + if config.Compression > 0 { + var compression zstd.EncoderLevel + switch config.Compression { + case 1: + compression = zstd.SpeedFastest + case 2: + compression = zstd.SpeedDefault + case 3: + compression = zstd.SpeedBestCompression + } + dbEncoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(compression)) + if err != nil { + return err + } + logger.Log().Debugf("[db] storing messages with compression: %s", compression.String()) + } else { + logger.Log().Debug("[db] storing messages with no compression") + } + p := config.Database if p == "" { @@ -101,7 +121,7 @@ func InitDB() error { if sqlDriver == "sqlite" { // SQLite performance tuning (https://phiresky.github.io/blog/2020/sqlite-performance-tuning/) - _, err = db.Exec("PRAGMA journal_mode = WAL; PRAGMA synchronous = normal;") + _, err = db.Exec("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;") if err != nil { return err } diff --git a/internal/storage/messages.go b/internal/storage/messages.go index 8eebe2e..757e0a1 100644 --- a/internal/storage/messages.go +++ b/internal/storage/messages.go @@ -102,17 +102,25 @@ func Store(body *[]byte) (string, error) { return "", err } - // insert compressed raw message - encoded := dbEncoder.EncodeAll(*body, make([]byte, 0, int(size))) + if config.Compression > 0 { + // insert compressed raw message + compressed := dbEncoder.EncodeAll(*body, make([]byte, 0, int(size))) - if sqlDriver == "rqlite" { - // rqlite does not support binary data in query, so we need to encode the compressed message into hexadecimal - // string and then generate the SQL query, which is more memory intensive especially with large messages - hexStr := hex.EncodeToString(encoded) - _, err = tx.Exec(fmt.Sprintf(`INSERT INTO %s (ID, Email) VALUES(?, x'%s')`, tenant("mailbox_data"), hexStr), id) // #nosec + if sqlDriver == "rqlite" { + // rqlite does not support binary data in query, so we need to encode the compressed message into hexadecimal + // string and then generate the SQL query, which is more memory intensive, especially with large messages + hexStr := hex.EncodeToString(compressed) + _, err = tx.Exec(fmt.Sprintf(`INSERT INTO %s (ID, Email, Compressed) VALUES(?, x'%s', 1)`, tenant("mailbox_data"), hexStr), id) // #nosec + } else { + _, err = tx.Exec(fmt.Sprintf(`INSERT INTO %s (ID, Email, Compressed) VALUES(?, ?, 1)`, tenant("mailbox_data")), id, compressed) // #nosec + } + + compressed = nil } else { - _, err = tx.Exec(fmt.Sprintf(`INSERT INTO %s (ID, Email) VALUES(?, ?)`, tenant("mailbox_data")), id, encoded) // #nosec + // insert uncompressed raw message + _, err = tx.Exec(fmt.Sprintf(`INSERT INTO %s (ID, Email, Compressed) VALUES(?, ?, 0)`, tenant("mailbox_data")), id, string(*body)) // #nosec } + if err != nil { return "", err } @@ -121,8 +129,6 @@ func Store(body *[]byte) (string, error) { return "", err } - encoded = nil - // extract tags using pre-set tag filters, empty slice if not set tags := findTagsInRawMessage(body) @@ -367,11 +373,12 @@ func GetMessage(id string) (*Message, error) { // GetMessageRaw returns an []byte of the full message func GetMessageRaw(id string) ([]byte, error) { - var i string - var msg string + var i, msg string + var compressed int q := sqlf.From(tenant("mailbox_data")). Select(`ID`).To(&i). Select(`Email`).To(&msg). + Select(`Compressed`).To(&compressed). Where(`ID = ?`, id) err := q.QueryRowAndClose(context.Background(), db) if err != nil { @@ -383,7 +390,7 @@ func GetMessageRaw(id string) ([]byte, error) { } var data []byte - if sqlDriver == "rqlite" { + if sqlDriver == "rqlite" && compressed == 1 { data, err = base64.StdEncoding.DecodeString(msg) if err != nil { return nil, fmt.Errorf("error decoding base64 message: %w", err) @@ -392,14 +399,18 @@ func GetMessageRaw(id string) ([]byte, error) { data = []byte(msg) } - raw, err := dbDecoder.DecodeAll(data, nil) - if err != nil { - return nil, fmt.Errorf("error decompressing message: %s", err.Error()) - } - dbLastAction = time.Now() - return raw, err + if compressed == 1 { + raw, err := dbDecoder.DecodeAll(data, nil) + if err != nil { + return nil, fmt.Errorf("error decompressing message: %s", err.Error()) + } + + return raw, err + } + + return data, nil } // GetAttachmentPart returns an *enmime.Part (attachment or inline) from a message diff --git a/internal/storage/schemas/1.23.0.sql b/internal/storage/schemas/1.23.0.sql new file mode 100644 index 0000000..319489c --- /dev/null +++ b/internal/storage/schemas/1.23.0.sql @@ -0,0 +1,5 @@ +-- CREATE Compressed COLUMN IN mailbox_data +ALTER TABLE {{ tenant "mailbox_data" }} ADD COLUMN Compressed INTEGER NOT NULL DEFAULT '0'; + +-- SET Compressed = 1 for all existing data +UPDATE {{ tenant "mailbox_data" }} SET Compressed = 1;