From 9f6b3ccf4056468378f2cf991a33929052498ea0 Mon Sep 17 00:00:00 2001 From: Laurent Cozic Date: Fri, 7 Jun 2019 23:20:08 +0100 Subject: [PATCH] All: Improved: Allow using multiple connections to download items while synchronising (#1633) * Task queue * All: Improved sync speed by downloading items in parallel * Improved download queue --- CliClient/tests/TaskQueue.js | 57 +++++++++++ ReactNativeClient/lib/TaskQueue.js | 126 ++++++++++++++++++++++++ ReactNativeClient/lib/models/Setting.js | 2 + ReactNativeClient/lib/synchronizer.js | 25 ++++- 4 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 CliClient/tests/TaskQueue.js create mode 100644 ReactNativeClient/lib/TaskQueue.js diff --git a/CliClient/tests/TaskQueue.js b/CliClient/tests/TaskQueue.js new file mode 100644 index 0000000000..f2ddd6af25 --- /dev/null +++ b/CliClient/tests/TaskQueue.js @@ -0,0 +1,57 @@ +require('app-module-path').addPath(__dirname); + +const { asyncTest, fileContentEqual, setupDatabase, revisionService, setupDatabaseAndSynchronizer, db, synchronizer, fileApi, sleep, clearDatabase, switchClient, syncTargetId, objectsEqual, checkThrowAsync } = require('test-utils.js'); +const TaskQueue = require('lib/TaskQueue.js'); + +process.on('unhandledRejection', (reason, p) => { + console.log('Unhandled Rejection at: Promise', p, 'reason:', reason); +}); + +describe('TaskQueue', function() { + + beforeEach(async (done) => { + await setupDatabaseAndSynchronizer(1); + await switchClient(1); + done(); + }); + + it('should queue and execute tasks', asyncTest(async () => { + const queue = new TaskQueue(); + + queue.push(1, async () => { await sleep(0.5); return 'a'; }); + queue.push(2, async () => { await sleep(0.5); return 'b'; }); + queue.push(3, async () => { await sleep(0.5); return 'c'; }); + + const results = []; + + results.push(await queue.waitForResult(1)); + results.push(await queue.waitForResult(2)); + results.push(await queue.waitForResult(3)); + + expect(results[0].id).toBe(1); + expect(results[0].result).toBe('a'); + expect(results[1].id).toBe(2); + expect(results[1].result).toBe('b'); + expect(results[2].id).toBe(3); + expect(results[2].result).toBe('c'); + })); + + it('should handle errors', asyncTest(async () => { + const queue = new TaskQueue(); + + queue.push(1, async () => { await sleep(0.5); return 'a'; }); + queue.push(2, async () => { await sleep(0.5); throw new Error('e'); }); + + const results = []; + + results.push(await queue.waitForResult(1)); + results.push(await queue.waitForResult(2)); + + expect(results[0].id).toBe(1); + expect(results[0].result).toBe('a'); + expect(results[1].id).toBe(2); + expect(!results[1].result).toBe(true); + expect(results[1].error.message).toBe('e'); + })); + +}); \ No newline at end of file diff --git a/ReactNativeClient/lib/TaskQueue.js b/ReactNativeClient/lib/TaskQueue.js new file mode 100644 index 0000000000..4e06637d26 --- /dev/null +++ b/ReactNativeClient/lib/TaskQueue.js @@ -0,0 +1,126 @@ +const { time } = require('lib/time-utils.js'); +const JoplinError = require('lib/JoplinError'); +const Setting = require('lib/models/Setting'); +const { Logger } = require('lib/logger.js'); + +class TaskQueue { + + constructor(name) { + this.waitingTasks_ = []; + this.processingTasks_ = {}; + this.processingQueue_ = false; + this.stopping_ = false; + this.results_ = {}; + this.name_ = name; + this.logger_ = new Logger(); + } + + concurrency() { + return Setting.value('sync.maxConcurrentConnections'); + } + + push(id, callback) { + if (this.stopping_) throw new Error('Cannot push task when queue is stopping'); + + this.waitingTasks_.push({ + id: id, + callback: callback, + }); + this.processQueue_(); + } + + processQueue_() { + if (this.processingQueue_ || this.stopping_) return; + + this.processingQueue_ = true; + + const completeTask = (task, result, error) => { + delete this.processingTasks_[task.id]; + + const r = { + id: task.id, + result: result, + }; + + if (error) r.error = error; + + this.results_[task.id] = r; + + this.processQueue_(); + } + + while (this.waitingTasks_.length > 0 && Object.keys(this.processingTasks_).length < this.concurrency()) { + if (this.stopping_) break; + + const task = this.waitingTasks_.splice(0, 1)[0]; + this.processingTasks_[task.id] = task; + + task.callback().then(result => { + completeTask(task, result, null); + }).catch(error => { + if (!error) error = new Error('Unknown error'); + completeTask(task, null, error); + }); + } + + this.processingQueue_ = false; + } + + isWaiting(taskId) { + return this.waitingTasks_.find(task => task.id === taskId) + } + + isProcessing(taskId) { + return taskId in this.processingTasks_; + } + + isDone(taskId) { + return taskId in this.results_; + } + + async waitForResult(taskId) { + if (!this.isWaiting(taskId) && !this.isProcessing(taskId) && !this.isDone(taskId)) throw new Error('No such task: ' + taskId); + + while (true) { + // if (this.stopping_) { + // return { + // id: taskId, + // error: new JoplinError('Queue has been destroyed', 'destroyedQueue'), + // }; + // } + + const task = this.results_[taskId]; + if (task) return task; + await time.sleep(0.1); + } + } + + async stop() { + this.stopping_ = true; + + this.logger_.info('TaskQueue.stop: ' + this.name_ + ': waiting for tasks to complete: ' + Object.keys(this.processingTasks_).length); + + // In general it's not a big issue if some tasks are still running because + // it won't call anything unexpected in caller code, since the caller has + // to explicitely retrieve the results + const startTime = Date.now(); + while (Object.keys(this.processingTasks_).length) { + await time.sleep(0.1); + if (Date.now() - startTime >= 30000) { + this.logger_.warn('TaskQueue.stop: ' + this.name_ + ': timed out waiting for task to complete'); + break; + } + } + + this.logger_.info('TaskQueue.stop: ' + this.name_ + ': Done, waited for ' + (Date.now() - startTime)); + } + + isStopping() { + return this.stopping_; + } + +} + +TaskQueue.CONCURRENCY = 5; + +module.exports = TaskQueue; \ No newline at end of file diff --git a/ReactNativeClient/lib/models/Setting.js b/ReactNativeClient/lib/models/Setting.js index ce17cd7544..d39bfdbf8c 100644 --- a/ReactNativeClient/lib/models/Setting.js +++ b/ReactNativeClient/lib/models/Setting.js @@ -163,6 +163,8 @@ class Setting extends BaseModel { }; }}, + 'sync.maxConcurrentConnections': {value: 5, type: Setting.TYPE_INT, public: true, section: 'sync', label: () => _('Max concurrent connections'), minimum: 1, maximum: 20, step: 1}, + 'sync.2.path': { value: '', type: Setting.TYPE_STRING, section:'sync', show: (settings) => { try { return settings['sync.target'] == SyncTargetRegistry.nameToId('filesystem') diff --git a/ReactNativeClient/lib/synchronizer.js b/ReactNativeClient/lib/synchronizer.js index e6321818ae..7566e060a2 100644 --- a/ReactNativeClient/lib/synchronizer.js +++ b/ReactNativeClient/lib/synchronizer.js @@ -14,6 +14,7 @@ const { _ } = require('lib/locale.js'); const { shim } = require('lib/shim.js'); const JoplinError = require('lib/JoplinError'); const BaseSyncTarget = require('lib/BaseSyncTarget'); +const TaskQueue = require('lib/TaskQueue'); class Synchronizer { @@ -28,6 +29,7 @@ class Synchronizer { this.cancelling_ = false; this.autoStartDecryptionWorker_ = true; this.maxResourceSize_ = null; + this.downloadQueue_ = null; // Debug flags are used to test certain hard-to-test conditions // such as cancelling in the middle of a loop. @@ -151,6 +153,10 @@ class Synchronizer { async cancel() { if (this.cancelling_ || this.state() == 'idle') return; + // Stop queue but don't set it to null as it may be used to + // retrieve the last few downloads. + this.downloadQueue_.stop(); + this.logSyncOperation('cancelling', null, null, ''); this.cancelling_ = true; @@ -466,6 +472,10 @@ class Synchronizer { // have been created or updated, and apply the changes to local. // ------------------------------------------------------------------------ + if (this.downloadQueue_) await this.downloadQueue_.stop(); + this.downloadQueue_ = new TaskQueue('syncDownload'); + this.downloadQueue_.logger_ = this.logger(); + if (syncSteps.indexOf("delta") >= 0) { // At this point all the local items that have changed have been pushed to remote // or handled as conflicts, so no conflict is possible after this. @@ -496,6 +506,14 @@ class Synchronizer { this.logSyncOperation("fetchingTotal", null, null, "Fetching delta items from sync target", remotes.length); + for (const remote of remotes) { + if (this.cancelling()) break; + + this.downloadQueue_.push(remote.path, async () => { + return this.api().get(remote.path); + }); + } + for (let i = 0; i < remotes.length; i++) { if (this.cancelling() || this.testingHooks_.indexOf("cancelDeltaLoop2") >= 0) { hasCancelled = true; @@ -508,9 +526,10 @@ class Synchronizer { if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder const loadContent = async () => { - let content = await this.api().get(path); - if (!content) return null; - return await BaseItem.unserialize(content); + let task = await this.downloadQueue_.waitForResult(path); //await this.api().get(path); + if (task.error) throw task.error; + if (!task.result) return null; + return await BaseItem.unserialize(task.result); }; let path = remote.path;