You've already forked joplin
							
							
				mirror of
				https://github.com/laurent22/joplin.git
				synced 2025-10-31 00:07:48 +02:00 
			
		
		
		
	All: Allow uploading items in batch when synchronising with Joplin Server
This commit is contained in:
		| @@ -872,6 +872,9 @@ packages/lib/eventManager.js.map | ||||
| packages/lib/file-api-driver-joplinServer.d.ts | ||||
| packages/lib/file-api-driver-joplinServer.js | ||||
| packages/lib/file-api-driver-joplinServer.js.map | ||||
| packages/lib/file-api-driver-memory.d.ts | ||||
| packages/lib/file-api-driver-memory.js | ||||
| packages/lib/file-api-driver-memory.js.map | ||||
| packages/lib/file-api-driver.test.d.ts | ||||
| packages/lib/file-api-driver.test.js | ||||
| packages/lib/file-api-driver.test.js.map | ||||
| @@ -1391,6 +1394,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map | ||||
| packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts | ||||
| packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js | ||||
| packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map | ||||
| packages/lib/services/synchronizer/ItemUploader.d.ts | ||||
| packages/lib/services/synchronizer/ItemUploader.js | ||||
| packages/lib/services/synchronizer/ItemUploader.js.map | ||||
| packages/lib/services/synchronizer/ItemUploader.test.d.ts | ||||
| packages/lib/services/synchronizer/ItemUploader.test.js | ||||
| packages/lib/services/synchronizer/ItemUploader.test.js.map | ||||
| packages/lib/services/synchronizer/LockHandler.d.ts | ||||
| packages/lib/services/synchronizer/LockHandler.js | ||||
| packages/lib/services/synchronizer/LockHandler.js.map | ||||
|   | ||||
							
								
								
									
										9
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										9
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -858,6 +858,9 @@ packages/lib/eventManager.js.map | ||||
| packages/lib/file-api-driver-joplinServer.d.ts | ||||
| packages/lib/file-api-driver-joplinServer.js | ||||
| packages/lib/file-api-driver-joplinServer.js.map | ||||
| packages/lib/file-api-driver-memory.d.ts | ||||
| packages/lib/file-api-driver-memory.js | ||||
| packages/lib/file-api-driver-memory.js.map | ||||
| packages/lib/file-api-driver.test.d.ts | ||||
| packages/lib/file-api-driver.test.js | ||||
| packages/lib/file-api-driver.test.js.map | ||||
| @@ -1377,6 +1380,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map | ||||
| packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts | ||||
| packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js | ||||
| packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map | ||||
| packages/lib/services/synchronizer/ItemUploader.d.ts | ||||
| packages/lib/services/synchronizer/ItemUploader.js | ||||
| packages/lib/services/synchronizer/ItemUploader.js.map | ||||
| packages/lib/services/synchronizer/ItemUploader.test.d.ts | ||||
| packages/lib/services/synchronizer/ItemUploader.test.js | ||||
| packages/lib/services/synchronizer/ItemUploader.test.js.map | ||||
| packages/lib/services/synchronizer/LockHandler.d.ts | ||||
| packages/lib/services/synchronizer/LockHandler.js | ||||
| packages/lib/services/synchronizer/LockHandler.js.map | ||||
|   | ||||
| @@ -51,5 +51,6 @@ done | ||||
| cd "$ROOT_DIR/packages/app-cli" | ||||
| npm start -- --profile "$PROFILE_DIR" batch "$CMD_FILE" | ||||
| npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Joplin_17_06_2021.jex | ||||
| # npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Tout_18_06_2021.jex | ||||
| npm start -- --profile "$PROFILE_DIR" sync | ||||
|  | ||||
|   | ||||
| @@ -1,7 +1,7 @@ | ||||
| const BaseSyncTarget = require('./BaseSyncTarget').default; | ||||
| const Setting = require('./models/Setting').default; | ||||
| const { FileApi } = require('./file-api.js'); | ||||
| const { FileApiDriverMemory } = require('./file-api-driver-memory.js'); | ||||
| const FileApiDriverMemory = require('./file-api-driver-memory').default; | ||||
| const Synchronizer = require('./Synchronizer').default; | ||||
|  | ||||
| class SyncTargetMemory extends BaseSyncTarget { | ||||
|   | ||||
| @@ -19,6 +19,7 @@ import EncryptionService from './services/EncryptionService'; | ||||
| import JoplinError from './JoplinError'; | ||||
| import ShareService from './services/share/ShareService'; | ||||
| import TaskQueue from './TaskQueue'; | ||||
| import ItemUploader from './services/synchronizer/ItemUploader'; | ||||
| const { sprintf } = require('sprintf-js'); | ||||
| const { Dirnames } = require('./services/synchronizer/utils/types'); | ||||
|  | ||||
| @@ -73,7 +74,7 @@ export default class Synchronizer { | ||||
|  | ||||
| 	public dispatch: Function; | ||||
|  | ||||
| 	constructor(db: any, api: any, appType: string) { | ||||
| 	public constructor(db: any, api: any, appType: string) { | ||||
| 		this.db_ = db; | ||||
| 		this.api_ = api; | ||||
| 		this.appType_ = appType; | ||||
| @@ -83,6 +84,8 @@ export default class Synchronizer { | ||||
| 		this.progressReport_ = {}; | ||||
|  | ||||
| 		this.dispatch = function() {}; | ||||
|  | ||||
| 		this.apiCall = this.apiCall.bind(this); | ||||
| 	} | ||||
|  | ||||
| 	state() { | ||||
| @@ -300,7 +303,7 @@ export default class Synchronizer { | ||||
| 		return ''; | ||||
| 	} | ||||
|  | ||||
| 	async apiCall(fnName: string, ...args: any[]) { | ||||
| 	private async apiCall(fnName: string, ...args: any[]) { | ||||
| 		if (this.syncTargetIsLocked_) throw new JoplinError('Sync target is locked - aborting API call', 'lockError'); | ||||
|  | ||||
| 		try { | ||||
| @@ -389,6 +392,8 @@ export default class Synchronizer { | ||||
| 		// correctly so as to share/unshare the right items. | ||||
| 		await Folder.updateAllShareIds(); | ||||
|  | ||||
| 		const itemUploader = new ItemUploader(this.api(), this.apiCall); | ||||
|  | ||||
| 		let errorToThrow = null; | ||||
| 		let syncLock = null; | ||||
|  | ||||
| @@ -440,6 +445,8 @@ export default class Synchronizer { | ||||
| 					const result = await BaseItem.itemsThatNeedSync(syncTargetId); | ||||
| 					const locals = result.items; | ||||
|  | ||||
| 					await itemUploader.preUploadItems(result.items.filter((it: any) => result.neverSyncedItemIds.includes(it.id))); | ||||
|  | ||||
| 					for (let i = 0; i < locals.length; i++) { | ||||
| 						if (this.cancelling()) break; | ||||
|  | ||||
| @@ -588,8 +595,7 @@ export default class Synchronizer { | ||||
| 							let canSync = true; | ||||
| 							try { | ||||
| 								if (this.testingHooks_.indexOf('notesRejectedByTarget') >= 0 && local.type_ === BaseModel.TYPE_NOTE) throw new JoplinError('Testing rejectedByTarget', 'rejectedByTarget'); | ||||
| 								const content = await ItemClass.serializeForSync(local); | ||||
| 								await this.apiCall('put', path, content); | ||||
| 								await itemUploader.serializeAndUploadItem(ItemClass, path, local); | ||||
| 							} catch (error) { | ||||
| 								if (error && error.code === 'rejectedByTarget') { | ||||
| 									await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message); | ||||
| @@ -619,7 +625,6 @@ export default class Synchronizer { | ||||
| 								// above also doesn't use it because it fetches the whole remote object and read the | ||||
| 								// more reliable 'updated_time' property. Basically remote.updated_time is deprecated. | ||||
|  | ||||
| 								// await this.api().setTimestamp(path, local.updated_time); | ||||
| 								await ItemClass.saveSyncTime(syncTargetId, local, local.updated_time); | ||||
| 							} | ||||
| 						} else if (action == 'itemConflict') { | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| import { MultiPutItem } from './file-api'; | ||||
| import JoplinError from './JoplinError'; | ||||
| import JoplinServerApi from './JoplinServerApi'; | ||||
| import { trimSlashes } from './path-utils'; | ||||
| @@ -31,6 +32,10 @@ export default class FileApiDriverJoplinServer { | ||||
| 		return this.api_; | ||||
| 	} | ||||
|  | ||||
| 	public get supportsMultiPut() { | ||||
| 		return true; | ||||
| 	} | ||||
|  | ||||
| 	public requestRepeatCount() { | ||||
| 		return 3; | ||||
| 	} | ||||
| @@ -174,6 +179,10 @@ export default class FileApiDriverJoplinServer { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	public async multiPut(items: MultiPutItem[], options: any = null) { | ||||
| 		return this.api().exec('PUT', 'api/batch_items', null, { items: items }, null, options); | ||||
| 	} | ||||
|  | ||||
| 	public async delete(path: string) { | ||||
| 		return this.api().exec('DELETE', this.apiFilePath_(path)); | ||||
| 	} | ||||
|   | ||||
| @@ -1,14 +1,18 @@ | ||||
| const time = require('./time').default; | ||||
| import time from './time'; | ||||
| const fs = require('fs-extra'); | ||||
| const { basicDelta } = require('./file-api'); | ||||
| import { basicDelta, MultiPutItem } from './file-api'; | ||||
| 
 | ||||
| export default class FileApiDriverMemory { | ||||
| 
 | ||||
| 	private items_: any[]; | ||||
| 	private deletedItems_: any[]; | ||||
| 
 | ||||
| class FileApiDriverMemory { | ||||
| 	constructor() { | ||||
| 		this.items_ = []; | ||||
| 		this.deletedItems_ = []; | ||||
| 	} | ||||
| 
 | ||||
| 	encodeContent_(content) { | ||||
| 	encodeContent_(content: any) { | ||||
| 		if (content instanceof Buffer) { | ||||
| 			return content.toString('base64'); | ||||
| 		} else { | ||||
| @@ -16,23 +20,27 @@ class FileApiDriverMemory { | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	decodeContent_(content) { | ||||
| 	public get supportsMultiPut() { | ||||
| 		return true; | ||||
| 	} | ||||
| 
 | ||||
| 	decodeContent_(content: any) { | ||||
| 		return Buffer.from(content, 'base64').toString('utf-8'); | ||||
| 	} | ||||
| 
 | ||||
| 	itemIndexByPath(path) { | ||||
| 	itemIndexByPath(path: string) { | ||||
| 		for (let i = 0; i < this.items_.length; i++) { | ||||
| 			if (this.items_[i].path == path) return i; | ||||
| 		} | ||||
| 		return -1; | ||||
| 	} | ||||
| 
 | ||||
| 	itemByPath(path) { | ||||
| 	itemByPath(path: string) { | ||||
| 		const index = this.itemIndexByPath(path); | ||||
| 		return index < 0 ? null : this.items_[index]; | ||||
| 	} | ||||
| 
 | ||||
| 	newItem(path, isDir = false) { | ||||
| 	newItem(path: string, isDir = false) { | ||||
| 		const now = time.unixMs(); | ||||
| 		return { | ||||
| 			path: path, | ||||
| @@ -43,18 +51,18 @@ class FileApiDriverMemory { | ||||
| 		}; | ||||
| 	} | ||||
| 
 | ||||
| 	stat(path) { | ||||
| 	stat(path: string) { | ||||
| 		const item = this.itemByPath(path); | ||||
| 		return Promise.resolve(item ? Object.assign({}, item) : null); | ||||
| 	} | ||||
| 
 | ||||
| 	async setTimestamp(path, timestampMs) { | ||||
| 	async setTimestamp(path: string, timestampMs: number): Promise<any> { | ||||
| 		const item = this.itemByPath(path); | ||||
| 		if (!item) return Promise.reject(new Error(`File not found: ${path}`)); | ||||
| 		item.updated_time = timestampMs; | ||||
| 	} | ||||
| 
 | ||||
| 	async list(path) { | ||||
| 	async list(path: string) { | ||||
| 		const output = []; | ||||
| 
 | ||||
| 		for (let i = 0; i < this.items_.length; i++) { | ||||
| @@ -77,7 +85,7 @@ class FileApiDriverMemory { | ||||
| 		}); | ||||
| 	} | ||||
| 
 | ||||
| 	async get(path, options) { | ||||
| 	async get(path: string, options: any) { | ||||
| 		const item = this.itemByPath(path); | ||||
| 		if (!item) return Promise.resolve(null); | ||||
| 		if (item.isDir) return Promise.reject(new Error(`${path} is a directory, not a file`)); | ||||
| @@ -93,13 +101,13 @@ class FileApiDriverMemory { | ||||
| 		return output; | ||||
| 	} | ||||
| 
 | ||||
| 	async mkdir(path) { | ||||
| 	async mkdir(path: string) { | ||||
| 		const index = this.itemIndexByPath(path); | ||||
| 		if (index >= 0) return; | ||||
| 		this.items_.push(this.newItem(path, true)); | ||||
| 	} | ||||
| 
 | ||||
| 	async put(path, content, options = null) { | ||||
| 	async put(path: string, content: any, options: any = null) { | ||||
| 		if (!options) options = {}; | ||||
| 
 | ||||
| 		if (options.source === 'file') content = await fs.readFile(options.path); | ||||
| @@ -109,13 +117,38 @@ class FileApiDriverMemory { | ||||
| 			const item = this.newItem(path, false); | ||||
| 			item.content = this.encodeContent_(content); | ||||
| 			this.items_.push(item); | ||||
| 			return item; | ||||
| 		} else { | ||||
| 			this.items_[index].content = this.encodeContent_(content); | ||||
| 			this.items_[index].updated_time = time.unixMs(); | ||||
| 			return this.items_[index]; | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	async delete(path) { | ||||
| 	public async multiPut(items: MultiPutItem[], options: any = null) { | ||||
| 		const output: any = { | ||||
| 			items: {}, | ||||
| 		}; | ||||
| 
 | ||||
| 		for (const item of items) { | ||||
| 			try { | ||||
| 				const processedItem = await this.put(`/root/${item.name}`, item.body, options); | ||||
| 				output.items[item.name] = { | ||||
| 					item: processedItem, | ||||
| 					error: null, | ||||
| 				}; | ||||
| 			} catch (error) { | ||||
| 				output.items[item.name] = { | ||||
| 					item: null, | ||||
| 					error: error, | ||||
| 				}; | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		return output; | ||||
| 	} | ||||
| 
 | ||||
| 	async delete(path: string) { | ||||
| 		const index = this.itemIndexByPath(path); | ||||
| 		if (index >= 0) { | ||||
| 			const item = Object.assign({}, this.items_[index]); | ||||
| @@ -126,10 +159,10 @@ class FileApiDriverMemory { | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	async move(oldPath, newPath) { | ||||
| 	async move(oldPath: string, newPath: string): Promise<any> { | ||||
| 		const sourceItem = this.itemByPath(oldPath); | ||||
| 		if (!sourceItem) return Promise.reject(new Error(`Path not found: ${oldPath}`)); | ||||
| 		this.delete(newPath); // Overwrite if newPath already exists
 | ||||
| 		await this.delete(newPath); // Overwrite if newPath already exists
 | ||||
| 		sourceItem.path = newPath; | ||||
| 	} | ||||
| 
 | ||||
| @@ -137,8 +170,8 @@ class FileApiDriverMemory { | ||||
| 		this.items_ = []; | ||||
| 	} | ||||
| 
 | ||||
| 	async delta(path, options = null) { | ||||
| 		const getStatFn = async path => { | ||||
| 	async delta(path: string, options: any = null) { | ||||
| 		const getStatFn = async (path: string) => { | ||||
| 			const output = this.items_.slice(); | ||||
| 			for (let i = 0; i < output.length; i++) { | ||||
| 				const item = Object.assign({}, output[i]); | ||||
| @@ -156,5 +189,3 @@ class FileApiDriverMemory { | ||||
| 		this.items_ = []; | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| module.exports = { FileApiDriverMemory }; | ||||
| @@ -11,6 +11,11 @@ const Mutex = require('async-mutex').Mutex; | ||||
|  | ||||
| const logger = Logger.create('FileApi'); | ||||
|  | ||||
| export interface MultiPutItem { | ||||
| 	name: string; | ||||
| 	body: string; | ||||
| } | ||||
|  | ||||
| function requestCanBeRepeated(error: any) { | ||||
| 	const errorCode = typeof error === 'object' && error.code ? error.code : null; | ||||
|  | ||||
| @@ -81,6 +86,10 @@ class FileApi { | ||||
| 		if (this.driver_.initialize) return this.driver_.initialize(this.fullPath('')); | ||||
| 	} | ||||
|  | ||||
| 	public get supportsMultiPut(): boolean { | ||||
| 		return !!this.driver().supportsMultiPut; | ||||
| 	} | ||||
|  | ||||
| 	async fetchRemoteDateOffset_() { | ||||
| 		const tempFile = `${this.tempDirName()}/timeCheck${Math.round(Math.random() * 1000000)}.txt`; | ||||
| 		const startTime = Date.now(); | ||||
| @@ -251,12 +260,6 @@ class FileApi { | ||||
| 		if (!output) return output; | ||||
| 		output.path = path; | ||||
| 		return output; | ||||
|  | ||||
| 		// return this.driver_.stat(this.fullPath(path)).then((output) => { | ||||
| 		// 	if (!output) return output; | ||||
| 		// 	output.path = path; | ||||
| 		// 	return output; | ||||
| 		// }); | ||||
| 	} | ||||
|  | ||||
| 	// Returns UTF-8 encoded string by default, or a Response if `options.target = 'file'` | ||||
| @@ -277,6 +280,11 @@ class FileApi { | ||||
| 		return tryAndRepeat(() => this.driver_.put(this.fullPath(path), content, options), this.requestRepeatCount()); | ||||
| 	} | ||||
|  | ||||
| 	public async multiPut(items: MultiPutItem[], options: any = null) { | ||||
| 		if (!this.driver().supportsMultiPut) throw new Error('Multi PUT not supported'); | ||||
| 		return tryAndRepeat(() => this.driver_.multiPut(items, options), this.requestRepeatCount()); | ||||
| 	} | ||||
|  | ||||
| 	delete(path: string) { | ||||
| 		logger.debug(`delete ${this.fullPath(path)}`); | ||||
| 		return tryAndRepeat(() => this.driver_.delete(this.fullPath(path)), this.requestRepeatCount()); | ||||
|   | ||||
| @@ -403,7 +403,7 @@ export default class BaseItem extends BaseModel { | ||||
| 		return this.shareService_; | ||||
| 	} | ||||
|  | ||||
| 	public static async serializeForSync(item: BaseItemEntity) { | ||||
| 	public static async serializeForSync(item: BaseItemEntity): Promise<string> { | ||||
| 		const ItemClass = this.itemClass(item); | ||||
| 		const shownKeys = ItemClass.fieldNames(); | ||||
| 		shownKeys.push('type_'); | ||||
|   | ||||
							
								
								
									
										167
									
								
								packages/lib/services/synchronizer/ItemUploader.test.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										167
									
								
								packages/lib/services/synchronizer/ItemUploader.test.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,167 @@ | ||||
| import { FileApi } from '../../file-api'; | ||||
| import BaseItem from '../../models/BaseItem'; | ||||
| import Note from '../../models/Note'; | ||||
| import { expectNotThrow, expectThrow, setupDatabaseAndSynchronizer, switchClient } from '../../testing/test-utils'; | ||||
| import time from '../../time'; | ||||
| import ItemUploader, { ApiCallFunction } from './ItemUploader'; | ||||
|  | ||||
| interface ApiCall { | ||||
| 	name: string; | ||||
| 	args: any[]; | ||||
| } | ||||
|  | ||||
| function clearArray(a: any[]) { | ||||
| 	a.splice(0, a.length); | ||||
| } | ||||
|  | ||||
| function newFakeApi(): FileApi { | ||||
| 	return { supportsMultiPut: true } as any; | ||||
| } | ||||
|  | ||||
| function newFakeApiCall(callRecorder: ApiCall[], itemBodyCallback: Function = null): ApiCallFunction { | ||||
| 	const apiCall = async (callName: string, ...args: any[]): Promise<any> => { | ||||
| 		callRecorder.push({ name: callName, args }); | ||||
|  | ||||
| 		if (callName === 'multiPut') { | ||||
| 			const [batch] = args; | ||||
| 			const output: any = { items: {} }; | ||||
| 			for (const item of batch) { | ||||
| 				if (itemBodyCallback) { | ||||
| 					output.items[item.name] = itemBodyCallback(item); | ||||
| 				} else { | ||||
| 					output.items[item.name] = { | ||||
| 						item: item.body, | ||||
| 						error: null, | ||||
| 					}; | ||||
| 				} | ||||
| 			} | ||||
| 			return output; | ||||
| 		} | ||||
| 	}; | ||||
| 	return apiCall; | ||||
| } | ||||
|  | ||||
| describe('synchronizer_ItemUplader', function() { | ||||
|  | ||||
| 	beforeEach(async (done) => { | ||||
| 		await setupDatabaseAndSynchronizer(1); | ||||
| 		await setupDatabaseAndSynchronizer(2); | ||||
| 		await switchClient(1); | ||||
| 		done(); | ||||
| 	}); | ||||
|  | ||||
| 	it('should batch uploads and use the cache afterwards', (async () => { | ||||
| 		const callRecorder: ApiCall[] = []; | ||||
| 		const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); | ||||
|  | ||||
| 		const notes = [ | ||||
| 			await Note.save({ title: '1' }), | ||||
| 			await Note.save({ title: '2' }), | ||||
| 		]; | ||||
|  | ||||
| 		await itemUploader.preUploadItems(notes); | ||||
|  | ||||
| 		// There should be only one call to "multiPut" because the items have | ||||
| 		// been batched. | ||||
| 		expect(callRecorder.length).toBe(1); | ||||
| 		expect(callRecorder[0].name).toBe('multiPut'); | ||||
|  | ||||
| 		clearArray(callRecorder); | ||||
|  | ||||
| 		// Now if we try to upload the item it shouldn't call the API because it | ||||
| 		// will use the cached item. | ||||
| 		await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]); | ||||
| 		expect(callRecorder.length).toBe(0); | ||||
|  | ||||
| 		// Now try to process a note that hasn't been cached. In that case, it | ||||
| 		// should call "PUT" directly. | ||||
| 		const note3 = await Note.save({ title: '3' }); | ||||
| 		await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(note3), note3); | ||||
| 		expect(callRecorder.length).toBe(1); | ||||
| 		expect(callRecorder[0].name).toBe('put'); | ||||
| 	})); | ||||
|  | ||||
| 	it('should not batch upload if the items are over the batch size limit', (async () => { | ||||
| 		const callRecorder: ApiCall[] = []; | ||||
| 		const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); | ||||
| 		itemUploader.maxBatchSize = 1; | ||||
|  | ||||
| 		const notes = [ | ||||
| 			await Note.save({ title: '1' }), | ||||
| 			await Note.save({ title: '2' }), | ||||
| 		]; | ||||
|  | ||||
| 		await itemUploader.preUploadItems(notes); | ||||
| 		expect(callRecorder.length).toBe(0); | ||||
| 	})); | ||||
|  | ||||
| 	it('should not use the cache if the note has changed since the pre-upload', (async () => { | ||||
| 		const callRecorder: ApiCall[] = []; | ||||
| 		const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); | ||||
|  | ||||
| 		const notes = [ | ||||
| 			await Note.save({ title: '1' }), | ||||
| 			await Note.save({ title: '2' }), | ||||
| 		]; | ||||
|  | ||||
| 		await itemUploader.preUploadItems(notes); | ||||
| 		clearArray(callRecorder); | ||||
|  | ||||
| 		await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]); | ||||
| 		expect(callRecorder.length).toBe(0); | ||||
|  | ||||
| 		await time.msleep(1); | ||||
| 		notes[1] = await Note.save({ title: '22' }), | ||||
| 		await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1]); | ||||
| 		expect(callRecorder.length).toBe(1); | ||||
| 	})); | ||||
|  | ||||
| 	it('should respect the max batch size', (async () => { | ||||
| 		const callRecorder: ApiCall[] = []; | ||||
| 		const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder)); | ||||
|  | ||||
| 		const notes = [ | ||||
| 			await Note.save({ title: '1' }), | ||||
| 			await Note.save({ title: '2' }), | ||||
| 			await Note.save({ title: '3' }), | ||||
| 		]; | ||||
|  | ||||
| 		const noteSize = BaseItem.systemPath(notes[0]).length + (await Note.serializeForSync(notes[0])).length; | ||||
| 		itemUploader.maxBatchSize = noteSize * 2; | ||||
|  | ||||
| 		// It should send two batches - one with two notes, and the second with | ||||
| 		// only one note. | ||||
| 		await itemUploader.preUploadItems(notes); | ||||
| 		expect(callRecorder.length).toBe(2); | ||||
| 		expect(callRecorder[0].args[0].length).toBe(2); | ||||
| 		expect(callRecorder[1].args[0].length).toBe(1); | ||||
| 	})); | ||||
|  | ||||
| 	it('should rethrow error for items within the batch', (async () => { | ||||
| 		const callRecorder: ApiCall[] = []; | ||||
|  | ||||
| 		const notes = [ | ||||
| 			await Note.save({ title: '1' }), | ||||
| 			await Note.save({ title: '2' }), | ||||
| 			await Note.save({ title: '3' }), | ||||
| 		]; | ||||
|  | ||||
| 		// Simulates throwing an error on note 2 | ||||
| 		const itemBodyCallback = (item: any): any => { | ||||
| 			if (item.name === BaseItem.systemPath(notes[1])) { | ||||
| 				return { error: new Error('Could not save item'), item: null }; | ||||
| 			} else { | ||||
| 				return { error: null, item: item.body }; | ||||
| 			} | ||||
| 		}; | ||||
|  | ||||
| 		const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder, itemBodyCallback)); | ||||
|  | ||||
| 		await itemUploader.preUploadItems(notes); | ||||
|  | ||||
| 		await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0])); | ||||
| 		await expectThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1])); | ||||
| 		await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[2]), notes[2])); | ||||
| 	})); | ||||
|  | ||||
| }); | ||||
							
								
								
									
										110
									
								
								packages/lib/services/synchronizer/ItemUploader.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										110
									
								
								packages/lib/services/synchronizer/ItemUploader.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,110 @@ | ||||
| import { ModelType } from '../../BaseModel'; | ||||
| import { FileApi, MultiPutItem } from '../../file-api'; | ||||
| import Logger from '../../Logger'; | ||||
| import BaseItem, { ItemThatNeedSync } from '../../models/BaseItem'; | ||||
|  | ||||
| const logger = Logger.create('ItemUploader'); | ||||
|  | ||||
| export type ApiCallFunction = (fnName: string, ...args: any[])=> Promise<any>; | ||||
|  | ||||
| interface BatchItem extends MultiPutItem { | ||||
| 	localItemUpdatedTime: number; | ||||
| } | ||||
|  | ||||
| export default class ItemUploader { | ||||
|  | ||||
| 	private api_: FileApi; | ||||
| 	private apiCall_: ApiCallFunction; | ||||
| 	private preUploadedItems_: Record<string, any> = {}; | ||||
| 	private preUploadedItemUpdatedTimes_: Record<string, number> = {}; | ||||
| 	private maxBatchSize_ = 1 * 1024 * 1024; // 1MB; | ||||
|  | ||||
| 	public constructor(api: FileApi, apiCall: ApiCallFunction) { | ||||
| 		this.api_ = api; | ||||
| 		this.apiCall_ = apiCall; | ||||
| 	} | ||||
|  | ||||
| 	public get maxBatchSize() { | ||||
| 		return this.maxBatchSize_; | ||||
| 	} | ||||
|  | ||||
| 	public set maxBatchSize(v: number) { | ||||
| 		this.maxBatchSize_ = v; | ||||
| 	} | ||||
|  | ||||
| 	public async serializeAndUploadItem(ItemClass: any, path: string, local: ItemThatNeedSync) { | ||||
| 		const preUploadItem = this.preUploadedItems_[path]; | ||||
| 		if (preUploadItem) { | ||||
| 			if (this.preUploadedItemUpdatedTimes_[path] !== local.updated_time) { | ||||
| 				// Normally this should be rare as it can only happen if the | ||||
| 				// item has been changed between the moment it was pre-uploaded | ||||
| 				// and the moment where it's being processed by the | ||||
| 				// synchronizer. It could happen for example for a note being | ||||
| 				// edited just at the same time. In that case, we proceed with | ||||
| 				// the regular upload. | ||||
| 				logger.warn(`Pre-uploaded item updated_time has changed. It is going to be re-uploaded again: ${path} (From ${this.preUploadedItemUpdatedTimes_[path]} to ${local.updated_time})`); | ||||
| 			} else { | ||||
| 				if (preUploadItem.error) throw new Error(preUploadItem.error.message ? preUploadItem.error.message : 'Unknown pre-upload error'); | ||||
| 				return; | ||||
| 			} | ||||
| 		} | ||||
| 		const content = await ItemClass.serializeForSync(local); | ||||
| 		await this.apiCall_('put', path, content); | ||||
| 	} | ||||
|  | ||||
| 	public async preUploadItems(items: ItemThatNeedSync[]) { | ||||
| 		if (!this.api_.supportsMultiPut) return; | ||||
|  | ||||
| 		const itemsToUpload: BatchItem[] = []; | ||||
|  | ||||
| 		for (const local of items) { | ||||
| 			// For resources, additional logic is necessary - in particular the blob | ||||
| 			// should be uploaded before the metadata, so we can't batch process. | ||||
| 			if (local.type_ === ModelType.Resource) continue; | ||||
|  | ||||
| 			const ItemClass = BaseItem.itemClass(local); | ||||
| 			itemsToUpload.push({ | ||||
| 				name: BaseItem.systemPath(local), | ||||
| 				body: await ItemClass.serializeForSync(local), | ||||
| 				localItemUpdatedTime: local.updated_time, | ||||
| 			}); | ||||
| 		} | ||||
|  | ||||
| 		let batchSize = 0; | ||||
| 		let currentBatch: BatchItem[] = []; | ||||
|  | ||||
| 		const uploadBatch = async (batch: BatchItem[]) => { | ||||
| 			for (const batchItem of batch) { | ||||
| 				this.preUploadedItemUpdatedTimes_[batchItem.name] = batchItem.localItemUpdatedTime; | ||||
| 			} | ||||
|  | ||||
| 			const response = await this.apiCall_('multiPut', batch); | ||||
| 			this.preUploadedItems_ = { | ||||
| 				...this.preUploadedItems_, | ||||
| 				...response.items, | ||||
| 			}; | ||||
| 		}; | ||||
|  | ||||
| 		while (itemsToUpload.length) { | ||||
| 			const itemToUpload = itemsToUpload.pop(); | ||||
| 			const itemSize = itemToUpload.name.length + itemToUpload.body.length; | ||||
|  | ||||
| 			// Although it should be rare, if the item itself is above the | ||||
| 			// batch max size, we skip it. In that case it will be uploaded the | ||||
| 			// regular way when the synchronizer calls `serializeAndUploadItem()` | ||||
| 			if (itemSize > this.maxBatchSize) continue; | ||||
|  | ||||
| 			if (batchSize + itemSize > this.maxBatchSize) { | ||||
| 				await uploadBatch(currentBatch); | ||||
| 				batchSize = itemSize; | ||||
| 				currentBatch = [itemToUpload]; | ||||
| 			} else { | ||||
| 				batchSize += itemSize; | ||||
| 				currentBatch.push(itemToUpload); | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if (currentBatch.length) await uploadBatch(currentBatch); | ||||
| 	} | ||||
|  | ||||
| } | ||||
| @@ -29,7 +29,7 @@ import Revision from '../models/Revision'; | ||||
| import MasterKey from '../models/MasterKey'; | ||||
| import BaseItem from '../models/BaseItem'; | ||||
| const { FileApi } = require('../file-api.js'); | ||||
| const { FileApiDriverMemory } = require('../file-api-driver-memory.js'); | ||||
| const FileApiDriverMemory = require('../file-api-driver-memory').default; | ||||
| const { FileApiDriverLocal } = require('../file-api-driver-local.js'); | ||||
| const { FileApiDriverWebDav } = require('../file-api-driver-webdav.js'); | ||||
| const { FileApiDriverDropbox } = require('../file-api-driver-dropbox.js'); | ||||
|   | ||||
| @@ -5,14 +5,17 @@ import Router from '../../utils/Router'; | ||||
| import { RouteType } from '../../utils/types'; | ||||
| import { AppContext } from '../../utils/types'; | ||||
| import * as fs from 'fs-extra'; | ||||
| import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound } from '../../utils/errors'; | ||||
| import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound, ErrorPayloadTooLarge } from '../../utils/errors'; | ||||
| import ItemModel, { ItemSaveOption, SaveFromRawContentItem } from '../../models/ItemModel'; | ||||
| import { requestDeltaPagination, requestPagination } from '../../models/utils/pagination'; | ||||
| import { AclAction } from '../../models/BaseModel'; | ||||
| import { safeRemove } from '../../utils/fileUtils'; | ||||
| import { formatBytes, MB } from '../../utils/bytes'; | ||||
|  | ||||
| const router = new Router(RouteType.Api); | ||||
|  | ||||
| const batchMaxSize = 1 * MB; | ||||
|  | ||||
| export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: boolean) { | ||||
| 	if (!ctx.owner.can_upload) throw new ErrorForbidden('Uploading content is disabled'); | ||||
|  | ||||
| @@ -23,12 +26,16 @@ export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: b | ||||
| 	let items: SaveFromRawContentItem[] = []; | ||||
|  | ||||
| 	if (isBatch) { | ||||
| 		let totalSize = 0; | ||||
| 		items = bodyFields.items.map((item: any) => { | ||||
| 			totalSize += item.name.length + (item.body ? item.body.length : 0); | ||||
| 			return { | ||||
| 				name: item.name, | ||||
| 				body: item.body ? Buffer.from(item.body, 'utf8') : Buffer.alloc(0), | ||||
| 			}; | ||||
| 		}); | ||||
|  | ||||
| 		if (totalSize > batchMaxSize) throw new ErrorPayloadTooLarge(`Size of items (${formatBytes(totalSize)}) is over the limit (${formatBytes(batchMaxSize)})`); | ||||
| 	} else { | ||||
| 		const filePath = parsedBody?.files?.file ? parsedBody.files.file.path : null; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user