1
0
mirror of https://github.com/laurent22/joplin.git synced 2025-02-01 19:15:01 +02:00

All: Improved: Allow using multiple connections to download items while synchronising (#1633)

* Task queue

* All: Improved sync speed by downloading items in parallel

* Improved download queue
This commit is contained in:
Laurent Cozic 2019-06-07 23:20:08 +01:00 committed by GitHub
parent df714c357d
commit 9f6b3ccf40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 207 additions and 3 deletions

View File

@ -0,0 +1,57 @@
require('app-module-path').addPath(__dirname);
const { asyncTest, fileContentEqual, setupDatabase, revisionService, setupDatabaseAndSynchronizer, db, synchronizer, fileApi, sleep, clearDatabase, switchClient, syncTargetId, objectsEqual, checkThrowAsync } = require('test-utils.js');
const TaskQueue = require('lib/TaskQueue.js');
process.on('unhandledRejection', (reason, p) => {
console.log('Unhandled Rejection at: Promise', p, 'reason:', reason);
});
describe('TaskQueue', function() {
beforeEach(async (done) => {
await setupDatabaseAndSynchronizer(1);
await switchClient(1);
done();
});
it('should queue and execute tasks', asyncTest(async () => {
const queue = new TaskQueue();
queue.push(1, async () => { await sleep(0.5); return 'a'; });
queue.push(2, async () => { await sleep(0.5); return 'b'; });
queue.push(3, async () => { await sleep(0.5); return 'c'; });
const results = [];
results.push(await queue.waitForResult(1));
results.push(await queue.waitForResult(2));
results.push(await queue.waitForResult(3));
expect(results[0].id).toBe(1);
expect(results[0].result).toBe('a');
expect(results[1].id).toBe(2);
expect(results[1].result).toBe('b');
expect(results[2].id).toBe(3);
expect(results[2].result).toBe('c');
}));
it('should handle errors', asyncTest(async () => {
const queue = new TaskQueue();
queue.push(1, async () => { await sleep(0.5); return 'a'; });
queue.push(2, async () => { await sleep(0.5); throw new Error('e'); });
const results = [];
results.push(await queue.waitForResult(1));
results.push(await queue.waitForResult(2));
expect(results[0].id).toBe(1);
expect(results[0].result).toBe('a');
expect(results[1].id).toBe(2);
expect(!results[1].result).toBe(true);
expect(results[1].error.message).toBe('e');
}));
});

View File

@ -0,0 +1,126 @@
const { time } = require('lib/time-utils.js');
const JoplinError = require('lib/JoplinError');
const Setting = require('lib/models/Setting');
const { Logger } = require('lib/logger.js');
class TaskQueue {
constructor(name) {
this.waitingTasks_ = [];
this.processingTasks_ = {};
this.processingQueue_ = false;
this.stopping_ = false;
this.results_ = {};
this.name_ = name;
this.logger_ = new Logger();
}
concurrency() {
return Setting.value('sync.maxConcurrentConnections');
}
push(id, callback) {
if (this.stopping_) throw new Error('Cannot push task when queue is stopping');
this.waitingTasks_.push({
id: id,
callback: callback,
});
this.processQueue_();
}
processQueue_() {
if (this.processingQueue_ || this.stopping_) return;
this.processingQueue_ = true;
const completeTask = (task, result, error) => {
delete this.processingTasks_[task.id];
const r = {
id: task.id,
result: result,
};
if (error) r.error = error;
this.results_[task.id] = r;
this.processQueue_();
}
while (this.waitingTasks_.length > 0 && Object.keys(this.processingTasks_).length < this.concurrency()) {
if (this.stopping_) break;
const task = this.waitingTasks_.splice(0, 1)[0];
this.processingTasks_[task.id] = task;
task.callback().then(result => {
completeTask(task, result, null);
}).catch(error => {
if (!error) error = new Error('Unknown error');
completeTask(task, null, error);
});
}
this.processingQueue_ = false;
}
isWaiting(taskId) {
return this.waitingTasks_.find(task => task.id === taskId)
}
isProcessing(taskId) {
return taskId in this.processingTasks_;
}
isDone(taskId) {
return taskId in this.results_;
}
async waitForResult(taskId) {
if (!this.isWaiting(taskId) && !this.isProcessing(taskId) && !this.isDone(taskId)) throw new Error('No such task: ' + taskId);
while (true) {
// if (this.stopping_) {
// return {
// id: taskId,
// error: new JoplinError('Queue has been destroyed', 'destroyedQueue'),
// };
// }
const task = this.results_[taskId];
if (task) return task;
await time.sleep(0.1);
}
}
async stop() {
this.stopping_ = true;
this.logger_.info('TaskQueue.stop: ' + this.name_ + ': waiting for tasks to complete: ' + Object.keys(this.processingTasks_).length);
// In general it's not a big issue if some tasks are still running because
// it won't call anything unexpected in caller code, since the caller has
// to explicitely retrieve the results
const startTime = Date.now();
while (Object.keys(this.processingTasks_).length) {
await time.sleep(0.1);
if (Date.now() - startTime >= 30000) {
this.logger_.warn('TaskQueue.stop: ' + this.name_ + ': timed out waiting for task to complete');
break;
}
}
this.logger_.info('TaskQueue.stop: ' + this.name_ + ': Done, waited for ' + (Date.now() - startTime));
}
isStopping() {
return this.stopping_;
}
}
TaskQueue.CONCURRENCY = 5;
module.exports = TaskQueue;

View File

@ -163,6 +163,8 @@ class Setting extends BaseModel {
}; };
}}, }},
'sync.maxConcurrentConnections': {value: 5, type: Setting.TYPE_INT, public: true, section: 'sync', label: () => _('Max concurrent connections'), minimum: 1, maximum: 20, step: 1},
'sync.2.path': { value: '', type: Setting.TYPE_STRING, section:'sync', show: (settings) => { 'sync.2.path': { value: '', type: Setting.TYPE_STRING, section:'sync', show: (settings) => {
try { try {
return settings['sync.target'] == SyncTargetRegistry.nameToId('filesystem') return settings['sync.target'] == SyncTargetRegistry.nameToId('filesystem')

View File

@ -14,6 +14,7 @@ const { _ } = require('lib/locale.js');
const { shim } = require('lib/shim.js'); const { shim } = require('lib/shim.js');
const JoplinError = require('lib/JoplinError'); const JoplinError = require('lib/JoplinError');
const BaseSyncTarget = require('lib/BaseSyncTarget'); const BaseSyncTarget = require('lib/BaseSyncTarget');
const TaskQueue = require('lib/TaskQueue');
class Synchronizer { class Synchronizer {
@ -28,6 +29,7 @@ class Synchronizer {
this.cancelling_ = false; this.cancelling_ = false;
this.autoStartDecryptionWorker_ = true; this.autoStartDecryptionWorker_ = true;
this.maxResourceSize_ = null; this.maxResourceSize_ = null;
this.downloadQueue_ = null;
// Debug flags are used to test certain hard-to-test conditions // Debug flags are used to test certain hard-to-test conditions
// such as cancelling in the middle of a loop. // such as cancelling in the middle of a loop.
@ -151,6 +153,10 @@ class Synchronizer {
async cancel() { async cancel() {
if (this.cancelling_ || this.state() == 'idle') return; if (this.cancelling_ || this.state() == 'idle') return;
// Stop queue but don't set it to null as it may be used to
// retrieve the last few downloads.
this.downloadQueue_.stop();
this.logSyncOperation('cancelling', null, null, ''); this.logSyncOperation('cancelling', null, null, '');
this.cancelling_ = true; this.cancelling_ = true;
@ -466,6 +472,10 @@ class Synchronizer {
// have been created or updated, and apply the changes to local. // have been created or updated, and apply the changes to local.
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
if (this.downloadQueue_) await this.downloadQueue_.stop();
this.downloadQueue_ = new TaskQueue('syncDownload');
this.downloadQueue_.logger_ = this.logger();
if (syncSteps.indexOf("delta") >= 0) { if (syncSteps.indexOf("delta") >= 0) {
// At this point all the local items that have changed have been pushed to remote // At this point all the local items that have changed have been pushed to remote
// or handled as conflicts, so no conflict is possible after this. // or handled as conflicts, so no conflict is possible after this.
@ -496,6 +506,14 @@ class Synchronizer {
this.logSyncOperation("fetchingTotal", null, null, "Fetching delta items from sync target", remotes.length); this.logSyncOperation("fetchingTotal", null, null, "Fetching delta items from sync target", remotes.length);
for (const remote of remotes) {
if (this.cancelling()) break;
this.downloadQueue_.push(remote.path, async () => {
return this.api().get(remote.path);
});
}
for (let i = 0; i < remotes.length; i++) { for (let i = 0; i < remotes.length; i++) {
if (this.cancelling() || this.testingHooks_.indexOf("cancelDeltaLoop2") >= 0) { if (this.cancelling() || this.testingHooks_.indexOf("cancelDeltaLoop2") >= 0) {
hasCancelled = true; hasCancelled = true;
@ -508,9 +526,10 @@ class Synchronizer {
if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder
const loadContent = async () => { const loadContent = async () => {
let content = await this.api().get(path); let task = await this.downloadQueue_.waitForResult(path); //await this.api().get(path);
if (!content) return null; if (task.error) throw task.error;
return await BaseItem.unserialize(content); if (!task.result) return null;
return await BaseItem.unserialize(task.result);
}; };
let path = remote.path; let path = remote.path;