diff --git a/CHANGELOG.md b/CHANGELOG.md index ecaf85ff..5d66d8c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,20 @@ ## (WIP) v0.13.0 -- Allowed overwriting the default file serve headers if an explicit response header is set. +- Added new "View" collection type (@todo document) -- Changed `System.GetFile()` to return directly `*blob.Reader` instead of the `io.ReadCloser` interface. +- Added auto fail/retry for the `SELECT` queries to gracefully handle the `database is locked` errors ([#1795](https://github.com/pocketbase/pocketbase/discussions/1795#discussioncomment-4882169)). + +- Added default max query executation timeout (120s). + +- Added support for `dao.RecordQuery(collection)` to scan directly the `One()` and `All()` results in `*models.Record` or `[]*models.Record` without the need of explicit `NullStringMap`. + +- Added support to overwrite the default file serve headers if an explicit response header is set. + +- Added file thumbs when visualizing `relation` display file fields. + +- Enabled `process.env` in JS migrations to allow accessing `os.Environ()`. + +- **!** Changed `System.GetFile()` to return directly `*blob.Reader` instead of the `io.ReadCloser` interface. - **!** Changed `To`, `Cc` and `Bcc` of `mailer.Message` to `[]mail.Address` for consistency and to allow multiple recipients and optional name. @@ -24,11 +36,7 @@ } ``` -- Enabled `process.env` in js migrations to allow accessing `os.Environ()`. - -- Enabled file thumbs when visualizing `relation` display file fields. - -- Added new "View" collection type (@todo document) +- **!** Removed the previously deprecated `Dao.Block()` and `Dao.Continue()` helpers in favor of `Dao.NonconcurrentDB()`. ## v0.12.3 diff --git a/apis/record_crud.go b/apis/record_crud.go index 90b00db6..15015522 100644 --- a/apis/record_crud.go +++ b/apis/record_crud.go @@ -72,16 +72,13 @@ func (api *recordApi) list(c echo.Context) error { searchProvider.AddFilter(search.FilterData(*collection.ListRule)) } - var rawRecords = []dbx.NullStringMap{} - result, err := searchProvider.ParseAndExec(c.QueryParams().Encode(), &rawRecords) + records := []*models.Record{} + + result, err := searchProvider.ParseAndExec(c.QueryParams().Encode(), &records) if err != nil { return NewBadRequestError("Invalid filter parameters.", err) } - records := models.NewRecordsFromNullStringMaps(collection, rawRecords) - - result.Items = records - event := new(core.RecordsListEvent) event.HttpContext = c event.Collection = collection diff --git a/core/base.go b/core/base.go index a6f0f858..bc82df3c 100644 --- a/core/base.go +++ b/core/base.go @@ -23,7 +23,7 @@ import ( ) const ( - DefaultDataMaxOpenConns int = 100 + DefaultDataMaxOpenConns int = 120 DefaultDataMaxIdleConns int = 20 DefaultLogsMaxOpenConns int = 10 DefaultLogsMaxIdleConns int = 2 diff --git a/daos/base.go b/daos/base.go index a15d7f43..08b4e5a9 100644 --- a/daos/base.go +++ b/daos/base.go @@ -4,19 +4,12 @@ package daos import ( - "context" "errors" - "strings" - "sync" - "time" "github.com/pocketbase/dbx" "github.com/pocketbase/pocketbase/models" - "golang.org/x/sync/semaphore" ) -const DefaultMaxFailRetries = 5 - // New creates a new Dao instance with the provided db builder // (for both async and sync db operations). func New(db dbx.Builder) *Dao { @@ -39,10 +32,6 @@ type Dao struct { concurrentDB dbx.Builder nonconcurrentDB dbx.Builder - // @todo delete after removing Block and Continue - sem *semaphore.Weighted - mux sync.RWMutex - BeforeCreateFunc func(eventDao *Dao, m models.Model) error AfterCreateFunc func(eventDao *Dao, m models.Model) BeforeUpdateFunc func(eventDao *Dao, m models.Model) error @@ -74,55 +63,15 @@ func (dao *Dao) NonconcurrentDB() dbx.Builder { return dao.nonconcurrentDB } -// Deprecated: Will be removed in the next releases. Use [Dao.NonconcurrentDB()] instead. -// -// Block acquires a lock and blocks all other go routines that uses -// the Dao instance until dao.Continue() is called, effectively making -// the concurrent requests to perform synchronous db operations. -// -// This method should be used only as a last resort and as a workaround -// for the SQLITE_BUSY error when mixing read&write in a transaction. -// -// Example: -// -// func someLongRunningTransaction() error { -// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) -// defer cancel() -// if err := app.Dao().Block(ctx); err != nil { -// return err -// } -// defer app.Dao().Continue() -// -// return app.Dao().RunInTransaction(func (txDao *daos.Dao) error { -// // some long running read&write transaction... -// }) -// } -func (dao *Dao) Block(ctx context.Context) error { - if dao.sem == nil { - dao.mux.Lock() - dao.sem = semaphore.NewWeighted(1) - dao.mux.Unlock() - } - - return dao.sem.Acquire(ctx, 1) -} - -// Deprecated: Will be removed in the next releases. Use [Dao.NonconcurrentDB()] instead. -// -// Continue releases the previously acquired Block() lock. -func (dao *Dao) Continue() { - if dao.sem == nil { - return - } - - dao.sem.Release(1) -} - // ModelQuery creates a new query with preset Select and From fields // based on the provided model argument. func (dao *Dao) ModelQuery(m models.Model) *dbx.SelectQuery { tableName := m.TableName() - return dao.DB().Select("{{" + tableName + "}}.*").From(tableName) + + return dao.DB(). + Select("{{" + tableName + "}}.*"). + From(tableName). + WithExecHook(onLockErrorRetry) } // FindById finds a single db record with the specified id and @@ -224,7 +173,7 @@ func (dao *Dao) Delete(m models.Model) error { return errors.New("ID is not set") } - return dao.failRetry(func(retryDao *Dao) error { + return dao.lockRetry(func(retryDao *Dao) error { if retryDao.BeforeDeleteFunc != nil { if err := retryDao.BeforeDeleteFunc(retryDao, m); err != nil { return err @@ -240,20 +189,20 @@ func (dao *Dao) Delete(m models.Model) error { } return nil - }, DefaultMaxFailRetries) + }, defaultMaxRetries) } // Save upserts (update or create if primary key is not set) the provided model. func (dao *Dao) Save(m models.Model) error { if m.IsNew() { - return dao.failRetry(func(retryDao *Dao) error { + return dao.lockRetry(func(retryDao *Dao) error { return retryDao.create(m) - }, DefaultMaxFailRetries) + }, defaultMaxRetries) } - return dao.failRetry(func(retryDao *Dao) error { + return dao.lockRetry(func(retryDao *Dao) error { return retryDao.update(m) - }, DefaultMaxFailRetries) + }, defaultMaxRetries) } func (dao *Dao) update(m models.Model) error { @@ -347,32 +296,19 @@ func (dao *Dao) create(m models.Model) error { return nil } -func (dao *Dao) failRetry(op func(retryDao *Dao) error, maxRetries int) error { +func (dao *Dao) lockRetry(op func(retryDao *Dao) error, maxRetries int) error { retryDao := dao - attempts := 1 -Retry: - if attempts == 2 { - // assign new Dao without the before hooks to avoid triggering - // the already fired before callbacks multiple times - retryDao = NewMultiDB(dao.concurrentDB, dao.nonconcurrentDB) - retryDao.AfterCreateFunc = dao.AfterCreateFunc - retryDao.AfterUpdateFunc = dao.AfterUpdateFunc - retryDao.AfterDeleteFunc = dao.AfterDeleteFunc - } + return baseLockRetry(func(attempt int) error { + if attempt == 2 { + // assign new Dao without the before hooks to avoid triggering + // the already fired before callbacks multiple times + retryDao = NewMultiDB(dao.concurrentDB, dao.nonconcurrentDB) + retryDao.AfterCreateFunc = dao.AfterCreateFunc + retryDao.AfterUpdateFunc = dao.AfterUpdateFunc + retryDao.AfterDeleteFunc = dao.AfterDeleteFunc + } - // execute - err := op(retryDao) - - if err != nil && - attempts < maxRetries && - // note: we are checking the err message so that we can handle both the cgo and noncgo errors - strings.Contains(err.Error(), "database is locked") { - // wait and retry - time.Sleep(time.Duration(200*attempts) * time.Millisecond) - attempts++ - goto Retry - } - - return err + return op(retryDao) + }, maxRetries) } diff --git a/daos/base_retry.go b/daos/base_retry.go new file mode 100644 index 00000000..9c0f9e51 --- /dev/null +++ b/daos/base_retry.go @@ -0,0 +1,58 @@ +package daos + +import ( + "context" + "strings" + "time" + + "github.com/pocketbase/dbx" +) + +const defaultQueryTimeout time.Duration = 2 * time.Minute + +const defaultMaxRetries int = 10 + +var defaultRetryIntervals = []int{100, 250, 350, 500, 700, 1000, 1200, 1500} + +func onLockErrorRetry(s *dbx.SelectQuery, op func() error) error { + return baseLockRetry(func(attempt int) error { + // load a default timeout context if not set explicitly + if s.Context() == nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultQueryTimeout) + defer func() { + cancel() + s.WithContext(nil) // reset + }() + s.WithContext(ctx) + } + + return op() + }, defaultMaxRetries) +} + +func baseLockRetry(op func(attempt int) error, maxRetries int) error { + attempt := 1 + +Retry: + err := op(attempt) + + if err != nil && + attempt <= maxRetries && + // we are checking the err message to handle both the cgo and noncgo errors + strings.Contains(err.Error(), "database is locked") { + // wait and retry + time.Sleep(getDefaultRetryInterval(attempt)) + attempt++ + goto Retry + } + + return err +} + +func getDefaultRetryInterval(attempt int) time.Duration { + if attempt < 0 || attempt > len(defaultRetryIntervals)-1 { + return time.Duration(defaultRetryIntervals[len(defaultRetryIntervals)-1]) * time.Millisecond + } + + return time.Duration(defaultRetryIntervals[attempt]) * time.Millisecond +} diff --git a/daos/base_retry_test.go b/daos/base_retry_test.go new file mode 100644 index 00000000..e4232f91 --- /dev/null +++ b/daos/base_retry_test.go @@ -0,0 +1,60 @@ +package daos + +import ( + "errors" + "testing" +) + +func TestGetDefaultRetryInterval(t *testing.T) { + if i := getDefaultRetryInterval(-1); i.Milliseconds() != 1500 { + t.Fatalf("Expected 1500ms, got %v", i) + } + + if i := getDefaultRetryInterval(999); i.Milliseconds() != 1500 { + t.Fatalf("Expected 1500ms, got %v", i) + } + + if i := getDefaultRetryInterval(3); i.Milliseconds() != 500 { + t.Fatalf("Expected 500ms, got %v", i) + } +} + +func TestBaseLockRetry(t *testing.T) { + scenarios := []struct { + err error + failUntilAttempt int + expectedAttempts int + }{ + {nil, 3, 1}, + {errors.New("test"), 3, 1}, + {errors.New("database is locked"), 3, 3}, + } + + for i, s := range scenarios { + lastAttempt := 0 + + err := baseLockRetry(func(attempt int) error { + lastAttempt = attempt + + if attempt < s.failUntilAttempt { + return s.err + } + + return nil + }, s.failUntilAttempt+2) + + if lastAttempt != s.expectedAttempts { + t.Errorf("[%d] Expected lastAttempt to be %d, got %d", i, s.expectedAttempts, lastAttempt) + } + + if s.failUntilAttempt == s.expectedAttempts && err != nil { + t.Errorf("[%d] Expected nil, got err %v", i, err) + continue + } + + if s.failUntilAttempt != s.expectedAttempts && s.err != nil && err == nil { + t.Errorf("[%d] Expected error %q, got nil", i, s.err) + continue + } + } +} diff --git a/daos/record.go b/daos/record.go index bbf53891..a7aea19f 100644 --- a/daos/record.go +++ b/daos/record.go @@ -20,7 +20,72 @@ func (dao *Dao) RecordQuery(collection *models.Collection) *dbx.SelectQuery { tableName := collection.Name selectCols := fmt.Sprintf("%s.*", dao.DB().QuoteSimpleColumnName(tableName)) - return dao.DB().Select(selectCols).From(tableName) + return dao.DB(). + Select(selectCols). + From(tableName). + WithExecHook(onLockErrorRetry). + WithOneHook(func(s *dbx.SelectQuery, a any, op func(b any) error) error { + switch v := a.(type) { + case *models.Record: + if v == nil { + return op(a) + } + + row := dbx.NullStringMap{} + if err := op(&row); err != nil { + return err + } + + record := models.NewRecordFromNullStringMap(collection, row) + + *v = *record + + return nil + default: + return op(a) + } + }). + WithAllHook(func(s *dbx.SelectQuery, sliceA any, op func(sliceB any) error) error { + switch v := sliceA.(type) { + case *[]*models.Record: + if v == nil { + return op(sliceA) + } + + rows := []dbx.NullStringMap{} + if err := op(&rows); err != nil { + return err + } + + records := models.NewRecordsFromNullStringMaps(collection, rows) + + *v = records + + return nil + case *[]models.Record: + if v == nil { + return op(sliceA) + } + + rows := []dbx.NullStringMap{} + if err := op(&rows); err != nil { + return err + } + + records := models.NewRecordsFromNullStringMaps(collection, rows) + + nonPointers := make([]models.Record, len(records)) + for i, r := range records { + nonPointers[i] = *r + } + + *v = nonPointers + + return nil + default: + return op(sliceA) + } + }) } // FindRecordById finds the Record model by its id. @@ -34,10 +99,8 @@ func (dao *Dao) FindRecordById( return nil, err } - tableName := collection.Name - query := dao.RecordQuery(collection). - AndWhere(dbx.HashExp{tableName + ".id": recordId}) + AndWhere(dbx.HashExp{collection.Name + ".id": recordId}) for _, filter := range optFilters { if filter == nil { @@ -48,12 +111,13 @@ func (dao *Dao) FindRecordById( } } - row := dbx.NullStringMap{} - if err := query.Limit(1).One(row); err != nil { + record := &models.Record{} + + if err := query.Limit(1).One(record); err != nil { return nil, err } - return models.NewRecordFromNullStringMap(collection, row), nil + return record, nil } // FindRecordsByIds finds all Record models by the provided ids. @@ -83,12 +147,13 @@ func (dao *Dao) FindRecordsByIds( } } - rows := make([]dbx.NullStringMap, 0, len(recordIds)) - if err := query.All(&rows); err != nil { + records := make([]*models.Record, 0, len(recordIds)) + + if err := query.All(&records); err != nil { return nil, err } - return models.NewRecordsFromNullStringMaps(collection, rows), nil + return records, nil } // FindRecordsByExpr finds all records by the specified db expression. @@ -117,13 +182,13 @@ func (dao *Dao) FindRecordsByExpr(collectionNameOrId string, exprs ...dbx.Expres } } - rows := []dbx.NullStringMap{} + var records []*models.Record - if err := query.All(&rows); err != nil { + if err := query.All(&records); err != nil { return nil, err } - return models.NewRecordsFromNullStringMaps(collection, rows), nil + return records, nil } // FindFirstRecordByData returns the first found record matching @@ -138,18 +203,17 @@ func (dao *Dao) FindFirstRecordByData( return nil, err } - row := dbx.NullStringMap{} + record := &models.Record{} err = dao.RecordQuery(collection). AndWhere(dbx.HashExp{inflector.Columnify(key): value}). Limit(1). - One(row) - + One(record) if err != nil { return nil, err } - return models.NewRecordFromNullStringMap(collection, row), nil + return record, nil } // IsRecordValueUnique checks if the provided key-value pair is a unique Record value. @@ -252,18 +316,17 @@ func (dao *Dao) FindAuthRecordByEmail(collectionNameOrId string, email string) ( return nil, fmt.Errorf("%q is not an auth collection", collectionNameOrId) } - row := dbx.NullStringMap{} + record := &models.Record{} err = dao.RecordQuery(collection). AndWhere(dbx.HashExp{schema.FieldNameEmail: email}). Limit(1). - One(row) - + One(record) if err != nil { return nil, err } - return models.NewRecordFromNullStringMap(collection, row), nil + return record, nil } // FindAuthRecordByUsername finds the auth record associated with the provided username (case insensitive). @@ -278,20 +341,19 @@ func (dao *Dao) FindAuthRecordByUsername(collectionNameOrId string, username str return nil, fmt.Errorf("%q is not an auth collection", collectionNameOrId) } - row := dbx.NullStringMap{} + record := &models.Record{} err = dao.RecordQuery(collection). AndWhere(dbx.NewExp("LOWER([["+schema.FieldNameUsername+"]])={:username}", dbx.Params{ "username": strings.ToLower(username), })). Limit(1). - One(row) - + One(record) if err != nil { return nil, err } - return models.NewRecordFromNullStringMap(collection, row), nil + return record, nil } // SuggestUniqueAuthRecordUsername checks if the provided username is unique diff --git a/daos/record_test.go b/daos/record_test.go index 28664db4..cc8ffd4e 100644 --- a/daos/record_test.go +++ b/daos/record_test.go @@ -36,6 +36,91 @@ func TestRecordQuery(t *testing.T) { } } +func TestRecordQueryOneWithRecord(t *testing.T) { + app, _ := tests.NewTestApp() + defer app.Cleanup() + + collection, err := app.Dao().FindCollectionByNameOrId("demo1") + if err != nil { + t.Fatal(err) + } + + id := "84nmscqy84lsi1t" + + q := app.Dao().RecordQuery(collection). + Where(dbx.HashExp{"id": id}) + + record := &models.Record{} + if err := q.One(record); err != nil { + t.Fatal(err) + } + + if record.GetString("id") != id { + t.Fatalf("Expected record with id %q, got %q", id, record.GetString("id")) + } +} + +func TestRecordQueryAllWithRecordsSlices(t *testing.T) { + app, _ := tests.NewTestApp() + defer app.Cleanup() + + collection, err := app.Dao().FindCollectionByNameOrId("demo1") + if err != nil { + t.Fatal(err) + } + + id1 := "84nmscqy84lsi1t" + id2 := "al1h9ijdeojtsjy" + + { + records := []models.Record{} + + q := app.Dao().RecordQuery(collection). + Where(dbx.HashExp{"id": []any{id1, id2}}). + OrderBy("created asc") + + if err := q.All(&records); err != nil { + t.Fatal(err) + } + + if len(records) != 2 { + t.Fatalf("Expected %d records, got %d", 2, len(records)) + } + + if records[0].Id != id1 { + t.Fatalf("Expected record with id %q, got %q", id1, records[0].Id) + } + + if records[1].Id != id2 { + t.Fatalf("Expected record with id %q, got %q", id2, records[1].Id) + } + } + + { + records := []*models.Record{} + + q := app.Dao().RecordQuery(collection). + Where(dbx.HashExp{"id": []any{id1, id2}}). + OrderBy("created asc") + + if err := q.All(&records); err != nil { + t.Fatal(err) + } + + if len(records) != 2 { + t.Fatalf("Expected %d records, got %d", 2, len(records)) + } + + if records[0].Id != id1 { + t.Fatalf("Expected record with id %q, got %q", id1, records[0].Id) + } + + if records[1].Id != id2 { + t.Fatalf("Expected record with id %q, got %q", id2, records[1].Id) + } + } +} + func TestFindRecordById(t *testing.T) { app, _ := tests.NewTestApp() defer app.Cleanup() diff --git a/daos/view.go b/daos/view.go index 41c8c2b2..dc3dbb23 100644 --- a/daos/view.go +++ b/daos/view.go @@ -187,7 +187,7 @@ func (dao *Dao) FindRecordByViewFile( cleanFieldName := inflector.Columnify(qf.original.Name) - row := dbx.NullStringMap{} + record := &models.Record{} err = dao.RecordQuery(qf.collection). InnerJoin(fmt.Sprintf( @@ -196,12 +196,12 @@ func (dao *Dao) FindRecordByViewFile( cleanFieldName, cleanFieldName, cleanFieldName, ), dbx.HashExp{"_je_file.value": filename}). Limit(1). - One(row) + One(record) if err != nil { return nil, err } - return models.NewRecordFromNullStringMap(qf.collection, row), nil + return record, nil } // -------------------------------------------------------------------