From d67cc00e4e5dd55edfb8ad82e51deb5214ced0ac Mon Sep 17 00:00:00 2001 From: Mert <101130780+mertalev@users.noreply.github.com> Date: Thu, 14 Mar 2024 01:52:30 -0400 Subject: [PATCH] feat(server): lower library scan memory usage (#7939) * use trie * update tests * formatting * pr feedback * linting --- server/package-lock.json | 27 ++++ server/package.json | 1 + .../domain/library/library.service.spec.ts | 19 ++- server/src/domain/library/library.service.ts | 123 ++++++++++-------- .../domain/repositories/storage.repository.ts | 1 + .../infra/repositories/filesystem.provider.ts | 33 ++++- .../repositories/storage.repository.mock.ts | 1 + 7 files changed, 143 insertions(+), 62 deletions(-) diff --git a/server/package-lock.json b/server/package-lock.json index deab0b7919..1808d316a1 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -47,6 +47,7 @@ "js-yaml": "^4.1.0", "lodash": "^4.17.21", "luxon": "^3.4.2", + "mnemonist": "^0.39.8", "nest-commander": "^3.11.1", "nestjs-otel": "^5.1.5", "node-addon-api": "^7.0.0", @@ -10426,6 +10427,14 @@ "integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==", "dev": true }, + "node_modules/mnemonist": { + "version": "0.39.8", + "resolved": "https://registry.npmjs.org/mnemonist/-/mnemonist-0.39.8.tgz", + "integrity": "sha512-vyWo2K3fjrUw8YeeZ1zF0fy6Mu59RHokURlld8ymdUPjMlD9EC9ov1/YPqTgqRvUN9nTr3Gqfz29LYAmu0PHPQ==", + "dependencies": { + "obliterator": "^2.0.1" + } + }, "node_modules/mock-fs": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/mock-fs/-/mock-fs-5.2.0.tgz", @@ -10819,6 +10828,11 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/obliterator": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/obliterator/-/obliterator-2.0.4.tgz", + "integrity": "sha512-lgHwxlxV1qIg1Eap7LgIeoBWIMFibOjbrYPIPJZcI1mmGAI2m3lNYpK12Y+GBdPQ0U1hRwSord7GIaawz962qQ==" + }, "node_modules/obuf": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz", @@ -22037,6 +22051,14 @@ "integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==", "dev": true }, + "mnemonist": { + "version": "0.39.8", + "resolved": "https://registry.npmjs.org/mnemonist/-/mnemonist-0.39.8.tgz", + "integrity": "sha512-vyWo2K3fjrUw8YeeZ1zF0fy6Mu59RHokURlld8ymdUPjMlD9EC9ov1/YPqTgqRvUN9nTr3Gqfz29LYAmu0PHPQ==", + "requires": { + "obliterator": "^2.0.1" + } + }, "mock-fs": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/mock-fs/-/mock-fs-5.2.0.tgz", @@ -22349,6 +22371,11 @@ "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.12.3.tgz", "integrity": "sha512-geUvdk7c+eizMNUDkRpW1wJwgfOiOeHbxBR/hLXK1aT6zmVSO0jsQcs7fj6MGw89jC/cjGfLcNOrtMYtGqm81g==" }, + "obliterator": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/obliterator/-/obliterator-2.0.4.tgz", + "integrity": "sha512-lgHwxlxV1qIg1Eap7LgIeoBWIMFibOjbrYPIPJZcI1mmGAI2m3lNYpK12Y+GBdPQ0U1hRwSord7GIaawz962qQ==" + }, "obuf": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz", diff --git a/server/package.json b/server/package.json index 0acd97837d..9faa96608f 100644 --- a/server/package.json +++ b/server/package.json @@ -71,6 +71,7 @@ "js-yaml": "^4.1.0", "lodash": "^4.17.21", "luxon": "^3.4.2", + "mnemonist": "^0.39.8", "nest-commander": "^3.11.1", "nestjs-otel": "^5.1.5", "node-addon-api": "^7.0.0", diff --git a/server/src/domain/library/library.service.spec.ts b/server/src/domain/library/library.service.spec.ts index 03042cf55a..6758b167ff 100644 --- a/server/src/domain/library/library.service.spec.ts +++ b/server/src/domain/library/library.service.spec.ts @@ -155,7 +155,10 @@ describe(LibraryService.name, () => { }; libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1); - storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']); + // eslint-disable-next-line @typescript-eslint/require-await + storageMock.walk.mockImplementation(async function* generator() { + yield '/data/user1/photo.jpg'; + }); assetMock.getLibraryAssetPaths.mockResolvedValue({ items: [], hasNextPage: false }); await sut.handleQueueAssetRefresh(mockLibraryJob); @@ -181,7 +184,10 @@ describe(LibraryService.name, () => { }; libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1); - storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']); + // eslint-disable-next-line @typescript-eslint/require-await + storageMock.walk.mockImplementation(async function* generator() { + yield '/data/user1/photo.jpg'; + }); assetMock.getLibraryAssetPaths.mockResolvedValue({ items: [], hasNextPage: false }); await sut.handleQueueAssetRefresh(mockLibraryJob); @@ -231,12 +237,11 @@ describe(LibraryService.name, () => { }; libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); - storageMock.crawl.mockResolvedValue([]); assetMock.getLibraryAssetPaths.mockResolvedValue({ items: [], hasNextPage: false }); await sut.handleQueueAssetRefresh(mockLibraryJob); - expect(storageMock.crawl).toHaveBeenCalledWith({ + expect(storageMock.walk).toHaveBeenCalledWith({ pathsToCrawl: [libraryStub.externalLibraryWithImportPaths1.importPaths[1]], exclusionPatterns: [], }); @@ -250,7 +255,6 @@ describe(LibraryService.name, () => { }; libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1); - storageMock.crawl.mockResolvedValue([]); assetMock.getLibraryAssetPaths.mockResolvedValue({ items: [assetStub.image], hasNextPage: false, @@ -271,7 +275,10 @@ describe(LibraryService.name, () => { }; libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1); - storageMock.crawl.mockResolvedValue([assetStub.offline.originalPath]); + // eslint-disable-next-line @typescript-eslint/require-await + storageMock.walk.mockImplementation(async function* generator() { + yield assetStub.offline.originalPath; + }); assetMock.getLibraryAssetPaths.mockResolvedValue({ items: [assetStub.offline], hasNextPage: false, diff --git a/server/src/domain/library/library.service.ts b/server/src/domain/library/library.service.ts index 25894c9b5a..b354c71cd7 100644 --- a/server/src/domain/library/library.service.ts +++ b/server/src/domain/library/library.service.ts @@ -1,6 +1,7 @@ -import { AssetType, LibraryType } from '@app/infra/entities'; +import { AssetType, LibraryEntity, LibraryType } from '@app/infra/entities'; import { ImmichLogger } from '@app/infra/logger'; import { BadRequestException, Inject, Injectable } from '@nestjs/common'; +import { Trie } from 'mnemonist'; import { R_OK } from 'node:constants'; import { EventEmitter } from 'node:events'; import { Stats } from 'node:fs'; @@ -11,7 +12,6 @@ import { AuthDto } from '../auth'; import { mimeTypes } from '../domain.constant'; import { handlePromiseError, usePagination, validateCronExpression } from '../domain.util'; import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job'; - import { DatabaseLock, IAccessRepository, @@ -39,6 +39,8 @@ import { mapLibrary, } from './library.dto'; +const LIBRARY_SCAN_BATCH_SIZE = 5000; + @Injectable() export class LibraryService extends EventEmitter { readonly logger = new ImmichLogger(LibraryService.name); @@ -626,6 +628,69 @@ export class LibraryService extends EventEmitter { this.logger.verbose(`Refreshing library: ${job.id}`); + const crawledAssetPaths = await this.getPathTrie(library); + this.logger.debug(`Found ${crawledAssetPaths.size} asset(s) when crawling import paths ${library.importPaths}`); + + const assetIdsToMarkOffline = []; + const assetIdsToMarkOnline = []; + const pagination = usePagination(LIBRARY_SCAN_BATCH_SIZE, (pagination) => + this.assetRepository.getLibraryAssetPaths(pagination, library.id), + ); + + const shouldScanAll = job.refreshAllFiles || job.refreshModifiedFiles; + for await (const page of pagination) { + for (const asset of page) { + const isOffline = !crawledAssetPaths.has(asset.originalPath); + if (isOffline && !asset.isOffline) { + assetIdsToMarkOffline.push(asset.id); + } + + if (!isOffline && asset.isOffline) { + assetIdsToMarkOnline.push(asset.id); + } + + if (!shouldScanAll) { + crawledAssetPaths.delete(asset.originalPath); + } + } + } + + if (assetIdsToMarkOffline.length > 0) { + this.logger.debug(`Found ${assetIdsToMarkOffline.length} offline asset(s) previously marked as online`); + await this.assetRepository.updateAll(assetIdsToMarkOffline, { isOffline: true }); + } + + if (assetIdsToMarkOnline.length > 0) { + this.logger.debug(`Found ${assetIdsToMarkOnline.length} online asset(s) previously marked as offline`); + await this.assetRepository.updateAll(assetIdsToMarkOnline, { isOffline: false }); + } + + if (crawledAssetPaths.size > 0) { + if (!shouldScanAll) { + this.logger.debug(`Will import ${crawledAssetPaths.size} new asset(s)`); + } + + const batch = []; + for (const assetPath of crawledAssetPaths) { + batch.push(assetPath); + + if (batch.length >= LIBRARY_SCAN_BATCH_SIZE) { + await this.scanAssets(job.id, batch, library.ownerId, job.refreshAllFiles ?? false); + batch.length = 0; + } + } + + if (batch.length > 0) { + await this.scanAssets(job.id, batch, library.ownerId, job.refreshAllFiles ?? false); + } + } + + await this.repository.update({ id: job.id, refreshedAt: new Date() }); + + return true; + } + + private async getPathTrie(library: LibraryEntity): Promise> { const pathValidation = await Promise.all( library.importPaths.map(async (importPath) => await this.validateImportPath(importPath)), ); @@ -640,61 +705,17 @@ export class LibraryService extends EventEmitter { .filter((validation) => validation.isValid) .map((validation) => validation.importPath); - let rawPaths = await this.storageRepository.crawl({ + const generator = this.storageRepository.walk({ pathsToCrawl: validImportPaths, exclusionPatterns: library.exclusionPatterns, }); - const crawledAssetPaths = new Set(rawPaths); - const shouldScanAll = job.refreshAllFiles || job.refreshModifiedFiles; - let pathsToScan: string[] = shouldScanAll ? rawPaths : []; - rawPaths = []; - - this.logger.debug(`Found ${crawledAssetPaths.size} asset(s) when crawling import paths ${library.importPaths}`); - - const assetIdsToMarkOffline = []; - const assetIdsToMarkOnline = []; - const pagination = usePagination(5000, (pagination) => - this.assetRepository.getLibraryAssetPaths(pagination, library.id), - ); - - for await (const page of pagination) { - for (const asset of page) { - const isOffline = !crawledAssetPaths.has(asset.originalPath); - if (isOffline && !asset.isOffline) { - assetIdsToMarkOffline.push(asset.id); - } - - if (!isOffline && asset.isOffline) { - assetIdsToMarkOnline.push(asset.id); - } - - crawledAssetPaths.delete(asset.originalPath); - } + const trie = new Trie(); + for await (const filePath of generator) { + trie.add(filePath); } - if (assetIdsToMarkOffline.length > 0) { - this.logger.debug(`Found ${assetIdsToMarkOffline.length} offline asset(s) previously marked as online`); - await this.assetRepository.updateAll(assetIdsToMarkOffline, { isOffline: true }); - } - - if (assetIdsToMarkOnline.length > 0) { - this.logger.debug(`Found ${assetIdsToMarkOnline.length} online asset(s) previously marked as offline`); - await this.assetRepository.updateAll(assetIdsToMarkOnline, { isOffline: false }); - } - - if (!shouldScanAll) { - pathsToScan = [...crawledAssetPaths]; - this.logger.debug(`Will import ${pathsToScan.length} new asset(s)`); - } - - if (pathsToScan.length > 0) { - await this.scanAssets(job.id, pathsToScan, library.ownerId, job.refreshAllFiles ?? false); - } - - await this.repository.update({ id: job.id, refreshedAt: new Date() }); - - return true; + return trie; } private async findOrFail(id: string) { diff --git a/server/src/domain/repositories/storage.repository.ts b/server/src/domain/repositories/storage.repository.ts index f4f8cab7b9..a052596c02 100644 --- a/server/src/domain/repositories/storage.repository.ts +++ b/server/src/domain/repositories/storage.repository.ts @@ -53,6 +53,7 @@ export interface IStorageRepository { readdir(folder: string): Promise; stat(filepath: string): Promise; crawl(crawlOptions: CrawlOptionsDto): Promise; + walk(crawlOptions: CrawlOptionsDto): AsyncGenerator; copyFile(source: string, target: string): Promise; rename(source: string, target: string): Promise; watch(paths: string[], options: WatchOptions, events: Partial): () => Promise; diff --git a/server/src/infra/repositories/filesystem.provider.ts b/server/src/infra/repositories/filesystem.provider.ts index 386ee5d718..c4f577ed25 100644 --- a/server/src/infra/repositories/filesystem.provider.ts +++ b/server/src/infra/repositories/filesystem.provider.ts @@ -11,7 +11,7 @@ import { import { ImmichLogger } from '@app/infra/logger'; import archiver from 'archiver'; import chokidar, { WatchOptions } from 'chokidar'; -import { glob } from 'fast-glob'; +import { glob, globStream } from 'fast-glob'; import { constants, createReadStream, existsSync, mkdirSync } from 'node:fs'; import fs from 'node:fs/promises'; import path from 'node:path'; @@ -141,10 +141,7 @@ export class FilesystemProvider implements IStorageRepository { return Promise.resolve([]); } - const base = pathsToCrawl.length === 1 ? pathsToCrawl[0] : `{${pathsToCrawl.join(',')}}`; - const extensions = `*{${mimeTypes.getSupportedFileExtensions().join(',')}}`; - - return glob(`${base}/**/${extensions}`, { + return glob(this.asGlob(pathsToCrawl), { absolute: true, caseSensitiveMatch: false, onlyFiles: true, @@ -153,6 +150,26 @@ export class FilesystemProvider implements IStorageRepository { }); } + async *walk(crawlOptions: CrawlOptionsDto): AsyncGenerator { + const { pathsToCrawl, exclusionPatterns, includeHidden } = crawlOptions; + if (pathsToCrawl.length === 0) { + async function* emptyGenerator() {} + return emptyGenerator(); + } + + const stream = globStream(this.asGlob(pathsToCrawl), { + absolute: true, + caseSensitiveMatch: false, + onlyFiles: true, + dot: includeHidden, + ignore: exclusionPatterns, + }); + + for await (const value of stream) { + yield value as string; + } + } + watch(paths: string[], options: WatchOptions, events: Partial) { const watcher = chokidar.watch(paths, options); @@ -164,4 +181,10 @@ export class FilesystemProvider implements IStorageRepository { return () => watcher.close(); } + + private asGlob(pathsToCrawl: string[]): string { + const base = pathsToCrawl.length === 1 ? pathsToCrawl[0] : `{${pathsToCrawl.join(',')}}`; + const extensions = `*{${mimeTypes.getSupportedFileExtensions().join(',')}}`; + return `${base}/**/${extensions}`; + } } diff --git a/server/test/repositories/storage.repository.mock.ts b/server/test/repositories/storage.repository.mock.ts index e0b244fc2d..a8ffbf4105 100644 --- a/server/test/repositories/storage.repository.mock.ts +++ b/server/test/repositories/storage.repository.mock.ts @@ -56,6 +56,7 @@ export const newStorageRepositoryMock = (reset = true): jest.Mocked