From 1c2d83e2c7edab099cb9e6be9f6423240bc6d2eb Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Fri, 26 May 2023 15:43:24 -0400 Subject: [PATCH] refactor(server): job handlers (#2572) * refactor(server): job handlers * chore: remove comment * chore: add comments for --- .../immich/src/api-v1/asset/asset.core.ts | 7 +- .../src/api-v1/asset/asset.service.spec.ts | 5 +- .../microservices/src/processor.service.ts | 29 +- .../metadata-extraction.processor.ts | 468 +++++++++--------- server/libs/domain/src/asset/asset.core.ts | 20 - .../domain/src/asset/asset.service.spec.ts | 49 +- server/libs/domain/src/asset/asset.service.ts | 27 +- server/libs/domain/src/asset/index.ts | 1 - .../facial-recognition.service.spec.ts | 37 +- .../facial-recognition.services.ts | 190 ++++--- server/libs/domain/src/job/job.constants.ts | 10 +- server/libs/domain/src/job/job.interface.ts | 17 +- server/libs/domain/src/job/job.repository.ts | 31 +- .../libs/domain/src/job/job.service.spec.ts | 10 +- server/libs/domain/src/job/job.service.ts | 63 ++- .../domain/src/media/media.service.spec.ts | 124 ++--- server/libs/domain/src/media/media.service.ts | 211 ++++---- .../src/metadata/metadata.service.spec.ts | 64 +-- .../domain/src/metadata/metadata.service.ts | 85 ++-- .../domain/src/person/person.service.spec.ts | 9 - .../libs/domain/src/person/person.service.ts | 4 +- .../domain/src/search/search.service.spec.ts | 30 -- .../libs/domain/src/search/search.service.ts | 98 ++-- .../src/smart-info/smart-info.service.spec.ts | 63 +-- .../src/smart-info/smart-info.service.ts | 107 ++-- .../storage-template.service.spec.ts | 7 - .../storage-template.service.ts | 34 +- .../domain/src/storage/storage.service.ts | 3 + .../system-config/system-config.service.ts | 1 + .../libs/domain/src/user/user.service.spec.ts | 25 +- server/libs/domain/src/user/user.service.ts | 55 +- .../domain/test/storage.repository.mock.ts | 2 +- .../infra/src/repositories/job.repository.ts | 3 - 33 files changed, 807 insertions(+), 1082 deletions(-) delete mode 100644 server/libs/domain/src/asset/asset.core.ts diff --git a/server/apps/immich/src/api-v1/asset/asset.core.ts b/server/apps/immich/src/api-v1/asset/asset.core.ts index 5d1a1b50b1..9732d7e2ac 100644 --- a/server/apps/immich/src/api-v1/asset/asset.core.ts +++ b/server/apps/immich/src/api-v1/asset/asset.core.ts @@ -1,5 +1,5 @@ import { AuthUserDto, IJobRepository, JobName } from '@app/domain'; -import { AssetEntity, UserEntity } from '@app/infra/entities'; +import { AssetEntity, AssetType, UserEntity } from '@app/infra/entities'; import { IAssetRepository } from './asset-repository'; import { CreateAssetDto, UploadFile } from './dto/create-asset.dto'; import { parse } from 'node:path'; @@ -43,7 +43,10 @@ export class AssetCore { sidecarPath: sidecarFile?.originalPath || null, }); - await this.jobRepository.queue({ name: JobName.ASSET_UPLOADED, data: { asset } }); + await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: asset.id } }); + if (asset.type === AssetType.VIDEO) { + await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.id } }); + } return asset; } diff --git a/server/apps/immich/src/api-v1/asset/asset.service.spec.ts b/server/apps/immich/src/api-v1/asset/asset.service.spec.ts index 0a0fa9052f..39d107dcc3 100644 --- a/server/apps/immich/src/api-v1/asset/asset.service.spec.ts +++ b/server/apps/immich/src/api-v1/asset/asset.service.spec.ts @@ -328,8 +328,9 @@ describe('AssetService', () => { }); expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.ASSET_UPLOADED, data: { asset: assetEntityStub.livePhotoMotionAsset } }], - [{ name: JobName.ASSET_UPLOADED, data: { asset: assetEntityStub.livePhotoStillAsset } }], + [{ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: assetEntityStub.livePhotoMotionAsset.id } }], + [{ name: JobName.VIDEO_CONVERSION, data: { id: assetEntityStub.livePhotoMotionAsset.id } }], + [{ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: assetEntityStub.livePhotoStillAsset.id } }], ]); }); }); diff --git a/server/apps/microservices/src/processor.service.ts b/server/apps/microservices/src/processor.service.ts index bec0d0c38d..27da2bed0d 100644 --- a/server/apps/microservices/src/processor.service.ts +++ b/server/apps/microservices/src/processor.service.ts @@ -1,7 +1,9 @@ import { - AssetService, FacialRecognitionService, + IDeleteFilesJob, + JobItem, JobName, + JobService, JOBS_TO_QUEUE, MediaService, MetadataService, @@ -16,12 +18,12 @@ import { UserService, } from '@app/domain'; import { getQueueToken } from '@nestjs/bull'; -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { ModuleRef } from '@nestjs/core'; import { Queue } from 'bull'; import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor'; -type JobHandler = (data: T) => void | Promise; +type JobHandler = (data: T) => boolean | Promise; @Injectable() export class ProcessorService { @@ -30,8 +32,8 @@ export class ProcessorService { // TODO refactor to domain private metadataProcessor: MetadataExtractionProcessor, - private assetService: AssetService, private facialRecognitionService: FacialRecognitionService, + private jobService: JobService, private mediaService: MediaService, private metadataService: MetadataService, private personService: PersonService, @@ -43,9 +45,10 @@ export class ProcessorService { private userService: UserService, ) {} + private logger = new Logger(ProcessorService.name); + private handlers: Record = { - [JobName.ASSET_UPLOADED]: (data) => this.assetService.handleAssetUpload(data), - [JobName.DELETE_FILES]: (data) => this.storageService.handleDeleteFiles(data), + [JobName.DELETE_FILES]: (data: IDeleteFilesJob) => this.storageService.handleDeleteFiles(data), [JobName.USER_DELETE_CHECK]: () => this.userService.handleUserDeleteCheck(), [JobName.USER_DELETION]: (data) => this.userService.handleUserDelete(data), [JobName.QUEUE_OBJECT_TAGGING]: (data) => this.smartInfoService.handleQueueObjectTagging(data), @@ -71,15 +74,14 @@ export class ProcessorService { [JobName.QUEUE_VIDEO_CONVERSION]: (data) => this.mediaService.handleQueueVideoConversion(data), [JobName.VIDEO_CONVERSION]: (data) => this.mediaService.handleVideoConversion(data), [JobName.QUEUE_METADATA_EXTRACTION]: (data) => this.metadataProcessor.handleQueueMetadataExtraction(data), - [JobName.EXIF_EXTRACTION]: (data) => this.metadataProcessor.extractExifInfo(data), - [JobName.EXTRACT_VIDEO_METADATA]: (data) => this.metadataProcessor.extractVideoMetadata(data), + [JobName.METADATA_EXTRACTION]: (data) => this.metadataProcessor.handleMetadataExtraction(data), [JobName.QUEUE_RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleQueueRecognizeFaces(data), [JobName.RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleRecognizeFaces(data), [JobName.GENERATE_FACE_THUMBNAIL]: (data) => this.facialRecognitionService.handleGenerateFaceThumbnail(data), [JobName.PERSON_CLEANUP]: () => this.personService.handlePersonCleanup(), [JobName.QUEUE_SIDECAR]: (data) => this.metadataService.handleQueueSidecar(data), [JobName.SIDECAR_DISCOVERY]: (data) => this.metadataService.handleSidecarDiscovery(data), - [JobName.SIDECAR_SYNC]: (data) => this.metadataService.handleSidecarSync(data), + [JobName.SIDECAR_SYNC]: () => this.metadataService.handleSidecarSync(), }; async init() { @@ -98,7 +100,14 @@ export class ProcessorService { await queue.isReady(); queue.process(jobName, concurrency, async (job): Promise => { - await handler(job.data); + try { + const success = await handler(job.data); + if (success) { + await this.jobService.onDone({ name: jobName, data: job.data } as JobItem); + } + } catch (error: Error | any) { + this.logger.error(`Unable to run job handler: ${error}`, error?.stack, job.data); + } }); } } diff --git a/server/apps/microservices/src/processors/metadata-extraction.processor.ts b/server/apps/microservices/src/processors/metadata-extraction.processor.ts index 491aa92772..40f8a03dce 100644 --- a/server/apps/microservices/src/processors/metadata-extraction.processor.ts +++ b/server/apps/microservices/src/processors/metadata-extraction.processor.ts @@ -1,8 +1,7 @@ import { - AssetCore, - IAssetJob, IAssetRepository, IBaseJob, + IEntityJob, IGeocodingRepository, IJobRepository, JobName, @@ -32,7 +31,6 @@ interface ImmichTags extends Tags { export class MetadataExtractionProcessor { private logger = new Logger(MetadataExtractionProcessor.name); - private assetCore: AssetCore; private reverseGeocodingEnabled: boolean; constructor( @@ -43,7 +41,6 @@ export class MetadataExtractionProcessor { configService: ConfigService, ) { - this.assetCore = new AssetCore(assetRepository, jobRepository); this.reverseGeocodingEnabled = !configService.get('DISABLE_REVERSE_GEOCODING'); } @@ -70,271 +67,262 @@ export class MetadataExtractionProcessor { } async handleQueueMetadataExtraction(job: IBaseJob) { - try { - const { force } = job; - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination) - : this.assetRepository.getWithout(pagination, WithoutProperty.EXIF); - }); + const { force } = job; + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.EXIF); + }); - for await (const assets of assetPagination) { - for (const asset of assets) { - const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; - await this.jobRepository.queue({ name, data: { asset } }); - } + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id } }); } - } catch (error: any) { - this.logger.error(`Unable to queue metadata extraction`, error?.stack); + } + + return true; + } + + async handleMetadataExtraction({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); + if (!asset || !asset.isVisible) { + return false; + } + + if (asset.type === AssetType.VIDEO) { + return this.handleVideoMetadataExtraction(asset); + } else { + return this.handlePhotoMetadataExtraction(asset); } } - async extractExifInfo(job: IAssetJob) { - let asset = job.asset; + private async handlePhotoMetadataExtraction(asset: AssetEntity) { + const mediaExifData = await exiftool.read(asset.originalPath).catch((error: any) => { + this.logger.warn( + `The exifData parsing failed due to ${error} for asset ${asset.id} at ${asset.originalPath}`, + error?.stack, + ); + return null; + }); - try { - const mediaExifData = await exiftool.read(asset.originalPath).catch((error: any) => { - this.logger.warn( - `The exifData parsing failed due to ${error} for asset ${asset.id} at ${asset.originalPath}`, - error?.stack, - ); - return null; - }); - const sidecarExifData = asset.sidecarPath - ? await exiftool.read(asset.sidecarPath).catch((error: any) => { - this.logger.warn( - `The exifData parsing failed due to ${error} for asset ${asset.id} at ${asset.originalPath}`, - error?.stack, - ); - return null; - }) - : {}; - - const exifToDate = (exifDate: string | ExifDateTime | undefined) => { - if (!exifDate) return null; - - if (typeof exifDate === 'string') { - return new Date(exifDate); - } - - return exifDate.toDate(); - }; - - const exifTimeZone = (exifDate: string | ExifDateTime | undefined) => { - if (!exifDate) return null; - - if (typeof exifDate === 'string') { + const sidecarExifData = asset.sidecarPath + ? await exiftool.read(asset.sidecarPath).catch((error: any) => { + this.logger.warn( + `The exifData parsing failed due to ${error} for asset ${asset.id} at ${asset.originalPath}`, + error?.stack, + ); return null; - } + }) + : {}; - return exifDate.zone ?? null; - }; + const exifToDate = (exifDate: string | ExifDateTime | undefined) => { + if (!exifDate) return null; - const getExifProperty = (...properties: T[]): any | null => { - for (const property of properties) { - const value = sidecarExifData?.[property] ?? mediaExifData?.[property]; - if (value !== null && value !== undefined) { - return value; - } - } + if (typeof exifDate === 'string') { + return new Date(exifDate); + } + return exifDate.toDate(); + }; + + const exifTimeZone = (exifDate: string | ExifDateTime | undefined) => { + if (!exifDate) return null; + + if (typeof exifDate === 'string') { return null; - }; + } - const timeZone = exifTimeZone(getExifProperty('DateTimeOriginal', 'CreateDate') ?? asset.fileCreatedAt); - const fileCreatedAt = exifToDate(getExifProperty('DateTimeOriginal', 'CreateDate') ?? asset.fileCreatedAt); - const fileModifiedAt = exifToDate(getExifProperty('ModifyDate') ?? asset.fileModifiedAt); - const fileStats = fs.statSync(asset.originalPath); - const fileSizeInBytes = fileStats.size; + return exifDate.zone ?? null; + }; - const newExif = new ExifEntity(); - newExif.assetId = asset.id; - newExif.fileSizeInByte = fileSizeInBytes; - newExif.make = getExifProperty('Make'); - newExif.model = getExifProperty('Model'); - newExif.exifImageHeight = getExifProperty('ExifImageHeight', 'ImageHeight'); - newExif.exifImageWidth = getExifProperty('ExifImageWidth', 'ImageWidth'); - newExif.exposureTime = getExifProperty('ExposureTime'); - newExif.orientation = getExifProperty('Orientation')?.toString(); - newExif.dateTimeOriginal = fileCreatedAt; - newExif.modifyDate = fileModifiedAt; - newExif.timeZone = timeZone; - newExif.lensModel = getExifProperty('LensModel'); - newExif.fNumber = getExifProperty('FNumber'); - const focalLength = getExifProperty('FocalLength'); - newExif.focalLength = focalLength ? parseFloat(focalLength) : null; - // This is unusual - exifData.ISO should return a number, but experienced that sidecar XMP - // files MAY return an array of numbers instead. - const iso = getExifProperty('ISO'); - newExif.iso = Array.isArray(iso) ? iso[0] : iso || null; - newExif.latitude = getExifProperty('GPSLatitude'); - newExif.longitude = getExifProperty('GPSLongitude'); - newExif.livePhotoCID = getExifProperty('MediaGroupUUID'); - - if (newExif.livePhotoCID && !asset.livePhotoVideoId) { - const motionAsset = await this.assetCore.findLivePhotoMatch({ - livePhotoCID: newExif.livePhotoCID, - otherAssetId: asset.id, - ownerId: asset.ownerId, - type: AssetType.VIDEO, - }); - if (motionAsset) { - await this.assetCore.save({ id: asset.id, livePhotoVideoId: motionAsset.id }); - await this.assetCore.save({ id: motionAsset.id, isVisible: false }); + const getExifProperty = (...properties: T[]): any | null => { + for (const property of properties) { + const value = sidecarExifData?.[property] ?? mediaExifData?.[property]; + if (value !== null && value !== undefined) { + return value; } } - await this.applyReverseGeocoding(asset, newExif); + return null; + }; - /** - * IF the EXIF doesn't contain the width and height of the image, - * We will use Sharpjs to get the information. - */ - if (!newExif.exifImageHeight || !newExif.exifImageWidth || !newExif.orientation) { - const metadata = await sharp(asset.originalPath).metadata(); + const timeZone = exifTimeZone(getExifProperty('DateTimeOriginal', 'CreateDate') ?? asset.fileCreatedAt); + const fileCreatedAt = exifToDate(getExifProperty('DateTimeOriginal', 'CreateDate') ?? asset.fileCreatedAt); + const fileModifiedAt = exifToDate(getExifProperty('ModifyDate') ?? asset.fileModifiedAt); + const fileStats = fs.statSync(asset.originalPath); + const fileSizeInBytes = fileStats.size; - if (newExif.exifImageHeight === null) { - newExif.exifImageHeight = metadata.height || null; - } + const newExif = new ExifEntity(); + newExif.assetId = asset.id; + newExif.fileSizeInByte = fileSizeInBytes; + newExif.make = getExifProperty('Make'); + newExif.model = getExifProperty('Model'); + newExif.exifImageHeight = getExifProperty('ExifImageHeight', 'ImageHeight'); + newExif.exifImageWidth = getExifProperty('ExifImageWidth', 'ImageWidth'); + newExif.exposureTime = getExifProperty('ExposureTime'); + newExif.orientation = getExifProperty('Orientation')?.toString(); + newExif.dateTimeOriginal = fileCreatedAt; + newExif.modifyDate = fileModifiedAt; + newExif.timeZone = timeZone; + newExif.lensModel = getExifProperty('LensModel'); + newExif.fNumber = getExifProperty('FNumber'); + const focalLength = getExifProperty('FocalLength'); + newExif.focalLength = focalLength ? parseFloat(focalLength) : null; + // This is unusual - exifData.ISO should return a number, but experienced that sidecar XMP + // files MAY return an array of numbers instead. + const iso = getExifProperty('ISO'); + newExif.iso = Array.isArray(iso) ? iso[0] : iso || null; + newExif.latitude = getExifProperty('GPSLatitude'); + newExif.longitude = getExifProperty('GPSLongitude'); + newExif.livePhotoCID = getExifProperty('MediaGroupUUID'); - if (newExif.exifImageWidth === null) { - newExif.exifImageWidth = metadata.width || null; - } - - if (newExif.orientation === null) { - newExif.orientation = metadata.orientation !== undefined ? `${metadata.orientation}` : null; - } + if (newExif.livePhotoCID && !asset.livePhotoVideoId) { + const motionAsset = await this.assetRepository.findLivePhotoMatch({ + livePhotoCID: newExif.livePhotoCID, + otherAssetId: asset.id, + ownerId: asset.ownerId, + type: AssetType.VIDEO, + }); + if (motionAsset) { + await this.assetRepository.save({ id: asset.id, livePhotoVideoId: motionAsset.id }); + await this.assetRepository.save({ id: motionAsset.id, isVisible: false }); } - - await this.exifRepository.upsert(newExif, { conflictPaths: ['assetId'] }); - asset = await this.assetCore.save({ id: asset.id, fileCreatedAt: fileCreatedAt?.toISOString() }); - await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { asset } }); - } catch (error: any) { - this.logger.error( - `Error extracting EXIF ${error} for assetId ${asset.id} at ${asset.originalPath}`, - error?.stack, - ); } + + await this.applyReverseGeocoding(asset, newExif); + + /** + * IF the EXIF doesn't contain the width and height of the image, + * We will use Sharpjs to get the information. + */ + if (!newExif.exifImageHeight || !newExif.exifImageWidth || !newExif.orientation) { + const metadata = await sharp(asset.originalPath).metadata(); + + if (newExif.exifImageHeight === null) { + newExif.exifImageHeight = metadata.height || null; + } + + if (newExif.exifImageWidth === null) { + newExif.exifImageWidth = metadata.width || null; + } + + if (newExif.orientation === null) { + newExif.orientation = metadata.orientation !== undefined ? `${metadata.orientation}` : null; + } + } + + await this.exifRepository.upsert(newExif, { conflictPaths: ['assetId'] }); + await this.assetRepository.save({ id: asset.id, fileCreatedAt: fileCreatedAt?.toISOString() }); + + return true; } - async extractVideoMetadata(job: IAssetJob) { - let asset = job.asset; + private async handleVideoMetadataExtraction(asset: AssetEntity) { + const data = await ffprobe(asset.originalPath); + const durationString = this.extractDuration(data.format.duration || asset.duration); + let fileCreatedAt = asset.fileCreatedAt; - if (!asset.isVisible) { - return; + const videoTags = data.format.tags; + if (videoTags) { + if (videoTags['com.apple.quicktime.creationdate']) { + fileCreatedAt = String(videoTags['com.apple.quicktime.creationdate']); + } else if (videoTags['creation_time']) { + fileCreatedAt = String(videoTags['creation_time']); + } } - try { - const data = await ffprobe(asset.originalPath); - const durationString = this.extractDuration(data.format.duration || asset.duration); - let fileCreatedAt = asset.fileCreatedAt; - - const videoTags = data.format.tags; - if (videoTags) { - if (videoTags['com.apple.quicktime.creationdate']) { - fileCreatedAt = String(videoTags['com.apple.quicktime.creationdate']); - } else if (videoTags['creation_time']) { - fileCreatedAt = String(videoTags['creation_time']); - } - } - - const exifData = await exiftool.read(asset.sidecarPath || asset.originalPath).catch((error: any) => { - this.logger.warn( - `The exifData parsing failed due to ${error} for asset ${asset.id} at ${asset.originalPath}`, - error?.stack, - ); - return null; - }); - - const newExif = new ExifEntity(); - newExif.assetId = asset.id; - newExif.fileSizeInByte = data.format.size || null; - newExif.dateTimeOriginal = fileCreatedAt ? new Date(fileCreatedAt) : null; - newExif.modifyDate = null; - newExif.timeZone = null; - newExif.latitude = null; - newExif.longitude = null; - newExif.city = null; - newExif.state = null; - newExif.country = null; - newExif.fps = null; - newExif.livePhotoCID = exifData?.ContentIdentifier || null; - - if (newExif.livePhotoCID) { - const photoAsset = await this.assetCore.findLivePhotoMatch({ - livePhotoCID: newExif.livePhotoCID, - ownerId: asset.ownerId, - otherAssetId: asset.id, - type: AssetType.IMAGE, - }); - if (photoAsset) { - await this.assetCore.save({ id: photoAsset.id, livePhotoVideoId: asset.id }); - await this.assetCore.save({ id: asset.id, isVisible: false }); - } - } - - if (videoTags && videoTags['location']) { - const location = videoTags['location'] as string; - const locationRegex = /([+-][0-9]+\.[0-9]+)([+-][0-9]+\.[0-9]+)\/$/; - const match = location.match(locationRegex); - - if (match?.length === 3) { - newExif.latitude = parseFloat(match[1]); - newExif.longitude = parseFloat(match[2]); - } - } else if (videoTags && videoTags['com.apple.quicktime.location.ISO6709']) { - const location = videoTags['com.apple.quicktime.location.ISO6709'] as string; - const locationRegex = /([+-][0-9]+\.[0-9]+)([+-][0-9]+\.[0-9]+)([+-][0-9]+\.[0-9]+)\/$/; - const match = location.match(locationRegex); - - if (match?.length === 4) { - newExif.latitude = parseFloat(match[1]); - newExif.longitude = parseFloat(match[2]); - } - } - - if (newExif.longitude && newExif.latitude) { - try { - newExif.timeZone = tz_lookup(newExif.latitude, newExif.longitude); - } catch (error: any) { - this.logger.warn(`Error while calculating timezone from gps coordinates: ${error}`, error?.stack); - } - } - - await this.applyReverseGeocoding(asset, newExif); - - for (const stream of data.streams) { - if (stream.codec_type === 'video') { - newExif.exifImageWidth = stream.width || null; - newExif.exifImageHeight = stream.height || null; - - if (typeof stream.rotation === 'string') { - newExif.orientation = stream.rotation; - } else if (typeof stream.rotation === 'number') { - newExif.orientation = `${stream.rotation}`; - } else { - newExif.orientation = null; - } - - if (stream.r_frame_rate) { - const fpsParts = stream.r_frame_rate.split('/'); - - if (fpsParts.length === 2) { - newExif.fps = Math.round(parseInt(fpsParts[0]) / parseInt(fpsParts[1])); - } - } - } - } - - await this.exifRepository.upsert(newExif, { conflictPaths: ['assetId'] }); - asset = await this.assetCore.save({ id: asset.id, duration: durationString, fileCreatedAt }); - await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { asset } }); - } catch (error: any) { - this.logger.error( - `Error in video metadata extraction due to ${error} for asset ${asset.id} at ${asset.originalPath}`, + const exifData = await exiftool.read(asset.sidecarPath || asset.originalPath).catch((error: any) => { + this.logger.warn( + `The exifData parsing failed due to ${error} for asset ${asset.id} at ${asset.originalPath}`, error?.stack, ); + return null; + }); + + const newExif = new ExifEntity(); + newExif.assetId = asset.id; + newExif.fileSizeInByte = data.format.size || null; + newExif.dateTimeOriginal = fileCreatedAt ? new Date(fileCreatedAt) : null; + newExif.modifyDate = null; + newExif.timeZone = null; + newExif.latitude = null; + newExif.longitude = null; + newExif.city = null; + newExif.state = null; + newExif.country = null; + newExif.fps = null; + newExif.livePhotoCID = exifData?.ContentIdentifier || null; + + if (newExif.livePhotoCID) { + const photoAsset = await this.assetRepository.findLivePhotoMatch({ + livePhotoCID: newExif.livePhotoCID, + ownerId: asset.ownerId, + otherAssetId: asset.id, + type: AssetType.IMAGE, + }); + if (photoAsset) { + await this.assetRepository.save({ id: photoAsset.id, livePhotoVideoId: asset.id }); + await this.assetRepository.save({ id: asset.id, isVisible: false }); + } } + + if (videoTags && videoTags['location']) { + const location = videoTags['location'] as string; + const locationRegex = /([+-][0-9]+\.[0-9]+)([+-][0-9]+\.[0-9]+)\/$/; + const match = location.match(locationRegex); + + if (match?.length === 3) { + newExif.latitude = parseFloat(match[1]); + newExif.longitude = parseFloat(match[2]); + } + } else if (videoTags && videoTags['com.apple.quicktime.location.ISO6709']) { + const location = videoTags['com.apple.quicktime.location.ISO6709'] as string; + const locationRegex = /([+-][0-9]+\.[0-9]+)([+-][0-9]+\.[0-9]+)([+-][0-9]+\.[0-9]+)\/$/; + const match = location.match(locationRegex); + + if (match?.length === 4) { + newExif.latitude = parseFloat(match[1]); + newExif.longitude = parseFloat(match[2]); + } + } + + if (newExif.longitude && newExif.latitude) { + try { + newExif.timeZone = tz_lookup(newExif.latitude, newExif.longitude); + } catch (error: any) { + this.logger.warn(`Error while calculating timezone from gps coordinates: ${error}`, error?.stack); + } + } + + await this.applyReverseGeocoding(asset, newExif); + + for (const stream of data.streams) { + if (stream.codec_type === 'video') { + newExif.exifImageWidth = stream.width || null; + newExif.exifImageHeight = stream.height || null; + + if (typeof stream.rotation === 'string') { + newExif.orientation = stream.rotation; + } else if (typeof stream.rotation === 'number') { + newExif.orientation = `${stream.rotation}`; + } else { + newExif.orientation = null; + } + + if (stream.r_frame_rate) { + const fpsParts = stream.r_frame_rate.split('/'); + + if (fpsParts.length === 2) { + newExif.fps = Math.round(parseInt(fpsParts[0]) / parseInt(fpsParts[1])); + } + } + } + } + + await this.exifRepository.upsert(newExif, { conflictPaths: ['assetId'] }); + await this.assetRepository.save({ id: asset.id, duration: durationString, fileCreatedAt }); + + return true; } private async applyReverseGeocoding(asset: AssetEntity, newExif: ExifEntity) { diff --git a/server/libs/domain/src/asset/asset.core.ts b/server/libs/domain/src/asset/asset.core.ts deleted file mode 100644 index 910371eb2e..0000000000 --- a/server/libs/domain/src/asset/asset.core.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { AssetEntity } from '@app/infra/entities'; -import { IJobRepository, JobName } from '../job'; -import { IAssetRepository, LivePhotoSearchOptions } from './asset.repository'; - -export class AssetCore { - constructor(private assetRepository: IAssetRepository, private jobRepository: IJobRepository) {} - - async save(asset: Partial) { - const _asset = await this.assetRepository.save(asset); - await this.jobRepository.queue({ - name: JobName.SEARCH_INDEX_ASSET, - data: { ids: [_asset.id] }, - }); - return _asset; - } - - findLivePhotoMatch(options: LivePhotoSearchOptions): Promise { - return this.assetRepository.findLivePhotoMatch(options); - } -} diff --git a/server/libs/domain/src/asset/asset.service.spec.ts b/server/libs/domain/src/asset/asset.service.spec.ts index b63eda69ee..832b8d0531 100644 --- a/server/libs/domain/src/asset/asset.service.spec.ts +++ b/server/libs/domain/src/asset/asset.service.spec.ts @@ -1,12 +1,9 @@ -import { AssetEntity, AssetType } from '@app/infra/entities'; -import { assetEntityStub, authStub, newAssetRepositoryMock, newJobRepositoryMock } from '../../test'; +import { assetEntityStub, authStub, newAssetRepositoryMock } from '../../test'; import { AssetService, IAssetRepository } from '../asset'; -import { IJobRepository, JobName } from '../job'; describe(AssetService.name, () => { let sut: AssetService; let assetMock: jest.Mocked; - let jobMock: jest.Mocked; it('should work', () => { expect(sut).toBeDefined(); @@ -14,49 +11,7 @@ describe(AssetService.name, () => { beforeEach(async () => { assetMock = newAssetRepositoryMock(); - jobMock = newJobRepositoryMock(); - sut = new AssetService(assetMock, jobMock); - }); - - describe(`handle asset upload`, () => { - it('should process an uploaded video', async () => { - const data = { asset: { type: AssetType.VIDEO } as AssetEntity }; - - await expect(sut.handleAssetUpload(data)).resolves.toBeUndefined(); - - expect(jobMock.queue).toHaveBeenCalledTimes(3); - expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.GENERATE_JPEG_THUMBNAIL, data }], - [{ name: JobName.VIDEO_CONVERSION, data }], - [{ name: JobName.EXTRACT_VIDEO_METADATA, data }], - ]); - }); - - it('should process an uploaded image', async () => { - const data = { asset: { type: AssetType.IMAGE } as AssetEntity }; - - await sut.handleAssetUpload(data); - - expect(jobMock.queue).toHaveBeenCalledTimes(2); - expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.GENERATE_JPEG_THUMBNAIL, data }], - [{ name: JobName.EXIF_EXTRACTION, data }], - ]); - }); - }); - - describe('save', () => { - it('should save an asset', async () => { - assetMock.save.mockResolvedValue(assetEntityStub.image); - - await sut.save(assetEntityStub.image); - - expect(assetMock.save).toHaveBeenCalledWith(assetEntityStub.image); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.SEARCH_INDEX_ASSET, - data: { ids: [assetEntityStub.image.id] }, - }); - }); + sut = new AssetService(assetMock); }); describe('get map markers', () => { diff --git a/server/libs/domain/src/asset/asset.service.ts b/server/libs/domain/src/asset/asset.service.ts index 81ae2b95d3..5ba365e784 100644 --- a/server/libs/domain/src/asset/asset.service.ts +++ b/server/libs/domain/src/asset/asset.service.ts @@ -1,36 +1,11 @@ -import { AssetEntity, AssetType } from '@app/infra/entities'; import { Inject } from '@nestjs/common'; import { AuthUserDto } from '../auth'; -import { IAssetJob, IJobRepository, JobName } from '../job'; -import { AssetCore } from './asset.core'; import { IAssetRepository } from './asset.repository'; import { MapMarkerDto } from './dto/map-marker.dto'; import { MapMarkerResponseDto } from './response-dto'; export class AssetService { - private assetCore: AssetCore; - - constructor( - @Inject(IAssetRepository) private assetRepository: IAssetRepository, - @Inject(IJobRepository) private jobRepository: IJobRepository, - ) { - this.assetCore = new AssetCore(assetRepository, jobRepository); - } - - async handleAssetUpload(data: IAssetJob) { - await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data }); - - if (data.asset.type == AssetType.VIDEO) { - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data }); - await this.jobRepository.queue({ name: JobName.EXTRACT_VIDEO_METADATA, data }); - } else { - await this.jobRepository.queue({ name: JobName.EXIF_EXTRACTION, data }); - } - } - - save(asset: Partial) { - return this.assetCore.save(asset); - } + constructor(@Inject(IAssetRepository) private assetRepository: IAssetRepository) {} getMapMarkers(authUser: AuthUserDto, options: MapMarkerDto): Promise { return this.assetRepository.getMapMarkers(authUser.id, options); diff --git a/server/libs/domain/src/asset/index.ts b/server/libs/domain/src/asset/index.ts index 3aff64c2ce..aa429787d7 100644 --- a/server/libs/domain/src/asset/index.ts +++ b/server/libs/domain/src/asset/index.ts @@ -1,4 +1,3 @@ -export * from './asset.core'; export * from './asset.repository'; export * from './asset.service'; export * from './response-dto'; diff --git a/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts b/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts index fdfef4468e..4966a7fc2f 100644 --- a/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts +++ b/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts @@ -141,7 +141,7 @@ describe(FacialRecognitionService.name, () => { expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.FACES); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.RECOGNIZE_FACES, - data: { asset: assetEntityStub.image }, + data: { id: assetEntityStub.image.id }, }); }); @@ -158,25 +158,22 @@ describe(FacialRecognitionService.name, () => { expect(assetMock.getAll).toHaveBeenCalled(); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.RECOGNIZE_FACES, - data: { asset: assetEntityStub.image }, + data: { id: assetEntityStub.image.id }, }); }); - - it('should log an error', async () => { - assetMock.getWithout.mockRejectedValue(new Error('Database unavailable')); - await sut.handleQueueRecognizeFaces({}); - }); }); describe('handleRecognizeFaces', () => { it('should skip when no resize path', async () => { - await sut.handleRecognizeFaces({ asset: assetEntityStub.noResizePath }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.noResizePath]); + await sut.handleRecognizeFaces({ id: assetEntityStub.noResizePath.id }); expect(machineLearningMock.detectFaces).not.toHaveBeenCalled(); }); it('should handle no results', async () => { machineLearningMock.detectFaces.mockResolvedValue([]); - await sut.handleRecognizeFaces({ asset: assetEntityStub.image }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); + await sut.handleRecognizeFaces({ id: assetEntityStub.image.id }); expect(machineLearningMock.detectFaces).toHaveBeenCalledWith({ thumbnailPath: assetEntityStub.image.resizePath, }); @@ -187,26 +184,23 @@ describe(FacialRecognitionService.name, () => { it('should match existing people', async () => { machineLearningMock.detectFaces.mockResolvedValue([face.middle]); searchMock.searchFaces.mockResolvedValue(faceSearch.oneMatch); - - await sut.handleRecognizeFaces({ asset: assetEntityStub.image }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); + await sut.handleRecognizeFaces({ id: assetEntityStub.image.id }); expect(faceMock.create).toHaveBeenCalledWith({ personId: 'person-1', assetId: 'asset-id', embedding: [1, 2, 3, 4], }); - expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.SEARCH_INDEX_FACE, data: { personId: 'person-1', assetId: 'asset-id' } }], - [{ name: JobName.SEARCH_INDEX_ASSET, data: { ids: ['asset-id'] } }], - ]); }); it('should create a new person', async () => { machineLearningMock.detectFaces.mockResolvedValue([face.middle]); searchMock.searchFaces.mockResolvedValue(faceSearch.oneRemoteMatch); personMock.create.mockResolvedValue(personStub.noName); + assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); - await sut.handleRecognizeFaces({ asset: assetEntityStub.image }); + await sut.handleRecognizeFaces({ id: assetEntityStub.image.id }); expect(personMock.create).toHaveBeenCalledWith({ ownerId: assetEntityStub.image.ownerId }); expect(faceMock.create).toHaveBeenCalledWith({ @@ -234,14 +228,8 @@ describe(FacialRecognitionService.name, () => { }, ], [{ name: JobName.SEARCH_INDEX_FACE, data: { personId: 'person-1', assetId: 'asset-id' } }], - [{ name: JobName.SEARCH_INDEX_ASSET, data: { ids: ['asset-id'] } }], ]); }); - - it('should log an error', async () => { - machineLearningMock.detectFaces.mockRejectedValue(new Error('machine learning unavailable')); - await sut.handleRecognizeFaces({ asset: assetEntityStub.image }); - }); }); describe('handleGenerateFaceThumbnail', () => { @@ -317,10 +305,5 @@ describe(FacialRecognitionService.name, () => { size: 250, }); }); - - it('should log an error', async () => { - assetMock.getByIds.mockRejectedValue(new Error('Database unavailable')); - await sut.handleGenerateFaceThumbnail(face.middle); - }); }); }); diff --git a/server/libs/domain/src/facial-recognition/facial-recognition.services.ts b/server/libs/domain/src/facial-recognition/facial-recognition.services.ts index 36b60f7d34..622f1072b7 100644 --- a/server/libs/domain/src/facial-recognition/facial-recognition.services.ts +++ b/server/libs/domain/src/facial-recognition/facial-recognition.services.ts @@ -3,7 +3,7 @@ import { join } from 'path'; import { IAssetRepository, WithoutProperty } from '../asset'; import { MACHINE_LEARNING_ENABLED } from '../domain.constant'; import { usePagination } from '../domain.util'; -import { IAssetJob, IBaseJob, IFaceThumbnailJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; +import { IBaseJob, IEntityJob, IFaceThumbnailJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { CropOptions, FACE_THUMBNAIL_SIZE, IMediaRepository } from '../media'; import { IPersonRepository } from '../person/person.repository'; import { ISearchRepository } from '../search/search.repository'; @@ -27,123 +27,113 @@ export class FacialRecognitionService { ) {} async handleQueueRecognizeFaces({ force }: IBaseJob) { - try { - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination) - : this.assetRepository.getWithout(pagination, WithoutProperty.FACES); - }); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.FACES); + }); - if (force) { - const people = await this.personRepository.deleteAll(); - const faces = await this.searchRepository.deleteAllFaces(); - this.logger.debug(`Deleted ${people} people and ${faces} faces`); - } - for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { asset } }); - } - } - } catch (error: any) { - this.logger.error(`Unable to queue recognize faces`, error?.stack); + if (force) { + const people = await this.personRepository.deleteAll(); + const faces = await this.searchRepository.deleteAllFaces(); + this.logger.debug(`Deleted ${people} people and ${faces} faces`); } + + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { id: asset.id } }); + } + } + + return true; } - async handleRecognizeFaces(data: IAssetJob) { - const { asset } = data; - - if (!MACHINE_LEARNING_ENABLED || !asset.resizePath) { - return; + async handleRecognizeFaces({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); + if (!asset || !MACHINE_LEARNING_ENABLED || !asset.resizePath) { + return false; } - try { - const faces = await this.machineLearning.detectFaces({ thumbnailPath: asset.resizePath }); + const faces = await this.machineLearning.detectFaces({ thumbnailPath: asset.resizePath }); - this.logger.debug(`${faces.length} faces detected in ${asset.resizePath}`); - this.logger.verbose(faces.map((face) => ({ ...face, embedding: `float[${face.embedding.length}]` }))); + this.logger.debug(`${faces.length} faces detected in ${asset.resizePath}`); + this.logger.verbose(faces.map((face) => ({ ...face, embedding: `float[${face.embedding.length}]` }))); - for (const { embedding, ...rest } of faces) { - const faceSearchResult = await this.searchRepository.searchFaces(embedding, { ownerId: asset.ownerId }); + for (const { embedding, ...rest } of faces) { + const faceSearchResult = await this.searchRepository.searchFaces(embedding, { ownerId: asset.ownerId }); - let personId: string | null = null; + let personId: string | null = null; - // try to find a matching face and link to the associated person - // The closer to 0, the better the match. Range is from 0 to 2 - if (faceSearchResult.total && faceSearchResult.distances[0] < 0.6) { - this.logger.verbose(`Match face with distance ${faceSearchResult.distances[0]}`); - personId = faceSearchResult.items[0].personId; - } - - if (!personId) { - this.logger.debug('No matches, creating a new person.'); - const person = await this.personRepository.create({ ownerId: asset.ownerId }); - personId = person.id; - await this.jobRepository.queue({ - name: JobName.GENERATE_FACE_THUMBNAIL, - data: { assetId: asset.id, personId, ...rest }, - }); - } - - const faceId: AssetFaceId = { assetId: asset.id, personId }; - - await this.faceRepository.create({ ...faceId, embedding }); - await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_FACE, data: faceId }); - await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_ASSET, data: { ids: [asset.id] } }); + // try to find a matching face and link to the associated person + // The closer to 0, the better the match. Range is from 0 to 2 + if (faceSearchResult.total && faceSearchResult.distances[0] < 0.6) { + this.logger.verbose(`Match face with distance ${faceSearchResult.distances[0]}`); + personId = faceSearchResult.items[0].personId; } - // queue all faces for asset - } catch (error: any) { - this.logger.error(`Unable run facial recognition pipeline: ${asset.id}`, error?.stack); + if (!personId) { + this.logger.debug('No matches, creating a new person.'); + const person = await this.personRepository.create({ ownerId: asset.ownerId }); + personId = person.id; + await this.jobRepository.queue({ + name: JobName.GENERATE_FACE_THUMBNAIL, + data: { assetId: asset.id, personId, ...rest }, + }); + } + + const faceId: AssetFaceId = { assetId: asset.id, personId }; + + await this.faceRepository.create({ ...faceId, embedding }); + await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_FACE, data: faceId }); } + + return true; } async handleGenerateFaceThumbnail(data: IFaceThumbnailJob) { const { assetId, personId, boundingBox, imageWidth, imageHeight } = data; - try { - const [asset] = await this.assetRepository.getByIds([assetId]); - if (!asset || !asset.resizePath) { - this.logger.warn(`Asset not found for facial cropping: ${assetId}`); - return; - } - - this.logger.verbose(`Cropping face for person: ${personId}`); - - const outputFolder = this.storageCore.getFolderLocation(StorageFolder.THUMBNAILS, asset.ownerId); - const output = join(outputFolder, `${personId}.jpeg`); - this.storageRepository.mkdirSync(outputFolder); - - const { x1, y1, x2, y2 } = boundingBox; - - const halfWidth = (x2 - x1) / 2; - const halfHeight = (y2 - y1) / 2; - - const middleX = Math.round(x1 + halfWidth); - const middleY = Math.round(y1 + halfHeight); - - // zoom out 10% - const targetHalfSize = Math.floor(Math.max(halfWidth, halfHeight) * 1.1); - - // get the longest distance from the center of the image without overflowing - const newHalfSize = Math.min( - middleX - Math.max(0, middleX - targetHalfSize), - middleY - Math.max(0, middleY - targetHalfSize), - Math.min(imageWidth - 1, middleX + targetHalfSize) - middleX, - Math.min(imageHeight - 1, middleY + targetHalfSize) - middleY, - ); - - const cropOptions: CropOptions = { - left: middleX - newHalfSize, - top: middleY - newHalfSize, - width: newHalfSize * 2, - height: newHalfSize * 2, - }; - - const croppedOutput = await this.mediaRepository.crop(asset.resizePath, cropOptions); - await this.mediaRepository.resize(croppedOutput, output, { size: FACE_THUMBNAIL_SIZE, format: 'jpeg' }); - await this.personRepository.update({ id: personId, thumbnailPath: output }); - } catch (error: Error | any) { - this.logger.error(`Failed to crop face for asset: ${assetId}, person: ${personId} - ${error}`, error.stack); + const [asset] = await this.assetRepository.getByIds([assetId]); + if (!asset || !asset.resizePath) { + return false; } + + this.logger.verbose(`Cropping face for person: ${personId}`); + + const outputFolder = this.storageCore.getFolderLocation(StorageFolder.THUMBNAILS, asset.ownerId); + const output = join(outputFolder, `${personId}.jpeg`); + this.storageRepository.mkdirSync(outputFolder); + + const { x1, y1, x2, y2 } = boundingBox; + + const halfWidth = (x2 - x1) / 2; + const halfHeight = (y2 - y1) / 2; + + const middleX = Math.round(x1 + halfWidth); + const middleY = Math.round(y1 + halfHeight); + + // zoom out 10% + const targetHalfSize = Math.floor(Math.max(halfWidth, halfHeight) * 1.1); + + // get the longest distance from the center of the image without overflowing + const newHalfSize = Math.min( + middleX - Math.max(0, middleX - targetHalfSize), + middleY - Math.max(0, middleY - targetHalfSize), + Math.min(imageWidth - 1, middleX + targetHalfSize) - middleX, + Math.min(imageHeight - 1, middleY + targetHalfSize) - middleY, + ); + + const cropOptions: CropOptions = { + left: middleX - newHalfSize, + top: middleY - newHalfSize, + width: newHalfSize * 2, + height: newHalfSize * 2, + }; + + const croppedOutput = await this.mediaRepository.crop(asset.resizePath, cropOptions); + await this.mediaRepository.resize(croppedOutput, output, { size: FACE_THUMBNAIL_SIZE, format: 'jpeg' }); + await this.personRepository.update({ id: personId, thumbnailPath: output }); + + return true; } } diff --git a/server/libs/domain/src/job/job.constants.ts b/server/libs/domain/src/job/job.constants.ts index df7a7e7aa4..169d3cbf6f 100644 --- a/server/libs/domain/src/job/job.constants.ts +++ b/server/libs/domain/src/job/job.constants.ts @@ -19,9 +19,6 @@ export enum JobCommand { } export enum JobName { - // upload - ASSET_UPLOADED = 'asset-uploaded', - // conversion QUEUE_VIDEO_CONVERSION = 'queue-video-conversion', VIDEO_CONVERSION = 'video-conversion', @@ -33,8 +30,7 @@ export enum JobName { // metadata QUEUE_METADATA_EXTRACTION = 'queue-metadata-extraction', - EXIF_EXTRACTION = 'exif-extraction', - EXTRACT_VIDEO_METADATA = 'extract-video-metadata', + METADATA_EXTRACTION = 'metadata-extraction', // user deletion USER_DELETION = 'user-deletion', @@ -84,7 +80,6 @@ export const JOBS_ASSET_PAGINATION_SIZE = 1000; export const JOBS_TO_QUEUE: Record = { // misc - [JobName.ASSET_UPLOADED]: QueueName.BACKGROUND_TASK, [JobName.USER_DELETE_CHECK]: QueueName.BACKGROUND_TASK, [JobName.USER_DELETION]: QueueName.BACKGROUND_TASK, [JobName.DELETE_FILES]: QueueName.BACKGROUND_TASK, @@ -101,8 +96,7 @@ export const JOBS_TO_QUEUE: Record = { // metadata [JobName.QUEUE_METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION, - [JobName.EXIF_EXTRACTION]: QueueName.METADATA_EXTRACTION, - [JobName.EXTRACT_VIDEO_METADATA]: QueueName.METADATA_EXTRACTION, + [JobName.METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION, // storage template [JobName.STORAGE_TEMPLATE_MIGRATION]: QueueName.STORAGE_TEMPLATE_MIGRATION, diff --git a/server/libs/domain/src/job/job.interface.ts b/server/libs/domain/src/job/job.interface.ts index 7ce6dc1e74..3a0d2ecc82 100644 --- a/server/libs/domain/src/job/job.interface.ts +++ b/server/libs/domain/src/job/job.interface.ts @@ -1,18 +1,9 @@ -import { AlbumEntity, AssetEntity, UserEntity } from '@app/infra/entities'; import { BoundingBox } from '../smart-info'; export interface IBaseJob { force?: boolean; } -export interface IAlbumJob extends IBaseJob { - album: AlbumEntity; -} - -export interface IAssetJob extends IBaseJob { - asset: AssetEntity; -} - export interface IAssetFaceJob extends IBaseJob { assetId: string; personId: string; @@ -26,6 +17,10 @@ export interface IFaceThumbnailJob extends IAssetFaceJob { personId: string; } +export interface IEntityJob extends IBaseJob { + id: string; +} + export interface IBulkEntityJob extends IBaseJob { ids: string[]; } @@ -33,7 +28,3 @@ export interface IBulkEntityJob extends IBaseJob { export interface IDeleteFilesJob extends IBaseJob { files: Array; } - -export interface IUserDeletionJob extends IBaseJob { - user: UserEntity; -} diff --git a/server/libs/domain/src/job/job.repository.ts b/server/libs/domain/src/job/job.repository.ts index 65cd9b77d1..1433162f3e 100644 --- a/server/libs/domain/src/job/job.repository.ts +++ b/server/libs/domain/src/job/job.repository.ts @@ -1,12 +1,11 @@ import { JobName, QueueName } from './job.constants'; import { IAssetFaceJob, - IAssetJob, IBaseJob, IBulkEntityJob, IDeleteFilesJob, + IEntityJob, IFaceThumbnailJob, - IUserDeletionJob, } from './job.interface'; export interface JobCounts { @@ -24,50 +23,46 @@ export interface QueueStatus { } export type JobItem = - // Asset Upload - | { name: JobName.ASSET_UPLOADED; data: IAssetJob } - // Transcoding | { name: JobName.QUEUE_VIDEO_CONVERSION; data: IBaseJob } - | { name: JobName.VIDEO_CONVERSION; data: IAssetJob } + | { name: JobName.VIDEO_CONVERSION; data: IEntityJob } // Thumbnails | { name: JobName.QUEUE_GENERATE_THUMBNAILS; data: IBaseJob } - | { name: JobName.GENERATE_JPEG_THUMBNAIL; data: IAssetJob } - | { name: JobName.GENERATE_WEBP_THUMBNAIL; data: IAssetJob } + | { name: JobName.GENERATE_JPEG_THUMBNAIL; data: IEntityJob } + | { name: JobName.GENERATE_WEBP_THUMBNAIL; data: IEntityJob } // User Deletion | { name: JobName.USER_DELETE_CHECK } - | { name: JobName.USER_DELETION; data: IUserDeletionJob } + | { name: JobName.USER_DELETION; data: IEntityJob } // Storage Template | { name: JobName.STORAGE_TEMPLATE_MIGRATION } - | { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE; data: IAssetJob } + | { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE; data: IEntityJob } | { name: JobName.SYSTEM_CONFIG_CHANGE } // Metadata Extraction | { name: JobName.QUEUE_METADATA_EXTRACTION; data: IBaseJob } - | { name: JobName.EXIF_EXTRACTION; data: IAssetJob } - | { name: JobName.EXTRACT_VIDEO_METADATA; data: IAssetJob } + | { name: JobName.METADATA_EXTRACTION; data: IEntityJob } // Sidecar Scanning | { name: JobName.QUEUE_SIDECAR; data: IBaseJob } - | { name: JobName.SIDECAR_DISCOVERY; data: IAssetJob } - | { name: JobName.SIDECAR_SYNC; data: IAssetJob } + | { name: JobName.SIDECAR_DISCOVERY; data: IEntityJob } + | { name: JobName.SIDECAR_SYNC; data: IEntityJob } // Object Tagging | { name: JobName.QUEUE_OBJECT_TAGGING; data: IBaseJob } - | { name: JobName.DETECT_OBJECTS; data: IAssetJob } - | { name: JobName.CLASSIFY_IMAGE; data: IAssetJob } + | { name: JobName.DETECT_OBJECTS; data: IEntityJob } + | { name: JobName.CLASSIFY_IMAGE; data: IEntityJob } // Recognize Faces | { name: JobName.QUEUE_RECOGNIZE_FACES; data: IBaseJob } - | { name: JobName.RECOGNIZE_FACES; data: IAssetJob } + | { name: JobName.RECOGNIZE_FACES; data: IEntityJob } | { name: JobName.GENERATE_FACE_THUMBNAIL; data: IFaceThumbnailJob } // Clip Embedding | { name: JobName.QUEUE_ENCODE_CLIP; data: IBaseJob } - | { name: JobName.ENCODE_CLIP; data: IAssetJob } + | { name: JobName.ENCODE_CLIP; data: IEntityJob } // Filesystem | { name: JobName.DELETE_FILES; data: IDeleteFilesJob } diff --git a/server/libs/domain/src/job/job.service.spec.ts b/server/libs/domain/src/job/job.service.spec.ts index 1c329f34e7..a0b5c174b0 100644 --- a/server/libs/domain/src/job/job.service.spec.ts +++ b/server/libs/domain/src/job/job.service.spec.ts @@ -1,14 +1,20 @@ import { BadRequestException } from '@nestjs/common'; -import { newJobRepositoryMock } from '../../test'; +import { newAssetRepositoryMock, newCommunicationRepositoryMock, newJobRepositoryMock } from '../../test'; +import { IAssetRepository } from '../asset'; +import { ICommunicationRepository } from '../communication'; import { IJobRepository, JobCommand, JobName, JobService, QueueName } from '../job'; describe(JobService.name, () => { let sut: JobService; + let assetMock: jest.Mocked; + let communicationMock: jest.Mocked; let jobMock: jest.Mocked; beforeEach(async () => { + assetMock = newAssetRepositoryMock(); + communicationMock = newCommunicationRepositoryMock(); jobMock = newJobRepositoryMock(); - sut = new JobService(jobMock); + sut = new JobService(assetMock, communicationMock, jobMock); }); it('should work', () => { diff --git a/server/libs/domain/src/job/job.service.ts b/server/libs/domain/src/job/job.service.ts index 5244e2e62a..379ce64592 100644 --- a/server/libs/domain/src/job/job.service.ts +++ b/server/libs/domain/src/job/job.service.ts @@ -1,21 +1,21 @@ import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common'; +import { IAssetRepository, mapAsset } from '../asset'; +import { CommunicationEvent, ICommunicationRepository } from '../communication'; import { assertMachineLearningEnabled } from '../domain.constant'; import { JobCommandDto } from './dto'; import { JobCommand, JobName, QueueName } from './job.constants'; -import { IJobRepository } from './job.repository'; +import { IJobRepository, JobItem } from './job.repository'; import { AllJobStatusResponseDto, JobStatusDto } from './response-dto'; @Injectable() export class JobService { private logger = new Logger(JobService.name); - constructor(@Inject(IJobRepository) private jobRepository: IJobRepository) {} - - async handleNightlyJobs() { - await this.jobRepository.queue({ name: JobName.USER_DELETE_CHECK }); - await this.jobRepository.queue({ name: JobName.PERSON_CLEANUP }); - await this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } }); - } + constructor( + @Inject(IAssetRepository) private assetRepository: IAssetRepository, + @Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository, + @Inject(IJobRepository) private jobRepository: IJobRepository, + ) {} handleCommand(queueName: QueueName, dto: JobCommandDto): Promise { this.logger.debug(`Handling command: queue=${queueName},force=${dto.force}`); @@ -89,4 +89,51 @@ export class JobService { throw new BadRequestException(`Invalid job name: ${name}`); } } + + async handleNightlyJobs() { + await this.jobRepository.queue({ name: JobName.USER_DELETE_CHECK }); + await this.jobRepository.queue({ name: JobName.PERSON_CLEANUP }); + await this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } }); + } + + /** + * Queue follow up jobs + */ + async onDone(item: JobItem) { + switch (item.name) { + case JobName.SIDECAR_SYNC: + case JobName.SIDECAR_DISCOVERY: + await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: item.data.id } }); + break; + + case JobName.METADATA_EXTRACTION: + await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: item.data }); + break; + + case JobName.GENERATE_JPEG_THUMBNAIL: { + await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: item.data }); + await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: item.data }); + await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: item.data }); + await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: item.data }); + await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: item.data }); + + const [asset] = await this.assetRepository.getByIds([item.data.id]); + if (asset) { + this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset)); + } + break; + } + } + + // In addition to the above jobs, all of these should queue `SEARCH_INDEX_ASSET` + switch (item.name) { + case JobName.CLASSIFY_IMAGE: + case JobName.DETECT_OBJECTS: + case JobName.ENCODE_CLIP: + case JobName.RECOGNIZE_FACES: + case JobName.METADATA_EXTRACTION: + await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_ASSET, data: { ids: [item.data.id] } }); + break; + } + } } diff --git a/server/libs/domain/src/media/media.service.spec.ts b/server/libs/domain/src/media/media.service.spec.ts index a29187fd13..17b683d469 100644 --- a/server/libs/domain/src/media/media.service.spec.ts +++ b/server/libs/domain/src/media/media.service.spec.ts @@ -1,9 +1,7 @@ import { AssetType, SystemConfigKey } from '@app/infra/entities'; -import _ from 'lodash'; import { assetEntityStub, newAssetRepositoryMock, - newCommunicationRepositoryMock, newJobRepositoryMock, newMediaRepositoryMock, newStorageRepositoryMock, @@ -11,7 +9,6 @@ import { probeStub, } from '../../test'; import { IAssetRepository, WithoutProperty } from '../asset'; -import { ICommunicationRepository } from '../communication'; import { IJobRepository, JobName } from '../job'; import { IStorageRepository } from '../storage'; import { ISystemConfigRepository } from '../system-config'; @@ -22,7 +19,6 @@ describe(MediaService.name, () => { let sut: MediaService; let assetMock: jest.Mocked; let configMock: jest.Mocked; - let communicationMock: jest.Mocked; let jobMock: jest.Mocked; let mediaMock: jest.Mocked; let storageMock: jest.Mocked; @@ -30,12 +26,11 @@ describe(MediaService.name, () => { beforeEach(async () => { assetMock = newAssetRepositoryMock(); configMock = newSystemConfigRepositoryMock(); - communicationMock = newCommunicationRepositoryMock(); jobMock = newJobRepositoryMock(); mediaMock = newMediaRepositoryMock(); storageMock = newStorageRepositoryMock(); - sut = new MediaService(assetMock, communicationMock, jobMock, mediaMock, storageMock, configMock); + sut = new MediaService(assetMock, jobMock, mediaMock, storageMock, configMock); }); it('should be defined', () => { @@ -55,7 +50,7 @@ describe(MediaService.name, () => { expect(assetMock.getWithout).not.toHaveBeenCalled(); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.GENERATE_JPEG_THUMBNAIL, - data: { asset: assetEntityStub.image }, + data: { id: assetEntityStub.image.id }, }); }); @@ -71,23 +66,15 @@ describe(MediaService.name, () => { expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.GENERATE_JPEG_THUMBNAIL, - data: { asset: assetEntityStub.image }, + data: { id: assetEntityStub.image.id }, }); }); - - it('should log an error', async () => { - assetMock.getAll.mockRejectedValue(new Error('database unavailable')); - - await sut.handleQueueGenerateThumbnails({ force: true }); - - expect(assetMock.getAll).toHaveBeenCalled(); - }); }); describe('handleGenerateJpegThumbnail', () => { it('should generate a thumbnail for an image', async () => { assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); - await sut.handleGenerateJpegThumbnail({ asset: _.cloneDeep(assetEntityStub.image) }); + await sut.handleGenerateJpegThumbnail({ id: assetEntityStub.image.id }); expect(storageMock.mkdirSync).toHaveBeenCalledWith('upload/thumbs/user-id'); expect(mediaMock.resize).toHaveBeenCalledWith('/original/path.ext', 'upload/thumbs/user-id/asset-id.jpeg', { @@ -105,7 +92,7 @@ describe(MediaService.name, () => { assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); mediaMock.resize.mockRejectedValue(new Error('unsupported format')); - await sut.handleGenerateJpegThumbnail({ asset: _.cloneDeep(assetEntityStub.image) }); + await sut.handleGenerateJpegThumbnail({ id: assetEntityStub.image.id }); expect(storageMock.mkdirSync).toHaveBeenCalledWith('upload/thumbs/user-id'); expect(mediaMock.resize).toHaveBeenCalledWith('/original/path.ext', 'upload/thumbs/user-id/asset-id.jpeg', { @@ -124,7 +111,7 @@ describe(MediaService.name, () => { it('should generate a thumbnail for a video', async () => { assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); - await sut.handleGenerateJpegThumbnail({ asset: _.cloneDeep(assetEntityStub.video) }); + await sut.handleGenerateJpegThumbnail({ id: assetEntityStub.video.id }); expect(storageMock.mkdirSync).toHaveBeenCalledWith('upload/thumbs/user-id'); expect(mediaMock.extractVideoThumbnail).toHaveBeenCalledWith( @@ -138,37 +125,22 @@ describe(MediaService.name, () => { }); }); - it('should queue some jobs', async () => { - const asset = _.cloneDeep(assetEntityStub.image); - assetMock.getByIds.mockResolvedValue([asset]); - await sut.handleGenerateJpegThumbnail({ asset }); - - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: { asset } }); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.CLASSIFY_IMAGE, data: { asset } }); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.DETECT_OBJECTS, data: { asset } }); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { asset } }); - }); - - it('should log an error', async () => { + it('should run successfully', async () => { assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); - mediaMock.resize.mockRejectedValue(new Error('unsupported format')); - mediaMock.extractThumbnailFromExif.mockRejectedValue(new Error('unsupported format')); - - await sut.handleGenerateJpegThumbnail({ asset: assetEntityStub.image }); - - expect(assetMock.save).not.toHaveBeenCalled(); + await sut.handleGenerateJpegThumbnail({ id: assetEntityStub.image.id }); }); }); describe('handleGenerateWebpThumbnail', () => { it('should skip thumbnail generate if resize path is missing', async () => { - await sut.handleGenerateWepbThumbnail({ asset: assetEntityStub.noResizePath }); - + assetMock.getByIds.mockResolvedValue([assetEntityStub.noResizePath]); + await sut.handleGenerateWepbThumbnail({ id: assetEntityStub.noResizePath.id }); expect(mediaMock.resize).not.toHaveBeenCalled(); }); it('should generate a thumbnail', async () => { - await sut.handleGenerateWepbThumbnail({ asset: assetEntityStub.image }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); + await sut.handleGenerateWepbThumbnail({ id: assetEntityStub.image.id }); expect(mediaMock.resize).toHaveBeenCalledWith( '/uploads/user-id/thumbs/path.ext', @@ -177,14 +149,6 @@ describe(MediaService.name, () => { ); expect(assetMock.save).toHaveBeenCalledWith({ id: 'asset-id', webpPath: '/uploads/user-id/thumbs/path.ext' }); }); - - it('should log an error', async () => { - mediaMock.resize.mockRejectedValue(new Error('service unavailable')); - - await sut.handleGenerateWepbThumbnail({ asset: assetEntityStub.image }); - - expect(mediaMock.resize).toHaveBeenCalled(); - }); }); describe('handleQueueVideoConversion', () => { @@ -200,7 +164,7 @@ describe(MediaService.name, () => { expect(assetMock.getWithout).not.toHaveBeenCalled(); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.VIDEO_CONVERSION, - data: { asset: assetEntityStub.video }, + data: { id: assetEntityStub.video.id }, }); }); @@ -216,17 +180,9 @@ describe(MediaService.name, () => { expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.ENCODED_VIDEO); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.VIDEO_CONVERSION, - data: { asset: assetEntityStub.video }, + data: { id: assetEntityStub.video.id }, }); }); - - it('should log an error', async () => { - assetMock.getAll.mockRejectedValue(new Error('database unavailable')); - - await sut.handleQueueVideoConversion({ force: true }); - - expect(assetMock.getAll).toHaveBeenCalled(); - }); }); describe('handleVideoConversion', () => { @@ -234,18 +190,11 @@ describe(MediaService.name, () => { assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); }); - it('should log an error', async () => { - mediaMock.transcode.mockRejectedValue(new Error('unable to transcode')); - - await sut.handleVideoConversion({ asset: assetEntityStub.video }); - - expect(storageMock.mkdirSync).toHaveBeenCalled(); - }); - it('should transcode the longest stream', async () => { + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); mediaMock.probe.mockResolvedValue(probeStub.multipleVideoStreams); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.probe).toHaveBeenCalledWith('/original/path.ext'); expect(configMock.load).toHaveBeenCalled(); @@ -262,20 +211,23 @@ describe(MediaService.name, () => { it('should skip a video without any streams', async () => { mediaMock.probe.mockResolvedValue(probeStub.noVideoStreams); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).not.toHaveBeenCalled(); }); it('should skip a video without any height', async () => { mediaMock.probe.mockResolvedValue(probeStub.noHeight); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).not.toHaveBeenCalled(); }); it('should transcode when set to all', async () => { mediaMock.probe.mockResolvedValue(probeStub.multipleVideoStreams); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'all' }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -289,7 +241,7 @@ describe(MediaService.name, () => { it('should transcode when optimal and too big', async () => { mediaMock.probe.mockResolvedValue(probeStub.videoStream2160p); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'optimal' }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -310,7 +262,8 @@ describe(MediaService.name, () => { it('should transcode with alternate scaling video is vertical', async () => { mediaMock.probe.mockResolvedValue(probeStub.videoStreamVertical2160p); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'optimal' }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -331,7 +284,8 @@ describe(MediaService.name, () => { it('should transcode when audio doesnt match target', async () => { mediaMock.probe.mockResolvedValue(probeStub.audioStreamMp3); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'optimal' }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -352,7 +306,8 @@ describe(MediaService.name, () => { it('should transcode when container doesnt match target', async () => { mediaMock.probe.mockResolvedValue(probeStub.matroskaContainer); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'optimal' }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -373,14 +328,16 @@ describe(MediaService.name, () => { it('should not transcode an invalid transcode value', async () => { mediaMock.probe.mockResolvedValue(probeStub.videoStream2160p); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'invalid' }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).not.toHaveBeenCalled(); }); it('should set max bitrate if above 0', async () => { mediaMock.probe.mockResolvedValue(probeStub.matroskaContainer); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_MAX_BITRATE, value: '4500k' }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -405,7 +362,8 @@ describe(MediaService.name, () => { { key: SystemConfigKey.FFMPEG_MAX_BITRATE, value: '4500k' }, { key: SystemConfigKey.FFMPEG_TWO_PASS, value: true }, ]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -428,7 +386,8 @@ describe(MediaService.name, () => { it('should fallback to one pass for h264/h265 if two-pass is enabled but no max bitrate is set', async () => { mediaMock.probe.mockResolvedValue(probeStub.matroskaContainer); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TWO_PASS, value: true }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -452,7 +411,8 @@ describe(MediaService.name, () => { { key: SystemConfigKey.FFMPEG_TARGET_VIDEO_CODEC, value: 'vp9' }, { key: SystemConfigKey.FFMPEG_THREADS, value: 2 }, ]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -479,7 +439,8 @@ describe(MediaService.name, () => { { key: SystemConfigKey.FFMPEG_TARGET_VIDEO_CODEC, value: 'vp9' }, { key: SystemConfigKey.FFMPEG_THREADS, value: 2 }, ]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', @@ -503,7 +464,8 @@ describe(MediaService.name, () => { it('should disable thread pooling for x264/x265 if thread limit is above 0', async () => { mediaMock.probe.mockResolvedValue(probeStub.matroskaContainer); configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_THREADS, value: 2 }]); - await sut.handleVideoConversion({ asset: assetEntityStub.video }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); + await sut.handleVideoConversion({ id: assetEntityStub.video.id }); expect(mediaMock.transcode).toHaveBeenCalledWith( '/original/path.ext', 'upload/encoded-video/user-id/asset-id.mp4', diff --git a/server/libs/domain/src/media/media.service.ts b/server/libs/domain/src/media/media.service.ts index 9f215c2f0b..7bab14c46a 100644 --- a/server/libs/domain/src/media/media.service.ts +++ b/server/libs/domain/src/media/media.service.ts @@ -1,10 +1,9 @@ import { AssetEntity, AssetType, TranscodePreset } from '@app/infra/entities'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { join } from 'path'; -import { IAssetRepository, mapAsset, WithoutProperty } from '../asset'; -import { CommunicationEvent, ICommunicationRepository } from '../communication'; +import { IAssetRepository, WithoutProperty } from '../asset'; import { usePagination } from '../domain.util'; -import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; +import { IBaseJob, IEntityJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IStorageRepository, StorageCore, StorageFolder } from '../storage'; import { ISystemConfigRepository, SystemConfigFFmpegDto } from '../system-config'; import { SystemConfigCore } from '../system-config/system-config.core'; @@ -19,7 +18,6 @@ export class MediaService { constructor( @Inject(IAssetRepository) private assetRepository: IAssetRepository, - @Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository, @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(IMediaRepository) private mediaRepository: IMediaRepository, @Inject(IStorageRepository) private storageRepository: IStorageRepository, @@ -28,155 +26,128 @@ export class MediaService { this.configCore = new SystemConfigCore(systemConfig); } - async handleQueueGenerateThumbnails(job: IBaseJob): Promise { - try { - const { force } = job; + async handleQueueGenerateThumbnails(job: IBaseJob) { + const { force } = job; - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination) - : this.assetRepository.getWithout(pagination, WithoutProperty.THUMBNAIL); - }); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.THUMBNAIL); + }); - for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { asset } }); - } + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: asset.id } }); } - } catch (error: any) { - this.logger.error('Failed to queue generate thumbnail jobs', error.stack); } + + return true; } - async handleGenerateJpegThumbnail(data: IAssetJob): Promise { - const [asset] = await this.assetRepository.getByIds([data.asset.id]); - + async handleGenerateJpegThumbnail({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); if (!asset) { - this.logger.warn( - `Asset not found: ${data.asset.id} - Original Path: ${data.asset.originalPath} - Resize Path: ${data.asset.resizePath}`, - ); - return; + return false; } - try { - const resizePath = this.storageCore.getFolderLocation(StorageFolder.THUMBNAILS, asset.ownerId); - this.storageRepository.mkdirSync(resizePath); - const jpegThumbnailPath = join(resizePath, `${asset.id}.jpeg`); + const resizePath = this.storageCore.getFolderLocation(StorageFolder.THUMBNAILS, asset.ownerId); + this.storageRepository.mkdirSync(resizePath); + const jpegThumbnailPath = join(resizePath, `${asset.id}.jpeg`); - if (asset.type == AssetType.IMAGE) { - try { - await this.mediaRepository.resize(asset.originalPath, jpegThumbnailPath, { - size: JPEG_THUMBNAIL_SIZE, - format: 'jpeg', - }); - } catch (error) { - this.logger.warn( - `Failed to generate jpeg thumbnail using sharp, trying with exiftool-vendored (asset=${asset.id})`, - ); - await this.mediaRepository.extractThumbnailFromExif(asset.originalPath, jpegThumbnailPath); - } + if (asset.type == AssetType.IMAGE) { + try { + await this.mediaRepository.resize(asset.originalPath, jpegThumbnailPath, { + size: JPEG_THUMBNAIL_SIZE, + format: 'jpeg', + }); + } catch (error) { + this.logger.warn( + `Failed to generate jpeg thumbnail using sharp, trying with exiftool-vendored (asset=${asset.id})`, + ); + await this.mediaRepository.extractThumbnailFromExif(asset.originalPath, jpegThumbnailPath); } - - if (asset.type == AssetType.VIDEO) { - this.logger.log('Start Generating Video Thumbnail'); - await this.mediaRepository.extractVideoThumbnail(asset.originalPath, jpegThumbnailPath, JPEG_THUMBNAIL_SIZE); - this.logger.log(`Generating Video Thumbnail Success ${asset.id}`); - } - - await this.assetRepository.save({ id: asset.id, resizePath: jpegThumbnailPath }); - - asset.resizePath = jpegThumbnailPath; - - await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: { asset } }); - await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: { asset } }); - await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: { asset } }); - await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { asset } }); - await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { asset } }); - - this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset)); - } catch (error: any) { - this.logger.error(`Failed to generate thumbnail for asset: ${asset.id}/${asset.type}`, error.stack); } + + if (asset.type == AssetType.VIDEO) { + this.logger.log('Start Generating Video Thumbnail'); + await this.mediaRepository.extractVideoThumbnail(asset.originalPath, jpegThumbnailPath, JPEG_THUMBNAIL_SIZE); + this.logger.log(`Generating Video Thumbnail Success ${asset.id}`); + } + + await this.assetRepository.save({ id: asset.id, resizePath: jpegThumbnailPath }); + + return true; } - async handleGenerateWepbThumbnail(data: IAssetJob): Promise { - const { asset } = data; - - if (!asset.resizePath) { - return; + async handleGenerateWepbThumbnail({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); + if (!asset || !asset.resizePath) { + return false; } const webpPath = asset.resizePath.replace('jpeg', 'webp'); - try { - await this.mediaRepository.resize(asset.resizePath, webpPath, { size: WEBP_THUMBNAIL_SIZE, format: 'webp' }); - await this.assetRepository.save({ id: asset.id, webpPath: webpPath }); - } catch (error: any) { - this.logger.error(`Failed to generate webp thumbnail for asset: ${asset.id}`, error.stack); - } + await this.mediaRepository.resize(asset.resizePath, webpPath, { size: WEBP_THUMBNAIL_SIZE, format: 'webp' }); + await this.assetRepository.save({ id: asset.id, webpPath: webpPath }); + + return true; } async handleQueueVideoConversion(job: IBaseJob) { const { force } = job; - try { - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination, { type: AssetType.VIDEO }) - : this.assetRepository.getWithout(pagination, WithoutProperty.ENCODED_VIDEO); - }); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination, { type: AssetType.VIDEO }) + : this.assetRepository.getWithout(pagination, WithoutProperty.ENCODED_VIDEO); + }); - for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } }); - } + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.id } }); } - } catch (error: any) { - this.logger.error('Failed to queue video conversions', error.stack); } + + return true; } - async handleVideoConversion(job: IAssetJob) { - const [asset] = await this.assetRepository.getByIds([job.asset.id]); - + async handleVideoConversion({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); if (!asset) { - this.logger.warn(`Asset not found: ${job.asset.id} - Original Path: ${job.asset.originalPath}`); - return; + return false; } - try { - const input = asset.originalPath; - const outputFolder = this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, asset.ownerId); - const output = join(outputFolder, `${asset.id}.mp4`); - this.storageRepository.mkdirSync(outputFolder); + const input = asset.originalPath; + const outputFolder = this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, asset.ownerId); + const output = join(outputFolder, `${asset.id}.mp4`); + this.storageRepository.mkdirSync(outputFolder); - const { videoStreams, audioStreams, format } = await this.mediaRepository.probe(input); - const mainVideoStream = this.getMainVideoStream(videoStreams); - const mainAudioStream = this.getMainAudioStream(audioStreams); - const containerExtension = format.formatName; - if (!mainVideoStream || !mainAudioStream || !containerExtension) { - return; - } - - const { ffmpeg: config } = await this.configCore.getConfig(); - - const required = this.isTranscodeRequired(asset, mainVideoStream, mainAudioStream, containerExtension, config); - if (!required) { - return; - } - - const outputOptions = this.getFfmpegOptions(mainVideoStream, config); - const twoPass = this.eligibleForTwoPass(config); - - this.logger.log(`Start encoding video ${asset.id} ${outputOptions}`); - await this.mediaRepository.transcode(input, output, { outputOptions, twoPass }); - - this.logger.log(`Encoding success ${asset.id}`); - - await this.assetRepository.save({ id: asset.id, encodedVideoPath: output }); - } catch (error: any) { - this.logger.error(`Failed to handle video conversion for asset: ${asset.id}`, error.stack); + const { videoStreams, audioStreams, format } = await this.mediaRepository.probe(input); + const mainVideoStream = this.getMainVideoStream(videoStreams); + const mainAudioStream = this.getMainAudioStream(audioStreams); + const containerExtension = format.formatName; + if (!mainVideoStream || !mainAudioStream || !containerExtension) { + return false; } + + const { ffmpeg: config } = await this.configCore.getConfig(); + + const required = this.isTranscodeRequired(asset, mainVideoStream, mainAudioStream, containerExtension, config); + if (!required) { + return false; + } + + const outputOptions = this.getFfmpegOptions(mainVideoStream, config); + const twoPass = this.eligibleForTwoPass(config); + + this.logger.log(`Start encoding video ${asset.id} ${outputOptions}`); + await this.mediaRepository.transcode(input, output, { outputOptions, twoPass }); + + this.logger.log(`Encoding success ${asset.id}`); + + await this.assetRepository.save({ id: asset.id, encodedVideoPath: output }); + + return true; } private getMainVideoStream(streams: VideoStreamInfo[]): VideoStreamInfo | null { diff --git a/server/libs/domain/src/metadata/metadata.service.spec.ts b/server/libs/domain/src/metadata/metadata.service.spec.ts index 6c1c3af7b1..d889a9ce81 100644 --- a/server/libs/domain/src/metadata/metadata.service.spec.ts +++ b/server/libs/domain/src/metadata/metadata.service.spec.ts @@ -33,7 +33,7 @@ describe(MetadataService.name, () => { expect(assetMock.getWithout).not.toHaveBeenCalled(); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.SIDECAR_SYNC, - data: { asset: assetEntityStub.sidecar }, + data: { id: assetEntityStub.sidecar.id }, }); }); @@ -46,95 +46,59 @@ describe(MetadataService.name, () => { expect(assetMock.getWith).not.toHaveBeenCalled(); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.SIDECAR_DISCOVERY, - data: { asset: assetEntityStub.image }, + data: { id: assetEntityStub.image.id }, }); }); - - it('should log an error', async () => { - assetMock.getWith.mockRejectedValue(new Error('database unavailable')); - await sut.handleQueueSidecar({ force: true }); - expect(jobMock.queue).not.toHaveBeenCalled(); - }); }); describe('handleSidecarSync', () => { - it('should skip hidden assets', async () => { - await sut.handleSidecarSync({ asset: assetEntityStub.livePhotoMotionAsset }); - expect(jobMock.queue).not.toHaveBeenCalled(); - }); - - it('should handle video assets', async () => { - await sut.handleSidecarSync({ asset: assetEntityStub.video }); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.EXTRACT_VIDEO_METADATA, - data: { asset: assetEntityStub.video }, - }); - }); - - it('should handle image assets', async () => { - await sut.handleSidecarSync({ asset: assetEntityStub.image }); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.EXIF_EXTRACTION, - data: { asset: assetEntityStub.image }, - }); - }); - - it('should log an error', async () => { - jobMock.queue.mockRejectedValue(new Error('queue job failed')); - await sut.handleSidecarSync({ asset: assetEntityStub.image }); + it('should not error', async () => { + await sut.handleSidecarSync(); }); }); describe('handleSidecarDiscovery', () => { it('should skip hidden assets', async () => { - await sut.handleSidecarDiscovery({ asset: assetEntityStub.livePhotoMotionAsset }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.livePhotoMotionAsset]); + await sut.handleSidecarDiscovery({ id: assetEntityStub.livePhotoMotionAsset.id }); expect(storageMock.checkFileExists).not.toHaveBeenCalled(); }); it('should skip assets with a sidecar path', async () => { - await sut.handleSidecarDiscovery({ asset: assetEntityStub.sidecar }); + assetMock.getByIds.mockResolvedValue([assetEntityStub.sidecar]); + await sut.handleSidecarDiscovery({ id: assetEntityStub.sidecar.id }); expect(storageMock.checkFileExists).not.toHaveBeenCalled(); }); it('should do nothing when a sidecar is not found ', async () => { + assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); storageMock.checkFileExists.mockResolvedValue(false); - await sut.handleSidecarDiscovery({ asset: assetEntityStub.image }); + await sut.handleSidecarDiscovery({ id: assetEntityStub.image.id }); expect(assetMock.save).not.toHaveBeenCalled(); }); it('should update a image asset when a sidecar is found', async () => { + assetMock.getByIds.mockResolvedValue([assetEntityStub.image]); assetMock.save.mockResolvedValue(assetEntityStub.image); storageMock.checkFileExists.mockResolvedValue(true); - await sut.handleSidecarDiscovery({ asset: assetEntityStub.image }); + await sut.handleSidecarDiscovery({ id: assetEntityStub.image.id }); expect(storageMock.checkFileExists).toHaveBeenCalledWith('/original/path.ext.xmp', constants.W_OK); expect(assetMock.save).toHaveBeenCalledWith({ id: assetEntityStub.image.id, sidecarPath: '/original/path.ext.xmp', }); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.EXIF_EXTRACTION, - data: { asset: assetEntityStub.image }, - }); }); it('should update a video asset when a sidecar is found', async () => { + assetMock.getByIds.mockResolvedValue([assetEntityStub.video]); assetMock.save.mockResolvedValue(assetEntityStub.video); storageMock.checkFileExists.mockResolvedValue(true); - await sut.handleSidecarDiscovery({ asset: assetEntityStub.video }); + await sut.handleSidecarDiscovery({ id: assetEntityStub.video.id }); expect(storageMock.checkFileExists).toHaveBeenCalledWith('/original/path.ext.xmp', constants.W_OK); expect(assetMock.save).toHaveBeenCalledWith({ id: assetEntityStub.image.id, sidecarPath: '/original/path.ext.xmp', }); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.EXTRACT_VIDEO_METADATA, - data: { asset: assetEntityStub.video }, - }); - }); - - it('should log an error', async () => { - storageMock.checkFileExists.mockRejectedValue(new Error('bad permission')); - await sut.handleSidecarDiscovery({ asset: assetEntityStub.image }); }); }); }); diff --git a/server/libs/domain/src/metadata/metadata.service.ts b/server/libs/domain/src/metadata/metadata.service.ts index 9d4363ce51..e244ca6a1f 100644 --- a/server/libs/domain/src/metadata/metadata.service.ts +++ b/server/libs/domain/src/metadata/metadata.service.ts @@ -1,77 +1,54 @@ -import { AssetType } from '@app/infra/entities'; -import { Inject, Logger } from '@nestjs/common'; +import { Inject } from '@nestjs/common'; import { constants } from 'fs/promises'; -import { AssetCore, IAssetRepository, WithoutProperty, WithProperty } from '../asset'; +import { IAssetRepository, WithoutProperty, WithProperty } from '../asset'; import { usePagination } from '../domain.util'; -import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; +import { IBaseJob, IEntityJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IStorageRepository } from '../storage'; export class MetadataService { - private logger = new Logger(MetadataService.name); - private assetCore: AssetCore; - constructor( @Inject(IAssetRepository) private assetRepository: IAssetRepository, @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(IStorageRepository) private storageRepository: IStorageRepository, - ) { - this.assetCore = new AssetCore(assetRepository, jobRepository); - } + ) {} async handleQueueSidecar(job: IBaseJob) { - try { - const { force } = job; - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getWith(pagination, WithProperty.SIDECAR) - : this.assetRepository.getWithout(pagination, WithoutProperty.SIDECAR); - }); + const { force } = job; + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getWith(pagination, WithProperty.SIDECAR) + : this.assetRepository.getWithout(pagination, WithoutProperty.SIDECAR); + }); - for await (const assets of assetPagination) { - for (const asset of assets) { - const name = force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY; - await this.jobRepository.queue({ name, data: { asset } }); - } + for await (const assets of assetPagination) { + for (const asset of assets) { + const name = force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY; + await this.jobRepository.queue({ name, data: { id: asset.id } }); } - } catch (error: any) { - this.logger.error(`Unable to queue sidecar scanning`, error?.stack); } + + return true; } - async handleSidecarSync(job: IAssetJob) { - const { asset } = job; - if (!asset.isVisible) { - return; - } - - try { - const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; - await this.jobRepository.queue({ name, data: { asset } }); - } catch (error: any) { - this.logger.error(`Unable to queue metadata extraction`, error?.stack); - } + async handleSidecarSync() { + // TODO: optimize to only queue assets with recent xmp changes + return true; } - async handleSidecarDiscovery(job: IAssetJob) { - let { asset } = job; - if (!asset.isVisible || asset.sidecarPath) { - return; + async handleSidecarDiscovery({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); + if (!asset || !asset.isVisible || asset.sidecarPath) { + return false; } - try { - const sidecarPath = `${asset.originalPath}.xmp`; - const exists = await this.storageRepository.checkFileExists(sidecarPath, constants.W_OK); - if (!exists) { - return; - } - - asset = await this.assetCore.save({ id: asset.id, sidecarPath }); - // TODO: optimize to only queue assets with recent xmp changes - const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; - await this.jobRepository.queue({ name, data: { asset } }); - } catch (error: any) { - this.logger.error(`Unable to queue metadata extraction: ${error}`, error?.stack); - return; + const sidecarPath = `${asset.originalPath}.xmp`; + const exists = await this.storageRepository.checkFileExists(sidecarPath, constants.W_OK); + if (!exists) { + return false; } + + await this.assetRepository.save({ id: asset.id, sidecarPath }); + + return true; } } diff --git a/server/libs/domain/src/person/person.service.spec.ts b/server/libs/domain/src/person/person.service.spec.ts index d0011283cd..78ff32f1a0 100644 --- a/server/libs/domain/src/person/person.service.spec.ts +++ b/server/libs/domain/src/person/person.service.spec.ts @@ -122,14 +122,5 @@ describe(PersonService.name, () => { data: { files: ['/path/to/thumbnail'] }, }); }); - - it('should log an error', async () => { - personMock.getAllWithoutFaces.mockResolvedValue([personStub.noName]); - personMock.delete.mockRejectedValue(new Error('database unavailable')); - - await sut.handlePersonCleanup(); - - expect(jobMock.queue).not.toHaveBeenCalled(); - }); }); }); diff --git a/server/libs/domain/src/person/person.service.ts b/server/libs/domain/src/person/person.service.ts index 80618e4524..85a558355b 100644 --- a/server/libs/domain/src/person/person.service.ts +++ b/server/libs/domain/src/person/person.service.ts @@ -67,7 +67,7 @@ export class PersonService { return mapPerson(person); } - async handlePersonCleanup(): Promise { + async handlePersonCleanup() { const people = await this.repository.getAllWithoutFaces(); for (const person of people) { this.logger.debug(`Person ${person.name || person.id} no longer has any faces, deleting.`); @@ -78,5 +78,7 @@ export class PersonService { this.logger.error(`Unable to delete person: ${error}`, error?.stack); } } + + return true; } } diff --git a/server/libs/domain/src/search/search.service.spec.ts b/server/libs/domain/src/search/search.service.spec.ts index f203e2cd1d..ddb842ea48 100644 --- a/server/libs/domain/src/search/search.service.spec.ts +++ b/server/libs/domain/src/search/search.service.spec.ts @@ -204,18 +204,6 @@ describe(SearchService.name, () => { ]); }); - it('should log an error', async () => { - assetMock.getAll.mockResolvedValue({ - items: [assetEntityStub.image], - hasNextPage: false, - }); - searchMock.importAssets.mockRejectedValue(new Error('import failed')); - - await sut.handleIndexAssets(); - - expect(searchMock.importAssets).toHaveBeenCalled(); - }); - it('should skip if search is disabled', async () => { const sut = makeSut('false'); @@ -250,15 +238,6 @@ describe(SearchService.name, () => { expect(searchMock.importAlbums).toHaveBeenCalledWith([albumStub.empty], true); }); - - it('should log an error', async () => { - albumMock.getAll.mockResolvedValue([albumStub.empty]); - searchMock.importAlbums.mockRejectedValue(new Error('import failed')); - - await sut.handleIndexAlbums(); - - expect(searchMock.importAlbums).toHaveBeenCalled(); - }); }); describe('handleIndexAlbum', () => { @@ -325,15 +304,6 @@ describe(SearchService.name, () => { ]); }); - it('should log an error', async () => { - faceMock.getAll.mockResolvedValue([faceStub.face1]); - searchMock.importFaces.mockRejectedValue(new Error('import failed')); - - await sut.handleIndexFaces(); - - expect(searchMock.importFaces).toHaveBeenCalled(); - }); - it('should skip if search is disabled', async () => { const sut = makeSut('false'); diff --git a/server/libs/domain/src/search/search.service.ts b/server/libs/domain/src/search/search.service.ts index 2ac1d9a289..871012575b 100644 --- a/server/libs/domain/src/search/search.service.ts +++ b/server/libs/domain/src/search/search.service.ts @@ -137,122 +137,128 @@ export class SearchService { async handleIndexAlbums() { if (!this.enabled) { - return; + return false; } - try { - const albums = this.patchAlbums(await this.albumRepository.getAll()); - this.logger.log(`Indexing ${albums.length} albums`); - await this.searchRepository.importAlbums(albums, true); - } catch (error: any) { - this.logger.error(`Unable to index all albums`, error?.stack); - } + const albums = this.patchAlbums(await this.albumRepository.getAll()); + this.logger.log(`Indexing ${albums.length} albums`); + await this.searchRepository.importAlbums(albums, true); + + return true; } async handleIndexAssets() { if (!this.enabled) { - return; + return false; } - try { - // TODO: do this in batches based on searchIndexVersion - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => - this.assetRepository.getAll(pagination, { isVisible: true }), - ); + // TODO: do this in batches based on searchIndexVersion + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => + this.assetRepository.getAll(pagination, { isVisible: true }), + ); - for await (const assets of assetPagination) { - this.logger.debug(`Indexing ${assets.length} assets`); + for await (const assets of assetPagination) { + this.logger.debug(`Indexing ${assets.length} assets`); - const patchedAssets = this.patchAssets(assets); - await this.searchRepository.importAssets(patchedAssets, false); - } - - await this.searchRepository.importAssets([], true); - - this.logger.debug('Finished re-indexing all assets'); - } catch (error: any) { - this.logger.error(`Unable to index all assets`, error?.stack); + const patchedAssets = this.patchAssets(assets); + await this.searchRepository.importAssets(patchedAssets, false); } + + await this.searchRepository.importAssets([], true); + + this.logger.debug('Finished re-indexing all assets'); + + return false; } async handleIndexFaces() { if (!this.enabled) { - return; + return false; } - try { - // TODO: do this in batches based on searchIndexVersion - const faces = this.patchFaces(await this.faceRepository.getAll()); - this.logger.log(`Indexing ${faces.length} faces`); + // TODO: do this in batches based on searchIndexVersion + const faces = this.patchFaces(await this.faceRepository.getAll()); + this.logger.log(`Indexing ${faces.length} faces`); - const chunkSize = 1000; - for (let i = 0; i < faces.length; i += chunkSize) { - await this.searchRepository.importFaces(faces.slice(i, i + chunkSize), false); - } - - await this.searchRepository.importFaces([], true); - - this.logger.debug('Finished re-indexing all faces'); - } catch (error: any) { - this.logger.error(`Unable to index all faces`, error?.stack); + const chunkSize = 1000; + for (let i = 0; i < faces.length; i += chunkSize) { + await this.searchRepository.importFaces(faces.slice(i, i + chunkSize), false); } + + await this.searchRepository.importFaces([], true); + + this.logger.debug('Finished re-indexing all faces'); + + return true; } handleIndexAlbum({ ids }: IBulkEntityJob) { if (!this.enabled) { - return; + return false; } for (const id of ids) { this.albumQueue.upsert.add(id); } + + return true; } handleIndexAsset({ ids }: IBulkEntityJob) { if (!this.enabled) { - return; + return false; } for (const id of ids) { this.assetQueue.upsert.add(id); } + + return true; } async handleIndexFace({ assetId, personId }: IAssetFaceJob) { if (!this.enabled) { - return; + return false; } // immediately push to typesense await this.searchRepository.importFaces(await this.idsToFaces([{ assetId, personId }]), false); + + return true; } handleRemoveAlbum({ ids }: IBulkEntityJob) { if (!this.enabled) { - return; + return false; } for (const id of ids) { this.albumQueue.delete.add(id); } + + return true; } handleRemoveAsset({ ids }: IBulkEntityJob) { if (!this.enabled) { - return; + return false; } for (const id of ids) { this.assetQueue.delete.add(id); } + + return true; } handleRemoveFace({ assetId, personId }: IAssetFaceJob) { if (!this.enabled) { - return; + return false; } this.faceQueue.delete.add(this.asKey({ assetId, personId })); + + return true; } private async flush() { diff --git a/server/libs/domain/src/smart-info/smart-info.service.spec.ts b/server/libs/domain/src/smart-info/smart-info.service.spec.ts index 642d9bc105..0ba89769b7 100644 --- a/server/libs/domain/src/smart-info/smart-info.service.spec.ts +++ b/server/libs/domain/src/smart-info/smart-info.service.spec.ts @@ -30,6 +30,8 @@ describe(SmartInfoService.name, () => { jobMock = newJobRepositoryMock(); machineMock = newMachineLearningRepositoryMock(); sut = new SmartInfoService(assetMock, jobMock, smartMock, machineMock); + + assetMock.getByIds.mockResolvedValue([asset]); }); it('should work', () => { @@ -46,8 +48,8 @@ describe(SmartInfoService.name, () => { await sut.handleQueueObjectTagging({ force: false }); expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.CLASSIFY_IMAGE, data: { asset: assetEntityStub.image } }], - [{ name: JobName.DETECT_OBJECTS, data: { asset: assetEntityStub.image } }], + [{ name: JobName.CLASSIFY_IMAGE, data: { id: assetEntityStub.image.id } }], + [{ name: JobName.DETECT_OBJECTS, data: { id: assetEntityStub.image.id } }], ]); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.OBJECT_TAGS); }); @@ -61,8 +63,8 @@ describe(SmartInfoService.name, () => { await sut.handleQueueObjectTagging({ force: true }); expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.CLASSIFY_IMAGE, data: { asset: assetEntityStub.image } }], - [{ name: JobName.DETECT_OBJECTS, data: { asset: assetEntityStub.image } }], + [{ name: JobName.CLASSIFY_IMAGE, data: { id: assetEntityStub.image.id } }], + [{ name: JobName.DETECT_OBJECTS, data: { id: assetEntityStub.image.id } }], ]); expect(assetMock.getAll).toHaveBeenCalled(); }); @@ -70,7 +72,10 @@ describe(SmartInfoService.name, () => { describe('handleTagImage', () => { it('should skip assets without a resize path', async () => { - await sut.handleClassifyImage({ asset: { resizePath: '' } as AssetEntity }); + const asset = { resizePath: '' } as AssetEntity; + assetMock.getByIds.mockResolvedValue([asset]); + + await sut.handleClassifyImage({ id: asset.id }); expect(smartMock.upsert).not.toHaveBeenCalled(); expect(machineMock.classifyImage).not.toHaveBeenCalled(); @@ -79,7 +84,7 @@ describe(SmartInfoService.name, () => { it('should save the returned tags', async () => { machineMock.classifyImage.mockResolvedValue(['tag1', 'tag2', 'tag3']); - await sut.handleClassifyImage({ asset }); + await sut.handleClassifyImage({ id: asset.id }); expect(machineMock.classifyImage).toHaveBeenCalledWith({ thumbnailPath: 'path/to/resize.ext' }); expect(smartMock.upsert).toHaveBeenCalledWith({ @@ -88,18 +93,10 @@ describe(SmartInfoService.name, () => { }); }); - it('should handle an error with the machine learning pipeline', async () => { - machineMock.classifyImage.mockRejectedValue(new Error('Unable to read thumbnail')); - - await sut.handleClassifyImage({ asset }); - - expect(smartMock.upsert).not.toHaveBeenCalled(); - }); - it('should no update the smart info if no tags were returned', async () => { machineMock.classifyImage.mockResolvedValue([]); - await sut.handleClassifyImage({ asset }); + await sut.handleClassifyImage({ id: asset.id }); expect(machineMock.classifyImage).toHaveBeenCalled(); expect(smartMock.upsert).not.toHaveBeenCalled(); @@ -108,7 +105,10 @@ describe(SmartInfoService.name, () => { describe('handleDetectObjects', () => { it('should skip assets without a resize path', async () => { - await sut.handleDetectObjects({ asset: { resizePath: '' } as AssetEntity }); + const asset = { resizePath: '' } as AssetEntity; + assetMock.getByIds.mockResolvedValue([asset]); + + await sut.handleDetectObjects({ id: asset.id }); expect(smartMock.upsert).not.toHaveBeenCalled(); expect(machineMock.detectObjects).not.toHaveBeenCalled(); @@ -117,7 +117,7 @@ describe(SmartInfoService.name, () => { it('should save the returned objects', async () => { machineMock.detectObjects.mockResolvedValue(['obj1', 'obj2', 'obj3']); - await sut.handleDetectObjects({ asset }); + await sut.handleDetectObjects({ id: asset.id }); expect(machineMock.detectObjects).toHaveBeenCalledWith({ thumbnailPath: 'path/to/resize.ext' }); expect(smartMock.upsert).toHaveBeenCalledWith({ @@ -126,18 +126,10 @@ describe(SmartInfoService.name, () => { }); }); - it('should handle an error with the machine learning pipeline', async () => { - machineMock.detectObjects.mockRejectedValue(new Error('Unable to read thumbnail')); - - await sut.handleDetectObjects({ asset }); - - expect(smartMock.upsert).not.toHaveBeenCalled(); - }); - it('should no update the smart info if no objects were returned', async () => { machineMock.detectObjects.mockResolvedValue([]); - await sut.handleDetectObjects({ asset }); + await sut.handleDetectObjects({ id: asset.id }); expect(machineMock.detectObjects).toHaveBeenCalled(); expect(smartMock.upsert).not.toHaveBeenCalled(); @@ -153,7 +145,7 @@ describe(SmartInfoService.name, () => { await sut.handleQueueEncodeClip({ force: false }); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { asset: assetEntityStub.image } }); + expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { id: assetEntityStub.image.id } }); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.CLIP_ENCODING); }); @@ -165,14 +157,17 @@ describe(SmartInfoService.name, () => { await sut.handleQueueEncodeClip({ force: true }); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { asset: assetEntityStub.image } }); + expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { id: assetEntityStub.image.id } }); expect(assetMock.getAll).toHaveBeenCalled(); }); }); describe('handleEncodeClip', () => { it('should skip assets without a resize path', async () => { - await sut.handleEncodeClip({ asset: { resizePath: '' } as AssetEntity }); + const asset = { resizePath: '' } as AssetEntity; + assetMock.getByIds.mockResolvedValue([asset]); + + await sut.handleEncodeClip({ id: asset.id }); expect(smartMock.upsert).not.toHaveBeenCalled(); expect(machineMock.encodeImage).not.toHaveBeenCalled(); @@ -181,7 +176,7 @@ describe(SmartInfoService.name, () => { it('should save the returned objects', async () => { machineMock.encodeImage.mockResolvedValue([0.01, 0.02, 0.03]); - await sut.handleEncodeClip({ asset }); + await sut.handleEncodeClip({ id: asset.id }); expect(machineMock.encodeImage).toHaveBeenCalledWith({ thumbnailPath: 'path/to/resize.ext' }); expect(smartMock.upsert).toHaveBeenCalledWith({ @@ -189,13 +184,5 @@ describe(SmartInfoService.name, () => { clipEmbedding: [0.01, 0.02, 0.03], }); }); - - it('should handle an error with the machine learning pipeline', async () => { - machineMock.encodeImage.mockRejectedValue(new Error('Unable to read thumbnail')); - - await sut.handleEncodeClip({ asset }); - - expect(smartMock.upsert).not.toHaveBeenCalled(); - }); }); }); diff --git a/server/libs/domain/src/smart-info/smart-info.service.ts b/server/libs/domain/src/smart-info/smart-info.service.ts index d52830bc5f..cd84b97f97 100644 --- a/server/libs/domain/src/smart-info/smart-info.service.ts +++ b/server/libs/domain/src/smart-info/smart-info.service.ts @@ -2,7 +2,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { IAssetRepository, WithoutProperty } from '../asset'; import { MACHINE_LEARNING_ENABLED } from '../domain.constant'; import { usePagination } from '../domain.util'; -import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; +import { IBaseJob, IEntityJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IMachineLearningRepository } from './machine-learning.interface'; import { ISmartInfoRepository } from './smart-info.repository'; @@ -18,91 +18,82 @@ export class SmartInfoService { ) {} async handleQueueObjectTagging({ force }: IBaseJob) { - try { - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination) - : this.assetRepository.getWithout(pagination, WithoutProperty.OBJECT_TAGS); - }); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.OBJECT_TAGS); + }); - for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: { asset } }); - await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: { asset } }); - } + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: { id: asset.id } }); + await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: { id: asset.id } }); } - } catch (error: any) { - this.logger.error(`Unable to queue object tagging`, error?.stack); } + + return true; } - async handleDetectObjects(data: IAssetJob) { - const { asset } = data; + async handleDetectObjects({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); if (!MACHINE_LEARNING_ENABLED || !asset.resizePath) { - return; + return false; } - try { - const objects = await this.machineLearning.detectObjects({ thumbnailPath: asset.resizePath }); - if (objects.length > 0) { - await this.repository.upsert({ assetId: asset.id, objects }); - await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_ASSET, data: { ids: [asset.id] } }); - } - } catch (error: any) { - this.logger.error(`Unable run object detection pipeline: ${asset.id}`, error?.stack); + const objects = await this.machineLearning.detectObjects({ thumbnailPath: asset.resizePath }); + if (objects.length === 0) { + return false; } + + await this.repository.upsert({ assetId: asset.id, objects }); + + return true; } - async handleClassifyImage(data: IAssetJob) { - const { asset } = data; + async handleClassifyImage({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); if (!MACHINE_LEARNING_ENABLED || !asset.resizePath) { - return; + return false; } - try { - const tags = await this.machineLearning.classifyImage({ thumbnailPath: asset.resizePath }); - if (tags.length > 0) { - await this.repository.upsert({ assetId: asset.id, tags }); - await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_ASSET, data: { ids: [asset.id] } }); - } - } catch (error: any) { - this.logger.error(`Unable to run image tagging pipeline: ${asset.id}`, error?.stack); + const tags = await this.machineLearning.classifyImage({ thumbnailPath: asset.resizePath }); + if (tags.length === 0) { + return false; } + + await this.repository.upsert({ assetId: asset.id, tags }); + + return true; } async handleQueueEncodeClip({ force }: IBaseJob) { - try { - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination) - : this.assetRepository.getWithout(pagination, WithoutProperty.CLIP_ENCODING); - }); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.CLIP_ENCODING); + }); - for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { asset } }); - } + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { id: asset.id } }); } - } catch (error: any) { - this.logger.error(`Unable to queue clip encoding`, error?.stack); } + + return true; } - async handleEncodeClip(data: IAssetJob) { - const { asset } = data; + async handleEncodeClip({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); if (!MACHINE_LEARNING_ENABLED || !asset.resizePath) { - return; + return false; } - try { - const clipEmbedding = await this.machineLearning.encodeImage({ thumbnailPath: asset.resizePath }); - await this.repository.upsert({ assetId: asset.id, clipEmbedding: clipEmbedding }); - await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_ASSET, data: { ids: [asset.id] } }); - } catch (error: any) { - this.logger.error(`Unable run clip encoding pipeline: ${asset.id}`, error?.stack); - } + const clipEmbedding = await this.machineLearning.encodeImage({ thumbnailPath: asset.resizePath }); + await this.repository.upsert({ assetId: asset.id, clipEmbedding: clipEmbedding }); + + return true; } } diff --git a/server/libs/domain/src/storage-template/storage-template.service.spec.ts b/server/libs/domain/src/storage-template/storage-template.service.spec.ts index 8eea783184..f39f0ec2c7 100644 --- a/server/libs/domain/src/storage-template/storage-template.service.spec.ts +++ b/server/libs/domain/src/storage-template/storage-template.service.spec.ts @@ -195,11 +195,4 @@ describe(StorageTemplateService.name, () => { ]); }); }); - - it('should handle an error', async () => { - storageMock.removeEmptyDirs.mockRejectedValue(new Error('Read only filesystem')); - userMock.getList.mockResolvedValue([]); - - await sut.handleMigration(); - }); }); diff --git a/server/libs/domain/src/storage-template/storage-template.service.ts b/server/libs/domain/src/storage-template/storage-template.service.ts index f8e2ca7a59..ae0ba9b0e3 100644 --- a/server/libs/domain/src/storage-template/storage-template.service.ts +++ b/server/libs/domain/src/storage-template/storage-template.service.ts @@ -3,7 +3,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { IAssetRepository } from '../asset/asset.repository'; import { APP_MEDIA_LOCATION } from '../domain.constant'; import { getLivePhotoMotionFilename, usePagination } from '../domain.util'; -import { IAssetJob, JOBS_ASSET_PAGINATION_SIZE } from '../job'; +import { IEntityJob, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IStorageRepository } from '../storage/storage.repository'; import { INITIAL_SYSTEM_CONFIG, ISystemConfigRepository } from '../system-config'; import { IUserRepository } from '../user/user.repository'; @@ -29,24 +29,22 @@ export class StorageTemplateService { this.core = new StorageTemplateCore(configRepository, config, storageRepository); } - async handleMigrationSingle(data: IAssetJob) { - const { asset } = data; + async handleMigrationSingle({ id }: IEntityJob) { + const [asset] = await this.assetRepository.getByIds([id]); - try { - const user = await this.userRepository.get(asset.ownerId); - const storageLabel = user?.storageLabel || null; - const filename = asset.originalFileName || asset.id; - await this.moveAsset(asset, { storageLabel, filename }); + const user = await this.userRepository.get(asset.ownerId); + const storageLabel = user?.storageLabel || null; + const filename = asset.originalFileName || asset.id; + await this.moveAsset(asset, { storageLabel, filename }); - // move motion part of live photo - if (asset.livePhotoVideoId) { - const [livePhotoVideo] = await this.assetRepository.getByIds([asset.livePhotoVideoId]); - const motionFilename = getLivePhotoMotionFilename(filename, livePhotoVideo.originalPath); - await this.moveAsset(livePhotoVideo, { storageLabel, filename: motionFilename }); - } - } catch (error: any) { - this.logger.error('Error running single template migration', error); + // move motion part of live photo + if (asset.livePhotoVideoId) { + const [livePhotoVideo] = await this.assetRepository.getByIds([asset.livePhotoVideoId]); + const motionFilename = getLivePhotoMotionFilename(filename, livePhotoVideo.originalPath); + await this.moveAsset(livePhotoVideo, { storageLabel, filename: motionFilename }); } + + return true; } async handleMigration() { @@ -69,11 +67,11 @@ export class StorageTemplateService { this.logger.debug('Cleaning up empty directories...'); await this.storageRepository.removeEmptyDirs(APP_MEDIA_LOCATION); - } catch (error: any) { - this.logger.error('Error running template migration', error); } finally { console.timeEnd('migrating-time'); } + + return true; } // TODO: use asset core (once in domain) diff --git a/server/libs/domain/src/storage/storage.service.ts b/server/libs/domain/src/storage/storage.service.ts index 149f0b8e96..fea0c12081 100644 --- a/server/libs/domain/src/storage/storage.service.ts +++ b/server/libs/domain/src/storage/storage.service.ts @@ -11,6 +11,7 @@ export class StorageService { async handleDeleteFiles(job: IDeleteFilesJob) { const { files } = job; + // TODO: one job per file for (const file of files) { if (!file) { continue; @@ -22,5 +23,7 @@ export class StorageService { this.logger.warn('Unable to remove file from disk', error); } } + + return true; } } diff --git a/server/libs/domain/src/system-config/system-config.service.ts b/server/libs/domain/src/system-config/system-config.service.ts index 2134a6f628..4dc5cd6eba 100644 --- a/server/libs/domain/src/system-config/system-config.service.ts +++ b/server/libs/domain/src/system-config/system-config.service.ts @@ -46,6 +46,7 @@ export class SystemConfigService { async refreshConfig() { await this.core.refreshConfig(); + return true; } addValidator(validator: SystemConfigValidator) { diff --git a/server/libs/domain/src/user/user.service.spec.ts b/server/libs/domain/src/user/user.service.spec.ts index a1429b1d2f..895d86bfca 100644 --- a/server/libs/domain/src/user/user.service.spec.ts +++ b/server/libs/domain/src/user/user.service.spec.ts @@ -455,21 +455,22 @@ describe(UserService.name, () => { }); it('should queue user ready for deletion', async () => { - const user = { deletedAt: makeDeletedAt(10) }; + const user = { id: 'deleted-user', deletedAt: makeDeletedAt(10) }; userMock.getDeletedUsers.mockResolvedValue([user] as UserEntity[]); await sut.handleUserDeleteCheck(); expect(userMock.getDeletedUsers).toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.USER_DELETION, data: { user } }); + expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.USER_DELETION, data: { id: user.id } }); }); }); describe('handleUserDelete', () => { it('should skip users not ready for deletion', async () => { - const user = { deletedAt: makeDeletedAt(5) } as UserEntity; + const user = { id: 'user-1', deletedAt: makeDeletedAt(5) } as UserEntity; + userMock.get.mockResolvedValue(user); - await sut.handleUserDelete({ user }); + await sut.handleUserDelete({ id: user.id }); expect(storageMock.unlinkDir).not.toHaveBeenCalled(); expect(userMock.delete).not.toHaveBeenCalled(); @@ -477,8 +478,9 @@ describe(UserService.name, () => { it('should delete the user and associated assets', async () => { const user = { id: 'deleted-user', deletedAt: makeDeletedAt(10) } as UserEntity; + userMock.get.mockResolvedValue(user); - await sut.handleUserDelete({ user }); + await sut.handleUserDelete({ id: user.id }); const options = { force: true, recursive: true }; @@ -494,22 +496,13 @@ describe(UserService.name, () => { it('should delete the library path for a storage label', async () => { const user = { id: 'deleted-user', deletedAt: makeDeletedAt(10), storageLabel: 'admin' } as UserEntity; + userMock.get.mockResolvedValue(user); - await sut.handleUserDelete({ user }); + await sut.handleUserDelete({ id: user.id }); const options = { force: true, recursive: true }; expect(storageMock.unlinkDir).toHaveBeenCalledWith('upload/library/admin', options); }); - - it('should handle an error', async () => { - const user = { id: 'deleted-user', deletedAt: makeDeletedAt(10) } as UserEntity; - - storageMock.unlinkDir.mockRejectedValue(new Error('Read only filesystem')); - - await sut.handleUserDelete({ user }); - - expect(userMock.delete).not.toHaveBeenCalled(); - }); }); }); diff --git a/server/libs/domain/src/user/user.service.ts b/server/libs/domain/src/user/user.service.ts index 74374085a4..f8c852de6f 100644 --- a/server/libs/domain/src/user/user.service.ts +++ b/server/libs/domain/src/user/user.service.ts @@ -6,7 +6,7 @@ import { IAlbumRepository } from '../album/album.repository'; import { IAssetRepository } from '../asset/asset.repository'; import { AuthUserDto } from '../auth'; import { ICryptoRepository } from '../crypto/crypto.repository'; -import { IJobRepository, IUserDeletionJob, JobName } from '../job'; +import { IEntityJob, IJobRepository, JobName } from '../job'; import { StorageCore, StorageFolder } from '../storage'; import { IStorageRepository } from '../storage/storage.repository'; import { IUserRepository } from '../user/user.repository'; @@ -138,44 +138,47 @@ export class UserService { const users = await this.userRepository.getDeletedUsers(); for (const user of users) { if (this.isReadyForDeletion(user)) { - await this.jobRepository.queue({ name: JobName.USER_DELETION, data: { user } }); + await this.jobRepository.queue({ name: JobName.USER_DELETION, data: { id: user.id } }); } } + + return true; } - async handleUserDelete(data: IUserDeletionJob) { - const { user } = data; + async handleUserDelete({ id }: IEntityJob) { + const user = await this.userRepository.get(id, true); + if (!user) { + return false; + } // just for extra protection here if (!this.isReadyForDeletion(user)) { - this.logger.warn(`Skipped user that was not ready for deletion: id=${user.id}`); - return; + this.logger.warn(`Skipped user that was not ready for deletion: id=${id}`); + return false; } this.logger.log(`Deleting user: ${user.id}`); - try { - const folders = [ - this.storageCore.getLibraryFolder(user), - this.storageCore.getFolderLocation(StorageFolder.UPLOAD, user.id), - this.storageCore.getFolderLocation(StorageFolder.PROFILE, user.id), - this.storageCore.getFolderLocation(StorageFolder.THUMBNAILS, user.id), - this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, user.id), - ]; + const folders = [ + this.storageCore.getLibraryFolder(user), + this.storageCore.getFolderLocation(StorageFolder.UPLOAD, user.id), + this.storageCore.getFolderLocation(StorageFolder.PROFILE, user.id), + this.storageCore.getFolderLocation(StorageFolder.THUMBNAILS, user.id), + this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, user.id), + ]; - for (const folder of folders) { - this.logger.warn(`Removing user from filesystem: ${folder}`); - await this.storageRepository.unlinkDir(folder, { recursive: true, force: true }); - } - - this.logger.warn(`Removing user from database: ${user.id}`); - - await this.albumRepository.deleteAll(user.id); - await this.assetRepository.deleteAll(user.id); - await this.userRepository.delete(user, true); - } catch (error: any) { - this.logger.error(`Failed to remove user`, error, { id: user.id }); + for (const folder of folders) { + this.logger.warn(`Removing user from filesystem: ${folder}`); + await this.storageRepository.unlinkDir(folder, { recursive: true, force: true }); } + + this.logger.warn(`Removing user from database: ${user.id}`); + + await this.albumRepository.deleteAll(user.id); + await this.assetRepository.deleteAll(user.id); + await this.userRepository.delete(user, true); + + return true; } private isReadyForDeletion(user: UserEntity): boolean { diff --git a/server/libs/domain/test/storage.repository.mock.ts b/server/libs/domain/test/storage.repository.mock.ts index d58681aecc..758babd75c 100644 --- a/server/libs/domain/test/storage.repository.mock.ts +++ b/server/libs/domain/test/storage.repository.mock.ts @@ -4,7 +4,7 @@ export const newStorageRepositoryMock = (): jest.Mocked => { return { createReadStream: jest.fn(), unlink: jest.fn(), - unlinkDir: jest.fn(), + unlinkDir: jest.fn().mockResolvedValue(true), removeEmptyDirs: jest.fn(), moveFile: jest.fn(), checkFileExists: jest.fn(), diff --git a/server/libs/infra/src/repositories/job.repository.ts b/server/libs/infra/src/repositories/job.repository.ts index 2962222822..69a39444e0 100644 --- a/server/libs/infra/src/repositories/job.repository.ts +++ b/server/libs/infra/src/repositories/job.repository.ts @@ -45,9 +45,6 @@ export class JobRepository implements IJobRepository { private getJobOptions(item: JobItem): JobOptions | null { switch (item.name) { - case JobName.ASSET_UPLOADED: - return { jobId: item.data.asset.id }; - case JobName.GENERATE_FACE_THUMBNAIL: return { priority: 1 };