diff --git a/server/apps/microservices/src/processors/metadata-extraction.processor.ts b/server/apps/microservices/src/processors/metadata-extraction.processor.ts index 7152927f11..98aef7beab 100644 --- a/server/apps/microservices/src/processors/metadata-extraction.processor.ts +++ b/server/apps/microservices/src/processors/metadata-extraction.processor.ts @@ -6,7 +6,9 @@ import { IGeocodingRepository, IJobRepository, JobName, + JOBS_ASSET_PAGINATION_SIZE, QueueName, + usePagination, WithoutProperty, } from '@app/domain'; import { AssetEntity, AssetType, ExifEntity } from '@app/infra/entities'; @@ -74,13 +76,17 @@ export class MetadataExtractionProcessor { async handleQueueMetadataExtraction(job: Job) { try { const { force } = job.data; - const assets = force - ? await this.assetRepository.getAll() - : await this.assetRepository.getWithout(WithoutProperty.EXIF); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.EXIF); + }); - for (const asset of assets) { - const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; - await this.jobRepository.queue({ name, data: { asset } }); + for await (const assets of assetPagination) { + for (const asset of assets) { + const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; + await this.jobRepository.queue({ name, data: { asset } }); + } } } catch (error: any) { this.logger.error(`Unable to queue metadata extraction`, error?.stack); diff --git a/server/libs/domain/src/asset/asset.core.ts b/server/libs/domain/src/asset/asset.core.ts index 13eff52e87..910371eb2e 100644 --- a/server/libs/domain/src/asset/asset.core.ts +++ b/server/libs/domain/src/asset/asset.core.ts @@ -1,14 +1,10 @@ import { AssetEntity } from '@app/infra/entities'; import { IJobRepository, JobName } from '../job'; -import { AssetSearchOptions, IAssetRepository, LivePhotoSearchOptions } from './asset.repository'; +import { IAssetRepository, LivePhotoSearchOptions } from './asset.repository'; export class AssetCore { constructor(private assetRepository: IAssetRepository, private jobRepository: IJobRepository) {} - getAll(options: AssetSearchOptions) { - return this.assetRepository.getAll(options); - } - async save(asset: Partial) { const _asset = await this.assetRepository.save(asset); await this.jobRepository.queue({ diff --git a/server/libs/domain/src/asset/asset.repository.ts b/server/libs/domain/src/asset/asset.repository.ts index 20edc50b7c..bc3f845635 100644 --- a/server/libs/domain/src/asset/asset.repository.ts +++ b/server/libs/domain/src/asset/asset.repository.ts @@ -1,4 +1,5 @@ import { AssetEntity, AssetType } from '@app/infra/entities'; +import { Paginated, PaginationOptions } from '../domain.util'; export interface AssetSearchOptions { isVisible?: boolean; @@ -35,10 +36,10 @@ export const IAssetRepository = 'IAssetRepository'; export interface IAssetRepository { getByIds(ids: string[]): Promise; - getWithout(property: WithoutProperty): Promise; + getWithout(pagination: PaginationOptions, property: WithoutProperty): Paginated; getFirstAssetForAlbumId(albumId: string): Promise; deleteAll(ownerId: string): Promise; - getAll(options?: AssetSearchOptions): Promise; + getAll(pagination: PaginationOptions, options?: AssetSearchOptions): Paginated; save(asset: Partial): Promise; findLivePhotoMatch(options: LivePhotoSearchOptions): Promise; getMapMarkers(ownerId: string, options?: MapMarkerSearchOptions): Promise; diff --git a/server/libs/domain/src/domain.util.ts b/server/libs/domain/src/domain.util.ts index c38dc7bb19..96fac353e9 100644 --- a/server/libs/domain/src/domain.util.ts +++ b/server/libs/domain/src/domain.util.ts @@ -32,3 +32,28 @@ export function asHumanReadable(bytes: number, precision = 1): string { return `${remainder.toFixed(magnitude == 0 ? 0 : precision)} ${units[magnitude]}`; } + +export interface PaginationOptions { + take: number; + skip?: number; +} + +export interface PaginationResult { + items: T[]; + hasNextPage: boolean; +} + +export type Paginated = Promise>; + +export async function* usePagination( + pageSize: number, + getNextPage: (pagination: PaginationOptions) => Paginated, +) { + let hasNextPage = true; + + for (let skip = 0; hasNextPage; skip += pageSize) { + const result = await getNextPage({ take: pageSize, skip }); + hasNextPage = result.hasNextPage; + yield result.items; + } +} diff --git a/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts b/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts index 0c1f62d811..fdfef4468e 100644 --- a/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts +++ b/server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts @@ -132,10 +132,13 @@ describe(FacialRecognitionService.name, () => { describe('handleQueueRecognizeFaces', () => { it('should queue missing assets', async () => { - assetMock.getWithout.mockResolvedValue([assetEntityStub.image]); + assetMock.getWithout.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleQueueRecognizeFaces({}); - expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.FACES); + expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.FACES); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.RECOGNIZE_FACES, data: { asset: assetEntityStub.image }, @@ -143,7 +146,10 @@ describe(FacialRecognitionService.name, () => { }); it('should queue all assets', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); personMock.deleteAll.mockResolvedValue(5); searchMock.deleteAllFaces.mockResolvedValue(100); diff --git a/server/libs/domain/src/facial-recognition/facial-recognition.services.ts b/server/libs/domain/src/facial-recognition/facial-recognition.services.ts index 39d6b0ffbf..8f5d603087 100644 --- a/server/libs/domain/src/facial-recognition/facial-recognition.services.ts +++ b/server/libs/domain/src/facial-recognition/facial-recognition.services.ts @@ -2,7 +2,8 @@ import { Inject, Logger } from '@nestjs/common'; import { join } from 'path'; import { IAssetRepository, WithoutProperty } from '../asset'; import { MACHINE_LEARNING_ENABLED } from '../domain.constant'; -import { IAssetJob, IBaseJob, IFaceThumbnailJob, IJobRepository, JobName } from '../job'; +import { usePagination } from '../domain.util'; +import { IAssetJob, IBaseJob, IFaceThumbnailJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { CropOptions, FACE_THUMBNAIL_SIZE, IMediaRepository } from '../media'; import { IPersonRepository } from '../person/person.repository'; import { ISearchRepository } from '../search/search.repository'; @@ -27,17 +28,21 @@ export class FacialRecognitionService { async handleQueueRecognizeFaces({ force }: IBaseJob) { try { - const assets = force - ? await this.assetRepository.getAll() - : await this.assetRepository.getWithout(WithoutProperty.FACES); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.FACES); + }); if (force) { const people = await this.personRepository.deleteAll(); const faces = await this.searchRepository.deleteAllFaces(); this.logger.debug(`Deleted ${people} people and ${faces} faces`); } - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { asset } }); + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { asset } }); + } } } catch (error: any) { this.logger.error(`Unable to queue recognize faces`, error?.stack); diff --git a/server/libs/domain/src/job/job.constants.ts b/server/libs/domain/src/job/job.constants.ts index 1370a4bbe1..acccb6cc09 100644 --- a/server/libs/domain/src/job/job.constants.ts +++ b/server/libs/domain/src/job/job.constants.ts @@ -73,3 +73,5 @@ export enum JobName { QUEUE_ENCODE_CLIP = 'queue-clip-encode', ENCODE_CLIP = 'clip-encode', } + +export const JOBS_ASSET_PAGINATION_SIZE = 1000; diff --git a/server/libs/domain/src/media/media.service.spec.ts b/server/libs/domain/src/media/media.service.spec.ts index 8ff0b3ce69..71b579c95a 100644 --- a/server/libs/domain/src/media/media.service.spec.ts +++ b/server/libs/domain/src/media/media.service.spec.ts @@ -44,7 +44,10 @@ describe(MediaService.name, () => { describe('handleQueueGenerateThumbnails', () => { it('should queue all assets', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleQueueGenerateThumbnails({ force: true }); @@ -57,12 +60,15 @@ describe(MediaService.name, () => { }); it('should queue all assets with missing thumbnails', async () => { - assetMock.getWithout.mockResolvedValue([assetEntityStub.image]); + assetMock.getWithout.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleQueueGenerateThumbnails({ force: false }); expect(assetMock.getAll).not.toHaveBeenCalled(); - expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.THUMBNAIL); + expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { asset: assetEntityStub.image }, @@ -183,11 +189,14 @@ describe(MediaService.name, () => { describe('handleQueueVideoConversion', () => { it('should queue all video assets', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.video]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.video], + hasNextPage: false, + }); await sut.handleQueueVideoConversion({ force: true }); - expect(assetMock.getAll).toHaveBeenCalledWith({ type: AssetType.VIDEO }); + expect(assetMock.getAll).toHaveBeenCalledWith({ skip: 0, take: 1000 }, { type: AssetType.VIDEO }); expect(assetMock.getWithout).not.toHaveBeenCalled(); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.VIDEO_CONVERSION, @@ -196,12 +205,15 @@ describe(MediaService.name, () => { }); it('should queue all video assets without encoded videos', async () => { - assetMock.getWithout.mockResolvedValue([assetEntityStub.video]); + assetMock.getWithout.mockResolvedValue({ + items: [assetEntityStub.video], + hasNextPage: false, + }); await sut.handleQueueVideoConversion({}); expect(assetMock.getAll).not.toHaveBeenCalled(); - expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.ENCODED_VIDEO); + expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.ENCODED_VIDEO); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.VIDEO_CONVERSION, data: { asset: assetEntityStub.video }, diff --git a/server/libs/domain/src/media/media.service.ts b/server/libs/domain/src/media/media.service.ts index dbf7fc6b0a..f6dba4007a 100644 --- a/server/libs/domain/src/media/media.service.ts +++ b/server/libs/domain/src/media/media.service.ts @@ -3,7 +3,8 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { join } from 'path'; import { IAssetRepository, mapAsset, WithoutProperty } from '../asset'; import { CommunicationEvent, ICommunicationRepository } from '../communication'; -import { IAssetJob, IBaseJob, IJobRepository, JobName } from '../job'; +import { usePagination } from '../domain.util'; +import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IStorageRepository, StorageCore, StorageFolder } from '../storage'; import { ISystemConfigRepository, SystemConfigFFmpegDto } from '../system-config'; import { SystemConfigCore } from '../system-config/system-config.core'; @@ -31,12 +32,16 @@ export class MediaService { try { const { force } = job; - const assets = force - ? await this.assetRepository.getAll() - : await this.assetRepository.getWithout(WithoutProperty.THUMBNAIL); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.THUMBNAIL); + }); - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { asset } }); + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { asset } }); + } } } catch (error: any) { this.logger.error('Failed to queue generate thumbnail jobs', error.stack); @@ -115,11 +120,16 @@ export class MediaService { const { force } = job; try { - const assets = force - ? await this.assetRepository.getAll({ type: AssetType.VIDEO }) - : await this.assetRepository.getWithout(WithoutProperty.ENCODED_VIDEO); - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } }); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination, { type: AssetType.VIDEO }) + : this.assetRepository.getWithout(pagination, WithoutProperty.ENCODED_VIDEO); + }); + + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } }); + } } } catch (error: any) { this.logger.error('Failed to queue video conversions', error.stack); diff --git a/server/libs/domain/src/search/search.service.spec.ts b/server/libs/domain/src/search/search.service.spec.ts index 306f33777d..f203e2cd1d 100644 --- a/server/libs/domain/src/search/search.service.spec.ts +++ b/server/libs/domain/src/search/search.service.spec.ts @@ -185,15 +185,16 @@ describe(SearchService.name, () => { describe('handleIndexAssets', () => { it('should call done, even when there are no assets', async () => { - assetMock.getAll.mockResolvedValue([]); - await sut.handleIndexAssets(); expect(searchMock.importAssets).toHaveBeenCalledWith([], true); }); it('should index all the assets', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleIndexAssets(); @@ -204,7 +205,10 @@ describe(SearchService.name, () => { }); it('should log an error', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); searchMock.importAssets.mockRejectedValue(new Error('import failed')); await sut.handleIndexAssets(); diff --git a/server/libs/domain/src/search/search.service.ts b/server/libs/domain/src/search/search.service.ts index be1a9c4674..2ac1d9a289 100644 --- a/server/libs/domain/src/search/search.service.ts +++ b/server/libs/domain/src/search/search.service.ts @@ -7,8 +7,9 @@ import { mapAsset } from '../asset'; import { IAssetRepository } from '../asset/asset.repository'; import { AuthUserDto } from '../auth'; import { MACHINE_LEARNING_ENABLED } from '../domain.constant'; +import { usePagination } from '../domain.util'; import { AssetFaceId, IFaceRepository } from '../facial-recognition'; -import { IAssetFaceJob, IBulkEntityJob, IJobRepository, JobName } from '../job'; +import { IAssetFaceJob, IBulkEntityJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IMachineLearningRepository } from '../smart-info'; import { SearchDto } from './dto'; import { SearchConfigResponseDto, SearchResponseDto } from './response-dto'; @@ -155,12 +156,15 @@ export class SearchService { try { // TODO: do this in batches based on searchIndexVersion - const assets = this.patchAssets(await this.assetRepository.getAll({ isVisible: true })); - this.logger.log(`Indexing ${assets.length} assets`); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => + this.assetRepository.getAll(pagination, { isVisible: true }), + ); - const chunkSize = 1000; - for (let i = 0; i < assets.length; i += chunkSize) { - await this.searchRepository.importAssets(assets.slice(i, i + chunkSize), false); + for await (const assets of assetPagination) { + this.logger.debug(`Indexing ${assets.length} assets`); + + const patchedAssets = this.patchAssets(assets); + await this.searchRepository.importAssets(patchedAssets, false); } await this.searchRepository.importAssets([], true); diff --git a/server/libs/domain/src/smart-info/smart-info.service.spec.ts b/server/libs/domain/src/smart-info/smart-info.service.spec.ts index 3f22bc0449..642d9bc105 100644 --- a/server/libs/domain/src/smart-info/smart-info.service.spec.ts +++ b/server/libs/domain/src/smart-info/smart-info.service.spec.ts @@ -38,7 +38,10 @@ describe(SmartInfoService.name, () => { describe('handleQueueObjectTagging', () => { it('should queue the assets without tags', async () => { - assetMock.getWithout.mockResolvedValue([assetEntityStub.image]); + assetMock.getWithout.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleQueueObjectTagging({ force: false }); @@ -46,11 +49,14 @@ describe(SmartInfoService.name, () => { [{ name: JobName.CLASSIFY_IMAGE, data: { asset: assetEntityStub.image } }], [{ name: JobName.DETECT_OBJECTS, data: { asset: assetEntityStub.image } }], ]); - expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.OBJECT_TAGS); + expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.OBJECT_TAGS); }); it('should queue all the assets', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleQueueObjectTagging({ force: true }); @@ -140,16 +146,22 @@ describe(SmartInfoService.name, () => { describe('handleQueueEncodeClip', () => { it('should queue the assets without clip embeddings', async () => { - assetMock.getWithout.mockResolvedValue([assetEntityStub.image]); + assetMock.getWithout.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleQueueEncodeClip({ force: false }); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { asset: assetEntityStub.image } }); - expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.CLIP_ENCODING); + expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.CLIP_ENCODING); }); it('should queue all the assets', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); await sut.handleQueueEncodeClip({ force: true }); diff --git a/server/libs/domain/src/smart-info/smart-info.service.ts b/server/libs/domain/src/smart-info/smart-info.service.ts index dddfc26609..d52830bc5f 100644 --- a/server/libs/domain/src/smart-info/smart-info.service.ts +++ b/server/libs/domain/src/smart-info/smart-info.service.ts @@ -1,7 +1,8 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { IAssetRepository, WithoutProperty } from '../asset'; import { MACHINE_LEARNING_ENABLED } from '../domain.constant'; -import { IAssetJob, IBaseJob, IJobRepository, JobName } from '../job'; +import { usePagination } from '../domain.util'; +import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IMachineLearningRepository } from './machine-learning.interface'; import { ISmartInfoRepository } from './smart-info.repository'; @@ -18,13 +19,17 @@ export class SmartInfoService { async handleQueueObjectTagging({ force }: IBaseJob) { try { - const assets = force - ? await this.assetRepository.getAll() - : await this.assetRepository.getWithout(WithoutProperty.OBJECT_TAGS); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.OBJECT_TAGS); + }); - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: { asset } }); - await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: { asset } }); + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: { asset } }); + await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: { asset } }); + } } } catch (error: any) { this.logger.error(`Unable to queue object tagging`, error?.stack); @@ -69,12 +74,16 @@ export class SmartInfoService { async handleQueueEncodeClip({ force }: IBaseJob) { try { - const assets = force - ? await this.assetRepository.getAll() - : await this.assetRepository.getWithout(WithoutProperty.CLIP_ENCODING); + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.CLIP_ENCODING); + }); - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { asset } }); + for await (const assets of assetPagination) { + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { asset } }); + } } } catch (error: any) { this.logger.error(`Unable to queue clip encoding`, error?.stack); diff --git a/server/libs/domain/src/storage-template/storage-template.service.spec.ts b/server/libs/domain/src/storage-template/storage-template.service.spec.ts index 06f000568e..6200f3f97d 100644 --- a/server/libs/domain/src/storage-template/storage-template.service.spec.ts +++ b/server/libs/domain/src/storage-template/storage-template.service.spec.ts @@ -36,7 +36,10 @@ describe(StorageTemplateService.name, () => { describe('handle template migration', () => { it('should handle no assets', async () => { - assetMock.getAll.mockResolvedValue([]); + assetMock.getAll.mockResolvedValue({ + items: [], + hasNextPage: false, + }); userMock.getList.mockResolvedValue([]); await sut.handleTemplateMigration(); @@ -45,7 +48,10 @@ describe(StorageTemplateService.name, () => { }); it('should handle an asset with a duplicate destination', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); assetMock.save.mockResolvedValue(assetEntityStub.image); userMock.getList.mockResolvedValue([userEntityStub.user1]); @@ -69,12 +75,15 @@ describe(StorageTemplateService.name, () => { }); it('should skip when an asset already matches the template', async () => { - assetMock.getAll.mockResolvedValue([ - { - ...assetEntityStub.image, - originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id.ext', - }, - ]); + assetMock.getAll.mockResolvedValue({ + items: [ + { + ...assetEntityStub.image, + originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id.ext', + }, + ], + hasNextPage: false, + }); userMock.getList.mockResolvedValue([userEntityStub.user1]); await sut.handleTemplateMigration(); @@ -86,12 +95,15 @@ describe(StorageTemplateService.name, () => { }); it('should skip when an asset is probably a duplicate', async () => { - assetMock.getAll.mockResolvedValue([ - { - ...assetEntityStub.image, - originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id+1.ext', - }, - ]); + assetMock.getAll.mockResolvedValue({ + items: [ + { + ...assetEntityStub.image, + originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id+1.ext', + }, + ], + hasNextPage: false, + }); userMock.getList.mockResolvedValue([userEntityStub.user1]); await sut.handleTemplateMigration(); @@ -103,7 +115,10 @@ describe(StorageTemplateService.name, () => { }); it('should move an asset', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); assetMock.save.mockResolvedValue(assetEntityStub.image); userMock.getList.mockResolvedValue([userEntityStub.user1]); @@ -121,7 +136,10 @@ describe(StorageTemplateService.name, () => { }); it('should use the user storage label', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); assetMock.save.mockResolvedValue(assetEntityStub.image); userMock.getList.mockResolvedValue([userEntityStub.storageLabel]); @@ -139,7 +157,10 @@ describe(StorageTemplateService.name, () => { }); it('should not update the database if the move fails', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); storageMock.moveFile.mockRejectedValue(new Error('Read only system')); userMock.getList.mockResolvedValue([userEntityStub.user1]); @@ -154,7 +175,10 @@ describe(StorageTemplateService.name, () => { }); it('should move the asset back if the database fails', async () => { - assetMock.getAll.mockResolvedValue([assetEntityStub.image]); + assetMock.getAll.mockResolvedValue({ + items: [assetEntityStub.image], + hasNextPage: false, + }); assetMock.save.mockRejectedValue('Connection Error!'); userMock.getList.mockResolvedValue([userEntityStub.user1]); @@ -173,7 +197,6 @@ describe(StorageTemplateService.name, () => { }); it('should handle an error', async () => { - assetMock.getAll.mockResolvedValue([]); storageMock.removeEmptyDirs.mockRejectedValue(new Error('Read only filesystem')); userMock.getList.mockResolvedValue([]); diff --git a/server/libs/domain/src/storage-template/storage-template.service.ts b/server/libs/domain/src/storage-template/storage-template.service.ts index 91ffbb886c..986f879e51 100644 --- a/server/libs/domain/src/storage-template/storage-template.service.ts +++ b/server/libs/domain/src/storage-template/storage-template.service.ts @@ -2,8 +2,8 @@ import { AssetEntity, SystemConfig } from '@app/infra/entities'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { IAssetRepository } from '../asset/asset.repository'; import { APP_MEDIA_LOCATION } from '../domain.constant'; -import { getLivePhotoMotionFilename } from '../domain.util'; -import { IAssetJob } from '../job'; +import { getLivePhotoMotionFilename, usePagination } from '../domain.util'; +import { IAssetJob, JOBS_ASSET_PAGINATION_SIZE } from '../job'; import { IStorageRepository } from '../storage/storage.repository'; import { INITIAL_SYSTEM_CONFIG, ISystemConfigRepository } from '../system-config'; import { IUserRepository } from '../user/user.repository'; @@ -52,26 +52,21 @@ export class StorageTemplateService { async handleTemplateMigration() { try { console.time('migrating-time'); - const assets = await this.assetRepository.getAll(); + + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => + this.assetRepository.getAll(pagination), + ); const users = await this.userRepository.getList(); - const livePhotoMap: Record = {}; - - for (const asset of assets) { - if (asset.livePhotoVideoId) { - livePhotoMap[asset.livePhotoVideoId] = asset; + for await (const assets of assetPagination) { + for (const asset of assets) { + const user = users.find((user) => user.id === asset.ownerId); + const storageLabel = user?.storageLabel || null; + const filename = asset.originalFileName || asset.id; + await this.moveAsset(asset, { storageLabel, filename }); } } - for (const asset of assets) { - const livePhotoParentAsset = livePhotoMap[asset.id]; - // TODO: remove livePhoto specific stuff once upload is fixed - const user = users.find((user) => user.id === asset.ownerId); - const storageLabel = user?.storageLabel || null; - const filename = asset.originalFileName || livePhotoParentAsset?.originalFileName || asset.id; - await this.moveAsset(asset, { storageLabel, filename }); - } - this.logger.debug('Cleaning up empty directories...'); await this.storageRepository.removeEmptyDirs(APP_MEDIA_LOCATION); } catch (error: any) { diff --git a/server/libs/domain/test/asset.repository.mock.ts b/server/libs/domain/test/asset.repository.mock.ts index 44fd7bf7b2..62965245df 100644 --- a/server/libs/domain/test/asset.repository.mock.ts +++ b/server/libs/domain/test/asset.repository.mock.ts @@ -5,7 +5,10 @@ export const newAssetRepositoryMock = (): jest.Mocked => { getByIds: jest.fn(), getWithout: jest.fn(), getFirstAssetForAlbumId: jest.fn(), - getAll: jest.fn(), + getAll: jest.fn().mockResolvedValue({ + items: [], + hasNextPage: false, + }), deleteAll: jest.fn(), save: jest.fn(), findLivePhotoMatch: jest.fn(), diff --git a/server/libs/infra/src/repositories/asset.repository.ts b/server/libs/infra/src/repositories/asset.repository.ts index 24b7f4edb8..22697fd114 100644 --- a/server/libs/infra/src/repositories/asset.repository.ts +++ b/server/libs/infra/src/repositories/asset.repository.ts @@ -4,12 +4,15 @@ import { LivePhotoSearchOptions, MapMarker, MapMarkerSearchOptions, + Paginated, + PaginationOptions, WithoutProperty, } from '@app/domain'; import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { FindOptionsRelations, FindOptionsWhere, In, IsNull, Not, Repository } from 'typeorm'; import { AssetEntity, AssetType } from '../entities'; +import { paginate } from '../utils/pagination.util'; @Injectable() export class AssetRepository implements IAssetRepository { @@ -32,10 +35,8 @@ export class AssetRepository implements IAssetRepository { await this.repository.delete({ ownerId }); } - getAll(options?: AssetSearchOptions | undefined): Promise { - options = options || {}; - - return this.repository.find({ + getAll(pagination: PaginationOptions, options: AssetSearchOptions = {}): Paginated { + return paginate(this.repository, pagination, { where: { isVisible: options.isVisible, type: options.type, @@ -48,6 +49,10 @@ export class AssetRepository implements IAssetRepository { person: true, }, }, + order: { + // Ensures correct order when paginating + createdAt: 'ASC', + }, }); } @@ -83,7 +88,7 @@ export class AssetRepository implements IAssetRepository { }); } - getWithout(property: WithoutProperty): Promise { + getWithout(pagination: PaginationOptions, property: WithoutProperty): Paginated { let relations: FindOptionsRelations = {}; let where: FindOptionsWhere | FindOptionsWhere[] = {}; @@ -160,9 +165,13 @@ export class AssetRepository implements IAssetRepository { throw new Error(`Invalid getWithout property: ${property}`); } - return this.repository.find({ + return paginate(this.repository, pagination, { relations, where, + order: { + // Ensures correct order when paginating + createdAt: 'ASC', + }, }); } diff --git a/server/libs/infra/src/utils/pagination.util.ts b/server/libs/infra/src/utils/pagination.util.ts new file mode 100644 index 0000000000..2b37686f8a --- /dev/null +++ b/server/libs/infra/src/utils/pagination.util.ts @@ -0,0 +1,20 @@ +import { Paginated, PaginationOptions } from '@app/domain'; +import { FindOneOptions, ObjectLiteral, Repository } from 'typeorm'; + +export async function paginate( + repository: Repository, + paginationOptions: PaginationOptions, + searchOptions?: FindOneOptions, +): Paginated { + const items = await repository.find({ + ...searchOptions, + // Take one more item to check if there's a next page + take: paginationOptions.take + 1, + skip: paginationOptions.skip, + }); + + const hasNextPage = items.length > paginationOptions.take; + items.splice(paginationOptions.take); + + return { items, hasNextPage }; +}