From ff5230062490a000d85dced1559a6434bb96327b Mon Sep 17 00:00:00 2001 From: Mert <101130780+mertalev@users.noreply.github.com> Date: Thu, 16 May 2024 19:39:33 -0400 Subject: [PATCH] refactor(server): duplicate controller and service (#9542) * duplicate controller and service * change endpoint name * fix search tests * remove unused import * add to index --- server/src/controllers/asset.controller.ts | 5 - .../src/controllers/duplicate.controller.ts | 18 ++ server/src/services/asset.service.ts | 5 - server/src/services/duplicate.service.spec.ts | 269 ++++++++++++++++++ server/src/services/duplicate.service.ts | 133 +++++++++ server/src/services/index.ts | 2 + server/src/services/microservices.service.ts | 8 +- server/src/services/search.service.spec.ts | 242 +--------------- server/src/services/search.service.ts | 109 +------ 9 files changed, 430 insertions(+), 361 deletions(-) create mode 100644 server/src/controllers/duplicate.controller.ts create mode 100644 server/src/services/duplicate.service.spec.ts create mode 100644 server/src/services/duplicate.service.ts diff --git a/server/src/controllers/asset.controller.ts b/server/src/controllers/asset.controller.ts index 7e51f17b59..f2d076e17b 100644 --- a/server/src/controllers/asset.controller.ts +++ b/server/src/controllers/asset.controller.ts @@ -57,11 +57,6 @@ export class AssetController { return this.service.getStatistics(auth, dto); } - @Get('duplicates') - getAssetDuplicates(@Auth() auth: AuthDto): Promise { - return this.service.getDuplicates(auth); - } - @Post('jobs') @HttpCode(HttpStatus.NO_CONTENT) @Authenticated() diff --git a/server/src/controllers/duplicate.controller.ts b/server/src/controllers/duplicate.controller.ts new file mode 100644 index 0000000000..ecabc0ee74 --- /dev/null +++ b/server/src/controllers/duplicate.controller.ts @@ -0,0 +1,18 @@ +import { Controller, Get } from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { AssetResponseDto } from 'src/dtos/asset-response.dto'; +import { AuthDto } from 'src/dtos/auth.dto'; +import { Auth, Authenticated } from 'src/middleware/auth.guard'; +import { DuplicateService } from 'src/services/duplicate.service'; + +@ApiTags('Duplicate') +@Controller('duplicates') +export class DuplicateController { + constructor(private service: DuplicateService) {} + + @Get() + @Authenticated() + getAssetDuplicates(@Auth() auth: AuthDto): Promise { + return this.service.getDuplicates(auth); + } +} diff --git a/server/src/services/asset.service.ts b/server/src/services/asset.service.ts index a0cbf40278..d266b1ed2f 100644 --- a/server/src/services/asset.service.ts +++ b/server/src/services/asset.service.ts @@ -286,11 +286,6 @@ export class AssetService { return data; } - async getDuplicates(auth: AuthDto): Promise { - const res = await this.assetRepository.getDuplicates({ userIds: [auth.user.id] }); - return res.map((a) => mapAsset(a, { auth })); - } - async update(auth: AuthDto, id: string, dto: UpdateAssetDto): Promise { await this.access.requirePermission(auth, Permission.ASSET_UPDATE, id); diff --git a/server/src/services/duplicate.service.spec.ts b/server/src/services/duplicate.service.spec.ts new file mode 100644 index 0000000000..4560d9024c --- /dev/null +++ b/server/src/services/duplicate.service.spec.ts @@ -0,0 +1,269 @@ +import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface'; +import { ICryptoRepository } from 'src/interfaces/crypto.interface'; +import { IJobRepository, JobName, JobStatus } from 'src/interfaces/job.interface'; +import { ILoggerRepository } from 'src/interfaces/logger.interface'; +import { ISearchRepository } from 'src/interfaces/search.interface'; +import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; +import { DuplicateService } from 'src/services/duplicate.service'; +import { SearchService } from 'src/services/search.service'; +import { assetStub } from 'test/fixtures/asset.stub'; +import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock'; +import { newCryptoRepositoryMock } from 'test/repositories/crypto.repository.mock'; +import { newJobRepositoryMock } from 'test/repositories/job.repository.mock'; +import { newLoggerRepositoryMock } from 'test/repositories/logger.repository.mock'; +import { newSearchRepositoryMock } from 'test/repositories/search.repository.mock'; +import { newSystemMetadataRepositoryMock } from 'test/repositories/system-metadata.repository.mock'; +import { Mocked, beforeEach, vitest } from 'vitest'; + +vitest.useFakeTimers(); + +describe(SearchService.name, () => { + let sut: DuplicateService; + let assetMock: Mocked; + let systemMock: Mocked; + let searchMock: Mocked; + let loggerMock: Mocked; + let cryptoMock: Mocked; + let jobMock: Mocked; + + beforeEach(() => { + assetMock = newAssetRepositoryMock(); + systemMock = newSystemMetadataRepositoryMock(); + searchMock = newSearchRepositoryMock(); + loggerMock = newLoggerRepositoryMock(); + cryptoMock = newCryptoRepositoryMock(); + jobMock = newJobRepositoryMock(); + + sut = new DuplicateService(systemMock, searchMock, assetMock, loggerMock, cryptoMock, jobMock); + }); + + it('should work', () => { + expect(sut).toBeDefined(); + }); + + describe('handleQueueSearchDuplicates', () => { + beforeEach(() => { + systemMock.get.mockResolvedValue({ + machineLearning: { + enabled: true, + duplicateDetection: { + enabled: true, + }, + }, + }); + }); + + it('should skip if machine learning is disabled', async () => { + systemMock.get.mockResolvedValue({ + machineLearning: { + enabled: false, + duplicateDetection: { + enabled: true, + }, + }, + }); + + await expect(sut.handleQueueSearchDuplicates({})).resolves.toBe(JobStatus.SKIPPED); + expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); + expect(systemMock.get).toHaveBeenCalled(); + }); + + it('should skip if duplicate detection is disabled', async () => { + systemMock.get.mockResolvedValue({ + machineLearning: { + enabled: true, + duplicateDetection: { + enabled: false, + }, + }, + }); + + await expect(sut.handleQueueSearchDuplicates({})).resolves.toBe(JobStatus.SKIPPED); + expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); + expect(systemMock.get).toHaveBeenCalled(); + }); + + it('should queue missing assets', async () => { + assetMock.getWithout.mockResolvedValue({ + items: [assetStub.image], + hasNextPage: false, + }); + + await sut.handleQueueSearchDuplicates({}); + + expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.DUPLICATE); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.DUPLICATE_DETECTION, + data: { id: assetStub.image.id }, + }, + ]); + }); + + it('should queue all assets', async () => { + assetMock.getAll.mockResolvedValue({ + items: [assetStub.image], + hasNextPage: false, + }); + + await sut.handleQueueSearchDuplicates({ force: true }); + + expect(assetMock.getAll).toHaveBeenCalled(); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.DUPLICATE_DETECTION, + data: { id: assetStub.image.id }, + }, + ]); + }); + }); + + describe('handleSearchDuplicates', () => { + beforeEach(() => { + systemMock.get.mockResolvedValue({ + machineLearning: { + enabled: true, + duplicateDetection: { + enabled: true, + }, + }, + }); + }); + + it('should skip if machine learning is disabled', async () => { + systemMock.get.mockResolvedValue({ + machineLearning: { + enabled: false, + duplicateDetection: { + enabled: true, + }, + }, + }); + const id = assetStub.livePhotoMotionAsset.id; + assetMock.getById.mockResolvedValue(assetStub.livePhotoMotionAsset); + + const result = await sut.handleSearchDuplicates({ id }); + + expect(result).toBe(JobStatus.SKIPPED); + }); + + it('should skip if duplicate detection is disabled', async () => { + systemMock.get.mockResolvedValue({ + machineLearning: { + enabled: true, + duplicateDetection: { + enabled: false, + }, + }, + }); + const id = assetStub.livePhotoMotionAsset.id; + assetMock.getById.mockResolvedValue(assetStub.livePhotoMotionAsset); + + const result = await sut.handleSearchDuplicates({ id }); + + expect(result).toBe(JobStatus.SKIPPED); + }); + + it('should fail if asset is not found', async () => { + const result = await sut.handleSearchDuplicates({ id: assetStub.image.id }); + + expect(result).toBe(JobStatus.FAILED); + expect(loggerMock.error).toHaveBeenCalledWith(`Asset ${assetStub.image.id} not found`); + }); + + it('should skip if asset is not visible', async () => { + const id = assetStub.livePhotoMotionAsset.id; + assetMock.getById.mockResolvedValue(assetStub.livePhotoMotionAsset); + + const result = await sut.handleSearchDuplicates({ id }); + + expect(result).toBe(JobStatus.SKIPPED); + expect(loggerMock.debug).toHaveBeenCalledWith(`Asset ${id} is not visible, skipping`); + }); + + it('should fail if asset is missing preview image', async () => { + assetMock.getById.mockResolvedValue(assetStub.noResizePath); + + const result = await sut.handleSearchDuplicates({ id: assetStub.noResizePath.id }); + + expect(result).toBe(JobStatus.FAILED); + expect(loggerMock.warn).toHaveBeenCalledWith(`Asset ${assetStub.noResizePath.id} is missing preview image`); + }); + + it('should fail if asset is missing embedding', async () => { + assetMock.getById.mockResolvedValue(assetStub.image); + + const result = await sut.handleSearchDuplicates({ id: assetStub.image.id }); + + expect(result).toBe(JobStatus.FAILED); + expect(loggerMock.debug).toHaveBeenCalledWith(`Asset ${assetStub.image.id} is missing embedding`); + }); + + it('should search for duplicates and update asset with duplicateId', async () => { + assetMock.getById.mockResolvedValue(assetStub.hasEmbedding); + searchMock.searchDuplicates.mockResolvedValue([ + { assetId: assetStub.image.id, distance: 0.01, duplicateId: null }, + ]); + const expectedAssetIds = [assetStub.image.id, assetStub.hasEmbedding.id]; + + const result = await sut.handleSearchDuplicates({ id: assetStub.hasEmbedding.id }); + + expect(result).toBe(JobStatus.SUCCESS); + expect(searchMock.searchDuplicates).toHaveBeenCalledWith({ + assetId: assetStub.hasEmbedding.id, + embedding: assetStub.hasEmbedding.smartSearch!.embedding, + maxDistance: 0.03, + userIds: [assetStub.hasEmbedding.ownerId], + }); + expect(assetMock.updateDuplicates).toHaveBeenCalledWith({ + assetIds: expectedAssetIds, + targetDuplicateId: expect.any(String), + duplicateIds: [], + }); + expect(assetMock.upsertJobStatus).toHaveBeenCalledWith( + ...expectedAssetIds.map((assetId) => ({ assetId, duplicatesDetectedAt: expect.any(Date) })), + ); + }); + + it('should use existing duplicate ID among matched duplicates', async () => { + const duplicateId = assetStub.hasDupe.duplicateId; + assetMock.getById.mockResolvedValue(assetStub.hasEmbedding); + searchMock.searchDuplicates.mockResolvedValue([{ assetId: assetStub.hasDupe.id, distance: 0.01, duplicateId }]); + const expectedAssetIds = [assetStub.hasEmbedding.id]; + + const result = await sut.handleSearchDuplicates({ id: assetStub.hasEmbedding.id }); + + expect(result).toBe(JobStatus.SUCCESS); + expect(searchMock.searchDuplicates).toHaveBeenCalledWith({ + assetId: assetStub.hasEmbedding.id, + embedding: assetStub.hasEmbedding.smartSearch!.embedding, + maxDistance: 0.03, + userIds: [assetStub.hasEmbedding.ownerId], + }); + expect(assetMock.updateDuplicates).toHaveBeenCalledWith({ + assetIds: expectedAssetIds, + targetDuplicateId: assetStub.hasDupe.duplicateId, + duplicateIds: [], + }); + expect(assetMock.upsertJobStatus).toHaveBeenCalledWith( + ...expectedAssetIds.map((assetId) => ({ assetId, duplicatesDetectedAt: expect.any(Date) })), + ); + }); + + it('should remove duplicateId if no duplicates found and asset has duplicateId', async () => { + assetMock.getById.mockResolvedValue(assetStub.hasDupe); + searchMock.searchDuplicates.mockResolvedValue([]); + + const result = await sut.handleSearchDuplicates({ id: assetStub.hasDupe.id }); + + expect(result).toBe(JobStatus.SUCCESS); + expect(assetMock.update).toHaveBeenCalledWith({ id: assetStub.hasDupe.id, duplicateId: null }); + expect(assetMock.upsertJobStatus).toHaveBeenCalledWith({ + assetId: assetStub.hasDupe.id, + duplicatesDetectedAt: expect.any(Date), + }); + }); + }); +}); diff --git a/server/src/services/duplicate.service.ts b/server/src/services/duplicate.service.ts new file mode 100644 index 0000000000..a01e1b866a --- /dev/null +++ b/server/src/services/duplicate.service.ts @@ -0,0 +1,133 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { SystemConfigCore } from 'src/cores/system-config.core'; +import { AssetResponseDto, mapAsset } from 'src/dtos/asset-response.dto'; +import { AuthDto } from 'src/dtos/auth.dto'; +import { AssetEntity } from 'src/entities/asset.entity'; +import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface'; +import { ICryptoRepository } from 'src/interfaces/crypto.interface'; +import { + IBaseJob, + IEntityJob, + IJobRepository, + JOBS_ASSET_PAGINATION_SIZE, + JobName, + JobStatus, +} from 'src/interfaces/job.interface'; +import { ILoggerRepository } from 'src/interfaces/logger.interface'; +import { AssetDuplicateResult, ISearchRepository } from 'src/interfaces/search.interface'; +import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; +import { isDuplicateDetectionEnabled } from 'src/utils/misc'; +import { usePagination } from 'src/utils/pagination'; + +@Injectable() +export class DuplicateService { + private configCore: SystemConfigCore; + + constructor( + @Inject(ISystemMetadataRepository) systemMetadataRepository: ISystemMetadataRepository, + @Inject(ISearchRepository) private searchRepository: ISearchRepository, + @Inject(IAssetRepository) private assetRepository: IAssetRepository, + @Inject(ILoggerRepository) private logger: ILoggerRepository, + @Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository, + @Inject(IJobRepository) private jobRepository: IJobRepository, + ) { + this.logger.setContext(DuplicateService.name); + this.configCore = SystemConfigCore.create(systemMetadataRepository, logger); + } + + async getDuplicates(auth: AuthDto): Promise { + const res = await this.assetRepository.getDuplicates({ userIds: [auth.user.id] }); + return res.map((a) => mapAsset(a, { auth })); + } + + async handleQueueSearchDuplicates({ force }: IBaseJob): Promise { + const { machineLearning } = await this.configCore.getConfig(); + if (!isDuplicateDetectionEnabled(machineLearning)) { + return JobStatus.SKIPPED; + } + + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination, { isVisible: true }) + : this.assetRepository.getWithout(pagination, WithoutProperty.DUPLICATE); + }); + + for await (const assets of assetPagination) { + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } })), + ); + } + + return JobStatus.SUCCESS; + } + + async handleSearchDuplicates({ id }: IEntityJob): Promise { + const { machineLearning } = await this.configCore.getConfig(); + if (!isDuplicateDetectionEnabled(machineLearning)) { + return JobStatus.SKIPPED; + } + + const asset = await this.assetRepository.getById(id, { smartSearch: true }); + if (!asset) { + this.logger.error(`Asset ${id} not found`); + return JobStatus.FAILED; + } + + if (!asset.isVisible) { + this.logger.debug(`Asset ${id} is not visible, skipping`); + return JobStatus.SKIPPED; + } + + if (!asset.previewPath) { + this.logger.warn(`Asset ${id} is missing preview image`); + return JobStatus.FAILED; + } + + if (!asset.smartSearch?.embedding) { + this.logger.debug(`Asset ${id} is missing embedding`); + return JobStatus.FAILED; + } + + const duplicateAssets = await this.searchRepository.searchDuplicates({ + assetId: asset.id, + embedding: asset.smartSearch.embedding, + maxDistance: machineLearning.duplicateDetection.maxDistance, + userIds: [asset.ownerId], + }); + + let assetIds = [asset.id]; + if (duplicateAssets.length > 0) { + this.logger.debug( + `Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`, + ); + assetIds = await this.updateDuplicates(asset, duplicateAssets); + } else if (asset.duplicateId) { + this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`); + await this.assetRepository.update({ id: asset.id, duplicateId: null }); + } + + const duplicatesDetectedAt = new Date(); + await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt }))); + + return JobStatus.SUCCESS; + } + + private async updateDuplicates(asset: AssetEntity, duplicateAssets: AssetDuplicateResult[]): Promise { + const duplicateIds = [ + ...new Set( + duplicateAssets + .filter((asset): asset is AssetDuplicateResult & { duplicateId: string } => !!asset.duplicateId) + .map((duplicate) => duplicate.duplicateId), + ), + ]; + + const targetDuplicateId = asset.duplicateId ?? duplicateIds.shift() ?? this.cryptoRepository.randomUUID(); + const assetIdsToUpdate = duplicateAssets + .filter((asset) => asset.duplicateId !== targetDuplicateId) + .map((duplicate) => duplicate.assetId); + assetIdsToUpdate.push(asset.id); + + await this.assetRepository.updateDuplicates({ targetDuplicateId, assetIds: assetIdsToUpdate, duplicateIds }); + return assetIdsToUpdate; + } +} diff --git a/server/src/services/index.ts b/server/src/services/index.ts index 95f048ba3c..f130da2349 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -8,6 +8,7 @@ import { AuditService } from 'src/services/audit.service'; import { AuthService } from 'src/services/auth.service'; import { DatabaseService } from 'src/services/database.service'; import { DownloadService } from 'src/services/download.service'; +import { DuplicateService } from 'src/services/duplicate.service'; import { JobService } from 'src/services/job.service'; import { LibraryService } from 'src/services/library.service'; import { MediaService } from 'src/services/media.service'; @@ -44,6 +45,7 @@ export const services = [ AuthService, DatabaseService, DownloadService, + DuplicateService, JobService, LibraryService, MediaService, diff --git a/server/src/services/microservices.service.ts b/server/src/services/microservices.service.ts index 24acf6b978..2c8302fb1d 100644 --- a/server/src/services/microservices.service.ts +++ b/server/src/services/microservices.service.ts @@ -3,13 +3,13 @@ import { IDeleteFilesJob, JobName } from 'src/interfaces/job.interface'; import { AssetService } from 'src/services/asset.service'; import { AuditService } from 'src/services/audit.service'; import { DatabaseService } from 'src/services/database.service'; +import { DuplicateService } from 'src/services/duplicate.service'; import { JobService } from 'src/services/job.service'; import { LibraryService } from 'src/services/library.service'; import { MediaService } from 'src/services/media.service'; import { MetadataService } from 'src/services/metadata.service'; import { NotificationService } from 'src/services/notification.service'; import { PersonService } from 'src/services/person.service'; -import { SearchService } from 'src/services/search.service'; import { SessionService } from 'src/services/session.service'; import { SmartInfoService } from 'src/services/smart-info.service'; import { StorageTemplateService } from 'src/services/storage-template.service'; @@ -36,7 +36,7 @@ export class MicroservicesService { private storageTemplateService: StorageTemplateService, private storageService: StorageService, private userService: UserService, - private searchService: SearchService, + private duplicateService: DuplicateService, ) {} async init() { @@ -55,8 +55,8 @@ export class MicroservicesService { [JobName.USER_SYNC_USAGE]: () => this.userService.handleUserSyncUsage(), [JobName.QUEUE_SMART_SEARCH]: (data) => this.smartInfoService.handleQueueEncodeClip(data), [JobName.SMART_SEARCH]: (data) => this.smartInfoService.handleEncodeClip(data), - [JobName.QUEUE_DUPLICATE_DETECTION]: (data) => this.searchService.handleQueueSearchDuplicates(data), - [JobName.DUPLICATE_DETECTION]: (data) => this.searchService.handleSearchDuplicates(data), + [JobName.QUEUE_DUPLICATE_DETECTION]: (data) => this.duplicateService.handleQueueSearchDuplicates(data), + [JobName.DUPLICATE_DETECTION]: (data) => this.duplicateService.handleSearchDuplicates(data), [JobName.STORAGE_TEMPLATE_MIGRATION]: () => this.storageTemplateService.handleMigration(), [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: (data) => this.storageTemplateService.handleMigrationSingle(data), [JobName.QUEUE_MIGRATION]: () => this.mediaService.handleQueueMigration(), diff --git a/server/src/services/search.service.spec.ts b/server/src/services/search.service.spec.ts index dac6af2cf8..afc98b69de 100644 --- a/server/src/services/search.service.spec.ts +++ b/server/src/services/search.service.spec.ts @@ -1,7 +1,5 @@ import { mapAsset } from 'src/dtos/asset-response.dto'; -import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface'; -import { ICryptoRepository } from 'src/interfaces/crypto.interface'; -import { IJobRepository, JobName, JobStatus } from 'src/interfaces/job.interface'; +import { IAssetRepository } from 'src/interfaces/asset.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface'; import { IMetadataRepository } from 'src/interfaces/metadata.interface'; @@ -14,8 +12,6 @@ import { assetStub } from 'test/fixtures/asset.stub'; import { authStub } from 'test/fixtures/auth.stub'; import { personStub } from 'test/fixtures/person.stub'; import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock'; -import { newCryptoRepositoryMock } from 'test/repositories/crypto.repository.mock'; -import { newJobRepositoryMock } from 'test/repositories/job.repository.mock'; import { newLoggerRepositoryMock } from 'test/repositories/logger.repository.mock'; import { newMachineLearningRepositoryMock } from 'test/repositories/machine-learning.repository.mock'; import { newMetadataRepositoryMock } from 'test/repositories/metadata.repository.mock'; @@ -37,8 +33,6 @@ describe(SearchService.name, () => { let partnerMock: Mocked; let metadataMock: Mocked; let loggerMock: Mocked; - let cryptoMock: Mocked; - let jobMock: Mocked; beforeEach(() => { assetMock = newAssetRepositoryMock(); @@ -49,8 +43,6 @@ describe(SearchService.name, () => { partnerMock = newPartnerRepositoryMock(); metadataMock = newMetadataRepositoryMock(); loggerMock = newLoggerRepositoryMock(); - cryptoMock = newCryptoRepositoryMock(); - jobMock = newJobRepositoryMock(); sut = new SearchService( systemMock, @@ -61,8 +53,6 @@ describe(SearchService.name, () => { partnerMock, metadataMock, loggerMock, - cryptoMock, - jobMock, ); }); @@ -105,234 +95,4 @@ describe(SearchService.name, () => { expect(result).toEqual(expectedResponse); }); }); - - describe('handleQueueSearchDuplicates', () => { - beforeEach(() => { - systemMock.get.mockResolvedValue({ - machineLearning: { - enabled: true, - duplicateDetection: { - enabled: true, - }, - }, - }); - }); - - it('should skip if machine learning is disabled', async () => { - systemMock.get.mockResolvedValue({ - machineLearning: { - enabled: false, - duplicateDetection: { - enabled: true, - }, - }, - }); - - await expect(sut.handleQueueSearchDuplicates({})).resolves.toBe(JobStatus.SKIPPED); - expect(jobMock.queue).not.toHaveBeenCalled(); - expect(jobMock.queueAll).not.toHaveBeenCalled(); - expect(systemMock.get).toHaveBeenCalled(); - }); - - it('should skip if duplicate detection is disabled', async () => { - systemMock.get.mockResolvedValue({ - machineLearning: { - enabled: true, - duplicateDetection: { - enabled: false, - }, - }, - }); - - await expect(sut.handleQueueSearchDuplicates({})).resolves.toBe(JobStatus.SKIPPED); - expect(jobMock.queue).not.toHaveBeenCalled(); - expect(jobMock.queueAll).not.toHaveBeenCalled(); - expect(systemMock.get).toHaveBeenCalled(); - }); - - it('should queue missing assets', async () => { - assetMock.getWithout.mockResolvedValue({ - items: [assetStub.image], - hasNextPage: false, - }); - - await sut.handleQueueSearchDuplicates({}); - - expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.DUPLICATE); - expect(jobMock.queueAll).toHaveBeenCalledWith([ - { - name: JobName.DUPLICATE_DETECTION, - data: { id: assetStub.image.id }, - }, - ]); - }); - - it('should queue all assets', async () => { - assetMock.getAll.mockResolvedValue({ - items: [assetStub.image], - hasNextPage: false, - }); - personMock.getAll.mockResolvedValue({ - items: [personStub.withName], - hasNextPage: false, - }); - - await sut.handleQueueSearchDuplicates({ force: true }); - - expect(assetMock.getAll).toHaveBeenCalled(); - expect(jobMock.queueAll).toHaveBeenCalledWith([ - { - name: JobName.DUPLICATE_DETECTION, - data: { id: assetStub.image.id }, - }, - ]); - }); - }); - - describe('handleSearchDuplicates', () => { - beforeEach(() => { - systemMock.get.mockResolvedValue({ - machineLearning: { - enabled: true, - duplicateDetection: { - enabled: true, - }, - }, - }); - }); - - it('should skip if machine learning is disabled', async () => { - systemMock.get.mockResolvedValue({ - machineLearning: { - enabled: false, - duplicateDetection: { - enabled: true, - }, - }, - }); - const id = assetStub.livePhotoMotionAsset.id; - assetMock.getById.mockResolvedValue(assetStub.livePhotoMotionAsset); - - const result = await sut.handleSearchDuplicates({ id }); - - expect(result).toBe(JobStatus.SKIPPED); - }); - - it('should skip if duplicate detection is disabled', async () => { - systemMock.get.mockResolvedValue({ - machineLearning: { - enabled: true, - duplicateDetection: { - enabled: false, - }, - }, - }); - const id = assetStub.livePhotoMotionAsset.id; - assetMock.getById.mockResolvedValue(assetStub.livePhotoMotionAsset); - - const result = await sut.handleSearchDuplicates({ id }); - - expect(result).toBe(JobStatus.SKIPPED); - }); - - it('should fail if asset is not found', async () => { - const result = await sut.handleSearchDuplicates({ id: assetStub.image.id }); - - expect(result).toBe(JobStatus.FAILED); - expect(loggerMock.error).toHaveBeenCalledWith(`Asset ${assetStub.image.id} not found`); - }); - - it('should skip if asset is not visible', async () => { - const id = assetStub.livePhotoMotionAsset.id; - assetMock.getById.mockResolvedValue(assetStub.livePhotoMotionAsset); - - const result = await sut.handleSearchDuplicates({ id }); - - expect(result).toBe(JobStatus.SKIPPED); - expect(loggerMock.debug).toHaveBeenCalledWith(`Asset ${id} is not visible, skipping`); - }); - - it('should fail if asset is missing preview image', async () => { - assetMock.getById.mockResolvedValue(assetStub.noResizePath); - - const result = await sut.handleSearchDuplicates({ id: assetStub.noResizePath.id }); - - expect(result).toBe(JobStatus.FAILED); - expect(loggerMock.warn).toHaveBeenCalledWith(`Asset ${assetStub.noResizePath.id} is missing preview image`); - }); - - it('should fail if asset is missing embedding', async () => { - assetMock.getById.mockResolvedValue(assetStub.image); - - const result = await sut.handleSearchDuplicates({ id: assetStub.image.id }); - - expect(result).toBe(JobStatus.FAILED); - expect(loggerMock.debug).toHaveBeenCalledWith(`Asset ${assetStub.image.id} is missing embedding`); - }); - - it('should search for duplicates and update asset with duplicateId', async () => { - assetMock.getById.mockResolvedValue(assetStub.hasEmbedding); - searchMock.searchDuplicates.mockResolvedValue([ - { assetId: assetStub.image.id, distance: 0.01, duplicateId: null }, - ]); - const expectedAssetIds = [assetStub.image.id, assetStub.hasEmbedding.id]; - - const result = await sut.handleSearchDuplicates({ id: assetStub.hasEmbedding.id }); - - expect(result).toBe(JobStatus.SUCCESS); - expect(searchMock.searchDuplicates).toHaveBeenCalledWith({ - assetId: assetStub.hasEmbedding.id, - embedding: assetStub.hasEmbedding.smartSearch!.embedding, - maxDistance: 0.03, - userIds: [assetStub.hasEmbedding.ownerId], - }); - expect(assetMock.updateDuplicates).toHaveBeenCalledWith({ - assetIds: expectedAssetIds, - targetDuplicateId: expect.any(String), - duplicateIds: [], - }); - expect(assetMock.upsertJobStatus).toHaveBeenCalledWith( - ...expectedAssetIds.map((assetId) => ({ assetId, duplicatesDetectedAt: expect.any(Date) })), - ); - }); - - it('should use existing duplicate ID among matched duplicates', async () => { - const duplicateId = assetStub.hasDupe.duplicateId; - assetMock.getById.mockResolvedValue(assetStub.hasEmbedding); - searchMock.searchDuplicates.mockResolvedValue([{ assetId: assetStub.hasDupe.id, distance: 0.01, duplicateId }]); - const expectedAssetIds = [assetStub.hasEmbedding.id]; - - const result = await sut.handleSearchDuplicates({ id: assetStub.hasEmbedding.id }); - - expect(result).toBe(JobStatus.SUCCESS); - expect(searchMock.searchDuplicates).toHaveBeenCalledWith({ - assetId: assetStub.hasEmbedding.id, - embedding: assetStub.hasEmbedding.smartSearch!.embedding, - maxDistance: 0.03, - userIds: [assetStub.hasEmbedding.ownerId], - }); - expect(assetMock.updateDuplicates).toHaveBeenCalledWith({ - assetIds: expectedAssetIds, - targetDuplicateId: assetStub.hasDupe.duplicateId, - duplicateIds: [], - }); - expect(assetMock.upsertJobStatus).toHaveBeenCalledWith( - ...expectedAssetIds.map((assetId) => ({ assetId, duplicatesDetectedAt: expect.any(Date) })), - ); - }); - - it('should remove duplicateId if no duplicates found and asset has duplicateId', async () => { - assetMock.getById.mockResolvedValue(assetStub.hasDupe); - searchMock.searchDuplicates.mockResolvedValue([]); - - const result = await sut.handleSearchDuplicates({ id: assetStub.hasDupe.id }); - - expect(result).toBe(JobStatus.SUCCESS); - expect(assetMock.update).toHaveBeenCalledWith({ id: assetStub.hasDupe.id, duplicateId: null }); - expect(assetMock.upsertJobStatus).toHaveBeenCalledWith({ - assetId: assetStub.hasDupe.id, - duplicatesDetectedAt: expect.any(Date), - }); - }); - }); }); diff --git a/server/src/services/search.service.ts b/server/src/services/search.service.ts index 28f9b9713e..10a2ccda2a 100644 --- a/server/src/services/search.service.ts +++ b/server/src/services/search.service.ts @@ -16,25 +16,15 @@ import { } from 'src/dtos/search.dto'; import { AssetOrder } from 'src/entities/album.entity'; import { AssetEntity } from 'src/entities/asset.entity'; -import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface'; -import { ICryptoRepository } from 'src/interfaces/crypto.interface'; -import { - IBaseJob, - IEntityJob, - IJobRepository, - JOBS_ASSET_PAGINATION_SIZE, - JobName, - JobStatus, -} from 'src/interfaces/job.interface'; +import { IAssetRepository } from 'src/interfaces/asset.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface'; import { IMetadataRepository } from 'src/interfaces/metadata.interface'; import { IPartnerRepository } from 'src/interfaces/partner.interface'; import { IPersonRepository } from 'src/interfaces/person.interface'; -import { AssetDuplicateResult, ISearchRepository, SearchExploreItem } from 'src/interfaces/search.interface'; +import { ISearchRepository, SearchExploreItem } from 'src/interfaces/search.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; -import { isDuplicateDetectionEnabled, isSmartSearchEnabled } from 'src/utils/misc'; -import { usePagination } from 'src/utils/pagination'; +import { isSmartSearchEnabled } from 'src/utils/misc'; @Injectable() export class SearchService { @@ -49,8 +39,6 @@ export class SearchService { @Inject(IPartnerRepository) private partnerRepository: IPartnerRepository, @Inject(IMetadataRepository) private metadataRepository: IMetadataRepository, @Inject(ILoggerRepository) private logger: ILoggerRepository, - @Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository, - @Inject(IJobRepository) private jobRepository: IJobRepository, ) { this.logger.setContext(SearchService.name); this.configCore = SystemConfigCore.create(systemMetadataRepository, logger); @@ -159,97 +147,6 @@ export class SearchService { } } - async handleQueueSearchDuplicates({ force }: IBaseJob): Promise { - const { machineLearning } = await this.configCore.getConfig(); - if (!isDuplicateDetectionEnabled(machineLearning)) { - return JobStatus.SKIPPED; - } - - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination, { isVisible: true }) - : this.assetRepository.getWithout(pagination, WithoutProperty.DUPLICATE); - }); - - for await (const assets of assetPagination) { - await this.jobRepository.queueAll( - assets.map((asset) => ({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } })), - ); - } - - return JobStatus.SUCCESS; - } - - async handleSearchDuplicates({ id }: IEntityJob): Promise { - const { machineLearning } = await this.configCore.getConfig(); - if (!isDuplicateDetectionEnabled(machineLearning)) { - return JobStatus.SKIPPED; - } - - const asset = await this.assetRepository.getById(id, { smartSearch: true }); - if (!asset) { - this.logger.error(`Asset ${id} not found`); - return JobStatus.FAILED; - } - - if (!asset.isVisible) { - this.logger.debug(`Asset ${id} is not visible, skipping`); - return JobStatus.SKIPPED; - } - - if (!asset.previewPath) { - this.logger.warn(`Asset ${id} is missing preview image`); - return JobStatus.FAILED; - } - - if (!asset.smartSearch?.embedding) { - this.logger.debug(`Asset ${id} is missing embedding`); - return JobStatus.FAILED; - } - - const duplicateAssets = await this.searchRepository.searchDuplicates({ - assetId: asset.id, - embedding: asset.smartSearch.embedding, - maxDistance: machineLearning.duplicateDetection.maxDistance, - userIds: [asset.ownerId], - }); - - let assetIds = [asset.id]; - if (duplicateAssets.length > 0) { - this.logger.debug( - `Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`, - ); - assetIds = await this.updateDuplicates(asset, duplicateAssets); - } else if (asset.duplicateId) { - this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`); - await this.assetRepository.update({ id: asset.id, duplicateId: null }); - } - - const duplicatesDetectedAt = new Date(); - await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt }))); - - return JobStatus.SUCCESS; - } - - private async updateDuplicates(asset: AssetEntity, duplicateAssets: AssetDuplicateResult[]): Promise { - const duplicateIds = [ - ...new Set( - duplicateAssets - .filter((asset): asset is AssetDuplicateResult & { duplicateId: string } => !!asset.duplicateId) - .map((duplicate) => duplicate.duplicateId), - ), - ]; - - const targetDuplicateId = asset.duplicateId ?? duplicateIds.shift() ?? this.cryptoRepository.randomUUID(); - const assetIdsToUpdate = duplicateAssets - .filter((asset) => asset.duplicateId !== targetDuplicateId) - .map((duplicate) => duplicate.assetId); - assetIdsToUpdate.push(asset.id); - - await this.assetRepository.updateDuplicates({ targetDuplicateId, assetIds: assetIdsToUpdate, duplicateIds }); - return assetIdsToUpdate; - } - private async getUserIdsToSearch(auth: AuthDto): Promise { const userIds: string[] = [auth.user.id]; const partners = await this.partnerRepository.getAll(auth.user.id);