1
0
mirror of https://github.com/axllent/mailpit.git synced 2025-03-11 14:59:57 +02:00

Feature: Option to ignore duplicate Message-IDs

This option (default off) silently ignores any new messages with duplicate Message-IDs. This update includes a new database structure and automatic rebuild of existing data.
This commit is contained in:
Ralph Slooten 2023-05-04 21:48:09 +12:00
parent 225a1e2e2a
commit 5f919cc9dd
8 changed files with 363 additions and 66 deletions

View File

@ -87,6 +87,7 @@ func init() {
rootCmd.Flags().StringVar(&config.Webroot, "webroot", config.Webroot, "Set the webroot for web UI & API")
rootCmd.Flags().StringVar(&server.AccessControlAllowOrigin, "api-cors", server.AccessControlAllowOrigin, "Set API CORS Access-Control-Allow-Origin header")
rootCmd.Flags().BoolVar(&config.UseMessageDates, "use-message-dates", config.UseMessageDates, "Use message dates as the received dates")
rootCmd.Flags().BoolVar(&config.IgnoreDuplicateIDs, "ignore-duplicate-ids", config.IgnoreDuplicateIDs, "Ignore duplicate messages (by Message-Id)")
rootCmd.Flags().StringVar(&config.UIAuthFile, "ui-auth-file", config.UIAuthFile, "A password file for web UI authentication")
rootCmd.Flags().StringVar(&config.UITLSCert, "ui-tls-cert", config.UITLSCert, "TLS certificate for web UI (HTTPS) - requires ui-tls-key")
@ -97,11 +98,11 @@ func init() {
rootCmd.Flags().StringVar(&config.SMTPTLSCert, "smtp-tls-cert", config.SMTPTLSCert, "TLS certificate for SMTP (STARTTLS) - requires smtp-tls-key")
rootCmd.Flags().StringVar(&config.SMTPTLSKey, "smtp-tls-key", config.SMTPTLSKey, "TLS key for SMTP (STARTTLS) - requires smtp-tls-cert")
rootCmd.Flags().BoolVar(&config.SMTPAuthAllowInsecure, "smtp-auth-allow-insecure", config.SMTPAuthAllowInsecure, "Enable insecure PLAIN & LOGIN authentication")
rootCmd.Flags().StringVarP(&config.SMTPCLITags, "tag", "t", config.SMTPCLITags, "Tag new messages matching filters")
rootCmd.Flags().StringVar(&config.SMTPRelayConfigFile, "smtp-relay-config", config.SMTPRelayConfigFile, "SMTP configuration file to allow releasing messages")
rootCmd.Flags().BoolVar(&config.SMTPRelayAllIncoming, "smtp-relay-all", config.SMTPRelayAllIncoming, "Relay all incoming messages via external SMTP server (caution!)")
rootCmd.Flags().StringVarP(&config.SMTPCLITags, "tag", "t", config.SMTPCLITags, "Tag new messages matching filters")
rootCmd.Flags().BoolVarP(&logger.QuietLogging, "quiet", "q", logger.QuietLogging, "Quiet logging (errors only)")
rootCmd.Flags().BoolVarP(&logger.VerboseLogging, "verbose", "v", logger.VerboseLogging, "Verbose logging")
@ -201,8 +202,8 @@ func initConfigFromEnv() {
if getEnabledFromEnv("MP_USE_MESSAGE_DATES") {
config.UseMessageDates = true
}
if getEnabledFromEnv("MP_USE_MESSAGE_DATES") {
config.UseMessageDates = true
if getEnabledFromEnv("MP_IGNORE_DUPLICATE_IDS") {
config.IgnoreDuplicateIDs = true
}
if getEnabledFromEnv("MP_QUIET") {
logger.QuietLogging = true

View File

@ -65,6 +65,9 @@ var (
// SMTPAuthAcceptAny accepts any username/password including none
SMTPAuthAcceptAny bool
// IgnoreDuplicateIDs will skip messages with the same ID
IgnoreDuplicateIDs bool
// SMTPCLITags is used to map the CLI args
SMTPCLITags string

View File

@ -24,12 +24,19 @@ func mailHandler(origin net.Addr, from string, to []string, data []byte) error {
return err
}
messageID := strings.Trim(msg.Header.Get("Message-Id"), "<>")
// add a message ID if not set
if msg.Header.Get("Message-Id") == "" {
if messageID == "" {
// generate unique ID
uid := uuid.NewV4().String() + "@mailpit"
messageID = uuid.NewV4().String() + "@mailpit"
// add unique ID
data = append([]byte("Message-Id: <"+uid+">\r\n"), data...)
data = append([]byte("Message-Id: <"+messageID+">\r\n"), data...)
} else if config.IgnoreDuplicateIDs {
if storage.MessageIDExists(messageID) {
logger.Log().Debugf("[smtpd] duplicate message found, ignoring %s", messageID)
return nil
}
}
// if enabled, this will route the email 1:1 through to the preconfigured smtp server
@ -81,7 +88,8 @@ func mailHandler(origin net.Addr, from string, to []string, data []byte) error {
logger.Log().Debugf("[smtpd] added missing addresses to Bcc header: %s", strings.Join(missingAddresses, ", "))
}
if _, err := storage.Store(data); err != nil {
_, err = storage.Store(data)
if err != nil {
logger.Log().Errorf("[db] error storing message: %d", err.Error())
return err

View File

@ -655,6 +655,10 @@
"$ref": "#/definitions/Attachment"
}
},
"MessageID": {
"description": "Message ID",
"type": "string"
},
"Read": {
"description": "Read status",
"type": "boolean"

View File

@ -72,20 +72,55 @@ var (
Script: `ALTER TABLE mailbox ADD COLUMN Tags Text NOT NULL DEFAULT '[]';
CREATE INDEX IF NOT EXISTS idx_tags ON mailbox (Tags);`,
},
{
Version: 1.2,
Description: "Creating new mailbox format",
Script: `CREATE TABLE IF NOT EXISTS mailboxtmp (
Created INTEGER NOT NULL,
ID TEXT NOT NULL,
MessageID TEXT NOT NULL,
Subject TEXT NOT NULL,
Metadata TEXT,
Size INTEGER NOT NULL,
Inline INTEGER NOT NULL,
Attachments INTEGER NOT NULL,
Read INTEGER,
Tags TEXT,
SearchText TEXT
);
INSERT INTO mailboxtmp
(Created, ID, MessageID, Subject, Metadata, Size, Inline, Attachments, SearchText, Read, Tags)
SELECT
Sort, ID, '', json_extract(Data, '$.Subject'),Data,
json_extract(Data, '$.Size'), json_extract(Data, '$.Inline'), json_extract(Data, '$.Attachments'),
Search, Read, Tags
FROM mailbox;
DROP TABLE IF EXISTS mailbox;
ALTER TABLE mailboxtmp RENAME TO mailbox;
CREATE INDEX IF NOT EXISTS idx_created ON mailbox (Created);
CREATE UNIQUE INDEX IF NOT EXISTS idx_id ON mailbox (ID);
CREATE INDEX IF NOT EXISTS idx_message_id ON mailbox (MessageID);
CREATE INDEX IF NOT EXISTS idx_subject ON mailbox (Subject);
CREATE INDEX IF NOT EXISTS idx_size ON mailbox (Size);
CREATE INDEX IF NOT EXISTS idx_inline ON mailbox (Inline);
CREATE INDEX IF NOT EXISTS idx_attachments ON mailbox (Attachments);
CREATE INDEX IF NOT EXISTS idx_read ON mailbox (Read);
CREATE INDEX IF NOT EXISTS idx_tags ON mailbox (Tags);`,
},
}
)
// DBMailSummary struct for storing mail summary
type DBMailSummary struct {
Created time.Time
From *mail.Address
To []*mail.Address
Cc []*mail.Address
Bcc []*mail.Address
Subject string
Size int
Inline int
Attachments int
From *mail.Address
To []*mail.Address
Cc []*mail.Address
Bcc []*mail.Address
// Subject string
// Size int
// Inline int
// Attachments int
}
// InitDB will initialise the database
@ -144,6 +179,8 @@ func InitDB() error {
// auto-prune & delete
go dbCron()
go dataMigrations()
return nil
}
@ -189,22 +226,21 @@ func Store(body []byte) (string, error) {
from = &mail.Address{Name: env.GetHeader("From")}
}
messageID := strings.Trim(env.Root.Header.Get("Message-ID"), "<>")
obj := DBMailSummary{
Created: time.Now(),
From: from,
To: addressToSlice(env, "To"),
Cc: addressToSlice(env, "Cc"),
Bcc: addressToSlice(env, "Bcc"),
Subject: env.GetHeader("Subject"),
Size: len(body),
Inline: len(env.Inlines),
Attachments: len(env.Attachments),
From: from,
To: addressToSlice(env, "To"),
Cc: addressToSlice(env, "Cc"),
Bcc: addressToSlice(env, "Bcc"),
}
created := time.Now()
// use message date instead of created date
if config.UseMessageDates {
if mDate, err := env.Date(); err == nil {
obj.Created = mDate
created = mDate
}
}
@ -237,8 +273,14 @@ func Store(body []byte) (string, error) {
// roll back if it fails
defer tx.Rollback()
subject := env.GetHeader("Subject")
size := len(body)
inline := len(env.Inlines)
attachments := len(env.Attachments)
// insert mail summary data
_, err = tx.Exec("INSERT INTO mailbox(ID, Data, Search, Tags, Read) values(?,?,?,?,0)", id, string(summaryJSON), searchText, string(tagJSON))
_, err = tx.Exec("INSERT INTO mailbox(Created, ID, MessageID, Subject, Metadata, Size, Inline, Attachments, SearchText, Tags, Read) values(?,?,?,?,?,?,?,?,?,?,0)",
created.UnixMilli(), id, messageID, subject, string(summaryJSON), size, inline, attachments, searchText, string(tagJSON))
if err != nil {
return "", err
}
@ -259,9 +301,12 @@ func Store(body []byte) (string, error) {
return "", err
}
c.Tags = tagData
c.Created = created
c.ID = id
c.Attachments = attachments
c.Subject = subject
c.Size = size
c.Tags = tagData
websockets.Broadcast("new", c)
@ -276,24 +321,28 @@ func List(start, limit int) ([]MessageSummary, error) {
results := []MessageSummary{}
q := sqlf.From("mailbox").
Select(`ID, Data, Tags, Read`).
OrderBy("Sort DESC").
Select(`Created, ID, Subject, Metadata, Size, Attachments, Read, Tags`).
OrderBy("Created DESC").
Limit(limit).
Offset(start)
if err := q.QueryAndClose(nil, db, func(row *sql.Rows) {
var created int64
var id string
var summary string
var subject string
var metadata string
var size int
var attachments int
var tags string
var read int
em := MessageSummary{}
if err := row.Scan(&id, &summary, &tags, &read); err != nil {
if err := row.Scan(&created, &id, &subject, &metadata, &size, &attachments, &read, &tags); err != nil {
logger.Log().Error(err)
return
}
if err := json.Unmarshal([]byte(summary), &em); err != nil {
if err := json.Unmarshal([]byte(metadata), &em); err != nil {
logger.Log().Error(err)
return
}
@ -303,11 +352,17 @@ func List(start, limit int) ([]MessageSummary, error) {
return
}
em.Created = time.UnixMilli(created)
em.ID = id
em.Subject = subject
em.Size = size
em.Attachments = attachments
em.Read = read == 1
results = append(results, em)
// logger.PrettyPrint(em)
}); err != nil {
return results, err
}
@ -342,19 +397,23 @@ func Search(search string, start, limit int) ([]MessageSummary, error) {
q := searchParser(args, start, limit)
if err := q.QueryAndClose(nil, db, func(row *sql.Rows) {
var created int64
var id string
var summary string
var subject string
var metadata string
var size int
var attachments int
var tags string
var read int
var ignore string
em := MessageSummary{}
if err := row.Scan(&id, &summary, &tags, &read, &ignore, &ignore, &ignore, &ignore, &ignore, &ignore); err != nil {
if err := row.Scan(&created, &id, &subject, &metadata, &size, &attachments, &read, &tags, &ignore, &ignore, &ignore, &ignore); err != nil {
logger.Log().Error(err)
return
}
if err := json.Unmarshal([]byte(summary), &em); err != nil {
if err := json.Unmarshal([]byte(metadata), &em); err != nil {
logger.Log().Error(err)
return
}
@ -364,7 +423,11 @@ func Search(search string, start, limit int) ([]MessageSummary, error) {
return
}
em.Created = time.UnixMilli(created)
em.ID = id
em.Subject = subject
em.Size = size
em.Attachments = attachments
em.Read = read == 1
results = append(results, em)
@ -404,6 +467,8 @@ func GetMessage(id string) (*Message, error) {
from = &mail.Address{Name: env.GetHeader("From")}
}
messageID := strings.Trim(env.GetHeader("Message-ID"), "<>")
returnPath := strings.Trim(env.GetHeader("Return-Path"), "<>")
if returnPath == "" {
returnPath = from.Address
@ -413,27 +478,20 @@ func GetMessage(id string) (*Message, error) {
if err != nil {
// return received datetime when message does not contain a date header
q := sqlf.From("mailbox").
Select(`Data`).
OrderBy("Sort DESC").
Select(`Created`).
Where(`ID = ?`, id)
if err := q.QueryAndClose(nil, db, func(row *sql.Rows) {
var summary string
em := MessageSummary{}
var created int64
if err := row.Scan(&summary); err != nil {
logger.Log().Error(err)
return
}
if err := json.Unmarshal([]byte(summary), &em); err != nil {
if err := row.Scan(&created); err != nil {
logger.Log().Error(err)
return
}
logger.Log().Debugf("[db] %s does not contain a date header, using received datetime", id)
date = em.Created
date = time.UnixMicro(created)
}); err != nil {
logger.Log().Error(err)
}
@ -441,6 +499,7 @@ func GetMessage(id string) (*Message, error) {
obj := Message{
ID: id,
MessageID: messageID,
Read: true,
From: from,
Date: date,
@ -821,3 +880,16 @@ func IsUnread(id string) bool {
return unread == 1
}
// MessageIDExists blaah
func MessageIDExists(id string) bool {
var total int
q := sqlf.From("mailbox").
Select("COUNT(*)").To(&total).
Where("MessageID = ?", id)
_ = q.QueryRowAndClose(nil, db)
return total != 0
}

200
storage/migrationTasks.go Normal file
View File

@ -0,0 +1,200 @@
package storage
import (
"bytes"
"context"
"database/sql"
"strings"
"time"
"github.com/axllent/mailpit/config"
"github.com/axllent/mailpit/utils/logger"
"github.com/jhillyerd/enmime"
"github.com/leporo/sqlf"
"golang.org/x/text/language"
"golang.org/x/text/message"
)
func dataMigrations() {
updateSortByCreatedTask()
assignMessageIDsTask()
}
// Update Sort column using Created datetime <= v1.6.5
// Migration task implemented 05/2023 - can be removed end 2023
func updateSortByCreatedTask() {
q := sqlf.From("mailbox").
Select("ID").
Select(`json_extract(Metadata, '$.Created') as Created`).
Where("Created < ?", 1155000600)
toUpdate := make(map[string]int64)
p := message.NewPrinter(language.English)
if err := q.QueryAndClose(nil, db, func(row *sql.Rows) {
var id string
var ts sql.NullString
if err := row.Scan(&id, &ts); err != nil {
logger.Log().Error("[migration]", err)
return
}
if !ts.Valid {
logger.Log().Errorf("[migration] cannot get Created timestamp from %s", id)
return
}
t, _ := time.Parse(time.RFC3339Nano, ts.String)
toUpdate[id] = t.UnixMilli()
}); err != nil {
logger.Log().Error("[migration]", err)
return
}
total := len(toUpdate)
if total == 0 {
return
}
logger.Log().Infof("[migration] updating timestamp for %s messages", p.Sprintf("%d", len(toUpdate)))
// begin a transaction
ctx := context.Background()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
logger.Log().Error("[migration]", err)
return
}
// roll back if it fails
defer tx.Rollback()
var blockTime = time.Now()
count := 0
for id, ts := range toUpdate {
count++
_, err := tx.Exec(`UPDATE mailbox SET Created = ? WHERE ID = ?`, ts, id)
if err != nil {
logger.Log().Error("[migration]", err)
}
if count%1000 == 0 {
percent := (100 * count) / total
logger.Log().Infof("[migration] updated timestamp for 1,000 messages [%d%%] in %s", percent, time.Since(blockTime))
blockTime = time.Now()
}
}
logger.Log().Infof("[migration] commit %s changes", p.Sprintf("%d", count))
if err := tx.Commit(); err != nil {
logger.Log().Error("[migration]", err)
return
}
logger.Log().Infof("[migration] complete")
}
// Find any messages without a stored Message-ID and update it <= v1.6.5
// Migration task implemented 05/2023 - can be removed end 2023
func assignMessageIDsTask() {
if !config.IgnoreDuplicateIDs {
return
}
q := sqlf.From("mailbox").
Select("ID").
Where("MessageID = ''")
missingIDS := make(map[string]string)
if err := q.QueryAndClose(nil, db, func(row *sql.Rows) {
var id string
if err := row.Scan(&id); err != nil {
logger.Log().Error("[migration]", err)
return
}
missingIDS[id] = ""
}); err != nil {
logger.Log().Error("[migration]", err)
}
if len(missingIDS) == 0 {
return
}
var count int
var blockTime = time.Now()
p := message.NewPrinter(language.English)
total := len(missingIDS)
logger.Log().Infof("[migration] extracting Message-IDs for %s messages", p.Sprintf("%d", total))
for id := range missingIDS {
raw, err := GetMessageRaw(id)
if err != nil {
logger.Log().Error("[migration]", err)
continue
}
r := bytes.NewReader(raw)
env, err := enmime.ReadEnvelope(r)
if err != nil {
logger.Log().Error("[migration]", err)
continue
}
messageID := strings.Trim(env.GetHeader("Message-ID"), "<>")
missingIDS[id] = messageID
count++
if count%1000 == 0 {
percent := (100 * count) / total
logger.Log().Infof("[migration] extracted 1,000 Message-IDs [%d%%] in %s", percent, time.Since(blockTime))
blockTime = time.Now()
}
}
// begin a transaction
ctx := context.Background()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
logger.Log().Error("[migration]", err)
return
}
// roll back if it fails
defer tx.Rollback()
count = 0
for id, mid := range missingIDS {
_, err = tx.Exec(`UPDATE mailbox SET MessageID = ? WHERE ID = ?`, mid, id)
if err != nil {
logger.Log().Error("[migration]", err)
}
count++
if count%1000 == 0 {
percent := (100 * count) / total
logger.Log().Infof("[migration] stored 1,000 Message-IDs [%d%%] in %s", percent, time.Since(blockTime))
blockTime = time.Now()
}
}
logger.Log().Infof("[migration] commit %s changes", p.Sprintf("%d", count))
if err := tx.Commit(); err != nil {
logger.Log().Error("[migration]", err)
return
}
logger.Log().Infof("[migration] complete")
}

View File

@ -14,15 +14,13 @@ func searchParser(args []string, start, limit int) *sqlf.Stmt {
}
q := sqlf.From("mailbox").
Select(`ID, Data, Tags, Read,
json_extract(Data, '$.To') as ToJSON,
json_extract(Data, '$.From') as FromJSON,
IFNULL(json_extract(Data, '$.Cc'), '{}') as CcJSON,
IFNULL(json_extract(Data, '$.Bcc'), '{}') as BccJSON,
json_extract(Data, '$.Subject') as Subject,
json_extract(Data, '$.Attachments') as Attachments
Select(`Created, ID, Subject, Metadata, Size, Attachments, Read, Tags,
IFNULL(json_extract(Metadata, '$.To'), '{}') as ToJSON,
IFNULL(json_extract(Metadata, '$.From'), '{}') as FromJSON,
IFNULL(json_extract(Metadata, '$.Cc'), '{}') as CcJSON,
IFNULL(json_extract(Metadata, '$.Bcc'), '{}') as BccJSON
`).
OrderBy("Sort DESC").
OrderBy("Created DESC").
Limit(limit).
Offset(start)
@ -92,6 +90,15 @@ func searchParser(args []string, start, limit int) *sqlf.Stmt {
q.Where("Subject LIKE ?", "%"+escPercentChar(w)+"%")
}
}
} else if strings.HasPrefix(w, "message-id:") {
w = cleanString(w[11:])
if w != "" {
if exclude {
q.Where("MessageID NOT LIKE ?", "%"+escPercentChar(w)+"%")
} else {
q.Where("MessageID LIKE ?", "%"+escPercentChar(w)+"%")
}
}
} else if strings.HasPrefix(w, "tag:") {
w = cleanString(w[4:])
if w != "" {
@ -122,9 +129,9 @@ func searchParser(args []string, start, limit int) *sqlf.Stmt {
} else {
// search text
if exclude {
q.Where("search NOT LIKE ?", "%"+cleanString(escPercentChar(w))+"%")
q.Where("SearchText NOT LIKE ?", "%"+cleanString(escPercentChar(w))+"%")
} else {
q.Where("search LIKE ?", "%"+cleanString(escPercentChar(w))+"%")
q.Where("SearchText LIKE ?", "%"+cleanString(escPercentChar(w))+"%")
}
}
}

View File

@ -11,8 +11,10 @@ import (
//
// swagger:model Message
type Message struct {
// Unique message database id
// Database ID
ID string
// Message ID
MessageID string
// Read status
Read bool
// From address
@ -25,7 +27,7 @@ type Message struct {
Bcc []*mail.Address
// ReplyTo addresses
ReplyTo []*mail.Address
// ReturnPath is the Return-Path
// Return-Path
ReturnPath string
// Message subject
Subject string
@ -49,15 +51,15 @@ type Message struct {
//
// swagger:model Attachment
type Attachment struct {
// attachment part id
// Attachment part ID
PartID string
// file name
// File name
FileName string
// content type
// Content type
ContentType string
// content id
// Content ID
ContentID string
// size in bytes
// Size in bytes
Size int
}
@ -65,7 +67,7 @@ type Attachment struct {
//
// swagger:model MessageSummary
type MessageSummary struct {
// Unique message database id
// Database ID
ID string
// Read status
Read bool