1
0
mirror of https://github.com/vrtmrz/obsidian-livesync.git synced 2025-01-17 18:26:11 +02:00
- Now, we can synchronise hidden files that conflicted on each devices.
Enhanced:
- We can search for conflicting docs.
- Pending processes can now be run at any time.
- Performance improved on synchronising large numbers of files at once.
This commit is contained in:
vorotamoroz 2022-08-09 17:10:08 +09:00
parent 640c2a91f7
commit 8f583e3680
4 changed files with 220 additions and 103 deletions

View File

@ -1,7 +1,7 @@
{
"id": "obsidian-livesync",
"name": "Self-hosted LiveSync",
"version": "0.13.3",
"version": "0.13.4",
"minAppVersion": "0.9.12",
"description": "Community implementation of self-hosted livesync. Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.",
"author": "vorotamoroz",

View File

@ -1,6 +1,6 @@
{
"name": "obsidian-livesync",
"version": "0.13.3",
"version": "0.13.4",
"description": "Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.",
"main": "main.js",
"type": "module",

View File

@ -29,7 +29,7 @@ import { DocumentHistoryModal } from "./DocumentHistoryModal";
import { clearAllPeriodic, clearAllTriggers, disposeMemoObject, id2path, memoIfNotExist, memoObject, path2id, retriveMemoObject, setTrigger } from "./utils";
import { clearAllPeriodic, clearAllTriggers, clearTrigger, disposeMemoObject, id2path, memoIfNotExist, memoObject, path2id, retriveMemoObject, setTrigger } from "./utils";
import { decrypt, encrypt } from "./lib/src/e2ee_v2";
const isDebug = false;
@ -174,6 +174,45 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
this.showHistory(target);
}
}
async pickFileForResolve() {
const pageLimit = 1000;
let nextKey = "";
const notes: { path: string, mtime: number }[] = [];
do {
const docs = await this.localDatabase.localDatabase.allDocs({ limit: pageLimit, startkey: nextKey, conflicts: true, include_docs: true });
nextKey = "";
for (const row of docs.rows) {
const doc = row.doc;
nextKey = `${row.id}\u{10ffff}`;
if (!("_conflicts" in doc)) continue;
if (isInteralChunk(row.id)) continue;
if (doc._deleted) continue;
if ("deleted" in doc && doc.deleted) continue;
if (doc.type == "newnote" || doc.type == "plain") {
// const docId = doc._id.startsWith("i:") ? doc._id.substring("i:".length) : doc._id;
notes.push({ path: id2path(doc._id), mtime: doc.mtime });
}
if (isChunk(nextKey)) {
// skip the chunk zone.
nextKey = CHeaderEnd;
}
}
} while (nextKey != "");
notes.sort((a, b) => b.mtime - a.mtime);
const notesList = notes.map(e => e.path);
if (notesList.length == 0) {
Logger("There are no conflicted documents", LOG_LEVEL.NOTICE);
return;
}
const target = await askSelectString(this.app, "File to view History", notesList);
if (target) {
if (isInteralChunk(target)) {
//NOP
} else {
await this.showIfConflicted(this.app.vault.getAbstractFileByPath(target) as TFile);
}
}
}
async onload() {
setLogger(this.addLog.bind(this)); // Logger moved to global.
@ -492,6 +531,20 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
callback: () => {
this.fileHistory();
},
});
this.addCommand({
id: "livesync-conflictcheck",
name: "Pick a file to resolive conflict",
callback: () => {
this.pickFileForResolve();
},
})
this.addCommand({
id: "livesync-runbatch",
name: "Run pending batch processes",
callback: async () => {
await this.applyBatchChange();
},
})
}
@ -997,8 +1050,8 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
return;
}
if (docEntry._deleted || docEntry.deleted) {
//basically pass.
//but if there are no docs left, delete file.
// This occurs not only when files are deleted, but also when conflicts are resolved.
// We have to check no other revisions are left.
const lastDocs = await this.localDatabase.getDBEntry(pathSrc);
if (lastDocs === false) {
await this.deleteVaultItem(file);
@ -1006,7 +1059,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
// it perhaps delete some revisions.
// may be we have to reload this
await this.pullFile(pathSrc, null, true);
Logger(`delete skipped:${lastDocs._id}`);
Logger(`delete skipped:${lastDocs._id}`, LOG_LEVEL.VERBOSE);
}
return;
}
@ -1064,7 +1117,37 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}
}
async handleDBChanged(change: EntryBody) {
queuedEntries: EntryBody[] = [];
handleDBChanged(change: EntryBody) {
// If queued same file, cancel previous one.
this.queuedEntries.remove(this.queuedEntries.find(e => e._id == change._id));
// If the file is opened, we have to apply immediately
const af = app.workspace.getActiveFile();
if (af && af.path == id2path(change._id)) {
return this.handleDBChangedAsync(change);
}
this.queuedEntries.push(change);
if (this.queuedEntries.length > 50) {
clearTrigger("dbchanged");
this.execDBchanged();
}
setTrigger("dbchanged", 500, () => this.execDBchanged());
}
async execDBchanged() {
await runWithLock("dbchanged", false, async () => {
const w = [...this.queuedEntries];
this.queuedEntries = [];
Logger(`Applyng ${w.length} files`);
for (const entry of w) {
Logger(`Applying ${entry._id} (${entry._rev}) change...`, LOG_LEVEL.VERBOSE);
await this.handleDBChangedAsync(entry);
Logger(`Applied ${entry._id} (${entry._rev}) change...`);
}
}
);
}
async handleDBChangedAsync(change: EntryBody) {
const targetFile = this.app.vault.getAbstractFileByPath(id2path(change._id));
if (targetFile == null) {
if (change._deleted || change.deleted) {
@ -1113,36 +1196,48 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}
}
}
async procQueuedFiles() {
await runWithLock("procQueue", false, async () => {
this.saveQueuedFiles();
for (const queue of this.queuedFiles) {
if (queue.done) continue;
const now = new Date().getTime();
if (queue.missingChildren.length == 0) {
queue.done = true;
if (isInteralChunk(queue.entry._id)) {
//system file
const filename = id2path(id2filenameInternalChunk(queue.entry._id));
Logger(`Applying hidden file, ${queue.entry._id} (${queue.entry._rev}) change...`);
await this.syncInternalFilesAndDatabase("pull", false, false, [filename])
Logger(`Applied hidden file, ${queue.entry._id} (${queue.entry._rev}) change...`);
}
if (isValidPath(id2path(queue.entry._id))) {
Logger(`Applying ${queue.entry._id} (${queue.entry._rev}) change...`);
await this.handleDBChanged(queue.entry);
Logger(`Applied ${queue.entry._id} (${queue.entry._rev})`);
}
} else if (now > queue.timeout) {
if (!queue.warned) Logger(`Timed out: ${queue.entry._id} could not collect ${queue.missingChildren.length} chunks. plugin keeps watching, but you have to check the file after the replication.`, LOG_LEVEL.NOTICE);
queue.warned = true;
continue;
}
}
this.queuedFiles = this.queuedFiles.filter((e) => !e.done);
this.saveQueuedFiles();
procInternalFiles: string[] = [];
async execInternalFile() {
await runWithLock("execinternal", false, async () => {
const w = [...this.procInternalFiles];
this.procInternalFiles = [];
Logger(`Applying hidden ${w.length} files change...`);
await this.syncInternalFilesAndDatabase("pull", false, false, w);
Logger(`Applying hidden ${w.length} files changed`);
});
}
procInternalFile(filename: string) {
this.procInternalFiles.push(filename);
setTrigger("procInternal", 500, async () => {
await this.execInternalFile();
});
}
procQueuedFiles() {
this.saveQueuedFiles();
for (const queue of this.queuedFiles) {
if (queue.done) continue;
const now = new Date().getTime();
if (queue.missingChildren.length == 0) {
queue.done = true;
if (isInteralChunk(queue.entry._id)) {
//system file
const filename = id2path(id2filenameInternalChunk(queue.entry._id));
// await this.syncInternalFilesAndDatabase("pull", false, false, [filename])
this.procInternalFile(filename);
}
if (isValidPath(id2path(queue.entry._id))) {
this.handleDBChanged(queue.entry);
}
} else if (now > queue.timeout) {
if (!queue.warned) Logger(`Timed out: ${queue.entry._id} could not collect ${queue.missingChildren.length} chunks. plugin keeps watching, but you have to check the file after the replication.`, LOG_LEVEL.NOTICE);
queue.warned = true;
continue;
}
}
this.queuedFiles = this.queuedFiles.filter((e) => !e.done);
this.saveQueuedFiles();
}
parseIncomingChunk(chunk: PouchDB.Core.ExistingDocument<EntryDoc>) {
const now = new Date().getTime();
let isNewFileCompleted = false;
@ -2328,7 +2423,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
const content = await arrayBufferToBase64(contentBin);
if (content == fileOnDB.data && !force) {
// Logger(`STORAGE <-- DB:${filename}: skipped (hidden) Not changed`, LOG_LEVEL.VERBOSE);
return false;
return true;
}
await this.app.vault.adapter.writeBinary(filename, base64ToArrayBuffer(fileOnDB.data), { mtime: fileOnDB.mtime, ctime: fileOnDB.ctime });
Logger(`STORAGE <-- DB:${filename}: written (hidden, overwrite${force ? ", force" : ""})`);
@ -2353,8 +2448,46 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}
confirmPopup: WrappedNotice = null;
async resolveConflictOnInternalFiles() {
// Scan all conflicted internal files
const docs = await this.localDatabase.localDatabase.allDocs({ startkey: ICHeader, endkey: ICHeaderEnd, conflicts: true, include_docs: true });
for (const row of docs.rows) {
const doc = row.doc;
if (!("_conflicts" in doc)) continue;
if (isInteralChunk(row.id)) {
await this.resolveConflictOnInternalFile(row.id);
}
}
}
async resolveConflictOnInternalFile(id: string): Promise<boolean> {
// Retrieve data
const doc = await this.localDatabase.localDatabase.get(id, { conflicts: true });
// If there is no conflict, return with false.
if (!("_conflicts" in doc)) return false;
if (doc._conflicts.length == 0) return false;
Logger(`Hidden file conflicetd:${id2filenameInternalChunk(id)}`);
const revA = doc._rev;
const revB = doc._conflicts[0];
const revBdoc = await this.localDatabase.localDatabase.get(id, { rev: revB });
// determine which revision sould been deleted.
// simply check modified time
const mtimeA = ("mtime" in doc && doc.mtime) || 0;
const mtimeB = ("mtime" in revBdoc && revBdoc.mtime) || 0;
// Logger(`Revisions:${new Date(mtimeA).toLocaleString} and ${new Date(mtimeB).toLocaleString}`);
// console.log(`mtime:${mtimeA} - ${mtimeB}`);
const delRev = mtimeA < mtimeB ? revA : revB;
// delete older one.
await this.localDatabase.localDatabase.remove(id, delRev);
Logger(`Older one has been deleted:${id2filenameInternalChunk(id)}`);
// check the file again
return this.resolveConflictOnInternalFile(id);
}
//TODO: Tidy up. Even though it is experimental feature, So dirty...
async syncInternalFilesAndDatabase(direction: "push" | "pull" | "safe", showMessage: boolean, files: InternalFileInfo[] | false = false, targetFiles: string[] | false = false) {
await this.resolveConflictOnInternalFiles();
const logLevel = showMessage ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO;
Logger("Scanning hidden files.", logLevel, "sync_internal");
const ignorePatterns = this.settings.syncInternalFilesIgnorePatterns.toLocaleLowerCase()
@ -2409,73 +2542,52 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
const fileOnStorage = files.find(e => e.path == filename);
const fileOnDatabase = filesOnDB.find(e => e._id == filename2idInternalChunk(id2path(filename)));
// TODO: Fix this somehow smart.
let proc: Promise<void> | null;
if (fileOnStorage && fileOnDatabase) {
// Both => Synchronize
const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 };
if (fileOnDatabase.mtime == cache.docMtime && fileOnStorage.mtime == cache.storageMtime) {
continue;
}
const nw = compareMTime(fileOnStorage.mtime, fileOnDatabase.mtime);
if (nw == 0) continue;
if (nw > 0) {
proc = (async (fileOnStorage) => {
await this.storeInternaFileToDatabase(fileOnStorage);
cache.docMtime = fileOnDatabase.mtime;
cache.storageMtime = fileOnStorage.mtime;
caches[filename] = cache;
})(fileOnStorage);
}
if (nw < 0) {
proc = (async (filename) => {
if (await this.extractInternaFileFromDatabase(filename)) {
cache.docMtime = fileOnDatabase.mtime;
cache.storageMtime = fileOnStorage.mtime;
caches[filename] = cache;
countUpdatedFolder(filename);
}
})(filename);
}
} else if (!fileOnStorage && fileOnDatabase) {
if (direction == "push") {
if (fileOnDatabase.deleted) {
// await this.storeInternaFileToDatabase(fileOnStorage);
} else {
proc = (async () => {
await this.deleteInternaFileOnDatabase(filename);
})();
}
} else if (direction == "pull") {
proc = (async () => {
if (await this.extractInternaFileFromDatabase(filename)) {
countUpdatedFolder(filename);
}
})();
} else if (direction == "safe") {
if (fileOnDatabase.deleted) {
// await this.storeInternaFileToDatabase(fileOnStorage);
} else {
proc = (async () => {
if (await this.extractInternaFileFromDatabase(filename)) {
countUpdatedFolder(filename);
}
})();
}
}
} else if (fileOnStorage && !fileOnDatabase) {
proc = (async () => {
await this.storeInternaFileToDatabase(fileOnStorage);
})();
} else {
throw new Error("Invalid state on hidden file sync");
// Something corrupted?
const addProc = (p: () => Promise<void>): Promise<unknown> => {
return p();
}
if (proc) p.add(proc);
const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 };
p.add(addProc(async () => {
if (fileOnStorage && fileOnDatabase) {
// Both => Synchronize
if (fileOnDatabase.mtime == cache.docMtime && fileOnStorage.mtime == cache.storageMtime) {
return;
}
const nw = compareMTime(fileOnStorage.mtime, fileOnDatabase.mtime);
if (nw > 0) {
await this.storeInternaFileToDatabase(fileOnStorage);
}
if (nw < 0) {
// skip if not extraction performed.
if (!await this.extractInternaFileFromDatabase(filename)) return;
}
// If process successfly updated or file contents are same, update cache.
cache.docMtime = fileOnDatabase.mtime;
cache.storageMtime = fileOnStorage.mtime;
caches[filename] = cache;
countUpdatedFolder(filename);
} else if (!fileOnStorage && fileOnDatabase) {
console.log("pushpull")
if (direction == "push") {
if (fileOnDatabase.deleted) return;
await this.deleteInternaFileOnDatabase(filename);
} else if (direction == "pull") {
if (await this.extractInternaFileFromDatabase(filename)) {
countUpdatedFolder(filename);
}
} else if (direction == "safe") {
if (fileOnDatabase.deleted) return
if (await this.extractInternaFileFromDatabase(filename)) {
countUpdatedFolder(filename);
}
}
} else if (fileOnStorage && !fileOnDatabase) {
await this.storeInternaFileToDatabase(fileOnStorage);
} else {
throw new Error("Invalid state on hidden file sync");
// Something corrupted?
}
}));
await p.wait(limit);
}
await p.all();

View File

@ -9,4 +9,9 @@
#### Minors
- 0.13.1 Fixed on conflict resolution.
- 0.13.2 Fixed file deletion failures.
- 0.13.2 Fixed file deletion failures.
- 0.13.4
- Now, we can synchronise hidden files that conflicted on each devices.
- We can search for conflicting docs.
- Pending processes can now be run at any time.
- Performance improved on synchronising large numbers of files at once.