1
0
mirror of https://github.com/laurent22/joplin.git synced 2024-12-21 09:38:01 +02:00

Chore: Clean up synchronizer code and add types

This commit is contained in:
Laurent Cozic 2021-06-17 12:39:06 +01:00
parent 15ce5cdd6e
commit 1c597883ef
6 changed files with 85 additions and 40 deletions

View File

@ -842,6 +842,9 @@ packages/lib/SyncTargetOneDrive.js.map
packages/lib/Synchronizer.d.ts packages/lib/Synchronizer.d.ts
packages/lib/Synchronizer.js packages/lib/Synchronizer.js
packages/lib/Synchronizer.js.map packages/lib/Synchronizer.js.map
packages/lib/TaskQueue.d.ts
packages/lib/TaskQueue.js
packages/lib/TaskQueue.js.map
packages/lib/commands/historyBackward.d.ts packages/lib/commands/historyBackward.d.ts
packages/lib/commands/historyBackward.js packages/lib/commands/historyBackward.js
packages/lib/commands/historyBackward.js.map packages/lib/commands/historyBackward.js.map
@ -1436,6 +1439,9 @@ packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.js.map
packages/lib/services/synchronizer/tools.d.ts packages/lib/services/synchronizer/tools.d.ts
packages/lib/services/synchronizer/tools.js packages/lib/services/synchronizer/tools.js
packages/lib/services/synchronizer/tools.js.map packages/lib/services/synchronizer/tools.js.map
packages/lib/services/synchronizer/uploadUtils.d.ts
packages/lib/services/synchronizer/uploadUtils.js
packages/lib/services/synchronizer/uploadUtils.js.map
packages/lib/services/synchronizer/utils/handleSyncStartupOperation.d.ts packages/lib/services/synchronizer/utils/handleSyncStartupOperation.d.ts
packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js
packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js.map packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js.map

6
.gitignore vendored
View File

@ -828,6 +828,9 @@ packages/lib/SyncTargetOneDrive.js.map
packages/lib/Synchronizer.d.ts packages/lib/Synchronizer.d.ts
packages/lib/Synchronizer.js packages/lib/Synchronizer.js
packages/lib/Synchronizer.js.map packages/lib/Synchronizer.js.map
packages/lib/TaskQueue.d.ts
packages/lib/TaskQueue.js
packages/lib/TaskQueue.js.map
packages/lib/commands/historyBackward.d.ts packages/lib/commands/historyBackward.d.ts
packages/lib/commands/historyBackward.js packages/lib/commands/historyBackward.js
packages/lib/commands/historyBackward.js.map packages/lib/commands/historyBackward.js.map
@ -1422,6 +1425,9 @@ packages/lib/services/synchronizer/synchronizer_MigrationHandler.test.js.map
packages/lib/services/synchronizer/tools.d.ts packages/lib/services/synchronizer/tools.d.ts
packages/lib/services/synchronizer/tools.js packages/lib/services/synchronizer/tools.js
packages/lib/services/synchronizer/tools.js.map packages/lib/services/synchronizer/tools.js.map
packages/lib/services/synchronizer/uploadUtils.d.ts
packages/lib/services/synchronizer/uploadUtils.js
packages/lib/services/synchronizer/uploadUtils.js.map
packages/lib/services/synchronizer/utils/handleSyncStartupOperation.d.ts packages/lib/services/synchronizer/utils/handleSyncStartupOperation.d.ts
packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js
packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js.map packages/lib/services/synchronizer/utils/handleSyncStartupOperation.js.map

View File

@ -18,8 +18,8 @@ import ResourceService from './services/ResourceService';
import EncryptionService from './services/EncryptionService'; import EncryptionService from './services/EncryptionService';
import JoplinError from './JoplinError'; import JoplinError from './JoplinError';
import ShareService from './services/share/ShareService'; import ShareService from './services/share/ShareService';
import TaskQueue from './TaskQueue';
const { sprintf } = require('sprintf-js'); const { sprintf } = require('sprintf-js');
const TaskQueue = require('./TaskQueue');
const { Dirnames } = require('./services/synchronizer/utils/types'); const { Dirnames } = require('./services/synchronizer/utils/types');
interface RemoteItem { interface RemoteItem {
@ -564,14 +564,15 @@ export default class Synchronizer {
try { try {
const remoteContentPath = resourceRemotePath(local.id); const remoteContentPath = resourceRemotePath(local.id);
const result = await Resource.fullPathForSyncUpload(local); const result = await Resource.fullPathForSyncUpload(local);
local = result.resource; const resource = result.resource;
local = resource as any;
const localResourceContentPath = result.path; const localResourceContentPath = result.path;
if (local.size >= 10 * 1000 * 1000) { if (resource.size >= 10 * 1000 * 1000) {
this.logger().warn(`Uploading a large resource (resourceId: ${local.id}, size:${local.size} bytes) which may tie up the sync process.`); this.logger().warn(`Uploading a large resource (resourceId: ${local.id}, size:${resource.size} bytes) which may tie up the sync process.`);
} }
await this.apiCall('put', remoteContentPath, null, { path: localResourceContentPath, source: 'file', shareId: local.share_id }); await this.apiCall('put', remoteContentPath, null, { path: localResourceContentPath, source: 'file', shareId: resource.share_id });
} catch (error) { } catch (error) {
if (isCannotSyncError(error)) { if (isCannotSyncError(error)) {
await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message); await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message);
@ -787,7 +788,7 @@ export default class Synchronizer {
if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder
const loadContent = async () => { const loadContent = async () => {
const task = await this.downloadQueue_.waitForResult(path); // await this.apiCall('get', path); const task = await this.downloadQueue_.waitForResult(path);
if (task.error) throw task.error; if (task.error) throw task.error;
if (!task.result) return null; if (!task.result) return null;
return await BaseItem.unserialize(task.result); return await BaseItem.unserialize(task.result);

View File

@ -1,5 +1,5 @@
const { setupDatabaseAndSynchronizer, sleep, switchClient } = require('./testing/test-utils.js'); const { setupDatabaseAndSynchronizer, sleep, switchClient } = require('./testing/test-utils.js');
const TaskQueue = require('./TaskQueue.js'); const TaskQueue = require('./TaskQueue').default;
describe('TaskQueue', function() { describe('TaskQueue', function() {

View File

@ -1,23 +1,38 @@
const time = require('./time').default; import time from './time';
const Setting = require('./models/Setting').default; import Setting from './models/Setting';
const Logger = require('./Logger').default; import Logger from './Logger';
class TaskQueue { interface Task {
constructor(name) { id: string;
this.waitingTasks_ = []; callback: Function;
this.processingTasks_ = {}; }
this.processingQueue_ = false;
this.stopping_ = false; interface TaskResult {
this.results_ = {}; id: string;
result: any;
error?: Error;
}
export default class TaskQueue {
private waitingTasks_: Task[] = [];
private processingTasks_: Record<string, Task> = {};
private processingQueue_ = false;
private stopping_ = false;
private results_: Record<string, TaskResult> = {};
private name_: string;
private logger_: Logger;
constructor(name: string, logger: Logger = null) {
this.name_ = name; this.name_ = name;
this.logger_ = new Logger(); this.logger_ = logger ? logger : new Logger();
} }
concurrency() { concurrency() {
return Setting.value('sync.maxConcurrentConnections'); return Setting.value('sync.maxConcurrentConnections');
} }
push(id, callback) { push(id: string, callback: Function) {
if (this.stopping_) throw new Error('Cannot push task when queue is stopping'); if (this.stopping_) throw new Error('Cannot push task when queue is stopping');
this.waitingTasks_.push({ this.waitingTasks_.push({
@ -32,10 +47,10 @@ class TaskQueue {
this.processingQueue_ = true; this.processingQueue_ = true;
const completeTask = (task, result, error) => { const completeTask = (task: Task, result: any, error: Error) => {
delete this.processingTasks_[task.id]; delete this.processingTasks_[task.id];
const r = { const r: TaskResult = {
id: task.id, id: task.id,
result: result, result: result,
}; };
@ -55,10 +70,10 @@ class TaskQueue {
task task
.callback() .callback()
.then(result => { .then((result: any) => {
completeTask(task, result, null); completeTask(task, result, null);
}) })
.catch(error => { .catch((error: Error) => {
if (!error) error = new Error('Unknown error'); if (!error) error = new Error('Unknown error');
completeTask(task, null, error); completeTask(task, null, error);
}); });
@ -67,29 +82,42 @@ class TaskQueue {
this.processingQueue_ = false; this.processingQueue_ = false;
} }
isWaiting(taskId) { isWaiting(taskId: string) {
return this.waitingTasks_.find(task => task.id === taskId); return this.waitingTasks_.find(task => task.id === taskId);
} }
isProcessing(taskId) { isProcessing(taskId: string) {
return taskId in this.processingTasks_; return taskId in this.processingTasks_;
} }
isDone(taskId) { isDone(taskId: string) {
return taskId in this.results_; return taskId in this.results_;
} }
async waitForResult(taskId) { async waitForAll() {
if (!this.isWaiting(taskId) && !this.isProcessing(taskId) && !this.isDone(taskId)) throw new Error(`No such task: ${taskId}`); return new Promise((resolve) => {
const checkIID = setInterval(() => {
if (this.waitingTasks_.length) return;
if (this.processingTasks_.length) return;
clearInterval(checkIID);
resolve(null);
}, 100);
});
}
taskExists(taskId: string) {
return this.isWaiting(taskId) || this.isProcessing(taskId) || this.isDone(taskId);
}
taskResult(taskId: string) {
if (!this.taskExists(taskId)) throw new Error(`No such task: ${taskId}`);
return this.results_[taskId];
}
async waitForResult(taskId: string) {
if (!this.taskExists(taskId)) throw new Error(`No such task: ${taskId}`);
while (true) { while (true) {
// if (this.stopping_) {
// return {
// id: taskId,
// error: new JoplinError('Queue has been destroyed', 'destroyedQueue'),
// };
// }
const task = this.results_[taskId]; const task = this.results_[taskId];
if (task) return task; if (task) return task;
await time.sleep(0.1); await time.sleep(0.1);
@ -120,7 +148,3 @@ class TaskQueue {
return this.stopping_; return this.stopping_;
} }
} }
TaskQueue.CONCURRENCY = 5;
module.exports = TaskQueue;

View File

@ -18,9 +18,17 @@ export interface ItemsThatNeedDecryptionResult {
items: any[]; items: any[];
} }
export interface ItemThatNeedSync {
id: string;
sync_time: number;
type_: ModelType;
updated_time: number;
encryption_applied: number;
}
export interface ItemsThatNeedSyncResult { export interface ItemsThatNeedSyncResult {
hasMore: boolean; hasMore: boolean;
items: any[]; items: ItemThatNeedSync[];
neverSyncedItemIds: string[]; neverSyncedItemIds: string[];
} }