1
0
mirror of https://github.com/vrtmrz/obsidian-livesync.git synced 2025-02-10 19:41:57 +02:00

Modified:

- Implement automatic temporary reduction of batch sizes.
- Disable remote checkpointing.
This commit is contained in:
vorotamoroz 2022-01-05 17:20:33 +09:00
parent c3464a4e9c
commit 5bb8b2567b
5 changed files with 143 additions and 50 deletions

View File

@ -1,7 +1,7 @@
{
"id": "obsidian-livesync",
"name": "Self-hosted LiveSync",
"version": "0.4.0",
"version": "0.4.1",
"minAppVersion": "0.9.12",
"description": "Community implementation of self-hosted livesync. Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.",
"author": "vorotamoroz",

4
package-lock.json generated
View File

@ -1,12 +1,12 @@
{
"name": "obsidian-livesync",
"version": "0.4.0",
"version": "0.4.1",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "obsidian-livesync",
"version": "0.4.0",
"version": "0.4.1",
"license": "MIT",
"dependencies": {
"diff-match-patch": "^1.0.5",

View File

@ -1,6 +1,6 @@
{
"name": "obsidian-livesync",
"version": "0.4.0",
"version": "0.4.1",
"description": "Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.",
"main": "main.js",
"scripts": {

View File

@ -25,7 +25,7 @@ import {
} from "./types";
import { resolveWithIgnoreKnownError, delay, path2id, runWithLock } from "./utils";
import { Logger } from "./logger";
import { checkRemoteVersion, connectRemoteCouchDB } from "./utils_couchdb";
import { checkRemoteVersion, connectRemoteCouchDB, getLastPostFailedBySize } from "./utils_couchdb";
import { decrypt, encrypt } from "./e2ee";
export class LocalPouchDB {
@ -763,6 +763,12 @@ export class LocalPouchDB {
}
const syncOptionBase: PouchDB.Replication.SyncOptions = {
pull: {
checkpoint: "target",
},
push: {
checkpoint: "source",
},
batches_limit: setting.batches_limit,
batch_size: setting.batch_size,
};
@ -770,7 +776,7 @@ export class LocalPouchDB {
const db = dbret.db;
const totalCount = (await this.localDatabase.info()).doc_count;
//replicate once
const replicate = this.localDatabase.replicate.to(db, syncOptionBase);
const replicate = this.localDatabase.replicate.to(db, { checkpoint: "source", ...syncOptionBase });
replicate
.on("active", () => {
this.syncStatus = "CONNECTED";
@ -806,7 +812,7 @@ export class LocalPouchDB {
});
}
async checkReplicationConnectivity(setting: ObsidianLiveSyncSettings, keepAlive: boolean) {
async checkReplicationConnectivity(setting: ObsidianLiveSyncSettings, keepAlive: boolean, skipCheck: boolean) {
if (!this.isReady) {
Logger("Database is not ready.");
return false;
@ -832,40 +838,41 @@ export class LocalPouchDB {
return false;
}
if (!(await checkRemoteVersion(dbret.db, this.migrate.bind(this), VER))) {
Logger("Remote database is newer or corrupted, make sure to latest version of self-hosted-livesync installed", LOG_LEVEL.NOTICE);
return false;
if (!skipCheck) {
if (!(await checkRemoteVersion(dbret.db, this.migrate.bind(this), VER))) {
Logger("Remote database is newer or corrupted, make sure to latest version of self-hosted-livesync installed", LOG_LEVEL.NOTICE);
return false;
}
const defMilestonePoint: EntryMilestoneInfo = {
_id: MILSTONE_DOCID,
type: "milestoneinfo",
created: (new Date() as any) / 1,
locked: false,
accepted_nodes: [this.nodeid],
};
// const remoteInfo = dbret.info;
// const localInfo = await this.localDatabase.info();
// const remoteDocsCount = remoteInfo.doc_count;
// const localDocsCount = localInfo.doc_count;
// const remoteUpdSeq = typeof remoteInfo.update_seq == "string" ? Number(remoteInfo.update_seq.split("-")[0]) : remoteInfo.update_seq;
// const localUpdSeq = typeof localInfo.update_seq == "string" ? Number(localInfo.update_seq.split("-")[0]) : localInfo.update_seq;
// Logger(`Database diffences: remote:${remoteDocsCount} docs / last update ${remoteUpdSeq}`);
// Logger(`Database diffences: local :${localDocsCount} docs / last update ${localUpdSeq}`);
const remoteMilestone: EntryMilestoneInfo = await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defMilestonePoint);
this.remoteLocked = remoteMilestone.locked;
this.remoteLockedAndDeviceNotAccepted = remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1;
if (remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1) {
Logger("Remote database marked as 'Auto Sync Locked'. And this devide does not marked as resolved device. see settings dialog.", LOG_LEVEL.NOTICE);
return false;
}
if (typeof remoteMilestone._rev == "undefined") {
await dbret.db.put(remoteMilestone);
}
}
const defMilestonePoint: EntryMilestoneInfo = {
_id: MILSTONE_DOCID,
type: "milestoneinfo",
created: (new Date() as any) / 1,
locked: false,
accepted_nodes: [this.nodeid],
};
// const remoteInfo = dbret.info;
// const localInfo = await this.localDatabase.info();
// const remoteDocsCount = remoteInfo.doc_count;
// const localDocsCount = localInfo.doc_count;
// const remoteUpdSeq = typeof remoteInfo.update_seq == "string" ? Number(remoteInfo.update_seq.split("-")[0]) : remoteInfo.update_seq;
// const localUpdSeq = typeof localInfo.update_seq == "string" ? Number(localInfo.update_seq.split("-")[0]) : localInfo.update_seq;
// Logger(`Database diffences: remote:${remoteDocsCount} docs / last update ${remoteUpdSeq}`);
// Logger(`Database diffences: local :${localDocsCount} docs / last update ${localUpdSeq}`);
const remoteMilestone: EntryMilestoneInfo = await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defMilestonePoint);
this.remoteLocked = remoteMilestone.locked;
this.remoteLockedAndDeviceNotAccepted = remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1;
if (remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1) {
Logger("Remote database marked as 'Auto Sync Locked'. And this devide does not marked as resolved device. see settings dialog.", LOG_LEVEL.NOTICE);
return false;
}
if (typeof remoteMilestone._rev == "undefined") {
await dbret.db.put(remoteMilestone);
}
const syncOptionBase: PouchDB.Replication.SyncOptions = {
batches_limit: setting.batches_limit,
batch_size: setting.batch_size,
@ -877,33 +884,49 @@ export class LocalPouchDB {
async openReplication(setting: ObsidianLiveSyncSettings, keepAlive: boolean, showResult: boolean, callback: (e: PouchDB.Core.ExistingDocument<EntryDoc>[]) => Promise<void>): Promise<boolean> {
return await runWithLock("replicate", false, () => {
return this._openReplication(setting, keepAlive, showResult, callback);
return this._openReplication(setting, keepAlive, showResult, callback, false);
});
}
async _openReplication(setting: ObsidianLiveSyncSettings, keepAlive: boolean, showResult: boolean, callback: (e: PouchDB.Core.ExistingDocument<EntryDoc>[]) => Promise<void>): Promise<boolean> {
const ret = await this.checkReplicationConnectivity(setting, keepAlive);
originalSetting: ObsidianLiveSyncSettings = null;
// last_seq: number = 200;
async _openReplication(setting: ObsidianLiveSyncSettings, keepAlive: boolean, showResult: boolean, callback: (e: PouchDB.Core.ExistingDocument<EntryDoc>[]) => Promise<void>, retrying: boolean): Promise<boolean> {
const ret = await this.checkReplicationConnectivity(setting, keepAlive, retrying);
if (ret === false) return false;
let notice: Notice = null;
if (showResult) {
notice = new Notice("Replicating", 0);
notice = new Notice("Looking for the point last synchronized point.", 0);
}
const { db, syncOptionBase, syncOption } = ret;
//replicate once
this.syncStatus = "STARTED";
this.updateInfo();
let resolved = false;
const docArrivedOnStart = this.docArrived;
const docSentOnStart = this.docSent;
const _openReplicationSync = () => {
Logger("Sync Main Started");
if (!retrying) {
this.originalSetting = setting;
}
this.syncHandler = this.cancelHandler(this.syncHandler);
this.syncHandler = this.localDatabase.sync<EntryDoc>(db, syncOption);
this.syncHandler = this.localDatabase.sync<EntryDoc>(db, {
...syncOption,
pull: {
checkpoint: "target",
},
push: {
checkpoint: "source",
},
});
this.syncHandler
.on("active", () => {
this.syncStatus = "CONNECTED";
this.updateInfo();
Logger("Replication activated");
if (notice != null) notice.setMessage(`Activated..`);
})
.on("change", async (e) => {
try {
@ -924,6 +947,16 @@ export class LocalPouchDB {
Logger("Replication callback error");
Logger(ex);
}
// re-connect to retry with original setting
if (retrying) {
if (this.docSent - docSentOnStart + (this.docArrived - docArrivedOnStart) > this.originalSetting.batch_size * 2) {
// restore sync values
Logger("Back into original settings once.");
if (notice != null) notice.hide();
this.syncHandler = this.cancelHandler(this.syncHandler);
this._openReplication(this.originalSetting, keepAlive, showResult, callback, false);
}
}
})
.on("complete", (e) => {
this.syncStatus = "COMPLETED";
@ -948,8 +981,25 @@ export class LocalPouchDB {
this.syncHandler = this.cancelHandler(this.syncHandler);
this.updateInfo();
if (notice != null) notice.hide();
Logger("Replication error", LOG_LEVEL.NOTICE);
Logger(e);
if (getLastPostFailedBySize()) {
if (keepAlive) {
Logger("Replication stopped.", LOG_LEVEL.NOTICE);
} else {
// Duplicate settings for smaller batch.
const xsetting: ObsidianLiveSyncSettings = JSON.parse(JSON.stringify(setting));
xsetting.batch_size = Math.ceil(xsetting.batch_size / 2);
xsetting.batches_limit = Math.ceil(xsetting.batches_limit / 2);
if (xsetting.batch_size <= 3 || xsetting.batches_limit <= 3) {
Logger("We can't replicate more lower value.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
} else {
Logger(`Retry with lower batch size:${xsetting.batch_size}/${xsetting.batches_limit}`, showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
this._openReplication(xsetting, keepAlive, showResult, callback, true);
}
}
} else {
Logger("Replication error", LOG_LEVEL.NOTICE);
Logger(e);
}
})
.on("paused", (e) => {
this.syncStatus = "PAUSED";
@ -974,7 +1024,7 @@ export class LocalPouchDB {
Logger(await db.info(), LOG_LEVEL.VERBOSE);
let replicate: PouchDB.Replication.Replication<EntryDoc>;
try {
replicate = this.localDatabase.replicate.from(db, syncOptionBase);
replicate = this.localDatabase.replicate.from(db, { checkpoint: "target", ...syncOptionBase });
replicate
.on("active", () => {
this.syncStatus = "CONNECTED";

View File

@ -8,11 +8,54 @@ export const isValidRemoteCouchDBURI = (uri: string): boolean => {
if (uri.startsWith("http://")) return true;
return false;
};
let last_post_successed = false;
export const getLastPostFailedBySize = () => {
return !last_post_successed;
};
export const connectRemoteCouchDB = async (uri: string, auth: { username: string; password: string }): Promise<string | { db: PouchDB.Database<EntryDoc>; info: PouchDB.Core.DatabaseInfo }> => {
if (!isValidRemoteCouchDBURI(uri)) return "Remote URI is not valid";
const db: PouchDB.Database<EntryDoc> = new PouchDB<EntryDoc>(uri, {
const conf: PouchDB.HttpAdapter.HttpAdapterConfiguration = {
adapter: "http",
auth,
});
fetch: async function (url: string | Request, opts: RequestInit) {
let size_ok = true;
let size = "";
const localURL = url.toString().substring(uri.length);
const method = opts.method ?? "GET";
if (opts.body) {
const opts_length = opts.body.toString().length;
if (opts_length > 1024 * 1024 * 10) {
// over 10MB
size_ok = false;
if (uri.contains(".cloudantnosqldb.")) {
last_post_successed = false;
Logger("This request should fail on IBM Cloudant.", LOG_LEVEL.VERBOSE);
throw new Error("This request should fail on IBM Cloudant.");
}
}
size = ` (${opts_length})`;
}
try {
const responce: Response = await fetch(url, opts);
if (method == "POST" || method == "PUT") {
last_post_successed = responce.ok;
} else {
last_post_successed = true;
}
Logger(`HTTP:${method}${size} to:${localURL} -> ${responce.status}`, LOG_LEVEL.VERBOSE);
return responce;
} catch (ex) {
Logger(`HTTP:${method}${size} to:${localURL} -> failed`, LOG_LEVEL.VERBOSE);
if (!size_ok && (method == "POST" || method == "PUT")) {
last_post_successed = false;
}
Logger(ex);
throw ex;
}
// return await fetch(url, opts);
},
};
const db: PouchDB.Database<EntryDoc> = new PouchDB<EntryDoc>(uri, conf);
try {
const info = await db.info();
return { db: db, info: info };