1
0
mirror of https://github.com/laurent22/joplin.git synced 2025-02-01 19:15:01 +02:00

Server: Support including items with delta call to optimise sync

This commit is contained in:
Laurent Cozic 2023-12-17 11:12:26 +00:00
parent d13a3fc5c8
commit 67a68709e3
9 changed files with 183 additions and 84 deletions

View File

@ -401,7 +401,7 @@ export default class Synchronizer {
this.dispatch({ type: 'SYNC_STARTED' });
eventManager.emit(EventName.SyncStart);
this.logSyncOperation('starting', null, null, `Starting synchronisation to target ${syncTargetId}... supportsAccurateTimestamp = ${this.api().supportsAccurateTimestamp}; supportsMultiPut = ${this.api().supportsMultiPut} [${synchronizationId}]`);
this.logSyncOperation('starting', null, null, `Starting synchronisation to target ${syncTargetId}... supportsAccurateTimestamp = ${this.api().supportsAccurateTimestamp}; supportsMultiPut = ${this.api().supportsMultiPut}; supportsDeltaWithItems = ${this.api().supportsDeltaWithItems} [${synchronizationId}]`);
const handleCannotSyncItem = async (ItemClass: any, syncTargetId: any, item: any, cannotSyncReason: string, itemLocation: any = null) => {
await ItemClass.saveSyncDisabled(syncTargetId, item, cannotSyncReason, itemLocation);
@ -843,6 +843,10 @@ export default class Synchronizer {
if (local && local.updated_time === remote.jop_updated_time) needsToDownload = false;
}
if (this.api().supportsDeltaWithItems) {
needsToDownload = false;
}
if (needsToDownload) {
this.downloadQueue_.push(remote.path, async () => {
return this.apiCall('get', remote.path);
@ -862,6 +866,10 @@ export default class Synchronizer {
if (!BaseItem.isSystemPath(remote.path)) continue; // The delta API might return things like the .sync, .resource or the root folder
const loadContent = async () => {
if (this.api().supportsDeltaWithItems) {
return remote.jopItem;
}
const task = await this.downloadQueue_.waitForResult(path);
if (task.error) throw task.error;
if (!task.result) return null;

View File

@ -45,6 +45,10 @@ export default class FileApiDriverJoplinServer {
return true;
}
public get supportsDeltaWithItems() {
return true;
}
public requestRepeatCount() {
return 3;
}
@ -56,6 +60,7 @@ export default class FileApiDriverJoplinServer {
jop_updated_time: md.jop_updated_time,
isDir: false,
isDeleted: isDeleted,
jopItem: md.jopItem,
};
return output;

View File

@ -33,6 +33,8 @@ export interface RemoteItem {
// value will always be ahead. However for synchronising we need to know the
// exact Joplin item updated_time value.
jop_updated_time?: number;
jopItem?: any;
}
export interface PaginatedList {
@ -137,6 +139,13 @@ class FileApi {
return !!this.driver().supportsLocks;
}
// Tells whether the delta call is going to include the items themselves or
// just the metadata (which is the default). If the items are included it
// means less http request and faster processing.
public get supportsDeltaWithItems(): boolean {
return !!this.driver().supportsDeltaWithItems;
}
private async fetchRemoteDateOffset_() {
const tempFile = `${this.tempDirName()}/timeCheck${Math.round(Math.random() * 1000000)}.txt`;
const startTime = Date.now();

View File

@ -39,6 +39,8 @@ const defaultEnvValues: EnvVariables = {
MAX_TIME_DRIFT: 2000,
NTP_SERVER: 'pool.ntp.org:123',
DELTA_INCLUDES_ITEMS: true,
// ==================================================
// URL config
// ==================================================
@ -141,6 +143,7 @@ export interface EnvVariables {
RUNNING_IN_DOCKER: boolean;
MAX_TIME_DRIFT: number;
NTP_SERVER: string;
DELTA_INCLUDES_ITEMS: boolean;
IS_ADMIN_INSTANCE: boolean;
INSTANCE_NAME: string;

View File

@ -1,4 +1,4 @@
import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, expectThrow, createFolder, createItemTree3, expectNotThrow, createNote, updateNote } from '../utils/testing/testUtils';
import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, expectThrow, createFolder, createItemTree3, expectNotThrow, createNote, updateNote, deleteNote } from '../utils/testing/testUtils';
import { ChangeType } from '../services/database/types';
import { Day, msleep } from '../utils/time';
import { ChangePagination } from './ChangeModel';
@ -271,4 +271,58 @@ describe('ChangeModel', () => {
jest.useRealTimers();
});
test('should return whole item when doing a delta call', async () => {
const { user, session } = await createUserAndSession(1, true);
await createItemTree3(user.id, '', '', [
{
id: '000000000000000000000000000000F1',
title: 'Folder 1',
children: [
{
id: '00000000000000000000000000000001',
title: 'Note 1',
},
{
id: '00000000000000000000000000000002',
title: 'Note 2',
},
],
},
]);
let cursor = '';
{
const result = await models().change().delta(user.id);
cursor = result.cursor;
const titles = result.items.map(it => it.jopItem.title).sort();
expect(titles).toEqual(['Folder 1', 'Note 1', 'Note 2']);
}
await msleep(1);
await updateNote(session.id, {
id: '00000000000000000000000000000001',
title: 'new title',
});
{
const result = await models().change().delta(user.id, { cursor });
cursor = result.cursor;
expect(result.items.length).toBe(1);
expect(result.items[0].jopItem.title).toBe('new title');
}
await msleep(1);
await deleteNote(user.id, '00000000000000000000000000000002');
{
const result = await models().change().delta(user.id, { cursor });
expect(result.items.length).toBe(1);
expect(result.items[0].jopItem).toBe(null);
}
});
});

View File

@ -1,12 +1,14 @@
import { Knex } from 'knex';
import Logger from '@joplin/utils/Logger';
import { SqliteMaxVariableNum, isPostgres } from '../db';
import { DbConnection, SqliteMaxVariableNum, isPostgres } from '../db';
import { Change, ChangeType, Item, Uuid } from '../services/database/types';
import { md5 } from '../utils/crypto';
import { ErrorResyncRequired } from '../utils/errors';
import { Day, formatDateTime } from '../utils/time';
import BaseModel, { SaveOptions } from './BaseModel';
import { PaginatedResults } from './utils/pagination';
import { NewModelFactoryHandler } from './factory';
import { Config } from '../utils/types';
const logger = Logger.create('ChangeModel');
@ -14,6 +16,7 @@ export const defaultChangeTtl = 180 * Day;
export interface DeltaChange extends Change {
jop_updated_time?: number;
jopItem: any;
}
export type PaginatedDeltaChanges = PaginatedResults<DeltaChange>;
@ -50,6 +53,13 @@ export function requestDeltaPagination(query: any): ChangePagination {
export default class ChangeModel extends BaseModel<Change> {
private deltaIncludesItems_: boolean;
public constructor(db: DbConnection, modelFactory: NewModelFactoryHandler, config: Config) {
super(db, modelFactory, config);
this.deltaIncludesItems_ = config.DELTA_INCLUDES_ITEMS;
}
public get tableName(): string {
return 'changes';
}
@ -88,43 +98,6 @@ export default class ChangeModel extends BaseModel<Change> {
};
}
// private changesForUserQuery(userId: Uuid, count: boolean): Knex.QueryBuilder {
// // When need to get:
// //
// // - All the CREATE and DELETE changes associated with the user
// // - All the UPDATE changes that applies to items associated with the
// // user.
// //
// // UPDATE changes do not have the user_id set because they are specific
// // to the item, not to a particular user.
// const query = this
// .db('changes')
// .where(function() {
// void this.whereRaw('((type = ? OR type = ?) AND user_id = ?)', [ChangeType.Create, ChangeType.Delete, userId])
// // Need to use a RAW query here because Knex has a "not a
// // bug" bug that makes it go into infinite loop in some
// // contexts, possibly only when running inside Jest (didn't
// // test outside).
// // https://github.com/knex/knex/issues/1851
// .orWhereRaw('type = ? AND item_id IN (SELECT item_id FROM user_items WHERE user_id = ?)', [ChangeType.Update, userId]);
// });
// if (count) {
// void query.countDistinct('id', { as: 'total' });
// } else {
// void query.select([
// 'id',
// 'item_id',
// 'item_name',
// 'type',
// 'updated_time',
// ]);
// }
// return query;
// }
public async changesForUserQuery(userId: Uuid, fromCounter: number, limit: number, doCountQuery: boolean): Promise<Change[]> {
// When need to get:
//
@ -251,35 +224,6 @@ export default class ChangeModel extends BaseModel<Change> {
return output;
}
// public async allByUser(userId: Uuid, pagination: Pagination = null): Promise<PaginatedDeltaChanges> {
// pagination = {
// page: 1,
// limit: 100,
// order: [{ by: 'counter', dir: PaginationOrderDir.ASC }],
// ...pagination,
// };
// const query = this.changesForUserQuery(userId, false);
// const countQuery = this.changesForUserQuery(userId, true);
// const itemCount = (await countQuery.first()).total;
// void query
// .orderBy(pagination.order[0].by, pagination.order[0].dir)
// .offset((pagination.page - 1) * pagination.limit)
// .limit(pagination.limit) as any[];
// const changes = await query;
// return {
// items: changes,
// // If we have changes, we return the ID of the latest changes from which delta sync can resume.
// // If there's no change, we return the previous cursor.
// cursor: changes.length ? changes[changes.length - 1].id : pagination.cursor,
// has_more: changes.length >= pagination.limit,
// page_count: itemCount !== null ? Math.ceil(itemCount / pagination.limit) : undefined,
// };
// }
public async delta(userId: Uuid, pagination: ChangePagination = null): Promise<PaginatedDeltaChanges> {
pagination = {
...defaultDeltaPagination(),
@ -300,18 +244,35 @@ export default class ChangeModel extends BaseModel<Change> {
false,
);
const items: Item[] = await this.db('items').select('id', 'jop_updated_time').whereIn('items.id', changes.map(c => c.item_id));
let items: Item[] = await this.db('items').select('id', 'jop_updated_time').whereIn('items.id', changes.map(c => c.item_id));
let processedChanges = this.compressChanges(changes);
processedChanges = await this.removeDeletedItems(processedChanges, items);
const finalChanges: DeltaChange[] = processedChanges.map(c => {
const item = items.find(item => item.id === c.item_id);
if (!item) return c;
return {
...c,
if (this.deltaIncludesItems_) {
items = await this.models().item().loadWithContentMulti(processedChanges.map(c => c.item_id), {
fields: [
'content',
'id',
'jop_encryption_applied',
'jop_id',
'jop_parent_id',
'jop_share_id',
'jop_type',
'jop_updated_time',
],
});
}
const finalChanges = processedChanges.map(change => {
const item = items.find(item => item.id === change.item_id);
if (!item) return { ...change, jopItem: null };
const deltaChange: DeltaChange = {
...change,
jop_updated_time: item.jop_updated_time,
jopItem: this.deltaIncludesItems_ && item.jop_type ? this.models().item().itemToJoplinItem(item) : null,
};
return deltaChange;
});
return {

View File

@ -558,4 +558,31 @@ describe('ItemModel', () => {
expect((await models().userItem().byUserId(user2.id)).length).toBe(1);
});
test('should return multiple item contents', async () => {
const { user: user1 } = await createUserAndSession(1);
await createItemTree3(user1.id, '', '', [
{
id: '000000000000000000000000000000F1',
title: 'Folder 1',
children: [
{
id: '00000000000000000000000000000001',
title: 'Note 1',
},
{
id: '00000000000000000000000000000002',
title: 'Note 2',
},
],
},
]);
const itemIds = (await models().item().all()).map(i => i.id);
const items = await models().item().loadWithContentMulti(itemIds);
const jopItems = items.map(it => models().item().itemToJoplinItem(it));
expect(jopItems.map(it => it.title).sort()).toEqual(['Folder 1', 'Note 1', 'Note 2']);
});
});

View File

@ -268,19 +268,46 @@ export default class ItemModel extends BaseModel<Item> {
}
public async loadWithContent(id: Uuid, options: ItemLoadOptions = {}): Promise<Item> {
const item: Item = await this
const output = await this.loadWithContentMulti([id], options);
return output.length ? output[0] : null;
}
public async loadWithContentMulti(ids: Uuid[], options: ItemLoadOptions = {}): Promise<Item[]> {
const fields = this.selectFields(options, ['*'], 'items', ['items.content_size']);
const contentIndex = fields.findIndex(f => f === 'items.content');
if (contentIndex >= 0) {
fields.splice(contentIndex, 1);
}
const items: Item[] = await this
.db('user_items')
.leftJoin('items', 'items.id', 'user_items.item_id')
.select(this.selectFields(options, ['*'], 'items', ['items.content_size']))
.where('items.id', '=', id)
.first();
.select(fields)
.whereIn('items.id', ids);
const content = await this.storageDriverRead(id, item.content_size, { models: this.models() });
const promises: Promise<Buffer>[] = [];
const itemIndexToId: Record<number, string> = {};
return {
...item,
content,
};
for (let i = 0; i < items.length; i++) {
const item = items[i];
itemIndexToId[i] = item.id;
promises.push(this.storageDriverRead(item.id, item.content_size, { models: this.models() }));
}
await Promise.all(promises);
const output: Item[] = [];
for (let i = 0; i < promises.length; i++) {
const promise = promises[i];
const item = items.find(it => it.id === itemIndexToId[i]);
output.push({
...item,
content: await promise,
});
}
return output;
}
public async loadAsSerializedJoplinItem(id: Uuid): Promise<string> {

View File

@ -376,6 +376,11 @@ export async function updateNote(sessionId: string, note: NoteEntity): Promise<I
return updateItem(sessionId, `root:/${note.id}.md:`, makeNoteSerializedBody(note));
}
export async function deleteNote(userId: Uuid, noteJopId: string): Promise<void> {
const item = await models().item().loadByJopId(userId, noteJopId, { fields: ['id'] });
await models().item().delete(item.id);
}
export async function updateFolder(sessionId: string, folder: FolderEntity): Promise<Item> {
return updateItem(sessionId, `root:/${folder.id}.md:`, makeFolderSerializedBody(folder));
}