1
0
mirror of https://github.com/immich-app/immich.git synced 2024-11-28 09:33:27 +02:00

fix(server): don't crash when refreshing large libraries (#7934)

* add job to check for offline files

* fix lint

* only check for offline when using checkForOffline

* improve tests

* remove old test

* wip

* remove trie

* refactor batches

* also check offline status

* fix spelling

* don't do offline scan

* rename scan to check

* fix job statuses

* fix lint

* cleanup

* add test

* open-api

* fix test

* fix spinner

* reset text

* don't double batch

* fix comments from mert

* remove tries

* fix tests

* fix e2e

* fix test

* fix test

* add tests

* fix lint

* fix e2e

* interweave scans

* fix errors

* fix messages

* fix test

* add mock

* fix sql

* fix e2e

* use library batch size

* save -> update

* add file extensions

* update specs

* test for import paths

* check import paths when testing offline

* fix lint

* normalize import path

* remove console logs

* decrease batch size to 1000

* add test for import path

* add test for already-online assets

* fix merge

* fix lint

* add library job back

* add offline job to correct queue

* library spec compiles now

* move one test to new e2e

* fix comments

* fix comments

* fix lint

* refactor path validation

* fix loop bug

* remove logging

* expect responses

* fix asset mock

* take the straightforward approach

* use generator correctly

* fix vitest on file edit

* bump vitest to 1.6.0

* test for offline check

* add e2e tests for offlining assets depending on import path

* cleanup e2e test after finish

* cleanup library service

* paginate the walk generator

* fix tests

* fix typo

* refactoring handleOfflineCheck

* better testing of handleOfflineCheck

* fix lint

* handle large library deletions

* dont check if library is deleted

* fix mock

* add a 100k page size to library

* fix loading animation

* better log messages

* Better logging for offline asset removal

* fix sql and tests

* fix number format

* Remove submodule

* fix format

* chore: cleanup

* chore: fix tests

---------

Co-authored-by: Alex <alex.tran1502@gmail.com>
Co-authored-by: Jason Rasmussen <jason@rasm.me>
This commit is contained in:
Jonathan Jogenfors 2024-08-28 19:05:48 +02:00 committed by GitHub
parent 5811025ebd
commit 363c558db7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 364 additions and 282 deletions

View File

@ -364,7 +364,7 @@ describe('/libraries', () => {
utils.removeImageFile(`${testAssetDir}/temp/directoryA/assetB.png`);
});
it('should offline missing files', async () => {
it('should offline a file missing from disk', async () => {
utils.createImageFile(`${testAssetDir}/temp/directoryA/assetB.png`);
const library = await utils.createLibrary(admin.accessToken, {
ownerId: admin.userId,
@ -391,6 +391,43 @@ describe('/libraries', () => {
);
});
it('should offline a file outside of import paths', async () => {
utils.createImageFile(`${testAssetDir}/temp/directoryA/assetB.png`);
utils.createImageFile(`${testAssetDir}/temp/directoryB/assetC.png`);
const library = await utils.createLibrary(admin.accessToken, {
ownerId: admin.userId,
importPaths: [`${testAssetDirInternal}/temp`],
});
await scan(admin.accessToken, library.id);
await utils.waitForQueueFinish(admin.accessToken, 'library');
await request(app)
.put(`/libraries/${library.id}`)
.set('Authorization', `Bearer ${admin.accessToken}`)
.send({ importPaths: [`${testAssetDirInternal}/temp/directoryA`] });
await scan(admin.accessToken, library.id);
await utils.waitForQueueFinish(admin.accessToken, 'library');
const { assets } = await utils.metadataSearch(admin.accessToken, { libraryId: library.id });
expect(assets.items).toEqual(
expect.arrayContaining([
expect.objectContaining({
isOffline: false,
originalFileName: 'assetB.png',
}),
expect.objectContaining({
isOffline: true,
originalFileName: 'assetC.png',
}),
]),
);
utils.removeImageFile(`${testAssetDir}/temp/directoryB/assetC.png`);
});
it('should not try to delete offline files', async () => {
utils.createImageFile(`${testAssetDir}/temp/offline1/assetA.png`);

View File

@ -45,7 +45,6 @@
"js-yaml": "^4.1.0",
"lodash": "^4.17.21",
"luxon": "^3.4.2",
"mnemonist": "^0.39.8",
"nest-commander": "^3.11.1",
"nestjs-cls": "^4.3.0",
"nestjs-otel": "^6.0.0",
@ -10434,14 +10433,6 @@
"integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==",
"dev": true
},
"node_modules/mnemonist": {
"version": "0.39.8",
"resolved": "https://registry.npmjs.org/mnemonist/-/mnemonist-0.39.8.tgz",
"integrity": "sha512-vyWo2K3fjrUw8YeeZ1zF0fy6Mu59RHokURlld8ymdUPjMlD9EC9ov1/YPqTgqRvUN9nTr3Gqfz29LYAmu0PHPQ==",
"dependencies": {
"obliterator": "^2.0.1"
}
},
"node_modules/mock-fs": {
"version": "5.2.0",
"resolved": "https://registry.npmjs.org/mock-fs/-/mock-fs-5.2.0.tgz",
@ -10955,11 +10946,6 @@
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/obliterator": {
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/obliterator/-/obliterator-2.0.4.tgz",
"integrity": "sha512-lgHwxlxV1qIg1Eap7LgIeoBWIMFibOjbrYPIPJZcI1mmGAI2m3lNYpK12Y+GBdPQ0U1hRwSord7GIaawz962qQ=="
},
"node_modules/obuf": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",
@ -22483,14 +22469,6 @@
"integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==",
"dev": true
},
"mnemonist": {
"version": "0.39.8",
"resolved": "https://registry.npmjs.org/mnemonist/-/mnemonist-0.39.8.tgz",
"integrity": "sha512-vyWo2K3fjrUw8YeeZ1zF0fy6Mu59RHokURlld8ymdUPjMlD9EC9ov1/YPqTgqRvUN9nTr3Gqfz29LYAmu0PHPQ==",
"requires": {
"obliterator": "^2.0.1"
}
},
"mock-fs": {
"version": "5.2.0",
"resolved": "https://registry.npmjs.org/mock-fs/-/mock-fs-5.2.0.tgz",
@ -22855,11 +22833,6 @@
"resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.12.3.tgz",
"integrity": "sha512-geUvdk7c+eizMNUDkRpW1wJwgfOiOeHbxBR/hLXK1aT6zmVSO0jsQcs7fj6MGw89jC/cjGfLcNOrtMYtGqm81g=="
},
"obliterator": {
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/obliterator/-/obliterator-2.0.4.tgz",
"integrity": "sha512-lgHwxlxV1qIg1Eap7LgIeoBWIMFibOjbrYPIPJZcI1mmGAI2m3lNYpK12Y+GBdPQ0U1hRwSord7GIaawz962qQ=="
},
"obuf": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",

View File

@ -71,7 +71,6 @@
"js-yaml": "^4.1.0",
"lodash": "^4.17.21",
"luxon": "^3.4.2",
"mnemonist": "^0.39.8",
"nest-commander": "^3.11.1",
"nestjs-cls": "^4.3.0",
"nestjs-otel": "^6.0.0",

View File

@ -48,12 +48,16 @@ export class UpdateLibraryDto {
exclusionPatterns?: string[];
}
export class CrawlOptionsDto {
pathsToCrawl!: string[];
includeHidden? = false;
export interface CrawlOptionsDto {
pathsToCrawl: string[];
includeHidden?: boolean;
exclusionPatterns?: string[];
}
export interface WalkOptionsDto extends CrawlOptionsDto {
take: number;
}
export class ValidateLibraryDto {
@Optional()
@IsString({ each: true })

View File

@ -36,6 +36,7 @@ export enum WithoutProperty {
export enum WithProperty {
SIDECAR = 'sidecar',
IS_ONLINE = 'isOnline',
IS_OFFLINE = 'isOffline',
}

View File

@ -76,6 +76,7 @@ export enum JobName {
LIBRARY_SCAN = 'library-refresh',
LIBRARY_SCAN_ASSET = 'library-refresh-asset',
LIBRARY_REMOVE_OFFLINE = 'library-remove-offline',
LIBRARY_CHECK_OFFLINE = 'library-check-offline',
LIBRARY_DELETE = 'library-delete',
LIBRARY_QUEUE_SCAN_ALL = 'library-queue-all-refresh',
LIBRARY_QUEUE_CLEANUP = 'library-queue-cleanup',
@ -110,6 +111,7 @@ export enum JobName {
}
export const JOBS_ASSET_PAGINATION_SIZE = 1000;
export const JOBS_LIBRARY_PAGINATION_SIZE = 100_000;
export interface IBaseJob {
force?: boolean;
@ -129,6 +131,10 @@ export interface ILibraryFileJob extends IEntityJob {
assetPath: string;
}
export interface ILibraryOfflineJob extends IEntityJob {
importPaths: string[];
}
export interface ILibraryRefreshJob extends IEntityJob {
refreshModifiedFiles: boolean;
refreshAllFiles: boolean;
@ -264,6 +270,7 @@ export type JobItem =
| { name: JobName.LIBRARY_REMOVE_OFFLINE; data: IEntityJob }
| { name: JobName.LIBRARY_DELETE; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SCAN_ALL; data: IBaseJob }
| { name: JobName.LIBRARY_CHECK_OFFLINE; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob }
// Notification

View File

@ -12,5 +12,4 @@ export interface ILibraryRepository {
softDelete(id: string): Promise<void>;
update(library: Partial<LibraryEntity>): Promise<LibraryEntity>;
getStatistics(id: string): Promise<LibraryStatsResponseDto | undefined>;
getAssetIds(id: string, withDeleted?: boolean): Promise<string[]>;
}

View File

@ -2,7 +2,7 @@ import { WatchOptions } from 'chokidar';
import { Stats } from 'node:fs';
import { FileReadOptions } from 'node:fs/promises';
import { Readable } from 'node:stream';
import { CrawlOptionsDto } from 'src/dtos/library.dto';
import { CrawlOptionsDto, WalkOptionsDto } from 'src/dtos/library.dto';
export interface ImmichReadStream {
stream: Readable;
@ -45,8 +45,8 @@ export interface IStorageRepository {
checkDiskUsage(folder: string): Promise<DiskUsage>;
readdir(folder: string): Promise<string[]>;
stat(filepath: string): Promise<Stats>;
crawl(crawlOptions: CrawlOptionsDto): Promise<string[]>;
walk(crawlOptions: CrawlOptionsDto): AsyncGenerator<string>;
crawl(options: CrawlOptionsDto): Promise<string[]>;
walk(options: WalkOptionsDto): AsyncGenerator<string[]>;
copyFile(source: string, target: string): Promise<void>;
rename(source: string, target: string): Promise<void>;
watch(paths: string[], options: WatchOptions, events: Partial<WatchEvents>): () => Promise<void>;

View File

@ -145,14 +145,3 @@ WHERE
AND ("libraries"."deletedAt" IS NULL)
GROUP BY
"libraries"."id"
-- LibraryRepository.getAssetIds
SELECT
"assets"."id" AS "assets_id"
FROM
"libraries" "library"
INNER JOIN "assets" "assets" ON "assets"."libraryId" = "library"."id"
AND ("assets"."deletedAt" IS NULL)
WHERE
("library"."id" = $1)
AND ("library"."deletedAt" IS NULL)

View File

@ -383,7 +383,7 @@ export class AssetRepository implements IAssetRepository {
@GenerateSql(
...Object.values(WithProperty)
.filter((property) => property !== WithProperty.IS_OFFLINE)
.filter((property) => property !== WithProperty.IS_OFFLINE && property !== WithProperty.IS_ONLINE)
.map((property) => ({
name: property,
params: [DummyValue.PAGINATION, property],
@ -539,7 +539,14 @@ export class AssetRepository implements IAssetRepository {
if (!libraryId) {
throw new Error('Library id is required when finding offline assets');
}
where = [{ isOffline: true, libraryId: libraryId }];
where = [{ isOffline: true, libraryId }];
break;
}
case WithProperty.IS_ONLINE: {
if (!libraryId) {
throw new Error('Library id is required when finding online assets');
}
where = [{ isOffline: false, libraryId }];
break;
}

View File

@ -79,6 +79,7 @@ export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
[JobName.LIBRARY_SCAN_ASSET]: QueueName.LIBRARY,
[JobName.LIBRARY_SCAN]: QueueName.LIBRARY,
[JobName.LIBRARY_DELETE]: QueueName.LIBRARY,
[JobName.LIBRARY_CHECK_OFFLINE]: QueueName.LIBRARY,
[JobName.LIBRARY_REMOVE_OFFLINE]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_SCAN_ALL]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_CLEANUP]: QueueName.LIBRARY,

View File

@ -94,30 +94,6 @@ export class LibraryRepository implements ILibraryRepository {
};
}
@GenerateSql({ params: [DummyValue.UUID] })
async getAssetIds(libraryId: string, withDeleted = false): Promise<string[]> {
const builder = this.repository
.createQueryBuilder('library')
.innerJoinAndSelect('library.assets', 'assets')
.where('library.id = :id', { id: libraryId })
.select('assets.id');
if (withDeleted) {
builder.withDeleted();
}
// Return all asset paths for a given library
const rawResults = await builder.getRawMany();
const results: string[] = [];
for (const rawPath of rawResults) {
results.push(rawPath.assets_id);
}
return results;
}
private async save(library: Partial<LibraryEntity>) {
const { id } = await this.repository.save(library);
return this.repository.findOneByOrFail({ id });

View File

@ -5,7 +5,7 @@ import { escapePath, glob, globStream } from 'fast-glob';
import { constants, createReadStream, existsSync, mkdirSync } from 'node:fs';
import fs from 'node:fs/promises';
import path from 'node:path';
import { CrawlOptionsDto } from 'src/dtos/library.dto';
import { CrawlOptionsDto, WalkOptionsDto } from 'src/dtos/library.dto';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import {
DiskUsage,
@ -157,8 +157,8 @@ export class StorageRepository implements IStorageRepository {
});
}
async *walk(crawlOptions: CrawlOptionsDto): AsyncGenerator<string> {
const { pathsToCrawl, exclusionPatterns, includeHidden } = crawlOptions;
async *walk(walkOptions: WalkOptionsDto): AsyncGenerator<string[]> {
const { pathsToCrawl, exclusionPatterns, includeHidden } = walkOptions;
if (pathsToCrawl.length === 0) {
async function* emptyGenerator() {}
return emptyGenerator();
@ -172,8 +172,17 @@ export class StorageRepository implements IStorageRepository {
ignore: exclusionPatterns,
});
let batch: string[] = [];
for await (const value of stream) {
yield value as string;
batch.push(value.toString());
if (batch.length === walkOptions.take) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
}

View File

@ -8,7 +8,15 @@ import { AssetType } from 'src/enum';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { IJobRepository, ILibraryFileJob, ILibraryRefreshJob, JobName, JobStatus } from 'src/interfaces/job.interface';
import {
IJobRepository,
ILibraryFileJob,
ILibraryOfflineJob,
ILibraryRefreshJob,
JobName,
JOBS_LIBRARY_PAGINATION_SIZE,
JobStatus,
} from 'src/interfaces/job.interface';
import { ILibraryRepository } from 'src/interfaces/library.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { IStorageRepository } from 'src/interfaces/storage.interface';
@ -154,17 +162,19 @@ describe(LibraryService.name, () => {
});
describe('handleQueueAssetRefresh', () => {
it('should queue new assets', async () => {
it('should queue refresh of a new asset', async () => {
const mockLibraryJob: ILibraryRefreshJob = {
id: libraryStub.externalLibrary1.id,
refreshModifiedFiles: false,
refreshAllFiles: false,
};
assetMock.getWith.mockResolvedValue({ items: [], hasNextPage: false });
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
// eslint-disable-next-line @typescript-eslint/require-await
storageMock.walk.mockImplementation(async function* generator() {
yield '/data/user1/photo.jpg';
yield ['/data/user1/photo.jpg'];
});
assetMock.getExternalLibraryAssetPaths.mockResolvedValue({ items: [], hasNextPage: false });
@ -183,6 +193,44 @@ describe(LibraryService.name, () => {
]);
});
it('should queue offline check of existing online assets', async () => {
const mockLibraryJob: ILibraryRefreshJob = {
id: libraryStub.externalLibrary1.id,
refreshModifiedFiles: false,
refreshAllFiles: false,
};
assetMock.getWith.mockResolvedValue({ items: [], hasNextPage: false });
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
storageMock.walk.mockImplementation(async function* generator() {});
assetMock.getWith.mockResolvedValue({ items: [assetStub.external], hasNextPage: false });
await sut.handleQueueAssetRefresh(mockLibraryJob);
expect(jobMock.queueAll).toHaveBeenCalledWith([
{
name: JobName.LIBRARY_CHECK_OFFLINE,
data: {
id: assetStub.external.id,
importPaths: libraryStub.externalLibrary1.importPaths,
exclusionPatterns: [],
},
},
]);
});
it("should fail when library can't be found", async () => {
const mockLibraryJob: ILibraryRefreshJob = {
id: libraryStub.externalLibrary1.id,
refreshModifiedFiles: false,
refreshAllFiles: false,
};
libraryMock.get.mockResolvedValue(null);
await expect(sut.handleQueueAssetRefresh(mockLibraryJob)).resolves.toBe(JobStatus.SKIPPED);
});
it('should force queue new assets', async () => {
const mockLibraryJob: ILibraryRefreshJob = {
id: libraryStub.externalLibrary1.id,
@ -190,10 +238,11 @@ describe(LibraryService.name, () => {
refreshAllFiles: true,
};
assetMock.getWith.mockResolvedValue({ items: [], hasNextPage: false });
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
// eslint-disable-next-line @typescript-eslint/require-await
storageMock.walk.mockImplementation(async function* generator() {
yield '/data/user1/photo.jpg';
yield ['/data/user1/photo.jpg'];
});
assetMock.getExternalLibraryAssetPaths.mockResolvedValue({ items: [], hasNextPage: false });
@ -225,6 +274,8 @@ describe(LibraryService.name, () => {
storageMock.checkFileExists.mockResolvedValue(true);
assetMock.getWith.mockResolvedValue({ items: [], hasNextPage: false });
const mockLibraryJob: ILibraryRefreshJob = {
id: libraryStub.externalLibraryWithImportPaths1.id,
refreshModifiedFiles: false,
@ -239,51 +290,78 @@ describe(LibraryService.name, () => {
expect(storageMock.walk).toHaveBeenCalledWith({
pathsToCrawl: [libraryStub.externalLibraryWithImportPaths1.importPaths[1]],
exclusionPatterns: [],
includeHidden: false,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
});
});
it('should set missing assets offline', async () => {
const mockLibraryJob: ILibraryRefreshJob = {
id: libraryStub.externalLibrary1.id,
refreshModifiedFiles: false,
refreshAllFiles: false,
describe('handleOfflineCheck', () => {
it('should skip missing assets', async () => {
const mockAssetJob: ILibraryOfflineJob = {
id: assetStub.external.id,
importPaths: ['/'],
};
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
assetMock.getExternalLibraryAssetPaths.mockResolvedValue({
items: [assetStub.external],
hasNextPage: false,
});
assetMock.getById.mockResolvedValue(null);
await sut.handleQueueAssetRefresh(mockLibraryJob);
await expect(sut.handleOfflineCheck(mockAssetJob)).resolves.toBe(JobStatus.SKIPPED);
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.image.id], { isOffline: true });
expect(assetMock.updateAll).not.toHaveBeenCalledWith(expect.anything(), { isOffline: false });
expect(jobMock.queueAll).not.toHaveBeenCalled();
expect(assetMock.update).not.toHaveBeenCalled();
});
it('should set crawled assets that were previously offline back online', async () => {
const mockLibraryJob: ILibraryRefreshJob = {
id: libraryStub.externalLibrary1.id,
refreshModifiedFiles: false,
refreshAllFiles: false,
it('should do nothing with already-offline assets', async () => {
const mockAssetJob: ILibraryOfflineJob = {
id: assetStub.external.id,
importPaths: ['/'],
};
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
// eslint-disable-next-line @typescript-eslint/require-await
storageMock.walk.mockImplementation(async function* generator() {
yield assetStub.externalOffline.originalPath;
});
assetMock.getExternalLibraryAssetPaths.mockResolvedValue({
items: [assetStub.externalOffline],
hasNextPage: false,
});
assetMock.getById.mockResolvedValue(assetStub.offline);
await sut.handleQueueAssetRefresh(mockLibraryJob);
await expect(sut.handleOfflineCheck(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.externalOffline.id], { isOffline: false });
expect(assetMock.updateAll).not.toHaveBeenCalledWith(expect.anything(), { isOffline: true });
expect(jobMock.queueAll).not.toHaveBeenCalled();
expect(assetMock.update).not.toHaveBeenCalled();
});
it('should offline assets no longer on disk or matching exclusion pattern', async () => {
const mockAssetJob: ILibraryOfflineJob = {
id: assetStub.external.id,
importPaths: ['/'],
};
assetMock.getById.mockResolvedValue(assetStub.external);
await expect(sut.handleOfflineCheck(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
expect(assetMock.update).toHaveBeenCalledWith({ id: assetStub.external.id, isOffline: true });
});
it('should set assets outside of import paths as offline', async () => {
const mockAssetJob: ILibraryOfflineJob = {
id: assetStub.external.id,
importPaths: ['/data/user2'],
};
assetMock.getById.mockResolvedValue(assetStub.external);
storageMock.checkFileExists.mockResolvedValue(true);
await expect(sut.handleOfflineCheck(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
expect(assetMock.update).toHaveBeenCalledWith({ id: assetStub.external.id, isOffline: true });
});
it('should do nothing with online assets', async () => {
const mockAssetJob: ILibraryOfflineJob = {
id: assetStub.external.id,
importPaths: ['/'],
};
assetMock.getById.mockResolvedValue(assetStub.external);
storageMock.checkFileExists.mockResolvedValue(true);
await expect(sut.handleOfflineCheck(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
expect(assetMock.update).not.toHaveBeenCalled();
});
});
@ -1115,18 +1193,9 @@ describe(LibraryService.name, () => {
});
describe('handleDeleteLibrary', () => {
it('should not delete a nonexistent library', async () => {
libraryMock.get.mockResolvedValue(null);
libraryMock.getAssetIds.mockResolvedValue([]);
libraryMock.delete.mockImplementation(async () => {});
await expect(sut.handleDeleteLibrary({ id: libraryStub.externalLibrary1.id })).resolves.toBe(JobStatus.FAILED);
});
it('should delete an empty library', async () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
libraryMock.getAssetIds.mockResolvedValue([]);
assetMock.getAll.mockResolvedValue({ items: [], hasNextPage: false });
libraryMock.delete.mockImplementation(async () => {});
await expect(sut.handleDeleteLibrary({ id: libraryStub.externalLibrary1.id })).resolves.toBe(JobStatus.SUCCESS);
@ -1134,7 +1203,7 @@ describe(LibraryService.name, () => {
it('should delete a library with assets', async () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
libraryMock.getAssetIds.mockResolvedValue([assetStub.image1.id]);
assetMock.getAll.mockResolvedValue({ items: [assetStub.image1], hasNextPage: false });
libraryMock.delete.mockImplementation(async () => {});
assetMock.getById.mockResolvedValue(assetStub.image1);
@ -1273,7 +1342,7 @@ describe(LibraryService.name, () => {
assetMock.getWith.mockResolvedValue({ items: [assetStub.image1], hasNextPage: false });
assetMock.getById.mockResolvedValue(assetStub.image1);
await expect(sut.handleOfflineRemoval({ id: libraryStub.externalLibrary1.id })).resolves.toBe(JobStatus.SUCCESS);
await expect(sut.handleRemoveOffline({ id: libraryStub.externalLibrary1.id })).resolves.toBe(JobStatus.SUCCESS);
expect(jobMock.queueAll).toHaveBeenCalledWith([
{ name: JobName.ASSET_DELETION, data: { id: assetStub.image1.id, deleteOnDisk: false } },

View File

@ -1,5 +1,4 @@
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { Trie } from 'mnemonist';
import { R_OK } from 'node:constants';
import { Stats } from 'node:fs';
import path, { basename, parse } from 'node:path';
@ -18,7 +17,6 @@ import {
ValidateLibraryResponseDto,
mapLibrary,
} from 'src/dtos/library.dto';
import { LibraryEntity } from 'src/entities/library.entity';
import { AssetType } from 'src/enum';
import { IAssetRepository, WithProperty } from 'src/interfaces/asset.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
@ -29,8 +27,9 @@ import {
IEntityJob,
IJobRepository,
ILibraryFileJob,
ILibraryOfflineJob,
ILibraryRefreshJob,
JOBS_ASSET_PAGINATION_SIZE,
JOBS_LIBRARY_PAGINATION_SIZE,
JobName,
JobStatus,
} from 'src/interfaces/job.interface';
@ -43,8 +42,6 @@ import { handlePromiseError } from 'src/utils/misc';
import { usePagination } from 'src/utils/pagination';
import { validateCronExpression } from 'src/validation';
const LIBRARY_SCAN_BATCH_SIZE = 5000;
@Injectable()
export class LibraryService {
private configCore: SystemConfigCore;
@ -254,26 +251,17 @@ export class LibraryService {
}
private async scanAssets(libraryId: string, assetPaths: string[], ownerId: string, force = false) {
this.logger.verbose(`Queuing refresh of ${assetPaths.length} asset(s)`);
// We perform this in batches to save on memory when performing large refreshes (greater than 1M assets)
const batchSize = 5000;
for (let i = 0; i < assetPaths.length; i += batchSize) {
const batch = assetPaths.slice(i, i + batchSize);
await this.jobRepository.queueAll(
batch.map((assetPath) => ({
name: JobName.LIBRARY_SCAN_ASSET,
data: {
id: libraryId,
assetPath: assetPath,
ownerId,
force,
},
})),
);
}
this.logger.debug('Asset refresh queue completed');
await this.jobRepository.queueAll(
assetPaths.map((assetPath) => ({
name: JobName.LIBRARY_SCAN_ASSET,
data: {
id: libraryId,
assetPath,
ownerId,
force,
},
})),
);
}
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
@ -348,27 +336,32 @@ export class LibraryService {
}
async handleDeleteLibrary(job: IEntityJob): Promise<JobStatus> {
const library = await this.repository.get(job.id, true);
if (!library) {
return JobStatus.FAILED;
}
const libraryId = job.id;
// TODO use pagination
const assetIds = await this.repository.getAssetIds(job.id, true);
this.logger.debug(`Will delete ${assetIds.length} asset(s) in library ${job.id}`);
await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.ASSET_DELETION,
data: {
id: assetId,
deleteOnDisk: false,
},
})),
const assetPagination = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getAll(pagination, { libraryId: libraryId, withDeleted: true }),
);
if (assetIds.length === 0) {
this.logger.log(`Deleting library ${job.id}`);
await this.repository.delete(job.id);
let assetsFound = false;
this.logger.debug(`Will delete all assets in library ${libraryId}`);
for await (const assets of assetPagination) {
assetsFound = true;
this.logger.debug(`Queueing deletion of ${assets.length} asset(s) in library ${libraryId}`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.ASSET_DELETION,
data: {
id: asset.id,
deleteOnDisk: false,
},
})),
);
}
if (!assetsFound) {
this.logger.log(`Deleting library ${libraryId}`);
await this.repository.delete(libraryId);
}
return JobStatus.SUCCESS;
}
@ -453,6 +446,7 @@ export class LibraryService {
sidecarPath = `${assetPath}.xmp`;
}
// TODO: device asset id is deprecated, remove it
const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, '');
let assetId;
@ -494,7 +488,7 @@ export class LibraryService {
return JobStatus.SKIPPED;
}
this.logger.debug(`Queuing metadata extraction for: ${assetPath}`);
this.logger.debug(`Queueing metadata extraction for: ${assetPath}`);
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: assetId, source: 'upload' } });
@ -519,17 +513,15 @@ export class LibraryService {
}
async queueRemoveOffline(id: string) {
this.logger.verbose(`Removing offline files from library: ${id}`);
this.logger.verbose(`Queueing offline file removal from library ${id}`);
await this.jobRepository.queue({ name: JobName.LIBRARY_REMOVE_OFFLINE, data: { id } });
}
async handleQueueAllScan(job: IBaseJob): Promise<JobStatus> {
this.logger.debug(`Refreshing all external libraries: force=${job.force}`);
// Queue cleanup
await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_CLEANUP, data: {} });
// Queue all library refresh
const libraries = await this.repository.getAll(true);
await this.jobRepository.queueAll(
libraries.map((library) => ({
@ -544,22 +536,71 @@ export class LibraryService {
return JobStatus.SUCCESS;
}
async handleOfflineRemoval(job: IEntityJob): Promise<JobStatus> {
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
async handleOfflineCheck(job: ILibraryOfflineJob): Promise<JobStatus> {
const asset = await this.assetRepository.getById(job.id);
if (!asset) {
// Asset is no longer in the database, skip
return JobStatus.SKIPPED;
}
if (asset.isOffline) {
this.logger.verbose(`Asset is already offline: ${asset.originalPath}`);
return JobStatus.SUCCESS;
}
const isInPath = job.importPaths.find((path) => asset.originalPath.startsWith(path));
if (!isInPath) {
this.logger.debug(`Asset is no longer in an import path, marking offline: ${asset.originalPath}`);
await this.assetRepository.update({ id: asset.id, isOffline: true });
return JobStatus.SUCCESS;
}
const fileExists = await this.storageRepository.checkFileExists(asset.originalPath, R_OK);
if (!fileExists) {
this.logger.debug(
`Asset is no longer found on disk or is covered by exclusion pattern, marking offline: ${asset.originalPath}`,
);
await this.assetRepository.update({ id: asset.id, isOffline: true });
return JobStatus.SUCCESS;
}
this.logger.verbose(
`Asset is found on disk, not covered by an exclusion pattern, and is in an import path, keeping online: ${asset.originalPath}`,
);
return JobStatus.SUCCESS;
}
async handleRemoveOffline(job: IEntityJob): Promise<JobStatus> {
this.logger.debug(`Removing offline assets for library ${job.id}`);
const assetPagination = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getWith(pagination, WithProperty.IS_OFFLINE, job.id),
);
let offlineAssets = 0;
for await (const assets of assetPagination) {
this.logger.debug(`Removing ${assets.length} offline assets`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.ASSET_DELETION,
data: {
id: asset.id,
deleteOnDisk: false,
},
})),
);
offlineAssets += assets.length;
if (assets.length > 0) {
this.logger.debug(`Discovered ${offlineAssets} offline assets in library ${job.id}`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.ASSET_DELETION,
data: {
id: asset.id,
deleteOnDisk: false,
},
})),
);
this.logger.verbose(`Queued deletion of ${assets.length} offline assets in library ${job.id}`);
}
}
if (offlineAssets) {
this.logger.debug(`Finished queueing deletion of ${offlineAssets} offline assets for library ${job.id}`);
} else {
this.logger.debug(`Found no offline assets to delete from library ${job.id}`);
}
return JobStatus.SUCCESS;
@ -568,73 +609,67 @@ export class LibraryService {
async handleQueueAssetRefresh(job: ILibraryRefreshJob): Promise<JobStatus> {
const library = await this.repository.get(job.id);
if (!library) {
this.logger.warn('Library not found');
return JobStatus.FAILED;
return JobStatus.SKIPPED;
}
this.logger.log(`Refreshing library: ${job.id}`);
this.logger.log(`Refreshing library ${library.id}`);
const crawledAssetPaths = await this.getPathTrie(library);
this.logger.debug(`Found ${crawledAssetPaths.size} asset(s) when crawling import paths ${library.importPaths}`);
const validImportPaths: string[] = [];
const assetIdsToMarkOffline = [];
const assetIdsToMarkOnline = [];
const pagination = usePagination(LIBRARY_SCAN_BATCH_SIZE, (pagination) =>
this.assetRepository.getExternalLibraryAssetPaths(pagination, library.id),
for (const importPath of library.importPaths) {
const validation = await this.validateImportPath(importPath);
if (validation.isValid) {
validImportPaths.push(path.normalize(importPath));
} else {
this.logger.warn(`Skipping invalid import path: ${importPath}. Reason: ${validation.message}`);
}
}
if (validImportPaths.length === 0) {
this.logger.warn(`No valid import paths found for library ${library.id}`);
}
const assetsOnDisk = this.storageRepository.walk({
pathsToCrawl: validImportPaths,
includeHidden: false,
exclusionPatterns: library.exclusionPatterns,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
let crawledAssets = 0;
for await (const assetBatch of assetsOnDisk) {
crawledAssets += assetBatch.length;
this.logger.debug(`Discovered ${crawledAssets} asset(s) on disk for library ${library.id}...`);
await this.scanAssets(job.id, assetBatch, library.ownerId, job.refreshAllFiles ?? false);
this.logger.verbose(`Queued scan of ${assetBatch.length} crawled asset(s) in library ${library.id}...`);
}
if (crawledAssets) {
this.logger.debug(`Finished queueing scan of ${crawledAssets} assets on disk for library ${library.id}`);
} else {
this.logger.debug(`No non-excluded assets found in any import path for library ${library.id}`);
}
const onlineAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getWith(pagination, WithProperty.IS_ONLINE, job.id),
);
this.logger.verbose(`Crawled asset paths paginated`);
const shouldScanAll = job.refreshAllFiles || job.refreshModifiedFiles;
for await (const page of pagination) {
for (const asset of page) {
const isOffline = !crawledAssetPaths.has(asset.originalPath);
if (isOffline && !asset.isOffline) {
assetIdsToMarkOffline.push(asset.id);
this.logger.verbose(`Added to mark-offline list: ${asset.originalPath}`);
}
if (!isOffline && asset.isOffline) {
assetIdsToMarkOnline.push(asset.id);
this.logger.verbose(`Added to mark-online list: ${asset.originalPath}`);
}
if (!shouldScanAll) {
crawledAssetPaths.delete(asset.originalPath);
}
}
let onlineAssetCount = 0;
for await (const assets of onlineAssets) {
onlineAssetCount += assets.length;
this.logger.debug(`Discovered ${onlineAssetCount} asset(s) in library ${library.id}...`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.LIBRARY_CHECK_OFFLINE,
data: { id: asset.id, importPaths: validImportPaths, exclusionPatterns: library.exclusionPatterns },
})),
);
this.logger.debug(`Queued online check of ${assets.length} asset(s) in library ${library.id}...`);
}
this.logger.verbose(`Crawled assets have been checked for online/offline status`);
if (assetIdsToMarkOffline.length > 0) {
this.logger.debug(`Found ${assetIdsToMarkOffline.length} offline asset(s) previously marked as online`);
await this.assetRepository.updateAll(assetIdsToMarkOffline, { isOffline: true });
}
if (assetIdsToMarkOnline.length > 0) {
this.logger.debug(`Found ${assetIdsToMarkOnline.length} online asset(s) previously marked as offline`);
await this.assetRepository.updateAll(assetIdsToMarkOnline, { isOffline: false });
}
if (crawledAssetPaths.size > 0) {
if (!shouldScanAll) {
this.logger.debug(`Will import ${crawledAssetPaths.size} new asset(s)`);
}
let batch = [];
for (const assetPath of crawledAssetPaths) {
batch.push(assetPath);
if (batch.length >= LIBRARY_SCAN_BATCH_SIZE) {
await this.scanAssets(job.id, batch, library.ownerId, job.refreshAllFiles ?? false);
batch = [];
}
}
if (batch.length > 0) {
await this.scanAssets(job.id, batch, library.ownerId, job.refreshAllFiles ?? false);
}
if (onlineAssetCount) {
this.logger.log(`Finished queueing online check of ${onlineAssetCount} assets for library ${library.id}`);
}
await this.repository.update({ id: job.id, refreshedAt: new Date() });
@ -642,34 +677,6 @@ export class LibraryService {
return JobStatus.SUCCESS;
}
private async getPathTrie(library: LibraryEntity): Promise<Trie<string>> {
const pathValidation = await Promise.all(
library.importPaths.map(async (importPath) => await this.validateImportPath(importPath)),
);
const validImportPaths = pathValidation
.map((validation) => {
if (!validation.isValid) {
this.logger.error(`Skipping invalid import path: ${validation.importPath}. Reason: ${validation.message}`);
}
return validation;
})
.filter((validation) => validation.isValid)
.map((validation) => validation.importPath);
const generator = this.storageRepository.walk({
pathsToCrawl: validImportPaths,
exclusionPatterns: library.exclusionPatterns,
});
const trie = new Trie<string>();
for await (const filePath of generator) {
trie.add(filePath);
}
return trie;
}
private async findOrFail(id: string) {
const library = await this.repository.get(id);
if (!library) {

View File

@ -85,7 +85,8 @@ export class MicroservicesService {
[JobName.LIBRARY_SCAN_ASSET]: (data) => this.libraryService.handleAssetRefresh(data),
[JobName.LIBRARY_SCAN]: (data) => this.libraryService.handleQueueAssetRefresh(data),
[JobName.LIBRARY_DELETE]: (data) => this.libraryService.handleDeleteLibrary(data),
[JobName.LIBRARY_REMOVE_OFFLINE]: (data) => this.libraryService.handleOfflineRemoval(data),
[JobName.LIBRARY_CHECK_OFFLINE]: (data) => this.libraryService.handleOfflineCheck(data),
[JobName.LIBRARY_REMOVE_OFFLINE]: (data) => this.libraryService.handleRemoveOffline(data),
[JobName.LIBRARY_QUEUE_SCAN_ALL]: (data) => this.libraryService.handleQueueAllScan(data),
[JobName.LIBRARY_QUEUE_CLEANUP]: () => this.libraryService.handleQueueCleanup(),
[JobName.SEND_EMAIL]: (data) => this.notificationService.handleSendEmail(data),

View File

@ -9,7 +9,6 @@ export const newLibraryRepositoryMock = (): Mocked<ILibraryRepository> => {
softDelete: vitest.fn(),
update: vitest.fn(),
getStatistics: vitest.fn(),
getAssetIds: vitest.fn(),
getAllDeleted: vitest.fn(),
getAll: vitest.fn(),
};

View File

@ -329,17 +329,21 @@
<LoadingSpinner size="40" />
{:else}{owner[index].name}{/if}
</td>
{#if totalCount[index] == undefined}
<td colspan="2" class="flex w-1/3 items-center justify-center text-ellipsis px-4 text-sm">
<td class=" text-ellipsis px-4 text-sm">
{#if totalCount[index] == undefined}
<LoadingSpinner size="40" />
</td>
{:else}
<td class=" text-ellipsis px-4 text-sm">
{:else}
{totalCount[index].toLocaleString($locale)}
</td>
<td class=" text-ellipsis px-4 text-sm">{diskUsage[index]} {diskUsageUnit[index]}</td>
{/if}
{/if}
</td>
<td class=" text-ellipsis px-4 text-sm">
{#if diskUsage[index] == undefined}
<LoadingSpinner size="40" />
{:else}
{diskUsage[index]}
{diskUsageUnit[index]}
{/if}
</td>
<td class=" text-ellipsis px-4 text-sm">
<ButtonContextMenu