diff --git a/.eslintignore b/.eslintignore index 8895c4490..bc59af4ce 100644 --- a/.eslintignore +++ b/.eslintignore @@ -842,6 +842,9 @@ packages/lib/SyncTargetOneDrive.js.map packages/lib/Synchronizer.d.ts packages/lib/Synchronizer.js packages/lib/Synchronizer.js.map +packages/lib/TaskQueue.d.ts +packages/lib/TaskQueue.js +packages/lib/TaskQueue.js.map packages/lib/commands/historyBackward.d.ts packages/lib/commands/historyBackward.js packages/lib/commands/historyBackward.js.map @@ -1436,6 +1439,9 @@ packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.js.map packages/lib/services/synchronizer/tools.d.ts packages/lib/services/synchronizer/tools.js packages/lib/services/synchronizer/tools.js.map +packages/lib/services/synchronizer/uploadUtils.d.ts +packages/lib/services/synchronizer/uploadUtils.js +packages/lib/services/synchronizer/uploadUtils.js.map packages/lib/services/synchronizer/utils/handleSyncStartupOperation.d.ts packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js.map diff --git a/.gitignore b/.gitignore index 0a3273521..ca459c68e 100644 --- a/.gitignore +++ b/.gitignore @@ -828,6 +828,9 @@ packages/lib/SyncTargetOneDrive.js.map packages/lib/Synchronizer.d.ts packages/lib/Synchronizer.js packages/lib/Synchronizer.js.map +packages/lib/TaskQueue.d.ts +packages/lib/TaskQueue.js +packages/lib/TaskQueue.js.map packages/lib/commands/historyBackward.d.ts packages/lib/commands/historyBackward.js packages/lib/commands/historyBackward.js.map @@ -1422,6 +1425,9 @@ packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.js.map packages/lib/services/synchronizer/tools.d.ts packages/lib/services/synchronizer/tools.js packages/lib/services/synchronizer/tools.js.map +packages/lib/services/synchronizer/uploadUtils.d.ts +packages/lib/services/synchronizer/uploadUtils.js +packages/lib/services/synchronizer/uploadUtils.js.map packages/lib/services/synchronizer/utils/handleSyncStartupOperation.d.ts packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js.map diff --git a/packages/lib/Synchronizer.ts b/packages/lib/Synchronizer.ts index a8dd1fa41..59e138e75 100644 --- a/packages/lib/Synchronizer.ts +++ b/packages/lib/Synchronizer.ts @@ -18,8 +18,8 @@ import ResourceService from './services/ResourceService'; import EncryptionService from './services/EncryptionService'; import JoplinError from './JoplinError'; import ShareService from './services/share/ShareService'; +import TaskQueue from './TaskQueue'; const { sprintf } = require('sprintf-js'); -const TaskQueue = require('./TaskQueue'); const { Dirnames } = require('./services/synchronizer/utils/types'); interface RemoteItem { @@ -564,14 +564,15 @@ export default class Synchronizer { try { const remoteContentPath = resourceRemotePath(local.id); const result = await Resource.fullPathForSyncUpload(local); - local = result.resource; + const resource = result.resource; + local = resource as any; const localResourceContentPath = result.path; - if (local.size >= 10 * 1000 * 1000) { - this.logger().warn(`Uploading a large resource (resourceId: ${local.id}, size:${local.size} bytes) which may tie up the sync process.`); + if (resource.size >= 10 * 1000 * 1000) { + this.logger().warn(`Uploading a large resource (resourceId: ${local.id}, size:${resource.size} bytes) which may tie up the sync process.`); } - await this.apiCall('put', remoteContentPath, null, { path: localResourceContentPath, source: 'file', shareId: local.share_id }); + await this.apiCall('put', remoteContentPath, null, { path: localResourceContentPath, source: 'file', shareId: resource.share_id }); } catch (error) { if (isCannotSyncError(error)) { await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message); @@ -787,7 +788,7 @@ export default class Synchronizer { if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder const loadContent = async () => { - const task = await this.downloadQueue_.waitForResult(path); // await this.apiCall('get', path); + const task = await this.downloadQueue_.waitForResult(path); if (task.error) throw task.error; if (!task.result) return null; return await BaseItem.unserialize(task.result); diff --git a/packages/lib/TaskQueue.test.js b/packages/lib/TaskQueue.test.js index 0912e1855..5f4978255 100644 --- a/packages/lib/TaskQueue.test.js +++ b/packages/lib/TaskQueue.test.js @@ -1,5 +1,5 @@ const { setupDatabaseAndSynchronizer, sleep, switchClient } = require('./testing/test-utils.js'); -const TaskQueue = require('./TaskQueue.js'); +const TaskQueue = require('./TaskQueue').default; describe('TaskQueue', function() { diff --git a/packages/lib/TaskQueue.js b/packages/lib/TaskQueue.ts similarity index 57% rename from packages/lib/TaskQueue.js rename to packages/lib/TaskQueue.ts index f452030ba..45c660232 100644 --- a/packages/lib/TaskQueue.js +++ b/packages/lib/TaskQueue.ts @@ -1,23 +1,38 @@ -const time = require('./time').default; -const Setting = require('./models/Setting').default; -const Logger = require('./Logger').default; +import time from './time'; +import Setting from './models/Setting'; +import Logger from './Logger'; -class TaskQueue { - constructor(name) { - this.waitingTasks_ = []; - this.processingTasks_ = {}; - this.processingQueue_ = false; - this.stopping_ = false; - this.results_ = {}; +interface Task { + id: string; + callback: Function; +} + +interface TaskResult { + id: string; + result: any; + error?: Error; +} + +export default class TaskQueue { + + private waitingTasks_: Task[] = []; + private processingTasks_: Record = {}; + private processingQueue_ = false; + private stopping_ = false; + private results_: Record = {}; + private name_: string; + private logger_: Logger; + + constructor(name: string, logger: Logger = null) { this.name_ = name; - this.logger_ = new Logger(); + this.logger_ = logger ? logger : new Logger(); } concurrency() { return Setting.value('sync.maxConcurrentConnections'); } - push(id, callback) { + push(id: string, callback: Function) { if (this.stopping_) throw new Error('Cannot push task when queue is stopping'); this.waitingTasks_.push({ @@ -32,10 +47,10 @@ class TaskQueue { this.processingQueue_ = true; - const completeTask = (task, result, error) => { + const completeTask = (task: Task, result: any, error: Error) => { delete this.processingTasks_[task.id]; - const r = { + const r: TaskResult = { id: task.id, result: result, }; @@ -55,10 +70,10 @@ class TaskQueue { task .callback() - .then(result => { + .then((result: any) => { completeTask(task, result, null); }) - .catch(error => { + .catch((error: Error) => { if (!error) error = new Error('Unknown error'); completeTask(task, null, error); }); @@ -67,29 +82,42 @@ class TaskQueue { this.processingQueue_ = false; } - isWaiting(taskId) { + isWaiting(taskId: string) { return this.waitingTasks_.find(task => task.id === taskId); } - isProcessing(taskId) { + isProcessing(taskId: string) { return taskId in this.processingTasks_; } - isDone(taskId) { + isDone(taskId: string) { return taskId in this.results_; } - async waitForResult(taskId) { - if (!this.isWaiting(taskId) && !this.isProcessing(taskId) && !this.isDone(taskId)) throw new Error(`No such task: ${taskId}`); + async waitForAll() { + return new Promise((resolve) => { + const checkIID = setInterval(() => { + if (this.waitingTasks_.length) return; + if (this.processingTasks_.length) return; + clearInterval(checkIID); + resolve(null); + }, 100); + }); + } + + taskExists(taskId: string) { + return this.isWaiting(taskId) || this.isProcessing(taskId) || this.isDone(taskId); + } + + taskResult(taskId: string) { + if (!this.taskExists(taskId)) throw new Error(`No such task: ${taskId}`); + return this.results_[taskId]; + } + + async waitForResult(taskId: string) { + if (!this.taskExists(taskId)) throw new Error(`No such task: ${taskId}`); while (true) { - // if (this.stopping_) { - // return { - // id: taskId, - // error: new JoplinError('Queue has been destroyed', 'destroyedQueue'), - // }; - // } - const task = this.results_[taskId]; if (task) return task; await time.sleep(0.1); @@ -120,7 +148,3 @@ class TaskQueue { return this.stopping_; } } - -TaskQueue.CONCURRENCY = 5; - -module.exports = TaskQueue; diff --git a/packages/lib/models/BaseItem.ts b/packages/lib/models/BaseItem.ts index 56fcfdd12..e29626c8a 100644 --- a/packages/lib/models/BaseItem.ts +++ b/packages/lib/models/BaseItem.ts @@ -18,9 +18,17 @@ export interface ItemsThatNeedDecryptionResult { items: any[]; } +export interface ItemThatNeedSync { + id: string; + sync_time: number; + type_: ModelType; + updated_time: number; + encryption_applied: number; +} + export interface ItemsThatNeedSyncResult { hasMore: boolean; - items: any[]; + items: ItemThatNeedSync[]; neverSyncedItemIds: string[]; }