diff --git a/src/lib b/src/lib index 3ca6237..5c01ce3 160000 --- a/src/lib +++ b/src/lib @@ -1 +1 @@ -Subproject commit 3ca623780c1c26b153fd663e997b53e32c4de021 +Subproject commit 5c01ce3262b285c79473b61cc524c51183d13af6 diff --git a/src/main.ts b/src/main.ts index eb2d0c2..4b400d2 100644 --- a/src/main.ts +++ b/src/main.ts @@ -17,8 +17,8 @@ import { setNoticeClass, NewNotice, getLocks, - Parallels, WrappedNotice, + Semaphore, } from "./lib/src/utils"; import { Logger, setLogger } from "./lib/src/logger"; import { LocalPouchDB } from "./LocalPouchDB"; @@ -784,27 +784,25 @@ export default class ObsidianLiveSyncPlugin extends Plugin { return await runWithLock("batchSave", false, async () => { const batchItems = JSON.parse(JSON.stringify(this.batchFileChange)) as string[]; this.batchFileChange = []; - const limit = 3; - const p = Parallels(); + const semaphore = Semaphore(3); - for (const e of batchItems) { - const w = (async () => { - try { - const f = this.app.vault.getAbstractFileByPath(normalizePath(e)); - if (f && f instanceof TFile) { - await this.updateIntoDB(f); - Logger(`Batch save:${e}`); - } - } catch (ex) { - Logger(`Batch save error:${e}`, LOG_LEVEL.NOTICE); - Logger(ex, LOG_LEVEL.VERBOSE); + const batchProcesses = batchItems.map(e => (async (e) => { + const releaser = await semaphore.acquire(1, "batch"); + try { + const f = this.app.vault.getAbstractFileByPath(normalizePath(e)); + if (f && f instanceof TFile) { + await this.updateIntoDB(f); + Logger(`Batch save:${e}`); } - })(); - p.add(w); - await p.wait(limit) - } - this.refreshStatusText(); - await p.all(); + } catch (ex) { + Logger(`Batch save error:${e}`, LOG_LEVEL.NOTICE); + Logger(ex, LOG_LEVEL.VERBOSE); + } finally { + releaser(); + } + })(e)) + await Promise.all(batchProcesses); + this.refreshStatusText(); return; }); @@ -1618,20 +1616,17 @@ export default class ObsidianLiveSyncPlugin extends Plugin { const count = objects.length; Logger(procedurename); let i = 0; - const p = Parallels(); - const limit = 10; + const semaphore = Semaphore(10); Logger(`${procedurename} exec.`); - for (const v of objects) { - if (!this.localDatabase.isReady) throw Error("Database is not ready!"); - const addProc = (p: () => Promise): Promise => { - return p(); - } - p.add(addProc(async () => { - try { - await callback(v); + if (!this.localDatabase.isReady) throw Error("Database is not ready!"); + const procs = objects.map(e => (async (v) => { + const releaser = await semaphore.acquire(1, procedurename); + + try { + await callback(v); i++; - if (i % 50 == 0) { + if (i % 50 == 0) { const notify = `${procedurename} : ${i}/${count}`; if (showingNotice) { Logger(notify, LOG_LEVEL.NOTICE, "syncAll"); @@ -1640,14 +1635,16 @@ export default class ObsidianLiveSyncPlugin extends Plugin { } this.setStatusBarText(notify); } - } catch (ex) { + } catch (ex) { Logger(`Error while ${procedurename}`, LOG_LEVEL.NOTICE); Logger(ex); - } - })); - await p.wait(limit); + } finally { + releaser(); + } } - await p.all(); + )(e)); + await Promise.all(procs); + Logger(`${procedurename} done.`); }; @@ -2545,8 +2542,6 @@ export default class ObsidianLiveSyncPlugin extends Plugin { const fileCount = allFileNames.length; let processed = 0; let filesChanged = 0; - const p = Parallels(); - const limit = 10; // count updated files up as like this below: // .obsidian: 2 // .obsidian/workspace: 1 @@ -2569,6 +2564,8 @@ export default class ObsidianLiveSyncPlugin extends Plugin { c = pieces.shift(); } } + const p = [] as Promise[]; + const semaphore = Semaphore(15); // Cache update time information for files which have already been processed (mainly for files that were skipped due to the same content) let caches: { [key: string]: { storageMtime: number; docMtime: number } } = {}; caches = await this.localDatabase.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number } }>("diff-caches-internal") || {}; @@ -2579,12 +2576,20 @@ export default class ObsidianLiveSyncPlugin extends Plugin { const fileOnStorage = files.find(e => e.path == filename); const fileOnDatabase = filesOnDB.find(e => e._id == filename2idInternalChunk(id2path(filename))); - const addProc = (p: () => Promise): Promise => { - return p(); + const addProc = async (p: () => Promise): Promise => { + const releaser = await semaphore.acquire(1); + try { + return p(); + } catch (ex) { + Logger("Some process failed", logLevel) + Logger(ex); + } finally { + releaser(); + } } const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 }; - p.add(addProc(async () => { + p.push(addProc(async () => { if (fileOnStorage && fileOnDatabase) { // Both => Synchronize if (fileOnDatabase.mtime == cache.docMtime && fileOnStorage.mtime == cache.storageMtime) { @@ -2624,9 +2629,8 @@ export default class ObsidianLiveSyncPlugin extends Plugin { // Something corrupted? } })); - await p.wait(limit); } - await p.all(); + await Promise.all(p); await this.localDatabase.kvDB.set("diff-caches-internal", caches); // When files has been retreived from the database. they must be reloaded. diff --git a/tsconfig.json b/tsconfig.json index 8f90892..ec8886b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,8 +9,19 @@ // "importsNotUsedAsValues": "error", "importHelpers": false, "alwaysStrict": true, - "lib": ["es2018", "DOM", "ES5", "ES6", "ES7"] + "lib": [ + "es2018", + "DOM", + "ES5", + "ES6", + "ES7", + "es2019.array" + ] }, - "include": ["**/*.ts"], - "exclude": ["pouchdb-browser-webpack"] -} + "include": [ + "**/*.ts" + ], + "exclude": [ + "pouchdb-browser-webpack" + ] +} \ No newline at end of file