You've already forked joplin
mirror of
https://github.com/laurent22/joplin.git
synced 2025-09-05 20:56:22 +02:00
Compare commits
6 Commits
v2.13.1
...
sync_multi
Author | SHA1 | Date | |
---|---|---|---|
|
7db7dc4957 | ||
|
1aa96af4db | ||
|
92dadd7509 | ||
|
000185bfb4 | ||
|
e81427a1f2 | ||
|
958e9163b6 |
@@ -872,6 +872,9 @@ packages/lib/eventManager.js.map
|
|||||||
packages/lib/file-api-driver-joplinServer.d.ts
|
packages/lib/file-api-driver-joplinServer.d.ts
|
||||||
packages/lib/file-api-driver-joplinServer.js
|
packages/lib/file-api-driver-joplinServer.js
|
||||||
packages/lib/file-api-driver-joplinServer.js.map
|
packages/lib/file-api-driver-joplinServer.js.map
|
||||||
|
packages/lib/file-api-driver-memory.d.ts
|
||||||
|
packages/lib/file-api-driver-memory.js
|
||||||
|
packages/lib/file-api-driver-memory.js.map
|
||||||
packages/lib/file-api-driver.test.d.ts
|
packages/lib/file-api-driver.test.d.ts
|
||||||
packages/lib/file-api-driver.test.js
|
packages/lib/file-api-driver.test.js
|
||||||
packages/lib/file-api-driver.test.js.map
|
packages/lib/file-api-driver.test.js.map
|
||||||
@@ -1391,6 +1394,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map
|
|||||||
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts
|
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts
|
||||||
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js
|
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js
|
||||||
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map
|
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.d.ts
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.js
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.js.map
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.test.d.ts
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.test.js
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.test.js.map
|
||||||
packages/lib/services/synchronizer/LockHandler.d.ts
|
packages/lib/services/synchronizer/LockHandler.d.ts
|
||||||
packages/lib/services/synchronizer/LockHandler.js
|
packages/lib/services/synchronizer/LockHandler.js
|
||||||
packages/lib/services/synchronizer/LockHandler.js.map
|
packages/lib/services/synchronizer/LockHandler.js.map
|
||||||
|
9
.gitignore
vendored
9
.gitignore
vendored
@@ -858,6 +858,9 @@ packages/lib/eventManager.js.map
|
|||||||
packages/lib/file-api-driver-joplinServer.d.ts
|
packages/lib/file-api-driver-joplinServer.d.ts
|
||||||
packages/lib/file-api-driver-joplinServer.js
|
packages/lib/file-api-driver-joplinServer.js
|
||||||
packages/lib/file-api-driver-joplinServer.js.map
|
packages/lib/file-api-driver-joplinServer.js.map
|
||||||
|
packages/lib/file-api-driver-memory.d.ts
|
||||||
|
packages/lib/file-api-driver-memory.js
|
||||||
|
packages/lib/file-api-driver-memory.js.map
|
||||||
packages/lib/file-api-driver.test.d.ts
|
packages/lib/file-api-driver.test.d.ts
|
||||||
packages/lib/file-api-driver.test.js
|
packages/lib/file-api-driver.test.js
|
||||||
packages/lib/file-api-driver.test.js.map
|
packages/lib/file-api-driver.test.js.map
|
||||||
@@ -1377,6 +1380,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map
|
|||||||
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts
|
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts
|
||||||
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js
|
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js
|
||||||
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map
|
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.d.ts
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.js
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.js.map
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.test.d.ts
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.test.js
|
||||||
|
packages/lib/services/synchronizer/ItemUploader.test.js.map
|
||||||
packages/lib/services/synchronizer/LockHandler.d.ts
|
packages/lib/services/synchronizer/LockHandler.d.ts
|
||||||
packages/lib/services/synchronizer/LockHandler.js
|
packages/lib/services/synchronizer/LockHandler.js
|
||||||
packages/lib/services/synchronizer/LockHandler.js.map
|
packages/lib/services/synchronizer/LockHandler.js.map
|
||||||
|
@@ -51,5 +51,6 @@ done
|
|||||||
cd "$ROOT_DIR/packages/app-cli"
|
cd "$ROOT_DIR/packages/app-cli"
|
||||||
npm start -- --profile "$PROFILE_DIR" batch "$CMD_FILE"
|
npm start -- --profile "$PROFILE_DIR" batch "$CMD_FILE"
|
||||||
npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Joplin_17_06_2021.jex
|
npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Joplin_17_06_2021.jex
|
||||||
|
# npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Tout_18_06_2021.jex
|
||||||
npm start -- --profile "$PROFILE_DIR" sync
|
npm start -- --profile "$PROFILE_DIR" sync
|
||||||
|
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
const BaseSyncTarget = require('./BaseSyncTarget').default;
|
const BaseSyncTarget = require('./BaseSyncTarget').default;
|
||||||
const Setting = require('./models/Setting').default;
|
const Setting = require('./models/Setting').default;
|
||||||
const { FileApi } = require('./file-api.js');
|
const { FileApi } = require('./file-api.js');
|
||||||
const { FileApiDriverMemory } = require('./file-api-driver-memory.js');
|
const FileApiDriverMemory = require('./file-api-driver-memory').default;
|
||||||
const Synchronizer = require('./Synchronizer').default;
|
const Synchronizer = require('./Synchronizer').default;
|
||||||
|
|
||||||
class SyncTargetMemory extends BaseSyncTarget {
|
class SyncTargetMemory extends BaseSyncTarget {
|
||||||
|
@@ -19,6 +19,7 @@ import EncryptionService from './services/EncryptionService';
|
|||||||
import JoplinError from './JoplinError';
|
import JoplinError from './JoplinError';
|
||||||
import ShareService from './services/share/ShareService';
|
import ShareService from './services/share/ShareService';
|
||||||
import TaskQueue from './TaskQueue';
|
import TaskQueue from './TaskQueue';
|
||||||
|
import ItemUploader from './services/synchronizer/ItemUploader';
|
||||||
const { sprintf } = require('sprintf-js');
|
const { sprintf } = require('sprintf-js');
|
||||||
const { Dirnames } = require('./services/synchronizer/utils/types');
|
const { Dirnames } = require('./services/synchronizer/utils/types');
|
||||||
|
|
||||||
@@ -73,7 +74,7 @@ export default class Synchronizer {
|
|||||||
|
|
||||||
public dispatch: Function;
|
public dispatch: Function;
|
||||||
|
|
||||||
constructor(db: any, api: any, appType: string) {
|
public constructor(db: any, api: any, appType: string) {
|
||||||
this.db_ = db;
|
this.db_ = db;
|
||||||
this.api_ = api;
|
this.api_ = api;
|
||||||
this.appType_ = appType;
|
this.appType_ = appType;
|
||||||
@@ -83,6 +84,8 @@ export default class Synchronizer {
|
|||||||
this.progressReport_ = {};
|
this.progressReport_ = {};
|
||||||
|
|
||||||
this.dispatch = function() {};
|
this.dispatch = function() {};
|
||||||
|
|
||||||
|
this.apiCall = this.apiCall.bind(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
state() {
|
state() {
|
||||||
@@ -300,7 +303,7 @@ export default class Synchronizer {
|
|||||||
return '';
|
return '';
|
||||||
}
|
}
|
||||||
|
|
||||||
async apiCall(fnName: string, ...args: any[]) {
|
private async apiCall(fnName: string, ...args: any[]) {
|
||||||
if (this.syncTargetIsLocked_) throw new JoplinError('Sync target is locked - aborting API call', 'lockError');
|
if (this.syncTargetIsLocked_) throw new JoplinError('Sync target is locked - aborting API call', 'lockError');
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -389,6 +392,8 @@ export default class Synchronizer {
|
|||||||
// correctly so as to share/unshare the right items.
|
// correctly so as to share/unshare the right items.
|
||||||
await Folder.updateAllShareIds();
|
await Folder.updateAllShareIds();
|
||||||
|
|
||||||
|
const itemUploader = new ItemUploader(this.api(), this.apiCall);
|
||||||
|
|
||||||
let errorToThrow = null;
|
let errorToThrow = null;
|
||||||
let syncLock = null;
|
let syncLock = null;
|
||||||
|
|
||||||
@@ -440,6 +445,8 @@ export default class Synchronizer {
|
|||||||
const result = await BaseItem.itemsThatNeedSync(syncTargetId);
|
const result = await BaseItem.itemsThatNeedSync(syncTargetId);
|
||||||
const locals = result.items;
|
const locals = result.items;
|
||||||
|
|
||||||
|
await itemUploader.preUploadItems(result.items.filter((it: any) => result.neverSyncedItemIds.includes(it.id)));
|
||||||
|
|
||||||
for (let i = 0; i < locals.length; i++) {
|
for (let i = 0; i < locals.length; i++) {
|
||||||
if (this.cancelling()) break;
|
if (this.cancelling()) break;
|
||||||
|
|
||||||
@@ -588,8 +595,7 @@ export default class Synchronizer {
|
|||||||
let canSync = true;
|
let canSync = true;
|
||||||
try {
|
try {
|
||||||
if (this.testingHooks_.indexOf('notesRejectedByTarget') >= 0 && local.type_ === BaseModel.TYPE_NOTE) throw new JoplinError('Testing rejectedByTarget', 'rejectedByTarget');
|
if (this.testingHooks_.indexOf('notesRejectedByTarget') >= 0 && local.type_ === BaseModel.TYPE_NOTE) throw new JoplinError('Testing rejectedByTarget', 'rejectedByTarget');
|
||||||
const content = await ItemClass.serializeForSync(local);
|
await itemUploader.serializeAndUploadItem(ItemClass, path, local);
|
||||||
await this.apiCall('put', path, content);
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error && error.code === 'rejectedByTarget') {
|
if (error && error.code === 'rejectedByTarget') {
|
||||||
await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message);
|
await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message);
|
||||||
@@ -619,7 +625,6 @@ export default class Synchronizer {
|
|||||||
// above also doesn't use it because it fetches the whole remote object and read the
|
// above also doesn't use it because it fetches the whole remote object and read the
|
||||||
// more reliable 'updated_time' property. Basically remote.updated_time is deprecated.
|
// more reliable 'updated_time' property. Basically remote.updated_time is deprecated.
|
||||||
|
|
||||||
// await this.api().setTimestamp(path, local.updated_time);
|
|
||||||
await ItemClass.saveSyncTime(syncTargetId, local, local.updated_time);
|
await ItemClass.saveSyncTime(syncTargetId, local, local.updated_time);
|
||||||
}
|
}
|
||||||
} else if (action == 'itemConflict') {
|
} else if (action == 'itemConflict') {
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
import { MultiPutItem } from './file-api';
|
||||||
import JoplinError from './JoplinError';
|
import JoplinError from './JoplinError';
|
||||||
import JoplinServerApi from './JoplinServerApi';
|
import JoplinServerApi from './JoplinServerApi';
|
||||||
import { trimSlashes } from './path-utils';
|
import { trimSlashes } from './path-utils';
|
||||||
@@ -31,6 +32,10 @@ export default class FileApiDriverJoplinServer {
|
|||||||
return this.api_;
|
return this.api_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public get supportsMultiPut() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
public requestRepeatCount() {
|
public requestRepeatCount() {
|
||||||
return 3;
|
return 3;
|
||||||
}
|
}
|
||||||
@@ -174,6 +179,10 @@ export default class FileApiDriverJoplinServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async multiPut(items: MultiPutItem[], options: any = null) {
|
||||||
|
return this.api().exec('PUT', 'api/batch_items', null, { items: items }, null, options);
|
||||||
|
}
|
||||||
|
|
||||||
public async delete(path: string) {
|
public async delete(path: string) {
|
||||||
return this.api().exec('DELETE', this.apiFilePath_(path));
|
return this.api().exec('DELETE', this.apiFilePath_(path));
|
||||||
}
|
}
|
||||||
|
@@ -1,14 +1,18 @@
|
|||||||
const time = require('./time').default;
|
import time from './time';
|
||||||
const fs = require('fs-extra');
|
const fs = require('fs-extra');
|
||||||
const { basicDelta } = require('./file-api');
|
import { basicDelta, MultiPutItem } from './file-api';
|
||||||
|
|
||||||
|
export default class FileApiDriverMemory {
|
||||||
|
|
||||||
|
private items_: any[];
|
||||||
|
private deletedItems_: any[];
|
||||||
|
|
||||||
class FileApiDriverMemory {
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.items_ = [];
|
this.items_ = [];
|
||||||
this.deletedItems_ = [];
|
this.deletedItems_ = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
encodeContent_(content) {
|
encodeContent_(content: any) {
|
||||||
if (content instanceof Buffer) {
|
if (content instanceof Buffer) {
|
||||||
return content.toString('base64');
|
return content.toString('base64');
|
||||||
} else {
|
} else {
|
||||||
@@ -16,23 +20,27 @@ class FileApiDriverMemory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
decodeContent_(content) {
|
public get supportsMultiPut() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
decodeContent_(content: any) {
|
||||||
return Buffer.from(content, 'base64').toString('utf-8');
|
return Buffer.from(content, 'base64').toString('utf-8');
|
||||||
}
|
}
|
||||||
|
|
||||||
itemIndexByPath(path) {
|
itemIndexByPath(path: string) {
|
||||||
for (let i = 0; i < this.items_.length; i++) {
|
for (let i = 0; i < this.items_.length; i++) {
|
||||||
if (this.items_[i].path == path) return i;
|
if (this.items_[i].path == path) return i;
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
itemByPath(path) {
|
itemByPath(path: string) {
|
||||||
const index = this.itemIndexByPath(path);
|
const index = this.itemIndexByPath(path);
|
||||||
return index < 0 ? null : this.items_[index];
|
return index < 0 ? null : this.items_[index];
|
||||||
}
|
}
|
||||||
|
|
||||||
newItem(path, isDir = false) {
|
newItem(path: string, isDir = false) {
|
||||||
const now = time.unixMs();
|
const now = time.unixMs();
|
||||||
return {
|
return {
|
||||||
path: path,
|
path: path,
|
||||||
@@ -43,18 +51,18 @@ class FileApiDriverMemory {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
stat(path) {
|
stat(path: string) {
|
||||||
const item = this.itemByPath(path);
|
const item = this.itemByPath(path);
|
||||||
return Promise.resolve(item ? Object.assign({}, item) : null);
|
return Promise.resolve(item ? Object.assign({}, item) : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
async setTimestamp(path, timestampMs) {
|
async setTimestamp(path: string, timestampMs: number): Promise<any> {
|
||||||
const item = this.itemByPath(path);
|
const item = this.itemByPath(path);
|
||||||
if (!item) return Promise.reject(new Error(`File not found: ${path}`));
|
if (!item) return Promise.reject(new Error(`File not found: ${path}`));
|
||||||
item.updated_time = timestampMs;
|
item.updated_time = timestampMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
async list(path) {
|
async list(path: string) {
|
||||||
const output = [];
|
const output = [];
|
||||||
|
|
||||||
for (let i = 0; i < this.items_.length; i++) {
|
for (let i = 0; i < this.items_.length; i++) {
|
||||||
@@ -77,7 +85,7 @@ class FileApiDriverMemory {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async get(path, options) {
|
async get(path: string, options: any) {
|
||||||
const item = this.itemByPath(path);
|
const item = this.itemByPath(path);
|
||||||
if (!item) return Promise.resolve(null);
|
if (!item) return Promise.resolve(null);
|
||||||
if (item.isDir) return Promise.reject(new Error(`${path} is a directory, not a file`));
|
if (item.isDir) return Promise.reject(new Error(`${path} is a directory, not a file`));
|
||||||
@@ -93,13 +101,13 @@ class FileApiDriverMemory {
|
|||||||
return output;
|
return output;
|
||||||
}
|
}
|
||||||
|
|
||||||
async mkdir(path) {
|
async mkdir(path: string) {
|
||||||
const index = this.itemIndexByPath(path);
|
const index = this.itemIndexByPath(path);
|
||||||
if (index >= 0) return;
|
if (index >= 0) return;
|
||||||
this.items_.push(this.newItem(path, true));
|
this.items_.push(this.newItem(path, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
async put(path, content, options = null) {
|
async put(path: string, content: any, options: any = null) {
|
||||||
if (!options) options = {};
|
if (!options) options = {};
|
||||||
|
|
||||||
if (options.source === 'file') content = await fs.readFile(options.path);
|
if (options.source === 'file') content = await fs.readFile(options.path);
|
||||||
@@ -109,13 +117,38 @@ class FileApiDriverMemory {
|
|||||||
const item = this.newItem(path, false);
|
const item = this.newItem(path, false);
|
||||||
item.content = this.encodeContent_(content);
|
item.content = this.encodeContent_(content);
|
||||||
this.items_.push(item);
|
this.items_.push(item);
|
||||||
|
return item;
|
||||||
} else {
|
} else {
|
||||||
this.items_[index].content = this.encodeContent_(content);
|
this.items_[index].content = this.encodeContent_(content);
|
||||||
this.items_[index].updated_time = time.unixMs();
|
this.items_[index].updated_time = time.unixMs();
|
||||||
|
return this.items_[index];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async delete(path) {
|
public async multiPut(items: MultiPutItem[], options: any = null) {
|
||||||
|
const output: any = {
|
||||||
|
items: {},
|
||||||
|
};
|
||||||
|
|
||||||
|
for (const item of items) {
|
||||||
|
try {
|
||||||
|
const processedItem = await this.put(`/root/${item.name}`, item.body, options);
|
||||||
|
output.items[item.name] = {
|
||||||
|
item: processedItem,
|
||||||
|
error: null,
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
output.items[item.name] = {
|
||||||
|
item: null,
|
||||||
|
error: error,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return output;
|
||||||
|
}
|
||||||
|
|
||||||
|
async delete(path: string) {
|
||||||
const index = this.itemIndexByPath(path);
|
const index = this.itemIndexByPath(path);
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
const item = Object.assign({}, this.items_[index]);
|
const item = Object.assign({}, this.items_[index]);
|
||||||
@@ -126,10 +159,10 @@ class FileApiDriverMemory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async move(oldPath, newPath) {
|
async move(oldPath: string, newPath: string): Promise<any> {
|
||||||
const sourceItem = this.itemByPath(oldPath);
|
const sourceItem = this.itemByPath(oldPath);
|
||||||
if (!sourceItem) return Promise.reject(new Error(`Path not found: ${oldPath}`));
|
if (!sourceItem) return Promise.reject(new Error(`Path not found: ${oldPath}`));
|
||||||
this.delete(newPath); // Overwrite if newPath already exists
|
await this.delete(newPath); // Overwrite if newPath already exists
|
||||||
sourceItem.path = newPath;
|
sourceItem.path = newPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,8 +170,8 @@ class FileApiDriverMemory {
|
|||||||
this.items_ = [];
|
this.items_ = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
async delta(path, options = null) {
|
async delta(path: string, options: any = null) {
|
||||||
const getStatFn = async path => {
|
const getStatFn = async (path: string) => {
|
||||||
const output = this.items_.slice();
|
const output = this.items_.slice();
|
||||||
for (let i = 0; i < output.length; i++) {
|
for (let i = 0; i < output.length; i++) {
|
||||||
const item = Object.assign({}, output[i]);
|
const item = Object.assign({}, output[i]);
|
||||||
@@ -156,5 +189,3 @@ class FileApiDriverMemory {
|
|||||||
this.items_ = [];
|
this.items_ = [];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = { FileApiDriverMemory };
|
|
@@ -11,6 +11,11 @@ const Mutex = require('async-mutex').Mutex;
|
|||||||
|
|
||||||
const logger = Logger.create('FileApi');
|
const logger = Logger.create('FileApi');
|
||||||
|
|
||||||
|
export interface MultiPutItem {
|
||||||
|
name: string;
|
||||||
|
body: string;
|
||||||
|
}
|
||||||
|
|
||||||
function requestCanBeRepeated(error: any) {
|
function requestCanBeRepeated(error: any) {
|
||||||
const errorCode = typeof error === 'object' && error.code ? error.code : null;
|
const errorCode = typeof error === 'object' && error.code ? error.code : null;
|
||||||
|
|
||||||
@@ -81,6 +86,10 @@ class FileApi {
|
|||||||
if (this.driver_.initialize) return this.driver_.initialize(this.fullPath(''));
|
if (this.driver_.initialize) return this.driver_.initialize(this.fullPath(''));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public get supportsMultiPut(): boolean {
|
||||||
|
return !!this.driver().supportsMultiPut;
|
||||||
|
}
|
||||||
|
|
||||||
async fetchRemoteDateOffset_() {
|
async fetchRemoteDateOffset_() {
|
||||||
const tempFile = `${this.tempDirName()}/timeCheck${Math.round(Math.random() * 1000000)}.txt`;
|
const tempFile = `${this.tempDirName()}/timeCheck${Math.round(Math.random() * 1000000)}.txt`;
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
@@ -251,12 +260,6 @@ class FileApi {
|
|||||||
if (!output) return output;
|
if (!output) return output;
|
||||||
output.path = path;
|
output.path = path;
|
||||||
return output;
|
return output;
|
||||||
|
|
||||||
// return this.driver_.stat(this.fullPath(path)).then((output) => {
|
|
||||||
// if (!output) return output;
|
|
||||||
// output.path = path;
|
|
||||||
// return output;
|
|
||||||
// });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns UTF-8 encoded string by default, or a Response if `options.target = 'file'`
|
// Returns UTF-8 encoded string by default, or a Response if `options.target = 'file'`
|
||||||
@@ -277,6 +280,11 @@ class FileApi {
|
|||||||
return tryAndRepeat(() => this.driver_.put(this.fullPath(path), content, options), this.requestRepeatCount());
|
return tryAndRepeat(() => this.driver_.put(this.fullPath(path), content, options), this.requestRepeatCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async multiPut(items: MultiPutItem[], options: any = null) {
|
||||||
|
if (!this.driver().supportsMultiPut) throw new Error('Multi PUT not supported');
|
||||||
|
return tryAndRepeat(() => this.driver_.multiPut(items, options), this.requestRepeatCount());
|
||||||
|
}
|
||||||
|
|
||||||
delete(path: string) {
|
delete(path: string) {
|
||||||
logger.debug(`delete ${this.fullPath(path)}`);
|
logger.debug(`delete ${this.fullPath(path)}`);
|
||||||
return tryAndRepeat(() => this.driver_.delete(this.fullPath(path)), this.requestRepeatCount());
|
return tryAndRepeat(() => this.driver_.delete(this.fullPath(path)), this.requestRepeatCount());
|
||||||
|
@@ -403,7 +403,7 @@ export default class BaseItem extends BaseModel {
|
|||||||
return this.shareService_;
|
return this.shareService_;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static async serializeForSync(item: BaseItemEntity) {
|
public static async serializeForSync(item: BaseItemEntity): Promise<string> {
|
||||||
const ItemClass = this.itemClass(item);
|
const ItemClass = this.itemClass(item);
|
||||||
const shownKeys = ItemClass.fieldNames();
|
const shownKeys = ItemClass.fieldNames();
|
||||||
shownKeys.push('type_');
|
shownKeys.push('type_');
|
||||||
|
167
packages/lib/services/synchronizer/ItemUploader.test.ts
Normal file
167
packages/lib/services/synchronizer/ItemUploader.test.ts
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
import { FileApi } from '../../file-api';
|
||||||
|
import BaseItem from '../../models/BaseItem';
|
||||||
|
import Note from '../../models/Note';
|
||||||
|
import { expectNotThrow, expectThrow, setupDatabaseAndSynchronizer, switchClient } from '../../testing/test-utils';
|
||||||
|
import time from '../../time';
|
||||||
|
import ItemUploader, { ApiCallFunction } from './ItemUploader';
|
||||||
|
|
||||||
|
interface ApiCall {
|
||||||
|
name: string;
|
||||||
|
args: any[];
|
||||||
|
}
|
||||||
|
|
||||||
|
function clearArray(a: any[]) {
|
||||||
|
a.splice(0, a.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
function newFakeApi(): FileApi {
|
||||||
|
return { supportsMultiPut: true } as any;
|
||||||
|
}
|
||||||
|
|
||||||
|
function newFakeApiCall(callRecorder: ApiCall[], itemBodyCallback: Function = null): ApiCallFunction {
|
||||||
|
const apiCall = async (callName: string, ...args: any[]): Promise<any> => {
|
||||||
|
callRecorder.push({ name: callName, args });
|
||||||
|
|
||||||
|
if (callName === 'multiPut') {
|
||||||
|
const [batch] = args;
|
||||||
|
const output: any = { items: {} };
|
||||||
|
for (const item of batch) {
|
||||||
|
if (itemBodyCallback) {
|
||||||
|
output.items[item.name] = itemBodyCallback(item);
|
||||||
|
} else {
|
||||||
|
output.items[item.name] = {
|
||||||
|
item: item.body,
|
||||||
|
error: null,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return output;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return apiCall;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('synchronizer_ItemUplader', function() {
|
||||||
|
|
||||||
|
beforeEach(async (done) => {
|
||||||
|
await setupDatabaseAndSynchronizer(1);
|
||||||
|
await setupDatabaseAndSynchronizer(2);
|
||||||
|
await switchClient(1);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should batch uploads and use the cache afterwards', (async () => {
|
||||||
|
const callRecorder: ApiCall[] = [];
|
||||||
|
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
|
||||||
|
|
||||||
|
const notes = [
|
||||||
|
await Note.save({ title: '1' }),
|
||||||
|
await Note.save({ title: '2' }),
|
||||||
|
];
|
||||||
|
|
||||||
|
await itemUploader.preUploadItems(notes);
|
||||||
|
|
||||||
|
// There should be only one call to "multiPut" because the items have
|
||||||
|
// been batched.
|
||||||
|
expect(callRecorder.length).toBe(1);
|
||||||
|
expect(callRecorder[0].name).toBe('multiPut');
|
||||||
|
|
||||||
|
clearArray(callRecorder);
|
||||||
|
|
||||||
|
// Now if we try to upload the item it shouldn't call the API because it
|
||||||
|
// will use the cached item.
|
||||||
|
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]);
|
||||||
|
expect(callRecorder.length).toBe(0);
|
||||||
|
|
||||||
|
// Now try to process a note that hasn't been cached. In that case, it
|
||||||
|
// should call "PUT" directly.
|
||||||
|
const note3 = await Note.save({ title: '3' });
|
||||||
|
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(note3), note3);
|
||||||
|
expect(callRecorder.length).toBe(1);
|
||||||
|
expect(callRecorder[0].name).toBe('put');
|
||||||
|
}));
|
||||||
|
|
||||||
|
it('should not batch upload if the items are over the batch size limit', (async () => {
|
||||||
|
const callRecorder: ApiCall[] = [];
|
||||||
|
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
|
||||||
|
itemUploader.maxBatchSize = 1;
|
||||||
|
|
||||||
|
const notes = [
|
||||||
|
await Note.save({ title: '1' }),
|
||||||
|
await Note.save({ title: '2' }),
|
||||||
|
];
|
||||||
|
|
||||||
|
await itemUploader.preUploadItems(notes);
|
||||||
|
expect(callRecorder.length).toBe(0);
|
||||||
|
}));
|
||||||
|
|
||||||
|
it('should not use the cache if the note has changed since the pre-upload', (async () => {
|
||||||
|
const callRecorder: ApiCall[] = [];
|
||||||
|
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
|
||||||
|
|
||||||
|
const notes = [
|
||||||
|
await Note.save({ title: '1' }),
|
||||||
|
await Note.save({ title: '2' }),
|
||||||
|
];
|
||||||
|
|
||||||
|
await itemUploader.preUploadItems(notes);
|
||||||
|
clearArray(callRecorder);
|
||||||
|
|
||||||
|
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]);
|
||||||
|
expect(callRecorder.length).toBe(0);
|
||||||
|
|
||||||
|
await time.msleep(1);
|
||||||
|
notes[1] = await Note.save({ title: '22' }),
|
||||||
|
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1]);
|
||||||
|
expect(callRecorder.length).toBe(1);
|
||||||
|
}));
|
||||||
|
|
||||||
|
it('should respect the max batch size', (async () => {
|
||||||
|
const callRecorder: ApiCall[] = [];
|
||||||
|
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
|
||||||
|
|
||||||
|
const notes = [
|
||||||
|
await Note.save({ title: '1' }),
|
||||||
|
await Note.save({ title: '2' }),
|
||||||
|
await Note.save({ title: '3' }),
|
||||||
|
];
|
||||||
|
|
||||||
|
const noteSize = BaseItem.systemPath(notes[0]).length + (await Note.serializeForSync(notes[0])).length;
|
||||||
|
itemUploader.maxBatchSize = noteSize * 2;
|
||||||
|
|
||||||
|
// It should send two batches - one with two notes, and the second with
|
||||||
|
// only one note.
|
||||||
|
await itemUploader.preUploadItems(notes);
|
||||||
|
expect(callRecorder.length).toBe(2);
|
||||||
|
expect(callRecorder[0].args[0].length).toBe(2);
|
||||||
|
expect(callRecorder[1].args[0].length).toBe(1);
|
||||||
|
}));
|
||||||
|
|
||||||
|
it('should rethrow error for items within the batch', (async () => {
|
||||||
|
const callRecorder: ApiCall[] = [];
|
||||||
|
|
||||||
|
const notes = [
|
||||||
|
await Note.save({ title: '1' }),
|
||||||
|
await Note.save({ title: '2' }),
|
||||||
|
await Note.save({ title: '3' }),
|
||||||
|
];
|
||||||
|
|
||||||
|
// Simulates throwing an error on note 2
|
||||||
|
const itemBodyCallback = (item: any): any => {
|
||||||
|
if (item.name === BaseItem.systemPath(notes[1])) {
|
||||||
|
return { error: new Error('Could not save item'), item: null };
|
||||||
|
} else {
|
||||||
|
return { error: null, item: item.body };
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder, itemBodyCallback));
|
||||||
|
|
||||||
|
await itemUploader.preUploadItems(notes);
|
||||||
|
|
||||||
|
await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]));
|
||||||
|
await expectThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1]));
|
||||||
|
await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[2]), notes[2]));
|
||||||
|
}));
|
||||||
|
|
||||||
|
});
|
110
packages/lib/services/synchronizer/ItemUploader.ts
Normal file
110
packages/lib/services/synchronizer/ItemUploader.ts
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
import { ModelType } from '../../BaseModel';
|
||||||
|
import { FileApi, MultiPutItem } from '../../file-api';
|
||||||
|
import Logger from '../../Logger';
|
||||||
|
import BaseItem, { ItemThatNeedSync } from '../../models/BaseItem';
|
||||||
|
|
||||||
|
const logger = Logger.create('ItemUploader');
|
||||||
|
|
||||||
|
export type ApiCallFunction = (fnName: string, ...args: any[])=> Promise<any>;
|
||||||
|
|
||||||
|
interface BatchItem extends MultiPutItem {
|
||||||
|
localItemUpdatedTime: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export default class ItemUploader {
|
||||||
|
|
||||||
|
private api_: FileApi;
|
||||||
|
private apiCall_: ApiCallFunction;
|
||||||
|
private preUploadedItems_: Record<string, any> = {};
|
||||||
|
private preUploadedItemUpdatedTimes_: Record<string, number> = {};
|
||||||
|
private maxBatchSize_ = 1 * 1024 * 1024; // 1MB;
|
||||||
|
|
||||||
|
public constructor(api: FileApi, apiCall: ApiCallFunction) {
|
||||||
|
this.api_ = api;
|
||||||
|
this.apiCall_ = apiCall;
|
||||||
|
}
|
||||||
|
|
||||||
|
public get maxBatchSize() {
|
||||||
|
return this.maxBatchSize_;
|
||||||
|
}
|
||||||
|
|
||||||
|
public set maxBatchSize(v: number) {
|
||||||
|
this.maxBatchSize_ = v;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async serializeAndUploadItem(ItemClass: any, path: string, local: ItemThatNeedSync) {
|
||||||
|
const preUploadItem = this.preUploadedItems_[path];
|
||||||
|
if (preUploadItem) {
|
||||||
|
if (this.preUploadedItemUpdatedTimes_[path] !== local.updated_time) {
|
||||||
|
// Normally this should be rare as it can only happen if the
|
||||||
|
// item has been changed between the moment it was pre-uploaded
|
||||||
|
// and the moment where it's being processed by the
|
||||||
|
// synchronizer. It could happen for example for a note being
|
||||||
|
// edited just at the same time. In that case, we proceed with
|
||||||
|
// the regular upload.
|
||||||
|
logger.warn(`Pre-uploaded item updated_time has changed. It is going to be re-uploaded again: ${path} (From ${this.preUploadedItemUpdatedTimes_[path]} to ${local.updated_time})`);
|
||||||
|
} else {
|
||||||
|
if (preUploadItem.error) throw new Error(preUploadItem.error.message ? preUploadItem.error.message : 'Unknown pre-upload error');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const content = await ItemClass.serializeForSync(local);
|
||||||
|
await this.apiCall_('put', path, content);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async preUploadItems(items: ItemThatNeedSync[]) {
|
||||||
|
if (!this.api_.supportsMultiPut) return;
|
||||||
|
|
||||||
|
const itemsToUpload: BatchItem[] = [];
|
||||||
|
|
||||||
|
for (const local of items) {
|
||||||
|
// For resources, additional logic is necessary - in particular the blob
|
||||||
|
// should be uploaded before the metadata, so we can't batch process.
|
||||||
|
if (local.type_ === ModelType.Resource) continue;
|
||||||
|
|
||||||
|
const ItemClass = BaseItem.itemClass(local);
|
||||||
|
itemsToUpload.push({
|
||||||
|
name: BaseItem.systemPath(local),
|
||||||
|
body: await ItemClass.serializeForSync(local),
|
||||||
|
localItemUpdatedTime: local.updated_time,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let batchSize = 0;
|
||||||
|
let currentBatch: BatchItem[] = [];
|
||||||
|
|
||||||
|
const uploadBatch = async (batch: BatchItem[]) => {
|
||||||
|
for (const batchItem of batch) {
|
||||||
|
this.preUploadedItemUpdatedTimes_[batchItem.name] = batchItem.localItemUpdatedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = await this.apiCall_('multiPut', batch);
|
||||||
|
this.preUploadedItems_ = {
|
||||||
|
...this.preUploadedItems_,
|
||||||
|
...response.items,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
while (itemsToUpload.length) {
|
||||||
|
const itemToUpload = itemsToUpload.pop();
|
||||||
|
const itemSize = itemToUpload.name.length + itemToUpload.body.length;
|
||||||
|
|
||||||
|
// Although it should be rare, if the item itself is above the
|
||||||
|
// batch max size, we skip it. In that case it will be uploaded the
|
||||||
|
// regular way when the synchronizer calls `serializeAndUploadItem()`
|
||||||
|
if (itemSize > this.maxBatchSize) continue;
|
||||||
|
|
||||||
|
if (batchSize + itemSize > this.maxBatchSize) {
|
||||||
|
await uploadBatch(currentBatch);
|
||||||
|
batchSize = itemSize;
|
||||||
|
currentBatch = [itemToUpload];
|
||||||
|
} else {
|
||||||
|
batchSize += itemSize;
|
||||||
|
currentBatch.push(itemToUpload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (currentBatch.length) await uploadBatch(currentBatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@@ -29,7 +29,7 @@ import Revision from '../models/Revision';
|
|||||||
import MasterKey from '../models/MasterKey';
|
import MasterKey from '../models/MasterKey';
|
||||||
import BaseItem from '../models/BaseItem';
|
import BaseItem from '../models/BaseItem';
|
||||||
const { FileApi } = require('../file-api.js');
|
const { FileApi } = require('../file-api.js');
|
||||||
const { FileApiDriverMemory } = require('../file-api-driver-memory.js');
|
const FileApiDriverMemory = require('../file-api-driver-memory').default;
|
||||||
const { FileApiDriverLocal } = require('../file-api-driver-local.js');
|
const { FileApiDriverLocal } = require('../file-api-driver-local.js');
|
||||||
const { FileApiDriverWebDav } = require('../file-api-driver-webdav.js');
|
const { FileApiDriverWebDav } = require('../file-api-driver-webdav.js');
|
||||||
const { FileApiDriverDropbox } = require('../file-api-driver-dropbox.js');
|
const { FileApiDriverDropbox } = require('../file-api-driver-dropbox.js');
|
||||||
|
@@ -5,14 +5,17 @@ import Router from '../../utils/Router';
|
|||||||
import { RouteType } from '../../utils/types';
|
import { RouteType } from '../../utils/types';
|
||||||
import { AppContext } from '../../utils/types';
|
import { AppContext } from '../../utils/types';
|
||||||
import * as fs from 'fs-extra';
|
import * as fs from 'fs-extra';
|
||||||
import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound } from '../../utils/errors';
|
import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound, ErrorPayloadTooLarge } from '../../utils/errors';
|
||||||
import ItemModel, { ItemSaveOption, SaveFromRawContentItem } from '../../models/ItemModel';
|
import ItemModel, { ItemSaveOption, SaveFromRawContentItem } from '../../models/ItemModel';
|
||||||
import { requestDeltaPagination, requestPagination } from '../../models/utils/pagination';
|
import { requestDeltaPagination, requestPagination } from '../../models/utils/pagination';
|
||||||
import { AclAction } from '../../models/BaseModel';
|
import { AclAction } from '../../models/BaseModel';
|
||||||
import { safeRemove } from '../../utils/fileUtils';
|
import { safeRemove } from '../../utils/fileUtils';
|
||||||
|
import { formatBytes, MB } from '../../utils/bytes';
|
||||||
|
|
||||||
const router = new Router(RouteType.Api);
|
const router = new Router(RouteType.Api);
|
||||||
|
|
||||||
|
const batchMaxSize = 1 * MB;
|
||||||
|
|
||||||
export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: boolean) {
|
export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: boolean) {
|
||||||
if (!ctx.owner.can_upload) throw new ErrorForbidden('Uploading content is disabled');
|
if (!ctx.owner.can_upload) throw new ErrorForbidden('Uploading content is disabled');
|
||||||
|
|
||||||
@@ -23,12 +26,16 @@ export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: b
|
|||||||
let items: SaveFromRawContentItem[] = [];
|
let items: SaveFromRawContentItem[] = [];
|
||||||
|
|
||||||
if (isBatch) {
|
if (isBatch) {
|
||||||
|
let totalSize = 0;
|
||||||
items = bodyFields.items.map((item: any) => {
|
items = bodyFields.items.map((item: any) => {
|
||||||
|
totalSize += item.name.length + (item.body ? item.body.length : 0);
|
||||||
return {
|
return {
|
||||||
name: item.name,
|
name: item.name,
|
||||||
body: item.body ? Buffer.from(item.body, 'utf8') : Buffer.alloc(0),
|
body: item.body ? Buffer.from(item.body, 'utf8') : Buffer.alloc(0),
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (totalSize > batchMaxSize) throw new ErrorPayloadTooLarge(`Size of items (${formatBytes(totalSize)}) is over the limit (${formatBytes(batchMaxSize)})`);
|
||||||
} else {
|
} else {
|
||||||
const filePath = parsedBody?.files?.file ? parsedBody.files.file.path : null;
|
const filePath = parsedBody?.files?.file ? parsedBody.files.file.path : null;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user