From 47a31c4ef183e11e9ba2028775ca5a0405bf7127 Mon Sep 17 00:00:00 2001 From: Laurent Date: Wed, 3 Nov 2021 12:26:26 +0000 Subject: [PATCH] All, Server: Add support for faster built-in sync locks (#5662) --- packages/lib/JoplinServerApi.ts | 8 +- packages/lib/Synchronizer.ts | 42 +++- packages/lib/file-api-driver-joplinServer.ts | 51 +++++ packages/lib/file-api.ts | 23 ++- .../lib/services/synchronizer/LockHandler.ts | 195 +++++++++++------- .../services/synchronizer/MigrationHandler.ts | 6 +- .../synchronizer_LockHandler.test.ts | 149 ++++++------- .../synchronizer_MigrationHandler.test.ts | 4 +- .../lib/testing/test-utils-synchronizer.ts | 2 +- packages/server/package.json | 2 +- packages/server/src/db.ts | 4 + packages/server/src/models/KeyValueModel.ts | 49 ++++- packages/server/src/models/LockModel.test.ts | 94 +++++++++ packages/server/src/models/LockModel.ts | 174 ++++++++++++++++ packages/server/src/models/factory.ts | 5 + packages/server/src/routes/api/debug.ts | 10 +- packages/server/src/routes/api/locks.ts | 32 +++ packages/server/src/routes/routes.ts | 2 + packages/server/src/utils/errors.ts | 4 +- 19 files changed, 675 insertions(+), 181 deletions(-) create mode 100644 packages/server/src/models/LockModel.test.ts create mode 100644 packages/server/src/models/LockModel.ts create mode 100644 packages/server/src/routes/api/locks.ts diff --git a/packages/lib/JoplinServerApi.ts b/packages/lib/JoplinServerApi.ts index 04578def1..2c8f313e6 100644 --- a/packages/lib/JoplinServerApi.ts +++ b/packages/lib/JoplinServerApi.ts @@ -142,7 +142,7 @@ export default class JoplinServerApi { } if (sessionId) headers['X-API-AUTH'] = sessionId; - headers['X-API-MIN-VERSION'] = '2.1.4'; + headers['X-API-MIN-VERSION'] = '2.6.0'; // Need server 2.6 for new lock support const fetchOptions: any = {}; fetchOptions.headers = headers; @@ -253,8 +253,12 @@ export default class JoplinServerApi { const output = await loadResponseJson(); return output; } catch (error) { - if (error.code !== 404) { + // Don't print error info for file not found (handled by the + // driver), or lock-acquisition errors because it's handled by + // LockHandler. + if (![404, 'hasExclusiveLock', 'hasSyncLock'].includes(error.code)) { logger.warn(this.requestToCurl_(url, fetchOptions)); + logger.warn('Code:', error.code); logger.warn(error); } diff --git a/packages/lib/Synchronizer.ts b/packages/lib/Synchronizer.ts index ae44c7d6d..d448e9511 100644 --- a/packages/lib/Synchronizer.ts +++ b/packages/lib/Synchronizer.ts @@ -1,6 +1,6 @@ import Logger from './Logger'; -import LockHandler, { LockType } from './services/synchronizer/LockHandler'; -import Setting from './models/Setting'; +import LockHandler, { hasActiveLock, LockClientType, LockType } from './services/synchronizer/LockHandler'; +import Setting, { AppType } from './models/Setting'; import shim from './shim'; import MigrationHandler from './services/synchronizer/MigrationHandler'; import eventManager from './eventManager'; @@ -66,6 +66,7 @@ export default class Synchronizer { private resourceService_: ResourceService = null; private syncTargetIsLocked_: boolean = false; private shareService_: ShareService = null; + private lockClientType_: LockClientType = null; // Debug flags are used to test certain hard-to-test conditions // such as cancelling in the middle of a loop. @@ -120,9 +121,21 @@ export default class Synchronizer { return this.lockHandler_; } + private lockClientType(): LockClientType { + if (this.lockClientType_) return this.lockClientType_; + + if (this.appType_ === AppType.Desktop) this.lockClientType_ = LockClientType.Desktop; + if (this.appType_ === AppType.Mobile) this.lockClientType_ = LockClientType.Mobile; + if (this.appType_ === AppType.Cli) this.lockClientType_ = LockClientType.Cli; + + if (!this.lockClientType_) throw new Error(`Invalid client type: ${this.appType_}`); + + return this.lockClientType_; + } + migrationHandler() { if (this.migrationHandler_) return this.migrationHandler_; - this.migrationHandler_ = new MigrationHandler(this.api(), this.db(), this.lockHandler(), this.appType_, this.clientId_); + this.migrationHandler_ = new MigrationHandler(this.api(), this.db(), this.lockHandler(), this.lockClientType(), this.clientId_); return this.migrationHandler_; } @@ -164,6 +177,12 @@ export default class Synchronizer { return !!report && !!report.errors && !!report.errors.length; } + private static completionTime(report: any): string { + const duration = report.completedTime - report.startTime; + if (duration > 1000) return `${Math.round(duration / 1000)}s`; + return `${duration}ms`; + } + static reportToLines(report: any) { const lines = []; if (report.createLocal) lines.push(_('Created local items: %d.', report.createLocal)); @@ -174,7 +193,7 @@ export default class Synchronizer { if (report.deleteRemote) lines.push(_('Deleted remote items: %d.', report.deleteRemote)); if (report.fetchingTotal && report.fetchingProcessed) lines.push(_('Fetched items: %d/%d.', report.fetchingProcessed, report.fetchingTotal)); if (report.cancelling && !report.completedTime) lines.push(_('Cancelling...')); - if (report.completedTime) lines.push(_('Completed: %s (%s)', time.formatMsToLocal(report.completedTime), `${Math.round((report.completedTime - report.startTime) / 1000)}s`)); + if (report.completedTime) lines.push(_('Completed: %s (%s)', time.formatMsToLocal(report.completedTime), this.completionTime(report))); if (this.reportHasErrors(report)) lines.push(_('Last error: %s', report.errors[report.errors.length - 1].toString().substr(0, 500))); return lines; @@ -298,10 +317,13 @@ export default class Synchronizer { } async lockErrorStatus_() { - const hasActiveExclusiveLock = await this.lockHandler().hasActiveLock(LockType.Exclusive); + const locks = await this.lockHandler().locks(); + const currentDate = await this.lockHandler().currentDate(); + + const hasActiveExclusiveLock = await hasActiveLock(locks, currentDate, this.lockHandler().lockTtl, LockType.Exclusive); if (hasActiveExclusiveLock) return 'hasExclusiveLock'; - const hasActiveSyncLock = await this.lockHandler().hasActiveLock(LockType.Sync, this.appType_, this.clientId_); + const hasActiveSyncLock = await hasActiveLock(locks, currentDate, this.lockHandler().lockTtl, LockType.Sync, this.lockClientType(), this.clientId_); if (!hasActiveSyncLock) return 'syncLockGone'; return ''; @@ -446,10 +468,10 @@ export default class Synchronizer { const previousE2EE = localInfo.e2ee; logger.info('Sync target info differs between local and remote - merging infos: ', newInfo.toObject()); - await this.lockHandler().acquireLock(LockType.Exclusive, this.appType_, this.clientId_, { clearExistingSyncLocksFromTheSameClient: true }); + await this.lockHandler().acquireLock(LockType.Exclusive, this.lockClientType(), this.clientId_, { clearExistingSyncLocksFromTheSameClient: true }); await uploadSyncInfo(this.api(), newInfo); await saveLocalSyncInfo(newInfo); - await this.lockHandler().releaseLock(LockType.Exclusive, this.appType_, this.clientId_); + await this.lockHandler().releaseLock(LockType.Exclusive, this.lockClientType(), this.clientId_); // console.info('NEW', newInfo); @@ -473,7 +495,7 @@ export default class Synchronizer { throw error; } - syncLock = await this.lockHandler().acquireLock(LockType.Sync, this.appType_, this.clientId_); + syncLock = await this.lockHandler().acquireLock(LockType.Sync, this.lockClientType(), this.clientId_); this.lockHandler().startAutoLockRefresh(syncLock, (error: any) => { logger.warn('Could not refresh lock - cancelling sync. Error was:', error); @@ -1084,7 +1106,7 @@ export default class Synchronizer { if (syncLock) { this.lockHandler().stopAutoLockRefresh(syncLock); - await this.lockHandler().releaseLock(LockType.Sync, this.appType_, this.clientId_); + await this.lockHandler().releaseLock(LockType.Sync, this.lockClientType(), this.clientId_); } this.syncTargetIsLocked_ = false; diff --git a/packages/lib/file-api-driver-joplinServer.ts b/packages/lib/file-api-driver-joplinServer.ts index 2f75526d7..fa3fddbd4 100644 --- a/packages/lib/file-api-driver-joplinServer.ts +++ b/packages/lib/file-api-driver-joplinServer.ts @@ -2,6 +2,7 @@ import { MultiPutItem } from './file-api'; import JoplinError from './JoplinError'; import JoplinServerApi from './JoplinServerApi'; import { trimSlashes } from './path-utils'; +import { Lock, LockClientType, LockType } from './services/synchronizer/LockHandler'; // All input paths should be in the format: "path/to/file". This is converted to // "root:/path/to/file:" when doing the API call. @@ -40,6 +41,10 @@ export default class FileApiDriverJoplinServer { return true; } + public get supportsLocks() { + return true; + } + public requestRepeatCount() { return 3; } @@ -196,6 +201,50 @@ export default class FileApiDriverJoplinServer { throw new Error('Not supported'); } + // private lockClientTypeToId(clientType:AppType):number { + // if (clientType === AppType.Desktop) return 1; + // if (clientType === AppType.Mobile) return 2; + // if (clientType === AppType.Cli) return 3; + // throw new Error('Invalid client type: ' + clientType); + // } + + // private lockTypeToId(lockType:LockType):number { + // if (lockType === LockType.None) return 0; // probably not possible? + // if (lockType === LockType.Sync) return 1; + // if (lockType === LockType.Exclusive) return 2; + // throw new Error('Invalid lock type: ' + lockType); + // } + + // private lockClientIdTypeToType(clientType:number):AppType { + // if (clientType === 1) return AppType.Desktop; + // if (clientType === 2) return AppType.Mobile; + // if (clientType === 3) return AppType.Cli; + // throw new Error('Invalid client type: ' + clientType); + // } + + // private lockIdToType(lockType:number):LockType { + // if (lockType === 0) return LockType.None; // probably not possible? + // if (lockType === 1) return LockType.Sync; + // if (lockType === 2) return LockType.Exclusive; + // throw new Error('Invalid lock type: ' + lockType); + // } + + public async acquireLock(type: LockType, clientType: LockClientType, clientId: string): Promise { + return this.api().exec('POST', 'api/locks', null, { + type, + clientType, + clientId: clientId, + }); + } + + public async releaseLock(type: LockType, clientType: LockClientType, clientId: string) { + await this.api().exec('DELETE', `api/locks/${type}_${clientType}_${clientId}`); + } + + public async listLocks() { + return this.api().exec('GET', 'api/locks'); + } + public async clearRoot(path: string) { const response = await this.list(path); @@ -203,6 +252,8 @@ export default class FileApiDriverJoplinServer { await this.delete(item.path); } + await this.api().exec('POST', 'api/debug', null, { action: 'clearKeyValues' }); + if (response.has_more) throw new Error('has_more support not implemented'); } } diff --git a/packages/lib/file-api.ts b/packages/lib/file-api.ts index 64d13e885..a06840ac4 100644 --- a/packages/lib/file-api.ts +++ b/packages/lib/file-api.ts @@ -5,6 +5,7 @@ import time from './time'; const { isHidden } = require('./path-utils'); import JoplinError from './JoplinError'; +import { Lock, LockClientType, LockType } from './services/synchronizer/LockHandler'; const ArrayUtils = require('./ArrayUtils'); const { sprintf } = require('sprintf-js'); const Mutex = require('async-mutex').Mutex; @@ -36,7 +37,7 @@ export interface RemoteItem { export interface PaginatedList { items: RemoteItem[]; - has_more: boolean; + hasMore: boolean; context: any; } @@ -130,6 +131,10 @@ class FileApi { return !!this.driver().supportsAccurateTimestamp; } + public get supportsLocks(): boolean { + return !!this.driver().supportsLocks; + } + async fetchRemoteDateOffset_() { const tempFile = `${this.tempDirName()}/timeCheck${Math.round(Math.random() * 1000000)}.txt`; const startTime = Date.now(); @@ -349,6 +354,22 @@ class FileApi { logger.debug(`delta ${this.fullPath(path)}`); return tryAndRepeat(() => this.driver_.delta(this.fullPath(path), options), this.requestRepeatCount()); } + + public async acquireLock(type: LockType, clientType: LockClientType, clientId: string): Promise { + if (!this.supportsLocks) throw new Error('Sync target does not support built-in locks'); + return tryAndRepeat(() => this.driver_.acquireLock(type, clientType, clientId), this.requestRepeatCount()); + } + + public async releaseLock(type: LockType, clientType: LockClientType, clientId: string) { + if (!this.supportsLocks) throw new Error('Sync target does not support built-in locks'); + return tryAndRepeat(() => this.driver_.releaseLock(type, clientType, clientId), this.requestRepeatCount()); + } + + public async listLocks() { + if (!this.supportsLocks) throw new Error('Sync target does not support built-in locks'); + return tryAndRepeat(() => this.driver_.listLocks(), this.requestRepeatCount()); + } + } function basicDeltaContextFromOptions_(options: any) { diff --git a/packages/lib/services/synchronizer/LockHandler.ts b/packages/lib/services/synchronizer/LockHandler.ts index 21b915c97..d0b720465 100644 --- a/packages/lib/services/synchronizer/LockHandler.ts +++ b/packages/lib/services/synchronizer/LockHandler.ts @@ -1,23 +1,91 @@ import { Dirnames } from './utils/types'; import shim from '../../shim'; - import JoplinError from '../../JoplinError'; import time from '../../time'; +import { FileApi } from '../../file-api'; const { fileExtension, filename } = require('../../path-utils'); export enum LockType { - None = '', - Sync = 'sync', - Exclusive = 'exclusive', + None = 0, + Sync = 1, + Exclusive = 2, +} + +export enum LockClientType { + Desktop = 1, + Mobile = 2, + Cli = 3, } export interface Lock { + id?: string; type: LockType; - clientType: string; + clientType: LockClientType; clientId: string; updatedTime?: number; } +function lockIsActive(lock: Lock, currentDate: Date, lockTtl: number): boolean { + return currentDate.getTime() - lock.updatedTime < lockTtl; +} + +export function lockNameToObject(name: string, updatedTime: number = null): Lock { + const p = name.split('_'); + + const lock: Lock = { + id: null, + type: Number(p[0]) as LockType, + clientType: Number(p[1]) as LockClientType, + clientId: p[2], + updatedTime, + }; + + if (isNaN(lock.clientType)) throw new Error(`Invalid lock client type: ${name}`); + if (isNaN(lock.type)) throw new Error(`Invalid lock type: ${name}`); + + return lock; +} + +export function hasActiveLock(locks: Lock[], currentDate: Date, lockTtl: number, lockType: LockType, clientType: LockClientType = null, clientId: string = null) { + const lock = activeLock(locks, currentDate, lockTtl, lockType, clientType, clientId); + return !!lock; +} + +// Finds if there's an active lock for this clientType and clientId and returns it. +// If clientType and clientId are not specified, returns the first active lock +// of that type instead. +export function activeLock(locks: Lock[], currentDate: Date, lockTtl: number, lockType: LockType, clientType: LockClientType = null, clientId: string = null) { + if (lockType === LockType.Exclusive) { + const activeLocks = locks + .slice() + .filter((lock: Lock) => lockIsActive(lock, currentDate, lockTtl) && lock.type === lockType) + .sort((a: Lock, b: Lock) => { + if (a.updatedTime === b.updatedTime) { + return a.clientId < b.clientId ? -1 : +1; + } + return a.updatedTime < b.updatedTime ? -1 : +1; + }); + + if (!activeLocks.length) return null; + const lock = activeLocks[0]; + + if (clientType && clientType !== lock.clientType) return null; + if (clientId && clientId !== lock.clientId) return null; + return lock; + } else if (lockType === LockType.Sync) { + for (const lock of locks) { + if (lock.type !== lockType) continue; + if (clientType && lock.clientType !== clientType) continue; + if (clientId && lock.clientId !== clientId) continue; + if (lockIsActive(lock, currentDate, lockTtl)) return lock; + } + return null; + } + + throw new Error(`Unsupported lock type: ${lockType}`); +} + + export interface AcquireLockOptions { // In theory, a client that tries to acquire an exclusive lock shouldn't // also have a sync lock. It can however happen when the app is closed @@ -55,14 +123,16 @@ export interface LockHandlerOptions { lockTtl?: number; } +export const defaultLockTtl = 1000 * 60 * 3; + export default class LockHandler { - private api_: any = null; + private api_: FileApi = null; private refreshTimers_: RefreshTimers = {}; private autoRefreshInterval_: number = 1000 * 60; - private lockTtl_: number = 1000 * 60 * 3; + private lockTtl_: number = defaultLockTtl; - constructor(api: any, options: LockHandlerOptions = null) { + public constructor(api: FileApi, options: LockHandlerOptions = null) { if (!options) options = {}; this.api_ = api; @@ -80,6 +150,10 @@ export default class LockHandler { this.lockTtl_ = v; } + public get useBuiltInLocks() { + return this.api_.supportsLocks; + } + private lockFilename(lock: Lock) { return `${[lock.type, lock.clientType, lock.clientId].join('_')}.json`; } @@ -87,8 +161,8 @@ export default class LockHandler { private lockTypeFromFilename(name: string): LockType { const ext = fileExtension(name); if (ext !== 'json') return LockType.None; - if (name.indexOf(LockType.Sync) === 0) return LockType.Sync; - if (name.indexOf(LockType.Exclusive) === 0) return LockType.Exclusive; + if (name.indexOf(LockType.Sync.toString()) === 0) return LockType.Sync; + if (name.indexOf(LockType.Exclusive.toString()) === 0) return LockType.Exclusive; return LockType.None; } @@ -97,17 +171,15 @@ export default class LockHandler { } private lockFileToObject(file: any): Lock { - const p = filename(file.path).split('_'); - - return { - type: p[0], - clientType: p[1], - clientId: p[2], - updatedTime: file.updated_time, - }; + return lockNameToObject(filename(file.path), file.updated_time); } async locks(lockType: LockType = null): Promise { + if (this.useBuiltInLocks) { + const locks = (await this.api_.listLocks()).items; + return locks; + } + const result = await this.api_.list(Dirnames.Locks); if (result.hasMore) throw new Error('hasMore not handled'); // Shouldn't happen anyway @@ -123,51 +195,6 @@ export default class LockHandler { return output; } - private lockIsActive(lock: Lock, currentDate: Date): boolean { - return currentDate.getTime() - lock.updatedTime < this.lockTtl; - } - - async hasActiveLock(lockType: LockType, clientType: string = null, clientId: string = null) { - const lock = await this.activeLock(lockType, clientType, clientId); - return !!lock; - } - - // Finds if there's an active lock for this clientType and clientId and returns it. - // If clientType and clientId are not specified, returns the first active lock - // of that type instead. - async activeLock(lockType: LockType, clientType: string = null, clientId: string = null) { - const locks = await this.locks(lockType); - const currentDate = await this.api_.remoteDate(); - - if (lockType === LockType.Exclusive) { - const activeLocks = locks - .slice() - .filter((lock: Lock) => this.lockIsActive(lock, currentDate)) - .sort((a: Lock, b: Lock) => { - if (a.updatedTime === b.updatedTime) { - return a.clientId < b.clientId ? -1 : +1; - } - return a.updatedTime < b.updatedTime ? -1 : +1; - }); - - if (!activeLocks.length) return null; - const activeLock = activeLocks[0]; - - if (clientType && clientType !== activeLock.clientType) return null; - if (clientId && clientId !== activeLock.clientId) return null; - return activeLock; - } else if (lockType === LockType.Sync) { - for (const lock of locks) { - if (clientType && lock.clientType !== clientType) continue; - if (clientId && lock.clientId !== clientId) continue; - if (this.lockIsActive(lock, currentDate)) return lock; - } - return null; - } - - throw new Error(`Unsupported lock type: ${lockType}`); - } - private async saveLock(lock: Lock) { await this.api_.put(this.lockFilePath(lock), JSON.stringify(lock)); } @@ -177,13 +204,18 @@ export default class LockHandler { return this.saveLock(lock); } - private async acquireSyncLock(clientType: string, clientId: string): Promise { + private async acquireSyncLock(clientType: LockClientType, clientId: string): Promise { + if (this.useBuiltInLocks) return this.api_.acquireLock(LockType.Sync, clientType, clientId); + try { let isFirstPass = true; while (true) { + const locks = await this.locks(); + const currentDate = await this.currentDate(); + const [exclusiveLock, syncLock] = await Promise.all([ - this.activeLock(LockType.Exclusive), - this.activeLock(LockType.Sync, clientType, clientId), + activeLock(locks, currentDate, this.lockTtl, LockType.Exclusive), + activeLock(locks, currentDate, this.lockTtl, LockType.Sync, clientType, clientId), ]); if (exclusiveLock) { @@ -221,7 +253,9 @@ export default class LockHandler { return `(${lock.clientType} #${lock.clientId})`; } - private async acquireExclusiveLock(clientType: string, clientId: string, options: AcquireLockOptions = null): Promise { + private async acquireExclusiveLock(clientType: LockClientType, clientId: string, options: AcquireLockOptions = null): Promise { + if (this.useBuiltInLocks) return this.api_.acquireLock(LockType.Exclusive, clientType, clientId); + // The logic to acquire an exclusive lock, while avoiding race conditions is as follow: // // - Check if there is a lock file present @@ -252,9 +286,12 @@ export default class LockHandler { try { while (true) { + const locks = await this.locks(); + const currentDate = await this.currentDate(); + const [activeSyncLock, activeExclusiveLock] = await Promise.all([ - this.activeLock(LockType.Sync), - this.activeLock(LockType.Exclusive), + activeLock(locks, currentDate, this.lockTtl, LockType.Sync), + activeLock(locks, currentDate, this.lockTtl, LockType.Exclusive), ]); if (activeSyncLock) { @@ -299,7 +336,11 @@ export default class LockHandler { return [lock.type, lock.clientType, lock.clientId].join('_'); } - startAutoLockRefresh(lock: Lock, errorHandler: Function): string { + public async currentDate() { + return this.api_.remoteDate(); + } + + public startAutoLockRefresh(lock: Lock, errorHandler: Function): string { const handle = this.autoLockRefreshHandle(lock); if (this.refreshTimers_[handle]) { throw new Error(`There is already a timer refreshing this lock: ${handle}`); @@ -321,10 +362,11 @@ export default class LockHandler { this.refreshTimers_[handle].inProgress = true; let error = null; - const hasActiveLock = await this.hasActiveLock(lock.type, lock.clientType, lock.clientId); if (!this.refreshTimers_[handle]) return defer(); // Timeout has been cleared - if (!hasActiveLock) { + const locks = await this.locks(lock.type); + + if (!hasActiveLock(locks, await this.currentDate(), this.lockTtl, lock.type, lock.clientType, lock.clientId)) { // If the previous lock has expired, we shouldn't try to acquire a new one. This is because other clients might have performed // in the meantime operations that invalidates the current operation. For example, another client might have upgraded the // sync target in the meantime, so any active operation should be cancelled here. Or if the current client was upgraded @@ -368,7 +410,7 @@ export default class LockHandler { delete this.refreshTimers_[handle]; } - public async acquireLock(lockType: LockType, clientType: string, clientId: string, options: AcquireLockOptions = null): Promise { + public async acquireLock(lockType: LockType, clientType: LockClientType, clientId: string, options: AcquireLockOptions = null): Promise { options = { ...defaultAcquireLockOptions(), ...options, @@ -383,7 +425,12 @@ export default class LockHandler { } } - public async releaseLock(lockType: LockType, clientType: string, clientId: string) { + public async releaseLock(lockType: LockType, clientType: LockClientType, clientId: string) { + if (this.useBuiltInLocks) { + await this.api_.releaseLock(lockType, clientType, clientId); + return; + } + await this.api_.delete(this.lockFilePath({ type: lockType, clientType: clientType, diff --git a/packages/lib/services/synchronizer/MigrationHandler.ts b/packages/lib/services/synchronizer/MigrationHandler.ts index ba12882f2..acd6e6ef9 100644 --- a/packages/lib/services/synchronizer/MigrationHandler.ts +++ b/packages/lib/services/synchronizer/MigrationHandler.ts @@ -1,4 +1,4 @@ -import LockHandler, { LockType } from './LockHandler'; +import LockHandler, { LockClientType, LockType } from './LockHandler'; import { Dirnames } from './utils/types'; import BaseService from '../BaseService'; import migration1 from './migrations/1'; @@ -33,11 +33,11 @@ export default class MigrationHandler extends BaseService { private api_: FileApi = null; private lockHandler_: LockHandler = null; - private clientType_: string; + private clientType_: LockClientType; private clientId_: string; private db_: JoplinDatabase; - public constructor(api: FileApi, db: JoplinDatabase, lockHandler: LockHandler, clientType: string, clientId: string) { + public constructor(api: FileApi, db: JoplinDatabase, lockHandler: LockHandler, clientType: LockClientType, clientId: string) { super(); this.api_ = api; this.db_ = db; diff --git a/packages/lib/services/synchronizer/synchronizer_LockHandler.test.ts b/packages/lib/services/synchronizer/synchronizer_LockHandler.test.ts index 8b5f2eaac..9f2800607 100644 --- a/packages/lib/services/synchronizer/synchronizer_LockHandler.test.ts +++ b/packages/lib/services/synchronizer/synchronizer_LockHandler.test.ts @@ -1,4 +1,4 @@ -import LockHandler, { LockType, LockHandlerOptions, Lock } from '../../services/synchronizer/LockHandler'; +import LockHandler, { LockType, LockHandlerOptions, Lock, activeLock, LockClientType } from '../../services/synchronizer/LockHandler'; import { isNetworkSyncTarget, fileApi, setupDatabaseAndSynchronizer, synchronizer, switchClient, msleep, expectThrow, expectNotThrow } from '../../testing/test-utils'; // For tests with memory of file system we can use low intervals to make the tests faster. @@ -34,38 +34,46 @@ describe('synchronizer_LockHandler', function() { }); it('should acquire and release a sync lock', (async () => { - await lockHandler().acquireLock(LockType.Sync, 'mobile', '123456'); + await lockHandler().acquireLock(LockType.Sync, LockClientType.Mobile, '123456'); const locks = await lockHandler().locks(LockType.Sync); expect(locks.length).toBe(1); expect(locks[0].type).toBe(LockType.Sync); expect(locks[0].clientId).toBe('123456'); - expect(locks[0].clientType).toBe('mobile'); + expect(locks[0].clientType).toBe(LockClientType.Mobile); - await lockHandler().releaseLock(LockType.Sync, 'mobile', '123456'); + await lockHandler().releaseLock(LockType.Sync, LockClientType.Mobile, '123456'); expect((await lockHandler().locks(LockType.Sync)).length).toBe(0); })); it('should not use files that are not locks', (async () => { + if (lockHandler().useBuiltInLocks) return; // Doesn't make sense with built-in locks + await fileApi().put('locks/desktop.ini', 'a'); await fileApi().put('locks/exclusive.json', 'a'); await fileApi().put('locks/garbage.json', 'a'); - await fileApi().put('locks/sync_mobile_72c4d1b7253a4475bfb2f977117d26ed.json', 'a'); + await fileApi().put('locks/1_2_72c4d1b7253a4475bfb2f977117d26ed.json', 'a'); + + // Check that it doesn't cause an error if it fetches an old style lock + await fileApi().put('locks/sync_desktop_82c4d1b7253a4475bfb2f977117d26ed.json', 'a'); const locks = await lockHandler().locks(LockType.Sync); expect(locks.length).toBe(1); + expect(locks[0].type).toBe(LockType.Sync); + expect(locks[0].clientType).toBe(LockClientType.Mobile); + expect(locks[0].clientId).toBe('72c4d1b7253a4475bfb2f977117d26ed'); })); it('should allow multiple sync locks', (async () => { - await lockHandler().acquireLock(LockType.Sync, 'mobile', '111'); + await lockHandler().acquireLock(LockType.Sync, LockClientType.Mobile, '111'); await switchClient(2); - await lockHandler().acquireLock(LockType.Sync, 'mobile', '222'); + await lockHandler().acquireLock(LockType.Sync, LockClientType.Mobile, '222'); expect((await lockHandler().locks(LockType.Sync)).length).toBe(2); { - await lockHandler().releaseLock(LockType.Sync, 'mobile', '222'); + await lockHandler().releaseLock(LockType.Sync, LockClientType.Mobile, '222'); const locks = await lockHandler().locks(LockType.Sync); expect(locks.length).toBe(1); expect(locks[0].clientId).toBe('111'); @@ -74,11 +82,11 @@ describe('synchronizer_LockHandler', function() { it('should auto-refresh a lock', (async () => { const handler = newLockHandler({ autoRefreshInterval: 100 * timeoutMultipler }); - const lock = await handler.acquireLock(LockType.Sync, 'desktop', '111'); - const lockBefore = await handler.activeLock(LockType.Sync, 'desktop', '111'); + const lock = await handler.acquireLock(LockType.Sync, LockClientType.Desktop, '111'); + const lockBefore = activeLock(await handler.locks(), new Date(), handler.lockTtl, LockType.Sync, LockClientType.Desktop, '111'); handler.startAutoLockRefresh(lock, () => {}); await msleep(500 * timeoutMultipler); - const lockAfter = await handler.activeLock(LockType.Sync, 'desktop', '111'); + const lockAfter = activeLock(await handler.locks(), new Date(), handler.lockTtl, LockType.Sync, LockClientType.Desktop, '111'); expect(lockAfter.updatedTime).toBeGreaterThan(lockBefore.updatedTime); handler.stopAutoLockRefresh(lock); })); @@ -89,48 +97,53 @@ describe('synchronizer_LockHandler', function() { autoRefreshInterval: 200 * timeoutMultipler, }); - const lock = await handler.acquireLock(LockType.Sync, 'desktop', '111'); + const lock = await handler.acquireLock(LockType.Sync, LockClientType.Desktop, '111'); let autoLockError: any = null; handler.startAutoLockRefresh(lock, (error: any) => { autoLockError = error; }); - await msleep(250 * timeoutMultipler); + try { + await msleep(250 * timeoutMultipler); - expect(autoLockError.code).toBe('lockExpired'); - - handler.stopAutoLockRefresh(lock); + expect(autoLockError).toBeTruthy(); + expect(autoLockError.code).toBe('lockExpired'); + } finally { + handler.stopAutoLockRefresh(lock); + } })); it('should not allow sync locks if there is an exclusive lock', (async () => { - await lockHandler().acquireLock(LockType.Exclusive, 'desktop', '111'); + await lockHandler().acquireLock(LockType.Exclusive, LockClientType.Desktop, '111'); await expectThrow(async () => { - await lockHandler().acquireLock(LockType.Sync, 'mobile', '222'); + await lockHandler().acquireLock(LockType.Sync, LockClientType.Mobile, '222'); }, 'hasExclusiveLock'); })); it('should not allow exclusive lock if there are sync locks', (async () => { const lockHandler = newLockHandler({ lockTtl: 1000 * 60 * 60 }); + if (lockHandler.useBuiltInLocks) return; // Tested server side - await lockHandler.acquireLock(LockType.Sync, 'mobile', '111'); - await lockHandler.acquireLock(LockType.Sync, 'mobile', '222'); + await lockHandler.acquireLock(LockType.Sync, LockClientType.Mobile, '111'); + await lockHandler.acquireLock(LockType.Sync, LockClientType.Mobile, '222'); await expectThrow(async () => { - await lockHandler.acquireLock(LockType.Exclusive, 'desktop', '333'); + await lockHandler.acquireLock(LockType.Exclusive, LockClientType.Desktop, '333'); }, 'hasSyncLock'); })); it('should allow exclusive lock if the sync locks have expired', (async () => { const lockHandler = newLockHandler({ lockTtl: 500 * timeoutMultipler }); + if (lockHandler.useBuiltInLocks) return; // Tested server side - await lockHandler.acquireLock(LockType.Sync, 'mobile', '111'); - await lockHandler.acquireLock(LockType.Sync, 'mobile', '222'); + await lockHandler.acquireLock(LockType.Sync, LockClientType.Mobile, '111'); + await lockHandler.acquireLock(LockType.Sync, LockClientType.Mobile, '222'); await msleep(600 * timeoutMultipler); await expectNotThrow(async () => { - await lockHandler.acquireLock(LockType.Exclusive, 'desktop', '333'); + await lockHandler.acquireLock(LockType.Exclusive, LockClientType.Desktop, '333'); }); })); @@ -138,74 +151,44 @@ describe('synchronizer_LockHandler', function() { const lockHandler = newLockHandler(); { - const lock1: Lock = { type: LockType.Exclusive, clientId: '1', clientType: 'd' }; - const lock2: Lock = { type: LockType.Exclusive, clientId: '2', clientType: 'd' }; - await lockHandler.saveLock_(lock1); - await msleep(100); - await lockHandler.saveLock_(lock2); + const locks: Lock[] = [ + { + type: LockType.Exclusive, + clientId: '1', + clientType: LockClientType.Desktop, + updatedTime: Date.now(), + }, + ]; - const activeLock = await lockHandler.activeLock(LockType.Exclusive); - expect(activeLock.clientId).toBe('1'); + await msleep(100); + + locks.push({ + type: LockType.Exclusive, + clientId: '2', + clientType: LockClientType.Desktop, + updatedTime: Date.now(), + }); + + const lock = activeLock(locks, new Date(), lockHandler.lockTtl, LockType.Exclusive); + expect(lock.clientId).toBe('1'); } })); - it('should ignore locks by same client when trying to acquire exclusive lock', (async () => { - const lockHandler = newLockHandler(); - - await lockHandler.acquireLock(LockType.Sync, 'desktop', '111'); - - await expectThrow(async () => { - await lockHandler.acquireLock(LockType.Exclusive, 'desktop', '111', { clearExistingSyncLocksFromTheSameClient: false }); - }, 'hasSyncLock'); - - await expectNotThrow(async () => { - await lockHandler.acquireLock(LockType.Exclusive, 'desktop', '111', { clearExistingSyncLocksFromTheSameClient: true }); - }); - - const activeLock = await lockHandler.activeLock(LockType.Exclusive); - expect(activeLock.clientId).toBe('111'); - })); - - // it('should not have race conditions', (async () => { + // it('should ignore locks by same client when trying to acquire exclusive lock', (async () => { // const lockHandler = newLockHandler(); - // const clients = []; - // for (let i = 0; i < 20; i++) { - // clients.push({ - // id: 'client' + i, - // type: 'desktop', - // }); - // } + // await lockHandler.acquireLock(LockType.Sync, LockClientType.Desktop, '111'); - // for (let loopIndex = 0; loopIndex < 1000; loopIndex++) { - // const promises:Promise[] = []; - // for (let clientIndex = 0; clientIndex < clients.length; clientIndex++) { - // const client = clients[clientIndex]; + // await expectThrow(async () => { + // await lockHandler.acquireLock(LockType.Exclusive, LockClientType.Desktop, '111', { clearExistingSyncLocksFromTheSameClient: false }); + // }, 'hasSyncLock'); - // promises.push( - // lockHandler.acquireLock(LockType.Exclusive, client.type, client.id).catch(() => {}) - // ); + // await expectNotThrow(async () => { + // await lockHandler.acquireLock(LockType.Exclusive, LockClientType.Desktop, '111', { clearExistingSyncLocksFromTheSameClient: true }); + // }); - // // if (gotLock) { - // // await msleep(100); - // // const locks = await lockHandler.locks(LockType.Exclusive); - // // console.info('======================================='); - // // console.info(locks); - // // lockHandler.releaseLock(LockType.Exclusive, client.type, client.id); - // // } - - // // await msleep(500); - // } - - // const result = await Promise.all(promises); - // const locks = result.filter((lock:any) => !!lock); - - // expect(locks.length).toBe(1); - // const lock:Lock = locks[0] as Lock; - // const allLocks = await lockHandler.locks(); - // console.info('================================', allLocks); - // lockHandler.releaseLock(LockType.Exclusive, lock.clientType, lock.clientId); - // } + // const lock = activeLock(await lockHandler.locks(), new Date(), lockHandler.lockTtl, LockType.Exclusive); + // expect(lock.clientId).toBe('111'); // })); }); diff --git a/packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.ts b/packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.ts index c729c4295..1cdec94e2 100644 --- a/packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.ts +++ b/packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.ts @@ -5,7 +5,7 @@ // // These tests work by a taking a sync target snapshot at a version n and upgrading it to n+1. -import LockHandler from './LockHandler'; +import LockHandler, { LockClientType } from './LockHandler'; import MigrationHandler from './MigrationHandler'; import { Dirnames } from './utils/types'; import { setSyncTargetName, fileApi, synchronizer, decryptionWorker, encryptionService, setupDatabaseAndSynchronizer, switchClient, expectThrow, expectNotThrow, db } from '../../testing/test-utils'; @@ -28,7 +28,7 @@ function lockHandler(): LockHandler { function migrationHandler(clientId: string = 'abcd'): MigrationHandler { if (migrationHandler_) return migrationHandler_; - migrationHandler_ = new MigrationHandler(fileApi(), db(), lockHandler(), 'desktop', clientId); + migrationHandler_ = new MigrationHandler(fileApi(), db(), lockHandler(), LockClientType.Desktop, clientId); return migrationHandler_; } diff --git a/packages/lib/testing/test-utils-synchronizer.ts b/packages/lib/testing/test-utils-synchronizer.ts index 2f5874194..4c1ef2908 100644 --- a/packages/lib/testing/test-utils-synchronizer.ts +++ b/packages/lib/testing/test-utils-synchronizer.ts @@ -12,7 +12,7 @@ export async function allNotesFolders() { async function remoteItemsByTypes(types: number[]) { const list = await fileApi().list('', { includeDirs: false, syncItemsOnly: true }); - if (list.has_more) throw new Error('Not implemented!!!'); + if (list.hasMore) throw new Error('Not implemented!!!'); const files = list.items; const output = []; diff --git a/packages/server/package.json b/packages/server/package.json index 73ef5e3fe..11ad0472b 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -11,7 +11,7 @@ "devDropTables": "node dist/app.js --env dev --drop-tables", "devDropDb": "node dist/app.js --env dev --drop-db", "start": "node dist/app.js", - "generateTypes": "rm -f db-buildTypes.sqlite && npm run start -- --migrate-latest --env buildTypes && node dist/tools/generateTypes.js && mv db-buildTypes.sqlite schema.sqlite", + "generateTypes": "rm -f db-buildTypes.sqlite && npm run start -- --env buildTypes migrate latest && node dist/tools/generateTypes.js && mv db-buildTypes.sqlite schema.sqlite", "tsc": "tsc --project tsconfig.json", "test": "jest --verbose=false", "test-ci": "npm run test", diff --git a/packages/server/src/db.ts b/packages/server/src/db.ts index 031997b94..5cea91648 100644 --- a/packages/server/src/db.ts +++ b/packages/server/src/db.ts @@ -116,6 +116,10 @@ export const clientType = (db: DbConnection): DatabaseConfigClient => { return db.client.config.client; }; +export const returningSupported = (db: DbConnection) => { + return clientType(db) === DatabaseConfigClient.PostgreSQL; +}; + export const isPostgres = (db: DbConnection) => { return clientType(db) === DatabaseConfigClient.PostgreSQL; }; diff --git a/packages/server/src/models/KeyValueModel.ts b/packages/server/src/models/KeyValueModel.ts index bace48ea9..92472b059 100644 --- a/packages/server/src/models/KeyValueModel.ts +++ b/packages/server/src/models/KeyValueModel.ts @@ -1,4 +1,6 @@ +import { returningSupported } from '../db'; import { KeyValue } from '../services/database/types'; +import { msleep } from '../utils/time'; import BaseModel from './BaseModel'; export enum ValueType { @@ -6,7 +8,9 @@ export enum ValueType { String = 2, } -type Value = number | string; +export type Value = number | string; + +export type ReadThenWriteHandler = (value: Value)=> Promise; export default class KeyValueModel extends BaseModel { @@ -57,6 +61,45 @@ export default class KeyValueModel extends BaseModel { return this.unserializeValue(row.type, row.value) as any; } + public async readThenWrite(key: string, handler: ReadThenWriteHandler) { + if (!returningSupported(this.db)) { + // While inside a transaction SQlite should lock the whole database + // file, which should allow atomic read then write. + await this.withTransaction(async () => { + const value: any = await this.value(key); + const newValue = await handler(value); + await this.setValue(key, newValue); + }, 'KeyValueModel::readThenWrite'); + return; + } + + let loopCount = 0; + while (true) { + const row: KeyValue = await this.db(this.tableName).where('key', '=', key).first(); + const newValue = await handler(row ? row.value : null); + + let previousValue: Value = null; + if (row) { + previousValue = row.value; + } else { + await this.setValue(key, newValue); + previousValue = newValue; + } + + const updatedRows = await this + .db(this.tableName) + .update({ value: newValue }, ['id']) + .where('key', '=', key) + .where('value', '=', previousValue); + + if (updatedRows.length) return; + + loopCount++; + if (loopCount >= 10) throw new Error(`Could not update key: ${key}`); + await msleep(10000 * Math.random()); + } + } + public async deleteValue(key: string): Promise { await this.db(this.tableName).where('key', '=', key).delete(); } @@ -65,4 +108,8 @@ export default class KeyValueModel extends BaseModel { throw new Error('Call ::deleteValue()'); } + public async deleteAll(): Promise { + await this.db(this.tableName).delete(); + } + } diff --git a/packages/server/src/models/LockModel.test.ts b/packages/server/src/models/LockModel.test.ts new file mode 100644 index 000000000..20f9daa23 --- /dev/null +++ b/packages/server/src/models/LockModel.test.ts @@ -0,0 +1,94 @@ +// Note that a lot of the testing logic is done from +// synchronizer_LockHandler.test so to fully test that it works, Joplin Server +// should be setup as a sync target for the test units. + +import { ErrorConflict, ErrorUnprocessableEntity } from '../utils/errors'; +import { beforeAllDb, afterAllTests, beforeEachDb, models, createUserAndSession, expectHttpError } from '../utils/testing/testUtils'; +import { LockType, LockClientType, defaultLockTtl } from '@joplin/lib/services/synchronizer/LockHandler'; + +describe('LockModel', function() { + + beforeAll(async () => { + await beforeAllDb('LockModel'); + }); + + afterAll(async () => { + await afterAllTests(); + }); + + beforeEach(async () => { + await beforeEachDb(); + }); + + test('should allow exclusive lock if the sync locks have expired', async function() { + jest.useFakeTimers('modern'); + + const { user } = await createUserAndSession(1); + + const t1 = new Date('2020-01-01').getTime(); + jest.setSystemTime(t1); + + await models().lock().acquireLock(user.id, LockType.Sync, LockClientType.Desktop, '1111'); + await models().lock().acquireLock(user.id, LockType.Sync, LockClientType.Desktop, '2222'); + + // First confirm that it's not possible to acquire an exclusive lock if + // there are sync locks. + await expectHttpError(async () => models().lock().acquireLock(user.id, LockType.Exclusive, LockClientType.Desktop, '3333'), ErrorConflict.httpCode); + + jest.setSystemTime(t1 + defaultLockTtl + 1); + + // Now that the sync locks have expired check that it's possible to + // acquire a sync lock. + const exclusiveLock = await models().lock().acquireLock(user.id, LockType.Exclusive, LockClientType.Desktop, '3333'); + expect(exclusiveLock).toBeTruthy(); + + jest.useRealTimers(); + }); + + test('should keep user locks separated', async function() { + const { user: user1 } = await createUserAndSession(1); + const { user: user2 } = await createUserAndSession(2); + + await models().lock().acquireLock(user1.id, LockType.Sync, LockClientType.Desktop, '1111'); + + // If user 1 tries to get an exclusive lock, it should fail + await expectHttpError(async () => models().lock().acquireLock(user1.id, LockType.Exclusive, LockClientType.Desktop, '3333'), ErrorConflict.httpCode); + + // But it should work for user 2 + const exclusiveLock = await models().lock().acquireLock(user2.id, LockType.Exclusive, LockClientType.Desktop, '3333'); + expect(exclusiveLock).toBeTruthy(); + }); + + test('should validate locks', async function() { + const { user: user1 } = await createUserAndSession(1); + + await expectHttpError(async () => models().lock().acquireLock(user1.id, 'wrongtype' as any, LockClientType.Desktop, '1111'), ErrorUnprocessableEntity.httpCode); + await expectHttpError(async () => models().lock().acquireLock(user1.id, LockType.Exclusive, 'wrongclienttype' as any, '1111'), ErrorUnprocessableEntity.httpCode); + await expectHttpError(async () => models().lock().acquireLock(user1.id, LockType.Exclusive, LockClientType.Desktop, 'veryverylongclientidveryverylongclientidveryverylongclientidveryverylongclientid'), ErrorUnprocessableEntity.httpCode); + }); + + test('should expire locks', async function() { + const { user } = await createUserAndSession(1); + + jest.useFakeTimers('modern'); + + const t1 = new Date('2020-01-01').getTime(); + jest.setSystemTime(t1); + + await models().lock().acquireLock(user.id, LockType.Sync, LockClientType.Desktop, '1111'); + const lock1 = (await models().lock().allLocks(user.id))[0]; + + jest.setSystemTime(t1 + models().lock().lockTtl + 1); + + // If we call this again, at the same time it should expire old timers. + await models().lock().acquireLock(user.id, LockType.Sync, LockClientType.Desktop, '2222'); + + expect((await models().lock().allLocks(user.id)).length).toBe(1); + const lock2 = (await models().lock().allLocks(user.id))[0]; + + expect(lock1.id).not.toBe(lock2.id); + + jest.useRealTimers(); + }); + +}); diff --git a/packages/server/src/models/LockModel.ts b/packages/server/src/models/LockModel.ts new file mode 100644 index 000000000..00e0528b6 --- /dev/null +++ b/packages/server/src/models/LockModel.ts @@ -0,0 +1,174 @@ +import BaseModel, { UuidType } from './BaseModel'; +import { Uuid } from '../services/database/types'; +import { LockType, Lock, LockClientType, defaultLockTtl, activeLock } from '@joplin/lib/services/synchronizer/LockHandler'; +import { Value } from './KeyValueModel'; +import { ErrorConflict, ErrorUnprocessableEntity } from '../utils/errors'; +import uuidgen from '../utils/uuidgen'; + +export default class LockModel extends BaseModel { + + private lockTtl_: number = defaultLockTtl; + + protected get tableName(): string { + return 'locks'; + } + + protected uuidType(): UuidType { + return UuidType.Native; + } + + public get lockTtl() { + return this.lockTtl_; + } + + public async allLocks(userId: Uuid): Promise { + const userKey = `locks::${userId}`; + const v = await this.models().keyValue().value(userKey); + return v ? JSON.parse(v) : []; + } + + protected async validate(lock: Lock): Promise { + if (![LockType.Sync, LockType.Exclusive].includes(lock.type)) throw new ErrorUnprocessableEntity(`Invalid lock type: ${lock.type}`); + if (![LockClientType.Desktop, LockClientType.Mobile, LockClientType.Cli].includes(lock.clientType)) throw new ErrorUnprocessableEntity(`Invalid lock client type: ${lock.clientType}`); + if (lock.clientId.length > 64) throw new ErrorUnprocessableEntity(`Invalid client ID length: ${lock.clientId}`); + return lock; + } + + private expireLocks(locks: Lock[]): Lock[] { + const cutOffTime = Date.now() - this.lockTtl; + + const output: Lock[] = []; + + for (const lock of locks) { + if (lock.updatedTime > cutOffTime) { + output.push(lock); + } + } + + return output; + } + + private async acquireSyncLock(userId: Uuid, clientType: LockClientType, clientId: string): Promise { + const userKey = `locks::${userId}`; + let output: Lock = null; + + await this.models().keyValue().readThenWrite(userKey, async (value: Value) => { + let locks: Lock[] = value ? JSON.parse(value as string) : []; + locks = this.expireLocks(locks); + + const exclusiveLock = activeLock(locks, new Date(), this.lockTtl, LockType.Exclusive); + + if (exclusiveLock) { + throw new ErrorConflict(`Cannot acquire lock because there is already an exclusive lock for client: ${exclusiveLock.clientType} #${exclusiveLock.clientId}`, 'hasExclusiveLock'); + } + + const syncLock = activeLock(locks, new Date(), this.lockTtl, LockType.Sync, clientType, clientId); + + if (syncLock) { + output = { + ...syncLock, + updatedTime: Date.now(), + }; + + locks = locks.map(l => l.id === syncLock.id ? output : l); + } else { + output = { + id: uuidgen(), + type: LockType.Sync, + clientId, + clientType, + updatedTime: Date.now(), + }; + + locks.push(output); + } + + return JSON.stringify(locks); + }); + + return output; + } + + private async acquireExclusiveLock(userId: Uuid, clientType: LockClientType, clientId: string): Promise { + const userKey = `locks::${userId}`; + let output: Lock = null; + + await this.models().keyValue().readThenWrite(userKey, async (value: Value) => { + let locks: Lock[] = value ? JSON.parse(value as string) : []; + locks = this.expireLocks(locks); + + const exclusiveLock = activeLock(locks, new Date(), this.lockTtl, LockType.Exclusive); + + if (exclusiveLock) { + if (exclusiveLock.clientId === clientId) { + locks = locks.filter(l => l.id !== exclusiveLock.id); + output = { + ...exclusiveLock, + updatedTime: Date.now(), + }; + + locks.push(output); + + return JSON.stringify(locks); + } else { + throw new ErrorConflict(`Cannot acquire lock because there is already an exclusive lock for client: ${exclusiveLock.clientType} #${exclusiveLock.clientId}`, 'hasExclusiveLock'); + } + } + + const syncLock = activeLock(locks, new Date(), this.lockTtl, LockType.Sync); + + if (syncLock) { + if (syncLock.clientId === clientId) { + locks = locks.filter(l => l.id !== syncLock.id); + } else { + throw new ErrorConflict(`Cannot acquire exclusive lock because there is an active sync lock for client: ${syncLock.clientType} #${syncLock.clientId}`, 'hasSyncLock'); + } + } + + output = { + id: uuidgen(), + type: LockType.Exclusive, + clientId, + clientType, + updatedTime: Date.now(), + }; + + locks.push(output); + + return JSON.stringify(locks); + }); + + return output; + } + + public async acquireLock(userId: Uuid, type: LockType, clientType: LockClientType, clientId: string): Promise { + await this.validate({ type, clientType, clientId }); + + if (type === LockType.Sync) { + return this.acquireSyncLock(userId, clientType, clientId); + } else { + return this.acquireExclusiveLock(userId, clientType, clientId); + } + } + + public async releaseLock(userId: Uuid, type: LockType, clientType: LockClientType, clientId: string) { + await this.validate({ type, clientType, clientId }); + + const userKey = `locks::${userId}`; + + await this.models().keyValue().readThenWrite(userKey, async (value: Value) => { + let locks: Lock[] = value ? JSON.parse(value as string) : []; + locks = this.expireLocks(locks); + + for (let i = locks.length - 1; i >= 0; i--) { + const lock = locks[i]; + if (lock.type === type && lock.clientType === clientType && lock.clientId === clientId) { + locks.splice(i, 1); + } + } + + return JSON.stringify(locks); + }); + } + +} diff --git a/packages/server/src/models/factory.ts b/packages/server/src/models/factory.ts index d763c4e1e..7d0acded7 100644 --- a/packages/server/src/models/factory.ts +++ b/packages/server/src/models/factory.ts @@ -72,6 +72,7 @@ import SubscriptionModel from './SubscriptionModel'; import UserFlagModel from './UserFlagModel'; import EventModel from './EventModel'; import { Config } from '../utils/types'; +import LockModel from './LockModel'; export class Models { @@ -147,6 +148,10 @@ export class Models { return new EventModel(this.db_, newModelFactory, this.config_); } + public lock() { + return new LockModel(this.db_, newModelFactory, this.config_); + } + } export default function newModelFactory(db: DbConnection, config: Config): Models { diff --git a/packages/server/src/routes/api/debug.ts b/packages/server/src/routes/api/debug.ts index 3b4282a24..1c2da24ef 100644 --- a/packages/server/src/routes/api/debug.ts +++ b/packages/server/src/routes/api/debug.ts @@ -2,9 +2,10 @@ import config from '../../config'; import { clearDatabase, createTestUsers, CreateTestUsersOptions } from '../../tools/debugTools'; import { bodyFields } from '../../utils/requestUtils'; import Router from '../../utils/Router'; -import { RouteType } from '../../utils/types'; +import { Env, RouteType } from '../../utils/types'; import { SubPath } from '../../utils/routeUtils'; import { AppContext } from '../../utils/types'; +import { ErrorForbidden } from '../../utils/errors'; const router = new Router(RouteType.Api); @@ -17,7 +18,10 @@ interface Query { } router.post('api/debug', async (_path: SubPath, ctx: AppContext) => { + if (config().env !== Env.Dev) throw new ErrorForbidden(); + const query: Query = (await bodyFields(ctx.req)) as Query; + const models = ctx.joplin.models; console.info(`Action: ${query.action}`); @@ -33,6 +37,10 @@ router.post('api/debug', async (_path: SubPath, ctx: AppContext) => { if (query.action === 'clearDatabase') { await clearDatabase(ctx.joplin.db); } + + if (query.action === 'clearKeyValues') { + await models.keyValue().deleteAll(); + } }); export default router; diff --git a/packages/server/src/routes/api/locks.ts b/packages/server/src/routes/api/locks.ts new file mode 100644 index 000000000..64ee4a838 --- /dev/null +++ b/packages/server/src/routes/api/locks.ts @@ -0,0 +1,32 @@ +import { LockType, LockClientType, lockNameToObject } from '@joplin/lib/services/synchronizer/LockHandler'; +import { bodyFields } from '../../utils/requestUtils'; +import Router from '../../utils/Router'; +import { SubPath } from '../../utils/routeUtils'; +import { AppContext, RouteType } from '../../utils/types'; + +const router = new Router(RouteType.Api); + +interface PostFields { + type: LockType; + clientType: LockClientType; + clientId: string; +} + +router.post('api/locks', async (_path: SubPath, ctx: AppContext) => { + const fields = await bodyFields(ctx.req); + return ctx.joplin.models.lock().acquireLock(ctx.joplin.owner.id, fields.type, fields.clientType, fields.clientId); +}); + +router.del('api/locks/:id', async (path: SubPath, ctx: AppContext) => { + const lock = lockNameToObject(path.id); + await ctx.joplin.models.lock().releaseLock(ctx.joplin.owner.id, lock.type, lock.clientType, lock.clientId); +}); + +router.get('api/locks', async (_path: SubPath, ctx: AppContext) => { + return { + items: await ctx.joplin.models.lock().allLocks(ctx.joplin.owner.id), + has_more: false, + }; +}); + +export default router; diff --git a/packages/server/src/routes/routes.ts b/packages/server/src/routes/routes.ts index 9246943a6..f6a6db1d1 100644 --- a/packages/server/src/routes/routes.ts +++ b/packages/server/src/routes/routes.ts @@ -10,6 +10,7 @@ import apiSessions from './api/sessions'; import apiShares from './api/shares'; import apiShareUsers from './api/share_users'; import apiUsers from './api/users'; +import apiLocks from './api/locks'; import indexChanges from './index/changes'; import indexHelp from './index/help'; @@ -41,6 +42,7 @@ const routes: Routers = { 'api/share_users': apiShareUsers, 'api/shares': apiShares, 'api/users': apiUsers, + 'api/locks': apiLocks, 'changes': indexChanges, 'home': indexHome, diff --git a/packages/server/src/utils/errors.ts b/packages/server/src/utils/errors.ts index 762801f72..7d7059f43 100644 --- a/packages/server/src/utils/errors.ts +++ b/packages/server/src/utils/errors.ts @@ -79,8 +79,8 @@ export class ErrorUnprocessableEntity extends ApiError { export class ErrorConflict extends ApiError { public static httpCode: number = 409; - public constructor(message: string = 'Conflict') { - super(message, ErrorConflict.httpCode); + public constructor(message: string = 'Conflict', code: string = undefined) { + super(message, ErrorConflict.httpCode, code); Object.setPrototypeOf(this, ErrorConflict.prototype); } }