1
0
mirror of https://github.com/axllent/mailpit.git synced 2025-01-26 03:52:09 +02:00
mailpit/storage/database.go

624 lines
13 KiB
Go
Raw Normal View History

2022-07-29 23:23:08 +12:00
package storage
import (
"bytes"
"errors"
"fmt"
"net/mail"
"os"
"os/signal"
"regexp"
2022-07-30 19:58:31 +12:00
"strings"
2022-07-29 23:23:08 +12:00
"syscall"
"time"
"github.com/axllent/mailpit/config"
"github.com/axllent/mailpit/data"
"github.com/axllent/mailpit/logger"
"github.com/axllent/mailpit/server/websockets"
"github.com/jhillyerd/enmime"
"github.com/klauspost/compress/zstd"
2022-07-30 08:39:58 +12:00
"github.com/ostafen/clover/v2"
2022-07-29 23:23:08 +12:00
)
var (
db *clover.DB
// DefaultMailbox allowing for potential exampnsion in the future
DefaultMailbox = "catchall"
count int
per100start = time.Now()
// zstd encoder & decoder
encoder, _ = zstd.NewWriter(nil)
decoder, _ = zstd.NewReader(nil)
2022-07-29 23:23:08 +12:00
)
// CloverStore struct
type CloverStore struct {
Created time.Time
Read bool
From *mail.Address
To []*mail.Address
Cc []*mail.Address
Bcc []*mail.Address
Subject string
Size int
Inline int
Attachments int
SearchText string
}
// InitDB will initialise the database.
// If config.DataDir is empty then it will be in memory.
func InitDB() error {
var err error
if config.DataDir != "" {
logger.Log().Infof("[db] initialising data storage: %s", config.DataDir)
db, err = clover.Open(config.DataDir)
if err != nil {
return err
}
sigs := make(chan os.Signal, 1)
// catch all signals since not explicitly listing
// Program that will listen to the SIGINT and SIGTERM
// SIGINT will listen to CTRL-C.
// SIGTERM will be caught if kill command executed
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
// method invoked upon seeing signal
go func() {
s := <-sigs
2022-08-07 22:35:42 +12:00
logger.Log().Infof("[db] got %s signal, saving persistent data & shutting down", s)
2022-07-29 23:23:08 +12:00
if err := db.Close(); err != nil {
logger.Log().Errorf("[db] %s", err.Error())
}
os.Exit(0)
}()
} else {
logger.Log().Debug("[db] initialising memory data storage")
db, err = clover.Open("", clover.InMemoryMode(true))
if err != nil {
return err
}
}
// auto-prune
if config.MaxMessages > 0 {
go pruneCron()
}
// create catch-all collection
return CreateMailbox(DefaultMailbox)
}
// ListMailboxes returns a slice of mailboxes (collections)
func ListMailboxes() ([]data.MailboxSummary, error) {
mailboxes, err := db.ListCollections()
if err != nil {
return nil, err
}
results := []data.MailboxSummary{}
for _, m := range mailboxes {
2022-07-30 19:58:31 +12:00
// ignore *_data collections
if strings.HasSuffix(m, "_data") {
continue
2022-07-29 23:23:08 +12:00
}
2022-07-30 19:58:31 +12:00
stats := StatsGet(m)
2022-07-29 23:23:08 +12:00
mb := data.MailboxSummary{}
mb.Name = m
mb.Slug = m
2022-07-30 19:58:31 +12:00
mb.Total = stats.Total
mb.Unread = stats.Unread
2022-07-29 23:23:08 +12:00
2022-07-30 19:58:31 +12:00
if mb.Total > 0 {
2022-07-29 23:23:08 +12:00
q, err := db.FindFirst(
clover.NewQuery(m).Sort(clover.SortOption{Field: "Created", Direction: -1}),
)
if err != nil {
return nil, err
}
mb.LastMessage = q.Get("Created").(time.Time)
}
results = append(results, mb)
}
return results, nil
}
// MailboxExists is used to return whether a collection (aka: mailbox) exists
func MailboxExists(name string) bool {
ok, err := db.HasCollection(name)
if err != nil {
return false
}
return ok
}
// CreateMailbox will create a collection if it does not exist
2022-08-06 23:53:15 +12:00
func CreateMailbox(mailbox string) error {
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
2022-08-06 23:53:15 +12:00
if !MailboxExists(mailbox) {
logger.Log().Infof("[db] creating mailbox: %s", mailbox)
if err := db.CreateCollection(mailbox); err != nil {
2022-07-29 23:23:08 +12:00
return err
}
// create Created index
2022-08-06 23:53:15 +12:00
if err := db.CreateIndex(mailbox, "Created"); err != nil {
2022-07-29 23:23:08 +12:00
return err
}
// create Read index
2022-08-06 23:53:15 +12:00
if err := db.CreateIndex(mailbox, "Read"); err != nil {
2022-07-29 23:23:08 +12:00
return err
}
// create separate collection for data
2022-08-06 23:53:15 +12:00
if err := db.CreateCollection(mailbox + "_data"); err != nil {
2022-07-29 23:23:08 +12:00
return err
}
// create Created index
2022-08-06 23:53:15 +12:00
if err := db.CreateIndex(mailbox+"_data", "Created"); err != nil {
2022-07-29 23:23:08 +12:00
return err
}
}
2022-08-06 23:53:15 +12:00
return statsRefresh(mailbox)
2022-07-29 23:23:08 +12:00
}
// Store will store a message in the database and return the unique ID
func Store(mailbox string, b []byte) (string, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
r := bytes.NewReader(b)
// Parse message body with enmime.
env, err := enmime.ReadEnvelope(r)
if err != nil {
return "", err
}
var from *mail.Address
fromData := addressToSlice(env, "From")
if len(fromData) > 0 {
from = fromData[0]
} else if env.GetHeader("From") != "" {
from = &mail.Address{Name: env.GetHeader("From")}
2022-07-29 23:23:08 +12:00
}
obj := CloverStore{
Created: time.Now(),
From: from,
To: addressToSlice(env, "To"),
Cc: addressToSlice(env, "Cc"),
Bcc: addressToSlice(env, "Bcc"),
Subject: env.GetHeader("Subject"),
Size: len(b),
Inline: len(env.Inlines),
Attachments: len(env.Attachments),
SearchText: createSearchText(env),
}
doc := clover.NewDocumentOf(obj)
id, err := db.InsertOne(mailbox, doc)
if err != nil {
return "", err
}
statsAddNewMessage(mailbox)
2022-07-29 23:23:08 +12:00
// save the raw email in a separate collection
raw := clover.NewDocument()
raw.Set("_id", id)
raw.Set("Created", time.Now())
compressed := encoder.EncodeAll(b, make([]byte, 0, len(b)))
raw.Set("Email", string(compressed))
2022-07-29 23:23:08 +12:00
_, err = db.InsertOne(mailbox+"_data", raw)
if err != nil {
// delete the summary because the data insert failed
logger.Log().Debugf("[db] error inserting raw message, rolling back")
2022-08-07 00:09:32 +12:00
_ = DeleteOneMessage(mailbox, id)
2022-07-29 23:23:08 +12:00
return "", err
}
count++
if count%100 == 0 {
2022-07-30 22:33:20 +12:00
logger.Log().Infof("100 messages added in %s", time.Since(per100start))
2022-07-29 23:23:08 +12:00
per100start = time.Now()
}
d, err := db.FindById(DefaultMailbox, id)
if err != nil {
return "", err
}
c := &data.Summary{}
if err := d.Unmarshal(c); err != nil {
return "", err
}
c.ID = id
websockets.Broadcast("new", c)
return id, nil
}
// List returns a summary of messages.
// For pertformance reasons we manually paginate over queries of 100 results
// as clover's `Skip()` returns a subset of all results which is much slower.
// @see https://github.com/ostafen/clover/issues/73
func List(mailbox string, start, limit int) ([]data.Summary, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
var lastDoc *clover.Document
count := 0
startAddingAt := start + 1
adding := false
results := []data.Summary{}
for {
var instant time.Time
if lastDoc == nil {
instant = time.Now()
} else {
instant = lastDoc.Get("Created").(time.Time)
}
all, err := db.FindAll(
clover.NewQuery(mailbox).
Where(clover.Field("Created").Lt(instant)).
Sort(clover.SortOption{Field: "Created", Direction: -1}).
Limit(100),
)
if err != nil {
return nil, err
}
for _, d := range all {
count++
if count == startAddingAt {
adding = true
}
resultsLen := len(results)
if adding && resultsLen < limit {
cs := &data.Summary{}
if err := d.Unmarshal(cs); err != nil {
return nil, err
}
cs.ID = d.ObjectId()
results = append(results, *cs)
}
}
// we have enough resuts
if len(results) == limit {
return results, nil
}
if len(all) > 0 {
lastDoc = all[len(all)-1]
} else {
break
}
}
return results, nil
}
// Search returns a summary of items mathing a search. It searched the SearchText field.
func Search(mailbox, search string, start, limit int) ([]data.Summary, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
sq := fmt.Sprintf("(?i)%s", cleanString(regexp.QuoteMeta(search)))
2022-07-29 23:23:08 +12:00
q, err := db.FindAll(clover.NewQuery(mailbox).
Skip(start).
Limit(limit).
Sort(clover.SortOption{Field: "Created", Direction: -1}).
Where(clover.Field("SearchText").Like(sq)))
if err != nil {
return nil, err
}
results := []data.Summary{}
for _, d := range q {
cs := &data.Summary{}
2022-07-29 23:23:08 +12:00
if err := d.Unmarshal(cs); err != nil {
return nil, err
}
cs.ID = d.ObjectId()
results = append(results, *cs)
2022-07-29 23:23:08 +12:00
}
return results, nil
}
// Count returns the total number of messages in a mailbox
func Count(mailbox string) (int, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
return db.Count(clover.NewQuery(mailbox))
}
// CountUnread returns the unread number of messages in a mailbox
func CountUnread(mailbox string) (int, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
return db.Count(
clover.NewQuery(mailbox).
Where(clover.Field("Read").IsFalse()),
)
}
// GetMessage returns a data.Message generated from the {mailbox}_data collection.
// ID must be supplied as this is not stored within the CloverStore but rather the
// *clover.Document
func GetMessage(mailbox, id string) (*data.Message, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
raw, err := GetMessageRaw(mailbox, id)
if err != nil || raw == nil {
2022-07-29 23:23:08 +12:00
return nil, errors.New("message not found")
}
r := bytes.NewReader(raw)
2022-07-29 23:23:08 +12:00
env, err := enmime.ReadEnvelope(r)
if err != nil {
return nil, err
}
var from *mail.Address
fromData := addressToSlice(env, "From")
if len(fromData) > 0 {
from = fromData[0]
} else if env.GetHeader("From") != "" {
from = &mail.Address{Name: env.GetHeader("From")}
2022-07-29 23:23:08 +12:00
}
date, _ := env.Date()
2022-07-29 23:23:08 +12:00
obj := data.Message{
ID: id,
2022-07-29 23:23:08 +12:00
Read: true,
From: from,
Date: date,
To: addressToSlice(env, "To"),
Cc: addressToSlice(env, "Cc"),
Bcc: addressToSlice(env, "Bcc"),
Subject: env.GetHeader("Subject"),
Size: len(raw),
Text: env.Text,
}
html := env.HTML
// strip base tags
var re = regexp.MustCompile(`(?U)<base .*>`)
html = re.ReplaceAllString(html, "")
for _, i := range env.Inlines {
if i.FileName != "" || i.ContentID != "" {
obj.Inline = append(obj.Inline, data.AttachmentSummary(i))
}
}
for _, i := range env.OtherParts {
if i.FileName != "" || i.ContentID != "" {
obj.Inline = append(obj.Inline, data.AttachmentSummary(i))
}
}
for _, a := range env.Attachments {
if a.FileName != "" || a.ContentID != "" {
obj.Attachments = append(obj.Attachments, data.AttachmentSummary(a))
}
}
obj.HTML = html
2022-07-30 19:58:31 +12:00
msg, err := db.FindById(mailbox, id)
if err == nil && !msg.Get("Read").(bool) {
updates := make(map[string]interface{})
updates["Read"] = true
2022-07-29 23:23:08 +12:00
2022-07-30 19:58:31 +12:00
if err := db.UpdateById(mailbox, id, updates); err != nil {
return nil, err
}
statsReadOneMessage(mailbox)
2022-07-29 23:23:08 +12:00
}
return &obj, nil
}
// GetAttachmentPart returns an *enmime.Part (attachment or inline) from a message
func GetAttachmentPart(mailbox, id, partID string) (*enmime.Part, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
raw, err := GetMessageRaw(mailbox, id)
2022-07-29 23:23:08 +12:00
if err != nil {
return nil, err
}
r := bytes.NewReader(raw)
2022-07-29 23:23:08 +12:00
env, err := enmime.ReadEnvelope(r)
if err != nil {
return nil, err
}
for _, a := range env.Inlines {
if a.PartID == partID {
return a, nil
}
}
for _, a := range env.OtherParts {
if a.PartID == partID {
return a, nil
}
}
for _, a := range env.Attachments {
if a.PartID == partID {
return a, nil
}
}
return nil, errors.New("attachment not found")
}
// GetMessageRaw returns an []byte of the full message
func GetMessageRaw(mailbox, id string) ([]byte, error) {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
q, err := db.FindById(mailbox+"_data", id)
if err != nil {
return nil, err
}
if q == nil {
return nil, errors.New("message not found")
}
var raw []byte
if q.Has("Email") {
msg := q.Get("Email").(string)
raw, err = decoder.DecodeAll([]byte(msg), nil)
if err != nil {
return nil, fmt.Errorf("error decompressing message: %s", err.Error())
}
} else {
// deprecated 2022/08/10 - can be eventually removed
raw = []byte(q.Get("Data").(string))
}
2022-07-29 23:23:08 +12:00
return raw, err
2022-07-29 23:23:08 +12:00
}
// UnreadMessage will delete all messages from a mailbox
func UnreadMessage(mailbox, id string) error {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
updates := make(map[string]interface{})
updates["Read"] = false
2022-07-30 19:58:31 +12:00
statsUnreadOneMessage(mailbox)
2022-07-29 23:23:08 +12:00
return db.UpdateById(mailbox, id, updates)
}
// DeleteOneMessage will delete a single message from a mailbox
func DeleteOneMessage(mailbox, id string) error {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
q, err := db.FindById(mailbox, id)
if err != nil {
return err
}
unreadStatus := !q.Get("Read").(bool)
2022-07-29 23:23:08 +12:00
if err := db.DeleteById(mailbox, id); err != nil {
return err
}
statsDeleteOneMessage(mailbox, unreadStatus)
2022-07-30 19:58:31 +12:00
2022-07-29 23:23:08 +12:00
return db.DeleteById(mailbox+"_data", id)
}
// DeleteAllMessages will delete all messages from a mailbox
func DeleteAllMessages(mailbox string) error {
2022-08-06 23:53:15 +12:00
mailbox = sanitizeMailboxName(mailbox)
2022-07-29 23:23:08 +12:00
totalStart := time.Now()
totalMessages, err := db.Count(clover.NewQuery(mailbox))
if err != nil {
return err
}
for {
toDelete, err := db.Count(clover.NewQuery(mailbox))
if err != nil {
return err
}
if toDelete == 0 {
break
}
if err := db.Delete(clover.NewQuery(mailbox).Limit(2500)); err != nil {
return err
}
if err := db.Delete(clover.NewQuery(mailbox + "_data").Limit(2500)); err != nil {
return err
}
}
2022-07-30 19:58:31 +12:00
// resets stats for mailbox
2022-08-07 00:09:32 +12:00
_ = statsRefresh(mailbox)
2022-07-29 23:23:08 +12:00
elapsed := time.Since(totalStart)
logger.Log().Infof("Deleted %d messages from %s in %s", totalMessages, mailbox, elapsed)
return nil
}
2022-08-07 09:34:06 +12:00
// MarkAllRead will mark every unread message in a mailbox as read
func MarkAllRead(mailbox string) error {
mailbox = sanitizeMailboxName(mailbox)
totalStart := time.Now()
q, err := db.FindAll(clover.NewQuery(mailbox).
Where(clover.Field("Read").IsFalse()))
if err != nil {
return err
}
total := len(q)
updates := make(map[string]interface{})
updates["Read"] = true
for _, m := range q {
if err := db.UpdateById(mailbox, m.ObjectId(), updates); err != nil {
logger.Log().Error(err)
return err
}
}
if err := statsRefresh(mailbox); err != nil {
return err
}
elapsed := time.Since(totalStart)
logger.Log().Debugf("[db] marked %d messages in %s as read in %s", total, mailbox, elapsed)
return nil
}