1
0
mirror of https://github.com/laurent22/joplin.git synced 2026-06-18 20:16:34 +02:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Laurent Cozic 7db7dc4957 multiput 2021-06-18 16:47:57 +01:00
Laurent Cozic 1aa96af4db fix tests 2021-06-18 16:13:22 +01:00
Laurent Cozic 92dadd7509 tests 2021-06-18 15:39:24 +01:00
Laurent Cozic 000185bfb4 multiput 2021-06-18 15:19:51 +01:00
Laurent Cozic e81427a1f2 Merge branch 'dev' into sync_batch_upload 2021-06-18 11:50:41 +01:00
Laurent Cozic 3b9c02e92d Server: Add support for uploading multiple items in one request 2021-06-18 11:50:06 +01:00
Laurent Cozic 958e9163b6 All: Batch upload during initial sync 2021-06-17 12:45:34 +01:00
19 changed files with 653 additions and 126 deletions
+9
View File
@@ -872,6 +872,9 @@ packages/lib/eventManager.js.map
packages/lib/file-api-driver-joplinServer.d.ts
packages/lib/file-api-driver-joplinServer.js
packages/lib/file-api-driver-joplinServer.js.map
packages/lib/file-api-driver-memory.d.ts
packages/lib/file-api-driver-memory.js
packages/lib/file-api-driver-memory.js.map
packages/lib/file-api-driver.test.d.ts
packages/lib/file-api-driver.test.js
packages/lib/file-api-driver.test.js.map
@@ -1391,6 +1394,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map
packages/lib/services/synchronizer/ItemUploader.d.ts
packages/lib/services/synchronizer/ItemUploader.js
packages/lib/services/synchronizer/ItemUploader.js.map
packages/lib/services/synchronizer/ItemUploader.test.d.ts
packages/lib/services/synchronizer/ItemUploader.test.js
packages/lib/services/synchronizer/ItemUploader.test.js.map
packages/lib/services/synchronizer/LockHandler.d.ts
packages/lib/services/synchronizer/LockHandler.js
packages/lib/services/synchronizer/LockHandler.js.map
+9
View File
@@ -858,6 +858,9 @@ packages/lib/eventManager.js.map
packages/lib/file-api-driver-joplinServer.d.ts
packages/lib/file-api-driver-joplinServer.js
packages/lib/file-api-driver-joplinServer.js.map
packages/lib/file-api-driver-memory.d.ts
packages/lib/file-api-driver-memory.js
packages/lib/file-api-driver-memory.js.map
packages/lib/file-api-driver.test.d.ts
packages/lib/file-api-driver.test.js
packages/lib/file-api-driver.test.js.map
@@ -1377,6 +1380,12 @@ packages/lib/services/spellChecker/SpellCheckerService.js.map
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.d.ts
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js
packages/lib/services/spellChecker/SpellCheckerServiceDriverBase.js.map
packages/lib/services/synchronizer/ItemUploader.d.ts
packages/lib/services/synchronizer/ItemUploader.js
packages/lib/services/synchronizer/ItemUploader.js.map
packages/lib/services/synchronizer/ItemUploader.test.d.ts
packages/lib/services/synchronizer/ItemUploader.test.js
packages/lib/services/synchronizer/ItemUploader.test.js.map
packages/lib/services/synchronizer/LockHandler.d.ts
packages/lib/services/synchronizer/LockHandler.js
packages/lib/services/synchronizer/LockHandler.js.map
@@ -51,5 +51,6 @@ done
cd "$ROOT_DIR/packages/app-cli"
npm start -- --profile "$PROFILE_DIR" batch "$CMD_FILE"
npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Joplin_17_06_2021.jex
# npm start -- --profile "$PROFILE_DIR" import ~/Desktop/Tout_18_06_2021.jex
npm start -- --profile "$PROFILE_DIR" sync
+1 -1
View File
@@ -1,7 +1,7 @@
const BaseSyncTarget = require('./BaseSyncTarget').default;
const Setting = require('./models/Setting').default;
const { FileApi } = require('./file-api.js');
const { FileApiDriverMemory } = require('./file-api-driver-memory.js');
const FileApiDriverMemory = require('./file-api-driver-memory').default;
const Synchronizer = require('./Synchronizer').default;
class SyncTargetMemory extends BaseSyncTarget {
+10 -5
View File
@@ -19,6 +19,7 @@ import EncryptionService from './services/EncryptionService';
import JoplinError from './JoplinError';
import ShareService from './services/share/ShareService';
import TaskQueue from './TaskQueue';
import ItemUploader from './services/synchronizer/ItemUploader';
const { sprintf } = require('sprintf-js');
const { Dirnames } = require('./services/synchronizer/utils/types');
@@ -73,7 +74,7 @@ export default class Synchronizer {
public dispatch: Function;
constructor(db: any, api: any, appType: string) {
public constructor(db: any, api: any, appType: string) {
this.db_ = db;
this.api_ = api;
this.appType_ = appType;
@@ -83,6 +84,8 @@ export default class Synchronizer {
this.progressReport_ = {};
this.dispatch = function() {};
this.apiCall = this.apiCall.bind(this);
}
state() {
@@ -300,7 +303,7 @@ export default class Synchronizer {
return '';
}
async apiCall(fnName: string, ...args: any[]) {
private async apiCall(fnName: string, ...args: any[]) {
if (this.syncTargetIsLocked_) throw new JoplinError('Sync target is locked - aborting API call', 'lockError');
try {
@@ -389,6 +392,8 @@ export default class Synchronizer {
// correctly so as to share/unshare the right items.
await Folder.updateAllShareIds();
const itemUploader = new ItemUploader(this.api(), this.apiCall);
let errorToThrow = null;
let syncLock = null;
@@ -440,6 +445,8 @@ export default class Synchronizer {
const result = await BaseItem.itemsThatNeedSync(syncTargetId);
const locals = result.items;
await itemUploader.preUploadItems(result.items.filter((it: any) => result.neverSyncedItemIds.includes(it.id)));
for (let i = 0; i < locals.length; i++) {
if (this.cancelling()) break;
@@ -588,8 +595,7 @@ export default class Synchronizer {
let canSync = true;
try {
if (this.testingHooks_.indexOf('notesRejectedByTarget') >= 0 && local.type_ === BaseModel.TYPE_NOTE) throw new JoplinError('Testing rejectedByTarget', 'rejectedByTarget');
const content = await ItemClass.serializeForSync(local);
await this.apiCall('put', path, content);
await itemUploader.serializeAndUploadItem(ItemClass, path, local);
} catch (error) {
if (error && error.code === 'rejectedByTarget') {
await handleCannotSyncItem(ItemClass, syncTargetId, local, error.message);
@@ -619,7 +625,6 @@ export default class Synchronizer {
// above also doesn't use it because it fetches the whole remote object and read the
// more reliable 'updated_time' property. Basically remote.updated_time is deprecated.
// await this.api().setTimestamp(path, local.updated_time);
await ItemClass.saveSyncTime(syncTargetId, local, local.updated_time);
}
} else if (action == 'itemConflict') {
@@ -1,3 +1,4 @@
import { MultiPutItem } from './file-api';
import JoplinError from './JoplinError';
import JoplinServerApi from './JoplinServerApi';
import { trimSlashes } from './path-utils';
@@ -31,6 +32,10 @@ export default class FileApiDriverJoplinServer {
return this.api_;
}
public get supportsMultiPut() {
return true;
}
public requestRepeatCount() {
return 3;
}
@@ -174,6 +179,10 @@ export default class FileApiDriverJoplinServer {
}
}
public async multiPut(items: MultiPutItem[], options: any = null) {
return this.api().exec('PUT', 'api/batch_items', null, { items: items }, null, options);
}
public async delete(path: string) {
return this.api().exec('DELETE', this.apiFilePath_(path));
}
@@ -1,14 +1,18 @@
const time = require('./time').default;
import time from './time';
const fs = require('fs-extra');
const { basicDelta } = require('./file-api');
import { basicDelta, MultiPutItem } from './file-api';
export default class FileApiDriverMemory {
private items_: any[];
private deletedItems_: any[];
class FileApiDriverMemory {
constructor() {
this.items_ = [];
this.deletedItems_ = [];
}
encodeContent_(content) {
encodeContent_(content: any) {
if (content instanceof Buffer) {
return content.toString('base64');
} else {
@@ -16,23 +20,27 @@ class FileApiDriverMemory {
}
}
decodeContent_(content) {
public get supportsMultiPut() {
return true;
}
decodeContent_(content: any) {
return Buffer.from(content, 'base64').toString('utf-8');
}
itemIndexByPath(path) {
itemIndexByPath(path: string) {
for (let i = 0; i < this.items_.length; i++) {
if (this.items_[i].path == path) return i;
}
return -1;
}
itemByPath(path) {
itemByPath(path: string) {
const index = this.itemIndexByPath(path);
return index < 0 ? null : this.items_[index];
}
newItem(path, isDir = false) {
newItem(path: string, isDir = false) {
const now = time.unixMs();
return {
path: path,
@@ -43,18 +51,18 @@ class FileApiDriverMemory {
};
}
stat(path) {
stat(path: string) {
const item = this.itemByPath(path);
return Promise.resolve(item ? Object.assign({}, item) : null);
}
async setTimestamp(path, timestampMs) {
async setTimestamp(path: string, timestampMs: number): Promise<any> {
const item = this.itemByPath(path);
if (!item) return Promise.reject(new Error(`File not found: ${path}`));
item.updated_time = timestampMs;
}
async list(path) {
async list(path: string) {
const output = [];
for (let i = 0; i < this.items_.length; i++) {
@@ -77,7 +85,7 @@ class FileApiDriverMemory {
});
}
async get(path, options) {
async get(path: string, options: any) {
const item = this.itemByPath(path);
if (!item) return Promise.resolve(null);
if (item.isDir) return Promise.reject(new Error(`${path} is a directory, not a file`));
@@ -93,13 +101,13 @@ class FileApiDriverMemory {
return output;
}
async mkdir(path) {
async mkdir(path: string) {
const index = this.itemIndexByPath(path);
if (index >= 0) return;
this.items_.push(this.newItem(path, true));
}
async put(path, content, options = null) {
async put(path: string, content: any, options: any = null) {
if (!options) options = {};
if (options.source === 'file') content = await fs.readFile(options.path);
@@ -109,13 +117,38 @@ class FileApiDriverMemory {
const item = this.newItem(path, false);
item.content = this.encodeContent_(content);
this.items_.push(item);
return item;
} else {
this.items_[index].content = this.encodeContent_(content);
this.items_[index].updated_time = time.unixMs();
return this.items_[index];
}
}
async delete(path) {
public async multiPut(items: MultiPutItem[], options: any = null) {
const output: any = {
items: {},
};
for (const item of items) {
try {
const processedItem = await this.put(`/root/${item.name}`, item.body, options);
output.items[item.name] = {
item: processedItem,
error: null,
};
} catch (error) {
output.items[item.name] = {
item: null,
error: error,
};
}
}
return output;
}
async delete(path: string) {
const index = this.itemIndexByPath(path);
if (index >= 0) {
const item = Object.assign({}, this.items_[index]);
@@ -126,10 +159,10 @@ class FileApiDriverMemory {
}
}
async move(oldPath, newPath) {
async move(oldPath: string, newPath: string): Promise<any> {
const sourceItem = this.itemByPath(oldPath);
if (!sourceItem) return Promise.reject(new Error(`Path not found: ${oldPath}`));
this.delete(newPath); // Overwrite if newPath already exists
await this.delete(newPath); // Overwrite if newPath already exists
sourceItem.path = newPath;
}
@@ -137,8 +170,8 @@ class FileApiDriverMemory {
this.items_ = [];
}
async delta(path, options = null) {
const getStatFn = async path => {
async delta(path: string, options: any = null) {
const getStatFn = async (path: string) => {
const output = this.items_.slice();
for (let i = 0; i < output.length; i++) {
const item = Object.assign({}, output[i]);
@@ -156,5 +189,3 @@ class FileApiDriverMemory {
this.items_ = [];
}
}
module.exports = { FileApiDriverMemory };
+14 -6
View File
@@ -11,6 +11,11 @@ const Mutex = require('async-mutex').Mutex;
const logger = Logger.create('FileApi');
export interface MultiPutItem {
name: string;
body: string;
}
function requestCanBeRepeated(error: any) {
const errorCode = typeof error === 'object' && error.code ? error.code : null;
@@ -81,6 +86,10 @@ class FileApi {
if (this.driver_.initialize) return this.driver_.initialize(this.fullPath(''));
}
public get supportsMultiPut(): boolean {
return !!this.driver().supportsMultiPut;
}
async fetchRemoteDateOffset_() {
const tempFile = `${this.tempDirName()}/timeCheck${Math.round(Math.random() * 1000000)}.txt`;
const startTime = Date.now();
@@ -251,12 +260,6 @@ class FileApi {
if (!output) return output;
output.path = path;
return output;
// return this.driver_.stat(this.fullPath(path)).then((output) => {
// if (!output) return output;
// output.path = path;
// return output;
// });
}
// Returns UTF-8 encoded string by default, or a Response if `options.target = 'file'`
@@ -277,6 +280,11 @@ class FileApi {
return tryAndRepeat(() => this.driver_.put(this.fullPath(path), content, options), this.requestRepeatCount());
}
public async multiPut(items: MultiPutItem[], options: any = null) {
if (!this.driver().supportsMultiPut) throw new Error('Multi PUT not supported');
return tryAndRepeat(() => this.driver_.multiPut(items, options), this.requestRepeatCount());
}
delete(path: string) {
logger.debug(`delete ${this.fullPath(path)}`);
return tryAndRepeat(() => this.driver_.delete(this.fullPath(path)), this.requestRepeatCount());
+1 -1
View File
@@ -403,7 +403,7 @@ export default class BaseItem extends BaseModel {
return this.shareService_;
}
public static async serializeForSync(item: BaseItemEntity) {
public static async serializeForSync(item: BaseItemEntity): Promise<string> {
const ItemClass = this.itemClass(item);
const shownKeys = ItemClass.fieldNames();
shownKeys.push('type_');
@@ -0,0 +1,167 @@
import { FileApi } from '../../file-api';
import BaseItem from '../../models/BaseItem';
import Note from '../../models/Note';
import { expectNotThrow, expectThrow, setupDatabaseAndSynchronizer, switchClient } from '../../testing/test-utils';
import time from '../../time';
import ItemUploader, { ApiCallFunction } from './ItemUploader';
interface ApiCall {
name: string;
args: any[];
}
function clearArray(a: any[]) {
a.splice(0, a.length);
}
function newFakeApi(): FileApi {
return { supportsMultiPut: true } as any;
}
function newFakeApiCall(callRecorder: ApiCall[], itemBodyCallback: Function = null): ApiCallFunction {
const apiCall = async (callName: string, ...args: any[]): Promise<any> => {
callRecorder.push({ name: callName, args });
if (callName === 'multiPut') {
const [batch] = args;
const output: any = { items: {} };
for (const item of batch) {
if (itemBodyCallback) {
output.items[item.name] = itemBodyCallback(item);
} else {
output.items[item.name] = {
item: item.body,
error: null,
};
}
}
return output;
}
};
return apiCall;
}
describe('synchronizer_ItemUplader', function() {
beforeEach(async (done) => {
await setupDatabaseAndSynchronizer(1);
await setupDatabaseAndSynchronizer(2);
await switchClient(1);
done();
});
it('should batch uploads and use the cache afterwards', (async () => {
const callRecorder: ApiCall[] = [];
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
const notes = [
await Note.save({ title: '1' }),
await Note.save({ title: '2' }),
];
await itemUploader.preUploadItems(notes);
// There should be only one call to "multiPut" because the items have
// been batched.
expect(callRecorder.length).toBe(1);
expect(callRecorder[0].name).toBe('multiPut');
clearArray(callRecorder);
// Now if we try to upload the item it shouldn't call the API because it
// will use the cached item.
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]);
expect(callRecorder.length).toBe(0);
// Now try to process a note that hasn't been cached. In that case, it
// should call "PUT" directly.
const note3 = await Note.save({ title: '3' });
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(note3), note3);
expect(callRecorder.length).toBe(1);
expect(callRecorder[0].name).toBe('put');
}));
it('should not batch upload if the items are over the batch size limit', (async () => {
const callRecorder: ApiCall[] = [];
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
itemUploader.maxBatchSize = 1;
const notes = [
await Note.save({ title: '1' }),
await Note.save({ title: '2' }),
];
await itemUploader.preUploadItems(notes);
expect(callRecorder.length).toBe(0);
}));
it('should not use the cache if the note has changed since the pre-upload', (async () => {
const callRecorder: ApiCall[] = [];
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
const notes = [
await Note.save({ title: '1' }),
await Note.save({ title: '2' }),
];
await itemUploader.preUploadItems(notes);
clearArray(callRecorder);
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]);
expect(callRecorder.length).toBe(0);
await time.msleep(1);
notes[1] = await Note.save({ title: '22' }),
await itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1]);
expect(callRecorder.length).toBe(1);
}));
it('should respect the max batch size', (async () => {
const callRecorder: ApiCall[] = [];
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder));
const notes = [
await Note.save({ title: '1' }),
await Note.save({ title: '2' }),
await Note.save({ title: '3' }),
];
const noteSize = BaseItem.systemPath(notes[0]).length + (await Note.serializeForSync(notes[0])).length;
itemUploader.maxBatchSize = noteSize * 2;
// It should send two batches - one with two notes, and the second with
// only one note.
await itemUploader.preUploadItems(notes);
expect(callRecorder.length).toBe(2);
expect(callRecorder[0].args[0].length).toBe(2);
expect(callRecorder[1].args[0].length).toBe(1);
}));
it('should rethrow error for items within the batch', (async () => {
const callRecorder: ApiCall[] = [];
const notes = [
await Note.save({ title: '1' }),
await Note.save({ title: '2' }),
await Note.save({ title: '3' }),
];
// Simulates throwing an error on note 2
const itemBodyCallback = (item: any): any => {
if (item.name === BaseItem.systemPath(notes[1])) {
return { error: new Error('Could not save item'), item: null };
} else {
return { error: null, item: item.body };
}
};
const itemUploader = new ItemUploader(newFakeApi(), newFakeApiCall(callRecorder, itemBodyCallback));
await itemUploader.preUploadItems(notes);
await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[0]), notes[0]));
await expectThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[1]), notes[1]));
await expectNotThrow(async () => itemUploader.serializeAndUploadItem(Note, BaseItem.systemPath(notes[2]), notes[2]));
}));
});
@@ -0,0 +1,110 @@
import { ModelType } from '../../BaseModel';
import { FileApi, MultiPutItem } from '../../file-api';
import Logger from '../../Logger';
import BaseItem, { ItemThatNeedSync } from '../../models/BaseItem';
const logger = Logger.create('ItemUploader');
export type ApiCallFunction = (fnName: string, ...args: any[])=> Promise<any>;
interface BatchItem extends MultiPutItem {
localItemUpdatedTime: number;
}
export default class ItemUploader {
private api_: FileApi;
private apiCall_: ApiCallFunction;
private preUploadedItems_: Record<string, any> = {};
private preUploadedItemUpdatedTimes_: Record<string, number> = {};
private maxBatchSize_ = 1 * 1024 * 1024; // 1MB;
public constructor(api: FileApi, apiCall: ApiCallFunction) {
this.api_ = api;
this.apiCall_ = apiCall;
}
public get maxBatchSize() {
return this.maxBatchSize_;
}
public set maxBatchSize(v: number) {
this.maxBatchSize_ = v;
}
public async serializeAndUploadItem(ItemClass: any, path: string, local: ItemThatNeedSync) {
const preUploadItem = this.preUploadedItems_[path];
if (preUploadItem) {
if (this.preUploadedItemUpdatedTimes_[path] !== local.updated_time) {
// Normally this should be rare as it can only happen if the
// item has been changed between the moment it was pre-uploaded
// and the moment where it's being processed by the
// synchronizer. It could happen for example for a note being
// edited just at the same time. In that case, we proceed with
// the regular upload.
logger.warn(`Pre-uploaded item updated_time has changed. It is going to be re-uploaded again: ${path} (From ${this.preUploadedItemUpdatedTimes_[path]} to ${local.updated_time})`);
} else {
if (preUploadItem.error) throw new Error(preUploadItem.error.message ? preUploadItem.error.message : 'Unknown pre-upload error');
return;
}
}
const content = await ItemClass.serializeForSync(local);
await this.apiCall_('put', path, content);
}
public async preUploadItems(items: ItemThatNeedSync[]) {
if (!this.api_.supportsMultiPut) return;
const itemsToUpload: BatchItem[] = [];
for (const local of items) {
// For resources, additional logic is necessary - in particular the blob
// should be uploaded before the metadata, so we can't batch process.
if (local.type_ === ModelType.Resource) continue;
const ItemClass = BaseItem.itemClass(local);
itemsToUpload.push({
name: BaseItem.systemPath(local),
body: await ItemClass.serializeForSync(local),
localItemUpdatedTime: local.updated_time,
});
}
let batchSize = 0;
let currentBatch: BatchItem[] = [];
const uploadBatch = async (batch: BatchItem[]) => {
for (const batchItem of batch) {
this.preUploadedItemUpdatedTimes_[batchItem.name] = batchItem.localItemUpdatedTime;
}
const response = await this.apiCall_('multiPut', batch);
this.preUploadedItems_ = {
...this.preUploadedItems_,
...response.items,
};
};
while (itemsToUpload.length) {
const itemToUpload = itemsToUpload.pop();
const itemSize = itemToUpload.name.length + itemToUpload.body.length;
// Although it should be rare, if the item itself is above the
// batch max size, we skip it. In that case it will be uploaded the
// regular way when the synchronizer calls `serializeAndUploadItem()`
if (itemSize > this.maxBatchSize) continue;
if (batchSize + itemSize > this.maxBatchSize) {
await uploadBatch(currentBatch);
batchSize = itemSize;
currentBatch = [itemToUpload];
} else {
batchSize += itemSize;
currentBatch.push(itemToUpload);
}
}
if (currentBatch.length) await uploadBatch(currentBatch);
}
}
+1 -1
View File
@@ -29,7 +29,7 @@ import Revision from '../models/Revision';
import MasterKey from '../models/MasterKey';
import BaseItem from '../models/BaseItem';
const { FileApi } = require('../file-api.js');
const { FileApiDriverMemory } = require('../file-api-driver-memory.js');
const FileApiDriverMemory = require('../file-api-driver-memory').default;
const { FileApiDriverLocal } = require('../file-api-driver-local.js');
const { FileApiDriverWebDav } = require('../file-api-driver-webdav.js');
const { FileApiDriverDropbox } = require('../file-api-driver-dropbox.js');
+116 -44
View File
@@ -12,6 +12,18 @@ const mimeUtils = require('@joplin/lib/mime-utils.js').mime;
// Converts "root:/myfile.txt:" to "myfile.txt"
const extractNameRegex = /^root:\/(.*):$/;
export interface SaveFromRawContentItem {
name: string;
body: Buffer;
}
export interface SaveFromRawContentResultItem {
item: Item;
error: any;
}
export type SaveFromRawContentResult = Record<string, SaveFromRawContentResultItem>;
export interface PaginatedItems extends PaginatedResults {
items: Item[];
}
@@ -282,62 +294,122 @@ export default class ItemModel extends BaseModel<Item> {
return this.itemToJoplinItem(raw);
}
public async saveFromRawContent(user: User, name: string, buffer: Buffer, options: ItemSaveOption = null): Promise<Item> {
public async saveFromRawContent(user: User, rawContentItems: SaveFromRawContentItem[], options: ItemSaveOption = null): Promise<SaveFromRawContentResult> {
options = options || {};
const existingItem = await this.loadByName(user.id, name);
// In this function, first we process the input items, which may be
// serialized Joplin items or actual buffers (for resources) and convert
// them to database items. Once it's done those db items are saved in
// batch at the end.
const isJoplinItem = isJoplinItemName(name);
let isNote = false;
const item: Item = {
name,
};
let joplinItem: any = null;
let resourceIds: string[] = [];
if (isJoplinItem) {
joplinItem = await unserializeJoplinItem(buffer.toString());
isNote = joplinItem.type_ === ModelType.Note;
resourceIds = isNote ? linkedResourceIds(joplinItem.body) : [];
item.jop_id = joplinItem.id;
item.jop_parent_id = joplinItem.parent_id || '';
item.jop_type = joplinItem.type_;
item.jop_encryption_applied = joplinItem.encryption_applied || 0;
item.jop_share_id = joplinItem.share_id || '';
const joplinItemToSave = { ...joplinItem };
delete joplinItemToSave.id;
delete joplinItemToSave.parent_id;
delete joplinItemToSave.share_id;
delete joplinItemToSave.type_;
delete joplinItemToSave.encryption_applied;
item.content = Buffer.from(JSON.stringify(joplinItemToSave));
} else {
item.content = buffer;
interface ItemToProcess {
item: Item;
error: Error;
resourceIds?: string[];
isNote?: boolean;
joplinItem?: any;
}
if (existingItem) item.id = existingItem.id;
const existingItems = await this.loadByNames(user.id, rawContentItems.map(i => i.name));
const itemsToProcess: Record<string, ItemToProcess> = {};
if (options.shareId) item.jop_share_id = options.shareId;
for (const rawItem of rawContentItems) {
try {
const isJoplinItem = isJoplinItemName(rawItem.name);
let isNote = false;
await this.models().user().checkMaxItemSizeLimit(user, buffer, item, joplinItem);
const item: Item = {
name: rawItem.name,
};
return this.withTransaction<Item>(async () => {
const savedItem = await this.saveForUser(user.id, item);
let joplinItem: any = null;
if (isNote) {
await this.models().itemResource().deleteByItemId(savedItem.id);
await this.models().itemResource().addResourceIds(savedItem.id, resourceIds);
let resourceIds: string[] = [];
if (isJoplinItem) {
joplinItem = await unserializeJoplinItem(rawItem.body.toString());
isNote = joplinItem.type_ === ModelType.Note;
resourceIds = isNote ? linkedResourceIds(joplinItem.body) : [];
item.jop_id = joplinItem.id;
item.jop_parent_id = joplinItem.parent_id || '';
item.jop_type = joplinItem.type_;
item.jop_encryption_applied = joplinItem.encryption_applied || 0;
item.jop_share_id = joplinItem.share_id || '';
const joplinItemToSave = { ...joplinItem };
delete joplinItemToSave.id;
delete joplinItemToSave.parent_id;
delete joplinItemToSave.share_id;
delete joplinItemToSave.type_;
delete joplinItemToSave.encryption_applied;
item.content = Buffer.from(JSON.stringify(joplinItemToSave));
} else {
item.content = rawItem.body;
}
const existingItem = existingItems.find(i => i.name === rawItem.name);
if (existingItem) item.id = existingItem.id;
if (options.shareId) item.jop_share_id = options.shareId;
await this.models().user().checkMaxItemSizeLimit(user, rawItem.body, item, joplinItem);
itemsToProcess[rawItem.name] = {
item: item,
error: null,
resourceIds,
isNote,
joplinItem,
};
} catch (error) {
itemsToProcess[rawItem.name] = {
item: null,
error: error,
};
}
}
return savedItem;
const output: SaveFromRawContentResult = {};
await this.withTransaction(async () => {
for (const name of Object.keys(itemsToProcess)) {
const o = itemsToProcess[name];
if (o.error) {
output[name] = {
item: null,
error: o.error,
};
continue;
}
const itemToSave = o.item;
try {
const savedItem = await this.saveForUser(user.id, itemToSave);
if (o.isNote) {
await this.models().itemResource().deleteByItemId(savedItem.id);
await this.models().itemResource().addResourceIds(savedItem.id, o.resourceIds);
}
output[name] = {
item: savedItem,
error: null,
};
} catch (error) {
output[name] = {
item: null,
error: error,
};
}
}
});
return output;
}
protected async validate(item: Item, options: ValidateOptions = {}): Promise<Item> {
@@ -0,0 +1,19 @@
import { SubPath } from '../../utils/routeUtils';
import Router from '../../utils/Router';
import { RouteType } from '../../utils/types';
import { AppContext } from '../../utils/types';
import { putItemContents } from './items';
import { PaginatedResults } from '../../models/utils/pagination';
const router = new Router(RouteType.Api);
router.put('api/batch_items', async (path: SubPath, ctx: AppContext) => {
const output: PaginatedResults = {
items: await putItemContents(path, ctx, true) as any,
has_more: false,
};
return output;
});
export default router;
+52 -1
View File
@@ -3,10 +3,11 @@ import { NoteEntity } from '@joplin/lib/services/database/types';
import { ModelType } from '@joplin/lib/BaseModel';
import { deleteApi, getApi, putApi } from '../../utils/testing/apiUtils';
import { Item } from '../../db';
import { PaginatedItems } from '../../models/ItemModel';
import { PaginatedItems, SaveFromRawContentResult } from '../../models/ItemModel';
import { shareFolderWithUser } from '../../utils/testing/shareApiUtils';
import { resourceBlobPath } from '../../utils/joplinUtils';
import { ErrorForbidden, ErrorPayloadTooLarge } from '../../utils/errors';
import { PaginatedResults } from '../../models/utils/pagination';
describe('api_items', function() {
@@ -149,6 +150,56 @@ describe('api_items', function() {
expect(result.name).toBe(`${noteId}.md`);
});
test('should batch upload items', async function() {
const { session: session1 } = await createUserAndSession(1, false);
const result: PaginatedResults = await putApi(session1.id, 'batch_items', {
items: [
{
name: '00000000000000000000000000000001.md',
body: makeNoteSerializedBody({ id: '00000000000000000000000000000001' }),
},
{
name: '00000000000000000000000000000002.md',
body: makeNoteSerializedBody({ id: '00000000000000000000000000000002' }),
},
],
});
expect(Object.keys(result.items).length).toBe(2);
expect(Object.keys(result.items).sort()).toEqual(['00000000000000000000000000000001.md', '00000000000000000000000000000002.md']);
});
test('should report errors when batch uploading', async function() {
const { user: user1,session: session1 } = await createUserAndSession(1, false);
const note1 = makeNoteSerializedBody({ id: '00000000000000000000000000000001' });
await models().user().save({ id: user1.id, max_item_size: note1.length });
const result: PaginatedResults = await putApi(session1.id, 'batch_items', {
items: [
{
name: '00000000000000000000000000000001.md',
body: note1,
},
{
name: '00000000000000000000000000000002.md',
body: makeNoteSerializedBody({ id: '00000000000000000000000000000002', body: 'too large' }),
},
],
});
const items: SaveFromRawContentResult = result.items as any;
expect(Object.keys(items).length).toBe(2);
expect(Object.keys(items).sort()).toEqual(['00000000000000000000000000000001.md', '00000000000000000000000000000002.md']);
expect(items['00000000000000000000000000000001.md'].item).toBeTruthy();
expect(items['00000000000000000000000000000001.md'].error).toBeFalsy();
expect(items['00000000000000000000000000000002.md'].item).toBeFalsy();
expect(items['00000000000000000000000000000002.md'].error.httpCode).toBe(ErrorPayloadTooLarge.httpCode);
});
test('should list children', async function() {
const { session } = await createUserAndSession(1, true);
+63 -32
View File
@@ -5,14 +5,71 @@ import Router from '../../utils/Router';
import { RouteType } from '../../utils/types';
import { AppContext } from '../../utils/types';
import * as fs from 'fs-extra';
import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound } from '../../utils/errors';
import ItemModel, { ItemSaveOption } from '../../models/ItemModel';
import { ErrorForbidden, ErrorMethodNotAllowed, ErrorNotFound, ErrorPayloadTooLarge } from '../../utils/errors';
import ItemModel, { ItemSaveOption, SaveFromRawContentItem } from '../../models/ItemModel';
import { requestDeltaPagination, requestPagination } from '../../models/utils/pagination';
import { AclAction } from '../../models/BaseModel';
import { safeRemove } from '../../utils/fileUtils';
import { formatBytes, MB } from '../../utils/bytes';
const router = new Router(RouteType.Api);
const batchMaxSize = 1 * MB;
export async function putItemContents(path: SubPath, ctx: AppContext, isBatch: boolean) {
if (!ctx.owner.can_upload) throw new ErrorForbidden('Uploading content is disabled');
const parsedBody = await formParse(ctx.req);
const bodyFields = parsedBody.fields;
const saveOptions: ItemSaveOption = {};
let items: SaveFromRawContentItem[] = [];
if (isBatch) {
let totalSize = 0;
items = bodyFields.items.map((item: any) => {
totalSize += item.name.length + (item.body ? item.body.length : 0);
return {
name: item.name,
body: item.body ? Buffer.from(item.body, 'utf8') : Buffer.alloc(0),
};
});
if (totalSize > batchMaxSize) throw new ErrorPayloadTooLarge(`Size of items (${formatBytes(totalSize)}) is over the limit (${formatBytes(batchMaxSize)})`);
} else {
const filePath = parsedBody?.files?.file ? parsedBody.files.file.path : null;
try {
const buffer = filePath ? await fs.readFile(filePath) : Buffer.alloc(0);
// This end point can optionally set the associated jop_share_id field. It
// is only useful when uploading resource blob (under .resource folder)
// since they can't have metadata. Note, Folder and Resource items all
// include the "share_id" field property so it doesn't need to be set via
// query parameter.
if (ctx.query['share_id']) {
saveOptions.shareId = ctx.query['share_id'];
await ctx.models.item().checkIfAllowed(ctx.owner, AclAction.Create, { jop_share_id: saveOptions.shareId });
}
items = [
{
name: ctx.models.item().pathToName(path.id),
body: buffer,
},
];
} finally {
if (filePath) await safeRemove(filePath);
}
}
const output = await ctx.models.item().saveFromRawContent(ctx.owner, items, saveOptions);
for (const [name] of Object.entries(output)) {
if (output[name].item) output[name].item = ctx.models.item().toApiOutput(output[name].item) as Item;
}
return output;
}
// Note about access control:
//
// - All these calls are scoped to a user, which is derived from the session
@@ -66,36 +123,10 @@ router.get('api/items/:id/content', async (path: SubPath, ctx: AppContext) => {
});
router.put('api/items/:id/content', async (path: SubPath, ctx: AppContext) => {
if (!ctx.owner.can_upload) throw new ErrorForbidden('Uploading content is disabled');
const itemModel = ctx.models.item();
const name = itemModel.pathToName(path.id);
const parsedBody = await formParse(ctx.req);
const filePath = parsedBody?.files?.file ? parsedBody.files.file.path : null;
let outputItem: Item = null;
try {
const buffer = filePath ? await fs.readFile(filePath) : Buffer.alloc(0);
const saveOptions: ItemSaveOption = {};
// This end point can optionally set the associated jop_share_id field. It
// is only useful when uploading resource blob (under .resource folder)
// since they can't have metadata. Note, Folder and Resource items all
// include the "share_id" field property so it doesn't need to be set via
// query parameter.
if (ctx.query['share_id']) {
saveOptions.shareId = ctx.query['share_id'];
await itemModel.checkIfAllowed(ctx.owner, AclAction.Create, { jop_share_id: saveOptions.shareId });
}
const item = await itemModel.saveFromRawContent(ctx.owner, name, buffer, saveOptions);
outputItem = itemModel.toApiOutput(item) as Item;
} finally {
if (filePath) await safeRemove(filePath);
}
return outputItem;
const results = await putItemContents(path, ctx, false);
const result = results[Object.keys(results)[0]];
if (result.error) throw result.error;
return result.item;
});
router.get('api/items/:id/delta', async (_path: SubPath, ctx: AppContext) => {
+2
View File
@@ -3,6 +3,7 @@ import { Routers } from '../utils/routeUtils';
import apiBatch from './api/batch';
import apiDebug from './api/debug';
import apiEvents from './api/events';
import apiBatchItems from './api/batch_items';
import apiItems from './api/items';
import apiPing from './api/ping';
import apiSessions from './api/sessions';
@@ -27,6 +28,7 @@ import defaultRoute from './default';
const routes: Routers = {
'api/batch': apiBatch,
'api/batch_items': apiBatchItems,
'api/debug': apiDebug,
'api/events': apiEvents,
'api/items': apiItems,
@@ -60,7 +60,8 @@ async function createItemTree3(sessionId: Uuid, userId: Uuid, parentFolderId: st
}
}
const newItem = await models().item().saveFromRawContent(user, `${jopItem.id}.md`, Buffer.from(serializedBody));
const result = await models().item().saveFromRawContent(user, [{ name: `${jopItem.id}.md`, body: Buffer.from(serializedBody) }]);
const newItem = result[`${jopItem.id}.md`].item;
if (isFolder && jopItem.children.length) await createItemTree3(sessionId, userId, newItem.jop_id, shareId, jopItem.children);
}
}
+15 -13
View File
@@ -275,19 +275,20 @@ export async function createItemTree(userId: Uuid, parentFolderId: string, tree:
}
}
export async function createItemTree2(userId: Uuid, parentFolderId: string, tree: any[]): Promise<void> {
const itemModel = models().item();
const user = await models().user().load(userId);
// export async function createItemTree2(userId: Uuid, parentFolderId: string, tree: any[]): Promise<void> {
// const itemModel = models().item();
// const user = await models().user().load(userId);
for (const jopItem of tree) {
const isFolder = !!jopItem.children;
const serializedBody = isFolder ?
makeFolderSerializedBody({ ...jopItem, parent_id: parentFolderId }) :
makeNoteSerializedBody({ ...jopItem, parent_id: parentFolderId });
const newItem = await itemModel.saveFromRawContent(user, `${jopItem.id}.md`, Buffer.from(serializedBody));
if (isFolder && jopItem.children.length) await createItemTree2(userId, newItem.jop_id, jopItem.children);
}
}
// for (const jopItem of tree) {
// const isFolder = !!jopItem.children;
// const serializedBody = isFolder ?
// makeFolderSerializedBody({ ...jopItem, parent_id: parentFolderId }) :
// makeNoteSerializedBody({ ...jopItem, parent_id: parentFolderId });
// const result = await itemModel.saveFromRawContent(user, [{ name: `${jopItem.id}.md`, body: Buffer.from(serializedBody) }]);
// const newItem = result[`${jopItem.id}.md`].item;
// if (isFolder && jopItem.children.length) await createItemTree2(userId, newItem.jop_id, jopItem.children);
// }
// }
export async function createItemTree3(userId: Uuid, parentFolderId: string, shareId: Uuid, tree: any[]): Promise<void> {
const itemModel = models().item();
@@ -298,7 +299,8 @@ export async function createItemTree3(userId: Uuid, parentFolderId: string, shar
const serializedBody = isFolder ?
makeFolderSerializedBody({ ...jopItem, parent_id: parentFolderId, share_id: shareId }) :
makeNoteSerializedBody({ ...jopItem, parent_id: parentFolderId, share_id: shareId });
const newItem = await itemModel.saveFromRawContent(user, `${jopItem.id}.md`, Buffer.from(serializedBody));
const result = await itemModel.saveFromRawContent(user, [{ name: `${jopItem.id}.md`, body: Buffer.from(serializedBody) }]);
const newItem = result[`${jopItem.id}.md`].item;
if (isFolder && jopItem.children.length) await createItemTree3(userId, newItem.jop_id, shareId, jopItem.children);
}
}