diff --git a/.eslintignore b/.eslintignore index 717e862722..bfd49f2cbd 100644 --- a/.eslintignore +++ b/.eslintignore @@ -872,6 +872,9 @@ packages/lib/eventManager.js.map packages/lib/file-api-driver-joplinServer.d.ts packages/lib/file-api-driver-joplinServer.js packages/lib/file-api-driver-joplinServer.js.map +packages/lib/file-api-driver-memory.d.ts +packages/lib/file-api-driver-memory.js +packages/lib/file-api-driver-memory.js.map packages/lib/file-api-driver.test.d.ts packages/lib/file-api-driver.test.js packages/lib/file-api-driver.test.js.map @@ -1391,6 +1394,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map +packages/lib/services/synchronizer/ItemUploader.d.ts +packages/lib/services/synchronizer/ItemUploader.js +packages/lib/services/synchronizer/ItemUploader.js.map +packages/lib/services/synchronizer/ItemUploader.test.d.ts +packages/lib/services/synchronizer/ItemUploader.test.js +packages/lib/services/synchronizer/ItemUploader.test.js.map packages/lib/services/synchronizer/LockHandler.d.ts packages/lib/services/synchronizer/LockHandler.js packages/lib/services/synchronizer/LockHandler.js.map diff --git a/.gitignore b/.gitignore index fb887551d5..7ca11ebdd2 100644 --- a/.gitignore +++ b/.gitignore @@ -858,6 +858,9 @@ packages/lib/eventManager.js.map packages/lib/file-api-driver-joplinServer.d.ts packages/lib/file-api-driver-joplinServer.js packages/lib/file-api-driver-joplinServer.js.map +packages/lib/file-api-driver-memory.d.ts +packages/lib/file-api-driver-memory.js +packages/lib/file-api-driver-memory.js.map packages/lib/file-api-driver.test.d.ts packages/lib/file-api-driver.test.js packages/lib/file-api-driver.test.js.map @@ -1377,6 +1380,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map +packages/lib/services/synchronizer/ItemUploader.d.ts +packages/lib/services/synchronizer/ItemUploader.js +packages/lib/services/synchronizer/ItemUploader.js.map +packages/lib/services/synchronizer/ItemUploader.test.d.ts +packages/lib/services/synchronizer/ItemUploader.test.js +packages/lib/services/synchronizer/ItemUploader.test.js.map packages/lib/services/synchronizer/LockHandler.d.ts packages/lib/services/synchronizer/LockHandler.js packages/lib/services/synchronizer/LockHandler.js.map diff --git a/packages/app-cli/tests/support/serverPerformances/testPerf.sh b/packages/app-cli/tests/support/serverPerformances/testPerf.sh index 0b94bfc07c..daffb22565 100755 --- a/packages/app-cli/tests/support/serverPerformances/testPerf.sh +++ b/packages/app-cli/tests/support/serverPerformances/testPerf.sh @@ -51,5 +51,6 @@ done cd "$ROOT_DIR/packages/app-cli" npm start -- --profile "$PROFILE_DIR" batch "$CMD_FILE" npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Joplin_17_06_2021.jex +# npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Tout_18_06_2021.jex npm start -- --profile "$PROFILE_DIR" sync diff --git a/packages/lib/SyncTargetMemory.js b/packages/lib/SyncTargetMemory.js index 5f98ff2db5..1bc44ed395 100644 --- a/packages/lib/SyncTargetMemory.js +++ b/packages/lib/SyncTargetMemory.js @@ -1,7 +1,7 @@ const BaseSyncTarget = require('./BaseSyncTarget').default; const Setting = require('./models/Setting').default; const { FileApi } = require('./file-api.js'); -const { FileApiDriverMemory } = require('./file-api-driver-memory.js'); +const FileApiDriverMemory = require('./file-api-driver-memory').default; const Synchronizer = require('./Synchronizer').default; class SyncTargetMemory extends BaseSyncTarget { diff --git a/packages/lib/Synchronizer.ts b/packages/lib/Synchronizer.ts index 59e138e75d..c93dbcb39f 100644 --- a/packages/lib/Synchronizer.ts +++ b/packages/lib/Synchronizer.ts @@ -19,6 +19,7 @@ import EncryptionService from './services/EncryptionService'; import JoplinError from './JoplinError'; import ShareService from './services/share/ShareService'; import TaskQueue from './TaskQueue'; +import ItemUploader from './services/synchronizer/ItemUploader'; const { sprintf } = require('sprintf-js'); const { Dirnames } = require('./services/synchronizer/utils/types'); @@ -73,7 +74,7 @@ export default class Synchronizer { public dispatch: Function; - constructor(db: any, api: any, appType: string) { + public constructor(db: any, api: any, appType: string) { this.db_ = db; this.api_ = api; this.appType_ = appType; @@ -83,6 +84,8 @@ export default class Synchronizer { this.progressReport_ = {}; this.dispatch = function() {}; + + this.apiCall = this.apiCall.bind(this); } state() { @@ -300,7 +303,7 @@ export default class Synchronizer { return ''; } - async apiCall(fnName: string, ...args: any[]) { + private async apiCall(fnName: string, ...args: any[]) { if (this.syncTargetIsLocked_) throw new JoplinError('Sync target is locked - aborting API call', 'lockError'); try { @@ -389,6 +392,8 @@ export default class Synchronizer { // correctly so as to share/unshare the right items. await Folder.updateAllShareIds(); + const itemUploader = new ItemUploader(this.api(), this.apiCall); + let errorToThrow = null; let syncLock = null; @@ -440,6 +445,8 @@ export default class Synchronizer { const result = await BaseItem.itemsThatNeedSync(syncTargetId); const locals = result.items; + await itemUploader.preUploadItems(result.items.filter((it: any) => result.neverSyncedItemIds.includes(it.id))); + for (let i = 0; i < locals.length; i++) { if (this.cancelling()) break; @@ -588,8 +595,7 @@ export default class Synchronizer { let canSync = true; try { if (this.testingHooks_.indexOf('notesRejectedByTarget') >= 0 && local.type_ === BaseModel.TYPE_NOTE) throw new JoplinError('Testing rejectedByTarget', 'rejectedByTarget'); - const content = await ItemClass.serializeForSync(local); - await this.apiCall('put', path, content); + await itemUploader.serializeAndUploadItem(ItemClass, path, local); } catch (error) { if (error && error.code === 'rejectedByTarget') { await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message); @@ -619,7 +625,6 @@ export default class Synchronizer { // above also doesn't use it because it fetches the whole remote object and read the // more reliable 'updated_time' property. Basically remote.updated_time is deprecated. - // await this.api().setTimestamp(path, local.updated_time); await ItemClass.saveSyncTime(syncTargetId, local, local.updated_time); } } else if (action == 'itemConflict') { diff --git a/packages/lib/file-api-driver-joplinServer.ts b/packages/lib/file-api-driver-joplinServer.ts index 5d39177872..97e32b8bb4 100644 --- a/packages/lib/file-api-driver-joplinServer.ts +++ b/packages/lib/file-api-driver-joplinServer.ts @@ -1,3 +1,4 @@ +import { MultiPutItem } from './file-api'; import JoplinError from './JoplinError'; import JoplinServerApi from './JoplinServerApi'; import { trimSlashes } from './path-utils'; @@ -31,6 +32,10 @@ export default class FileApiDriverJoplinServer { return this.api_; } + public get supportsMultiPut() { + return true; + } + public requestRepeatCount() { return 3; } @@ -174,6 +179,10 @@ export default class FileApiDriverJoplinServer { } } + public async multiPut(items: MultiPutItem[], options: any = null) { + return this.api().exec('PUT', 'api/batch_items', null, { items: items }, null, options); + } + public async delete(path: string) { return this.api().exec('DELETE', this.apiFilePath_(path)); } diff --git a/packages/lib/file-api-driver-memory.js b/packages/lib/file-api-driver-memory.ts similarity index 69% rename from packages/lib/file-api-driver-memory.js rename to packages/lib/file-api-driver-memory.ts index 9d8168092e..b0886b7409 100644 --- a/packages/lib/file-api-driver-memory.js +++ b/packages/lib/file-api-driver-memory.ts @@ -1,14 +1,18 @@ -const time = require('./time').default; +import time from './time'; const fs = require('fs-extra'); -const { basicDelta } = require('./file-api'); +import { basicDelta, MultiPutItem } from './file-api'; + +export default class FileApiDriverMemory { + + private items_: any[]; + private deletedItems_: any[]; -class FileApiDriverMemory { constructor() { this.items_ = []; this.deletedItems_ = []; } - encodeContent_(content) { + encodeContent_(content: any) { if (content instanceof Buffer) { return content.toString('base64'); } else { @@ -16,23 +20,27 @@ class FileApiDriverMemory { } } - decodeContent_(content) { + public get supportsMultiPut() { + return true; + } + + decodeContent_(content: any) { return Buffer.from(content, 'base64').toString('utf-8'); } - itemIndexByPath(path) { + itemIndexByPath(path: string) { for (let i = 0; i < this.items_.length; i++) { if (this.items_[i].path == path) return i; } return -1; } - itemByPath(path) { + itemByPath(path: string) { const index = this.itemIndexByPath(path); return index < 0 ? null : this.items_[index]; } - newItem(path, isDir = false) { + newItem(path: string, isDir = false) { const now = time.unixMs(); return { path: path, @@ -43,18 +51,18 @@ class FileApiDriverMemory { }; } - stat(path) { + stat(path: string) { const item = this.itemByPath(path); return Promise.resolve(item ? Object.assign({}, item) : null); } - async setTimestamp(path, timestampMs) { + async setTimestamp(path: string, timestampMs: number): Promise { const item = this.itemByPath(path); if (!item) return Promise.reject(new Error(`File not found: ${path}`)); item.updated_time = timestampMs; } - async list(path) { + async list(path: string) { const output = []; for (let i = 0; i < this.items_.length; i++) { @@ -77,7 +85,7 @@ class FileApiDriverMemory { }); } - async get(path, options) { + async get(path: string, options: any) { const item = this.itemByPath(path); if (!item) return Promise.resolve(null); if (item.isDir) return Promise.reject(new Error(`${path} is a directory, not a file`)); @@ -93,13 +101,13 @@ class FileApiDriverMemory { return output; } - async mkdir(path) { + async mkdir(path: string) { const index = this.itemIndexByPath(path); if (index >= 0) return; this.items_.push(this.newItem(path, true)); } - async put(path, content, options = null) { + async put(path: string, content: any, options: any = null) { if (!options) options = {}; if (options.source === 'file') content = await fs.readFile(options.path); @@ -109,13 +117,38 @@ class FileApiDriverMemory { const item = this.newItem(path, false); item.content = this.encodeContent_(content); this.items_.push(item); + return item; } else { this.items_[index].content = this.encodeContent_(content); this.items_[index].updated_time = time.unixMs(); + return this.items_[index]; } } - async delete(path) { + public async multiPut(items: MultiPutItem[], options: any = null) { + const output: any = { + items: {}, + }; + + for (const item of items) { + try { + const processedItem = await this.put(`/root/${item.name}`, item.body, options); + output.items[item.name] = { + item: processedItem, + error: null, + }; + } catch (error) { + output.items[item.name] = { + item: null, + error: error, + }; + } + } + + return output; + } + + async delete(path: string) { const index = this.itemIndexByPath(path); if (index >= 0) { const item = Object.assign({}, this.items_[index]); @@ -126,10 +159,10 @@ class FileApiDriverMemory { } } - async move(oldPath, newPath) { + async move(oldPath: string, newPath: string): Promise { const sourceItem = this.itemByPath(oldPath); if (!sourceItem) return Promise.reject(new Error(`Path not found: ${oldPath}`)); - this.delete(newPath); // Overwrite if newPath already exists + await this.delete(newPath); // Overwrite if newPath already exists sourceItem.path = newPath; } @@ -137,8 +170,8 @@ class FileApiDriverMemory { this.items_ = []; } - async delta(path, options = null) { - const getStatFn = async path => { + async delta(path: string, options: any = null) { + const getStatFn = async (path: string) => { const output = this.items_.slice(); for (let i = 0; i < output.length; i++) { const item = Object.assign({}, output[i]); @@ -156,5 +189,3 @@ class FileApiDriverMemory { this.items_ = []; } } - -module.exports = { FileApiDriverMemory }; diff --git a/packages/lib/file-api.ts b/packages/lib/file-api.ts index 6be94accdf..64c7a24590 100644 --- a/packages/lib/file-api.ts +++ b/packages/lib/file-api.ts @@ -11,6 +11,11 @@ const Mutex = require('async-mutex').Mutex; const logger = Logger.create('FileApi'); +export interface MultiPutItem { + name: string; + body: string; +} + function requestCanBeRepeated(error: any) { const errorCode = typeof error === 'object' && error.code ? error.code : null; @@ -81,6 +86,10 @@ class FileApi { if (this.driver_.initialize) return this.driver_.initialize(this.fullPath('')); } + public get supportsMultiPut(): boolean { + return !!this.driver().supportsMultiPut; + } + async fetchRemoteDateOffset_() { const tempFile = `${this.tempDirName()}/timeCheck${Math.round(Math.random() * 1000000)}.txt`; const startTime = Date.now(); @@ -251,12 +260,6 @@ class FileApi { if (!output) return output; output.path = path; return output; - - // return this.driver_.stat(this.fullPath(path)).then((output) => { - // if (!output) return output; - // output.path = path; - // return output; - // }); } // Returns UTF-8 encoded string by default, or a Response if `options.target = 'file'` @@ -277,6 +280,11 @@ class FileApi { return tryAndRepeat(() => this.driver_.put(this.fullPath(path), content, options), this.requestRepeatCount()); } + public async multiPut(items: MultiPutItem[], options: any = null) { + if (!this.driver().supportsMultiPut) throw new Error('Multi PUT not supported'); + return tryAndRepeat(() => this.driver_.multiPut(items, options), this.requestRepeatCount()); + } + delete(path: string) { logger.debug(`delete ${this.fullPath(path)}`); return tryAndRepeat(() => this.driver_.delete(this.fullPath(path)), this.requestRepeatCount()); diff --git a/packages/lib/models/BaseItem.ts b/packages/lib/models/BaseItem.ts index e29626c8a2..8c1e05ed99 100644 --- a/packages/lib/models/BaseItem.ts +++ b/packages/lib/models/BaseItem.ts @@ -403,7 +403,7 @@ export default class BaseItem extends BaseModel { return this.shareService_; } - public static async serializeForSync(item: BaseItemEntity) { + public static async serializeForSync(item: BaseItemEntity): Promise { const ItemClass = this.itemClass(item); const shownKeys = ItemClass.fieldNames(); shownKeys.push('type_'); diff --git a/packages/lib/services/synchronizer/ItemUploader.test.ts b/packages/lib/services/synchronizer/ItemUploader.test.ts new file mode 100644 index 0000000000..9cb541b099 --- /dev/null +++ b/packages/lib/services/synchronizer/ItemUploader.test.ts @@ -0,0 +1,167 @@ +import { FileApi } from '../../file-api'; +import BaseItem from '../../models/BaseItem'; +import Note from '../../models/Note'; +import { expectNotThrow, expectThrow, setupDatabaseAndSynchronizer, switchClient } from '../../testing/test-utils'; +import time from '../../time'; +import ItemUploader, { ApiCallFunction } from './ItemUploader'; + +interface ApiCall { + name: string; + args: any[]; +} + +function clearArray(a: any[]) { + a.splice(0, a.length); +} + +function newFakeApi(): FileApi { + return { supportsMultiPut: true } as any; +} + +function newFakeApiCall(callRecorder: ApiCall[], itemBodyCallback: Function = null): ApiCallFunction { + const apiCall = async (callName: string, ...args: any[]): Promise => { + callRecorder.push({ name: callName, args }); + + if (callName === 'multiPut') { + const [batch] = args; + const output: any = { items: {} }; + for (const item of batch) { + if (itemBodyCallback) { + output.items[item.name] = itemBodyCallback(item); + } else { + output.items[item.name] = { + item: item.body, + error: null, + }; + } + } + return output; + } + }; + return apiCall; +} + +describe('synchronizer_ItemUplader', function() { + + beforeEach(async (done) => { + await setupDatabaseAndSynchronizer(1); + await setupDatabaseAndSynchronizer(2); + await switchClient(1); + done(); + }); + + it('should batch uploads and use the cache afterwards', (async () => { + const callRecorder: ApiCall[] = []; + const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); + + const notes = [ + await Note.save({ title: '1' }), + await Note.save({ title: '2' }), + ]; + + await itemUploader.preUploadItems(notes); + + // There should be only one call to "multiPut" because the items have + // been batched. + expect(callRecorder.length).toBe(1); + expect(callRecorder[0].name).toBe('multiPut'); + + clearArray(callRecorder); + + // Now if we try to upload the item it shouldn't call the API because it + // will use the cached item. + await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]); + expect(callRecorder.length).toBe(0); + + // Now try to process a note that hasn't been cached. In that case, it + // should call "PUT" directly. + const note3 = await Note.save({ title: '3' }); + await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(note3), note3); + expect(callRecorder.length).toBe(1); + expect(callRecorder[0].name).toBe('put'); + })); + + it('should not batch upload if the items are over the batch size limit', (async () => { + const callRecorder: ApiCall[] = []; + const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); + itemUploader.maxBatchSize = 1; + + const notes = [ + await Note.save({ title: '1' }), + await Note.save({ title: '2' }), + ]; + + await itemUploader.preUploadItems(notes); + expect(callRecorder.length).toBe(0); + })); + + it('should not use the cache if the note has changed since the pre-upload', (async () => { + const callRecorder: ApiCall[] = []; + const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); + + const notes = [ + await Note.save({ title: '1' }), + await Note.save({ title: '2' }), + ]; + + await itemUploader.preUploadItems(notes); + clearArray(callRecorder); + + await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]); + expect(callRecorder.length).toBe(0); + + await time.msleep(1); + notes[1] = await Note.save({ title: '22' }), + await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1]); + expect(callRecorder.length).toBe(1); + })); + + it('should respect the max batch size', (async () => { + const callRecorder: ApiCall[] = []; + const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); + + const notes = [ + await Note.save({ title: '1' }), + await Note.save({ title: '2' }), + await Note.save({ title: '3' }), + ]; + + const noteSize = BaseItem.systemPath(notes[0]).length + (await Note.serializeForSync(notes[0])).length; + itemUploader.maxBatchSize = noteSize * 2; + + // It should send two batches - one with two notes, and the second with + // only one note. + await itemUploader.preUploadItems(notes); + expect(callRecorder.length).toBe(2); + expect(callRecorder[0].args[0].length).toBe(2); + expect(callRecorder[1].args[0].length).toBe(1); + })); + + it('should rethrow error for items within the batch', (async () => { + const callRecorder: ApiCall[] = []; + + const notes = [ + await Note.save({ title: '1' }), + await Note.save({ title: '2' }), + await Note.save({ title: '3' }), + ]; + + // Simulates throwing an error on note 2 + const itemBodyCallback = (item: any): any => { + if (item.name === BaseItem.systemPath(notes[1])) { + return { error: new Error('Could not save item'), item: null }; + } else { + return { error: null, item: item.body }; + } + }; + + const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder, itemBodyCallback)); + + await itemUploader.preUploadItems(notes); + + await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0])); + await expectThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1])); + await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[2]), notes[2])); + })); + +}); diff --git a/packages/lib/services/synchronizer/ItemUploader.ts b/packages/lib/services/synchronizer/ItemUploader.ts new file mode 100644 index 0000000000..8cd1d4e82b --- /dev/null +++ b/packages/lib/services/synchronizer/ItemUploader.ts @@ -0,0 +1,110 @@ +import { ModelType } from '../../BaseModel'; +import { FileApi, MultiPutItem } from '../../file-api'; +import Logger from '../../Logger'; +import BaseItem, { ItemThatNeedSync } from '../../models/BaseItem'; + +const logger = Logger.create('ItemUploader'); + +export type ApiCallFunction = (fnName: string, ...args: any[])=> Promise; + +interface BatchItem extends MultiPutItem { + localItemUpdatedTime: number; +} + +export default class ItemUploader { + + private api_: FileApi; + private apiCall_: ApiCallFunction; + private preUploadedItems_: Record = {}; + private preUploadedItemUpdatedTimes_: Record = {}; + private maxBatchSize_ = 1 * 1024 * 1024; // 1MB; + + public constructor(api: FileApi, apiCall: ApiCallFunction) { + this.api_ = api; + this.apiCall_ = apiCall; + } + + public get maxBatchSize() { + return this.maxBatchSize_; + } + + public set maxBatchSize(v: number) { + this.maxBatchSize_ = v; + } + + public async serializeAndUploadItem(ItemClass: any, path: string, local: ItemThatNeedSync) { + const preUploadItem = this.preUploadedItems_[path]; + if (preUploadItem) { + if (this.preUploadedItemUpdatedTimes_[path] !== local.updated_time) { + // Normally this should be rare as it can only happen if the + // item has been changed between the moment it was pre-uploaded + // and the moment where it's being processed by the + // synchronizer. It could happen for example for a note being + // edited just at the same time. In that case, we proceed with + // the regular upload. + logger.warn(`Pre-uploaded item updated_time has changed. It is going to be re-uploaded again: ${path} (From ${this.preUploadedItemUpdatedTimes_[path]} to ${local.updated_time})`); + } else { + if (preUploadItem.error) throw new Error(preUploadItem.error.message ? preUploadItem.error.message : 'Unknown pre-upload error'); + return; + } + } + const content = await ItemClass.serializeForSync(local); + await this.apiCall_('put', path, content); + } + + public async preUploadItems(items: ItemThatNeedSync[]) { + if (!this.api_.supportsMultiPut) return; + + const itemsToUpload: BatchItem[] = []; + + for (const local of items) { + // For resources, additional logic is necessary - in particular the blob + // should be uploaded before the metadata, so we can't batch process. + if (local.type_ === ModelType.Resource) continue; + + const ItemClass = BaseItem.itemClass(local); + itemsToUpload.push({ + name: BaseItem.systemPath(local), + body: await ItemClass.serializeForSync(local), + localItemUpdatedTime: local.updated_time, + }); + } + + let batchSize = 0; + let currentBatch: BatchItem[] = []; + + const uploadBatch = async (batch: BatchItem[]) => { + for (const batchItem of batch) { + this.preUploadedItemUpdatedTimes_[batchItem.name] = batchItem.localItemUpdatedTime; + } + + const response = await this.apiCall_('multiPut', batch); + this.preUploadedItems_ = { + ...this.preUploadedItems_, + ...response.items, + }; + }; + + while (itemsToUpload.length) { + const itemToUpload = itemsToUpload.pop(); + const itemSize = itemToUpload.name.length + itemToUpload.body.length; + + // Although it should be rare, if the item itself is above the + // batch max size, we skip it. In that case it will be uploaded the + // regular way when the synchronizer calls `serializeAndUploadItem()` + if (itemSize > this.maxBatchSize) continue; + + if (batchSize + itemSize > this.maxBatchSize) { + await uploadBatch(currentBatch); + batchSize = itemSize; + currentBatch = [itemToUpload]; + } else { + batchSize += itemSize; + currentBatch.push(itemToUpload); + } + } + + if (currentBatch.length) await uploadBatch(currentBatch); + } + +} diff --git a/packages/lib/testing/test-utils.ts b/packages/lib/testing/test-utils.ts index 6073a05e84..c608dc2e86 100644 --- a/packages/lib/testing/test-utils.ts +++ b/packages/lib/testing/test-utils.ts @@ -29,7 +29,7 @@ import Revision from '../models/Revision'; import MasterKey from '../models/MasterKey'; import BaseItem from '../models/BaseItem'; const { FileApi } = require('../file-api.js'); -const { FileApiDriverMemory } = require('../file-api-driver-memory.js'); +const FileApiDriverMemory = require('../file-api-driver-memory').default; const { FileApiDriverLocal } = require('../file-api-driver-local.js'); const { FileApiDriverWebDav } = require('../file-api-driver-webdav.js'); const { FileApiDriverDropbox } = require('../file-api-driver-dropbox.js'); diff --git a/packages/server/src/routes/api/items.ts b/packages/server/src/routes/api/items.ts index a987ab20c2..55558e8788 100644 --- a/packages/server/src/routes/api/items.ts +++ b/packages/server/src/routes/api/items.ts @@ -5,14 +5,17 @@ import Router from '../../utils/Router'; import { RouteType } from '../../utils/types'; import { AppContext } from '../../utils/types'; import * as fs from 'fs-extra'; -import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound } from '../../utils/errors'; +import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound, ErrorPayloadTooLarge } from '../../utils/errors'; import ItemModel, { ItemSaveOption, SaveFromRawContentItem } from '../../models/ItemModel'; import { requestDeltaPagination, requestPagination } from '../../models/utils/pagination'; import { AclAction } from '../../models/BaseModel'; import { safeRemove } from '../../utils/fileUtils'; +import { formatBytes, MB } from '../../utils/bytes'; const router = new Router(RouteType.Api); +const batchMaxSize = 1 * MB; + export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: boolean) { if (!ctx.owner.can_upload) throw new ErrorForbidden('Uploading content is disabled'); @@ -23,12 +26,16 @@ export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: b let items: SaveFromRawContentItem[] = []; if (isBatch) { + let totalSize = 0; items = bodyFields.items.map((item: any) => { + totalSize += item.name.length + (item.body ? item.body.length : 0); return { name: item.name, body: item.body ? Buffer.from(item.body, 'utf8') : Buffer.alloc(0), }; }); + + if (totalSize > batchMaxSize) throw new ErrorPayloadTooLarge(`Size of items (${formatBytes(totalSize)}) is over the limit (${formatBytes(batchMaxSize)})`); } else { const filePath = parsedBody?.files?.file ? parsedBody.files.file.path : null;