You've already forked joplin
mirror of
https://github.com/laurent22/joplin.git
synced 2025-06-12 22:57:38 +02:00
All: Add support for sync target lock
The goal is to allow locking a sync target so that maintenance operations, such as upgrading the target to a more efficient format, can be done. For now, only the lock mechanism is in place, as a way to evaluate it, and to see if it can cause any issue.
This commit is contained in:
@ -12,10 +12,11 @@ const { time } = require('lib/time-utils.js');
|
||||
const { Logger } = require('lib/logger.js');
|
||||
const { _ } = require('lib/locale.js');
|
||||
const { shim } = require('lib/shim.js');
|
||||
const { filename, fileExtension } = require('lib/path-utils');
|
||||
// const { filename, fileExtension } = require('lib/path-utils');
|
||||
const JoplinError = require('lib/JoplinError');
|
||||
const BaseSyncTarget = require('lib/BaseSyncTarget');
|
||||
const TaskQueue = require('lib/TaskQueue');
|
||||
const LockHandler = require('lib/services/synchronizer/LockHandler').default;
|
||||
|
||||
class Synchronizer {
|
||||
constructor(db, api, appType) {
|
||||
@ -23,7 +24,7 @@ class Synchronizer {
|
||||
this.db_ = db;
|
||||
this.api_ = api;
|
||||
this.syncDirName_ = '.sync';
|
||||
this.lockDirName_ = '.lock';
|
||||
this.lockDirName_ = 'locks';
|
||||
this.resourceDirName_ = BaseSyncTarget.resourceDirName();
|
||||
this.logger_ = new Logger();
|
||||
this.appType_ = appType;
|
||||
@ -66,6 +67,12 @@ class Synchronizer {
|
||||
return this.logger_;
|
||||
}
|
||||
|
||||
lockHandler() {
|
||||
if (this.lockHandler_) return this.lockHandler_;
|
||||
this.lockHandler_ = new LockHandler(this.api(), this.lockDirName_);
|
||||
return this.lockHandler_;
|
||||
}
|
||||
|
||||
maxResourceSize() {
|
||||
if (this.maxResourceSize_ !== null) return this.maxResourceSize_;
|
||||
return this.appType_ === 'mobile' ? 10 * 1000 * 1000 : Infinity;
|
||||
@ -205,64 +212,17 @@ class Synchronizer {
|
||||
return state;
|
||||
}
|
||||
|
||||
async acquireLock_() {
|
||||
await this.checkLock_();
|
||||
await this.api().put(`${this.lockDirName_}/${this.clientId()}_${Date.now()}.lock`, `${Date.now()}`);
|
||||
}
|
||||
|
||||
async releaseLock_() {
|
||||
const lockFiles = await this.lockFiles_();
|
||||
for (const lockFile of lockFiles) {
|
||||
const p = this.parseLockFilePath(lockFile.path);
|
||||
if (p.clientId === this.clientId()) {
|
||||
await this.api().delete(p.fullPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async lockFiles_() {
|
||||
const output = await this.api().list(this.lockDirName_);
|
||||
return output.items.filter((p) => {
|
||||
const ext = fileExtension(p.path);
|
||||
return ext === 'lock';
|
||||
});
|
||||
}
|
||||
|
||||
parseLockFilePath(path) {
|
||||
const splitted = filename(path).split('_');
|
||||
const fullPath = `${this.lockDirName_}/${path}`;
|
||||
if (splitted.length !== 2) throw new Error(`Sync target appears to be locked but lock filename is invalid: ${fullPath}. Please delete it on the sync target to continue.`);
|
||||
return {
|
||||
clientId: splitted[0],
|
||||
timestamp: Number(splitted[1]),
|
||||
fullPath: fullPath,
|
||||
};
|
||||
}
|
||||
|
||||
async checkLock_() {
|
||||
const lockFiles = await this.lockFiles_();
|
||||
if (lockFiles.length) {
|
||||
const lock = this.parseLockFilePath(lockFiles[0].path);
|
||||
|
||||
if (lock.clientId === this.clientId()) {
|
||||
await this.releaseLock_();
|
||||
} else {
|
||||
throw new Error(`The sync target was locked by client ${lock.clientId} on ${time.unixMsToLocalDateTime(lock.timestamp)} and cannot be accessed. If no app is currently operating on the sync target, you can delete the files in the "${this.lockDirName_}" directory on the sync target to resume.`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async checkSyncTargetVersion_() {
|
||||
const supportedSyncTargetVersion = Setting.value('syncVersion');
|
||||
const syncTargetVersion = await this.api().get('.sync/version.txt');
|
||||
const syncTargetVersion = await this.apiCall('get', '.sync/version.txt');
|
||||
|
||||
if (!syncTargetVersion) {
|
||||
await this.api().put('.sync/version.txt', `${supportedSyncTargetVersion}`);
|
||||
await this.apiCall('put', '.sync/version.txt', `${supportedSyncTargetVersion}`);
|
||||
} else {
|
||||
if (Number(syncTargetVersion) > supportedSyncTargetVersion) {
|
||||
throw new Error(sprintf('Sync version of the target (%d) does not match sync version supported by client (%d). Please upgrade your client.', Number(syncTargetVersion), supportedSyncTargetVersion));
|
||||
} else {
|
||||
await this.api().put('.sync/version.txt', `${supportedSyncTargetVersion}`);
|
||||
await this.apiCall('put', '.sync/version.txt', `${supportedSyncTargetVersion}`);
|
||||
// TODO: do upgrade job
|
||||
}
|
||||
}
|
||||
@ -272,6 +232,44 @@ class Synchronizer {
|
||||
return steps.includes('update_remote') && steps.includes('delete_remote') && steps.includes('delta');
|
||||
}
|
||||
|
||||
async lockErrorStatus_() {
|
||||
const hasActiveExclusiveLock = await this.lockHandler().hasActiveExclusiveLock();
|
||||
if (hasActiveExclusiveLock) return 'hasExclusiveLock';
|
||||
|
||||
const hasActiveSyncLock = await this.lockHandler().hasActiveSyncLock();
|
||||
if (!hasActiveSyncLock) return 'syncLockGone';
|
||||
|
||||
return '';
|
||||
}
|
||||
|
||||
async apiCall(fnName, ...args) {
|
||||
if (this.syncTargetIsLocked_) throw new JoplinError('Sync target is locked - aborting API call', 'lockError');
|
||||
|
||||
try {
|
||||
const output = await this.api()[fnName](...args);
|
||||
return output;
|
||||
} catch (error) {
|
||||
const lockStatus = await this.lockErrorStatus_();
|
||||
// When there's an error due to a lock, we re-wrap the error and change the error code so that error handling
|
||||
// does not do special processing on the original error. For example, if a resource could not be downloaded,
|
||||
// don't mark it as a "cannotSyncItem" since we don't know that.
|
||||
if (lockStatus) {
|
||||
throw new JoplinError(`Sync target lock error: ${lockStatus}. Original error was: ${error.message}`, 'lockError');
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async refreshLock() {
|
||||
if (this.state_ !== 'in_progress') {
|
||||
this.logger().warn('Trying to refresh lock, but sync is not in progress');
|
||||
return;
|
||||
}
|
||||
|
||||
await this.lockHandler().acquireLock('sync', this.appType_, this.clientId_);
|
||||
}
|
||||
|
||||
// Synchronisation is done in three major steps:
|
||||
//
|
||||
// 1. UPLOAD: Send to the sync target the items that have changed since the last sync.
|
||||
@ -300,6 +298,7 @@ class Synchronizer {
|
||||
|
||||
const syncTargetId = this.api().syncTargetId();
|
||||
|
||||
this.syncTargetIsLocked_ = false;
|
||||
this.cancelling_ = false;
|
||||
|
||||
const masterKeysBefore = await MasterKey.count();
|
||||
@ -325,12 +324,27 @@ class Synchronizer {
|
||||
let errorToThrow = null;
|
||||
|
||||
try {
|
||||
await this.api().mkdir(this.syncDirName_);
|
||||
await this.api().mkdir(this.lockDirName_);
|
||||
await this.apiCall('mkdir', this.syncDirName_);
|
||||
await this.apiCall('mkdir', this.lockDirName_);
|
||||
this.api().setTempDirName(this.syncDirName_);
|
||||
await this.api().mkdir(this.resourceDirName_);
|
||||
await this.apiCall('mkdir', this.resourceDirName_);
|
||||
|
||||
await this.lockHandler().acquireLock('sync', this.appType_, this.clientId_);
|
||||
|
||||
if (this.refreshLockIID_) clearInterval(this.refreshLockIID_);
|
||||
|
||||
this.refreshLockIID_ = setInterval(async () => {
|
||||
try {
|
||||
await this.refreshLock();
|
||||
} catch (error) {
|
||||
this.logger().warn('Could not refresh lock - cancelling sync. Error was:', error);
|
||||
clearInterval(this.refreshLockIID_);
|
||||
this.syncTargetIsLocked_ = true;
|
||||
this.refreshLockIID_ = null;
|
||||
this.cancel();
|
||||
}
|
||||
}, 1000 * 60);
|
||||
|
||||
await this.checkLock_();
|
||||
await this.checkSyncTargetVersion_();
|
||||
|
||||
// ========================================================================
|
||||
@ -369,7 +383,7 @@ class Synchronizer {
|
||||
// (by setting an updated_time less than current time).
|
||||
if (donePaths.indexOf(path) >= 0) throw new JoplinError(sprintf('Processing a path that has already been done: %s. sync_time was not updated? Remote item has an updated_time in the future?', path), 'processingPathTwice');
|
||||
|
||||
const remote = await this.api().stat(path);
|
||||
const remote = await this.apiCall('stat', path);
|
||||
let action = null;
|
||||
|
||||
let reason = '';
|
||||
@ -407,7 +421,7 @@ class Synchronizer {
|
||||
// OneDrive does not appear to have accurate timestamps as lastModifiedDateTime would occasionally be
|
||||
// a few seconds ahead of what it was set with setTimestamp()
|
||||
try {
|
||||
remoteContent = await this.api().get(path);
|
||||
remoteContent = await this.apiCall('get', path);
|
||||
} catch (error) {
|
||||
if (error.code === 'rejectedByTarget') {
|
||||
this.progressReport_.errors.push(error);
|
||||
@ -450,7 +464,7 @@ class Synchronizer {
|
||||
this.logger().warn(`Uploading a large resource (resourceId: ${local.id}, size:${local.size} bytes) which may tie up the sync process.`);
|
||||
}
|
||||
|
||||
await this.api().put(remoteContentPath, null, { path: localResourceContentPath, source: 'file' });
|
||||
await this.apiCall('put', remoteContentPath, null, { path: localResourceContentPath, source: 'file' });
|
||||
} catch (error) {
|
||||
if (error && ['rejectedByTarget', 'fileNotFound'].indexOf(error.code) >= 0) {
|
||||
await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message);
|
||||
@ -467,7 +481,7 @@ class Synchronizer {
|
||||
try {
|
||||
if (this.testingHooks_.indexOf('notesRejectedByTarget') >= 0 && local.type_ === BaseModel.TYPE_NOTE) throw new JoplinError('Testing rejectedByTarget', 'rejectedByTarget');
|
||||
const content = await ItemClass.serializeForSync(local);
|
||||
await this.api().put(path, content);
|
||||
await this.apiCall('put', path, content);
|
||||
} catch (error) {
|
||||
if (error && error.code === 'rejectedByTarget') {
|
||||
await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message);
|
||||
@ -593,11 +607,11 @@ class Synchronizer {
|
||||
const item = deletedItems[i];
|
||||
const path = BaseItem.systemPath(item.item_id);
|
||||
this.logSyncOperation('deleteRemote', null, { id: item.item_id }, 'local has been deleted');
|
||||
await this.api().delete(path);
|
||||
await this.apiCall('delete', path);
|
||||
|
||||
if (item.item_type === BaseModel.TYPE_RESOURCE) {
|
||||
const remoteContentPath = resourceRemotePath(item.item_id);
|
||||
await this.api().delete(remoteContentPath);
|
||||
await this.apiCall('delete', remoteContentPath);
|
||||
}
|
||||
|
||||
await BaseItem.remoteDeletedItem(syncTargetId, item.item_id);
|
||||
@ -628,7 +642,7 @@ class Synchronizer {
|
||||
while (true) {
|
||||
if (this.cancelling() || hasCancelled) break;
|
||||
|
||||
const listResult = await this.api().delta('', {
|
||||
const listResult = await this.apiCall('delta', '', {
|
||||
context: context,
|
||||
|
||||
// allItemIdsHandler() provides a way for drivers that don't have a delta API to
|
||||
@ -653,7 +667,7 @@ class Synchronizer {
|
||||
if (this.cancelling()) break;
|
||||
|
||||
this.downloadQueue_.push(remote.path, async () => {
|
||||
return this.api().get(remote.path);
|
||||
return this.apiCall('get', remote.path);
|
||||
});
|
||||
}
|
||||
|
||||
@ -669,7 +683,7 @@ class Synchronizer {
|
||||
if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder
|
||||
|
||||
const loadContent = async () => {
|
||||
const task = await this.downloadQueue_.waitForResult(path); // await this.api().get(path);
|
||||
const task = await this.downloadQueue_.waitForResult(path); // await this.apiCall('get', path);
|
||||
if (task.error) throw task.error;
|
||||
if (!task.result) return null;
|
||||
return await BaseItem.unserialize(task.result);
|
||||
@ -832,13 +846,13 @@ class Synchronizer {
|
||||
} catch (error) {
|
||||
if (throwOnError) {
|
||||
errorToThrow = error;
|
||||
} else if (error && ['cannotEncryptEncrypted', 'noActiveMasterKey', 'processingPathTwice', 'failSafe'].indexOf(error.code) >= 0) {
|
||||
} else if (error && ['cannotEncryptEncrypted', 'noActiveMasterKey', 'processingPathTwice', 'failSafe', 'lockError'].indexOf(error.code) >= 0) {
|
||||
// Only log an info statement for this since this is a common condition that is reported
|
||||
// in the application, and needs to be resolved by the user.
|
||||
// Or it's a temporary issue that will be resolved on next sync.
|
||||
this.logger().info(error.message);
|
||||
|
||||
if (error.code === 'failSafe') {
|
||||
if (error.code === 'failSafe' || error.code === 'lockError') {
|
||||
// Get the message to display on UI, but not in testing to avoid poluting stdout
|
||||
if (!shim.isTestingEnv()) this.progressReport_.errors.push(error.message);
|
||||
this.logLastRequests();
|
||||
@ -857,6 +871,15 @@ class Synchronizer {
|
||||
}
|
||||
}
|
||||
|
||||
await this.lockHandler().releaseLock('sync', this.appType_, this.clientId_);
|
||||
|
||||
if (this.refreshLockIID_) {
|
||||
clearInterval(this.refreshLockIID_);
|
||||
this.refreshLockIID_ = null;
|
||||
}
|
||||
|
||||
this.syncTargetIsLocked_ = false;
|
||||
|
||||
if (this.cancelling()) {
|
||||
this.logger().info('Synchronisation was cancelled.');
|
||||
this.cancelling_ = false;
|
||||
|
Reference in New Issue
Block a user