diff --git a/server/src/interfaces/trash.interface.ts b/server/src/interfaces/trash.interface.ts index 96c2322d8a..38e7c523ce 100644 --- a/server/src/interfaces/trash.interface.ts +++ b/server/src/interfaces/trash.interface.ts @@ -1,10 +1,8 @@ -import { Paginated, PaginationOptions } from 'src/utils/pagination'; - export const ITrashRepository = 'ITrashRepository'; export interface ITrashRepository { empty(userId: string): Promise; restore(userId: string): Promise; restoreAll(assetIds: string[]): Promise; - getDeletedIds(pagination: PaginationOptions): Paginated; + getDeletedIds(): AsyncIterableIterator<{ id: string }>; } diff --git a/server/src/queries/trash.repository.sql b/server/src/queries/trash.repository.sql new file mode 100644 index 0000000000..77c2ea51d0 --- /dev/null +++ b/server/src/queries/trash.repository.sql @@ -0,0 +1,27 @@ +-- NOTE: This file is auto generated by ./sql-generator + +-- TrashRepository.restore +update "assets" +set + "status" = $1, + "deletedAt" = $2 +where + "ownerId" = $3 + and "status" = $4 + +-- TrashRepository.empty +update "assets" +set + "status" = $1 +where + "ownerId" = $2 + and "status" = $3 + +-- TrashRepository.restoreAll +update "assets" +set + "status" = $1, + "deletedAt" = $2 +where + "status" = $3 + and "id" in ($4) diff --git a/server/src/repositories/trash.repository.ts b/server/src/repositories/trash.repository.ts index d24f4f709a..c1db31a3db 100644 --- a/server/src/repositories/trash.repository.ts +++ b/server/src/repositories/trash.repository.ts @@ -1,52 +1,50 @@ -import { InjectRepository } from '@nestjs/typeorm'; -import { AssetEntity } from 'src/entities/asset.entity'; +import { Kysely } from 'kysely'; +import { InjectKysely } from 'nestjs-kysely'; +import { DB } from 'src/db'; +import { DummyValue, GenerateSql } from 'src/decorators'; import { AssetStatus } from 'src/enum'; import { ITrashRepository } from 'src/interfaces/trash.interface'; -import { Paginated, paginatedBuilder, PaginationOptions } from 'src/utils/pagination'; -import { In, Repository } from 'typeorm'; export class TrashRepository implements ITrashRepository { - constructor(@InjectRepository(AssetEntity) private assetRepository: Repository) {} + constructor(@InjectKysely() private db: Kysely) {} - async getDeletedIds(pagination: PaginationOptions): Paginated { - const { hasNextPage, items } = await paginatedBuilder( - this.assetRepository - .createQueryBuilder('asset') - .select('asset.id') - .where({ status: AssetStatus.DELETED }) - .withDeleted(), - pagination, - ); - - return { - hasNextPage, - items: items.map((asset) => asset.id), - }; + getDeletedIds(): AsyncIterableIterator<{ id: string }> { + return this.db.selectFrom('assets').select(['id']).where('status', '=', AssetStatus.DELETED).stream(); } + @GenerateSql({ params: [DummyValue.UUID] }) async restore(userId: string): Promise { - const result = await this.assetRepository.update( - { ownerId: userId, status: AssetStatus.TRASHED }, - { status: AssetStatus.ACTIVE, deletedAt: null }, - ); + const { numUpdatedRows } = await this.db + .updateTable('assets') + .where('ownerId', '=', userId) + .where('status', '=', AssetStatus.TRASHED) + .set({ status: AssetStatus.ACTIVE, deletedAt: null }) + .executeTakeFirst(); - return result.affected || 0; + return Number(numUpdatedRows); } + @GenerateSql({ params: [DummyValue.UUID] }) async empty(userId: string): Promise { - const result = await this.assetRepository.update( - { ownerId: userId, status: AssetStatus.TRASHED }, - { status: AssetStatus.DELETED }, - ); + const { numUpdatedRows } = await this.db + .updateTable('assets') + .where('ownerId', '=', userId) + .where('status', '=', AssetStatus.TRASHED) + .set({ status: AssetStatus.DELETED }) + .executeTakeFirst(); - return result.affected || 0; + return Number(numUpdatedRows); } + @GenerateSql({ params: [[DummyValue.UUID]] }) async restoreAll(ids: string[]): Promise { - const result = await this.assetRepository.update( - { id: In(ids), status: AssetStatus.TRASHED }, - { status: AssetStatus.ACTIVE, deletedAt: null }, - ); - return result.affected ?? 0; + const { numUpdatedRows } = await this.db + .updateTable('assets') + .where('status', '=', AssetStatus.TRASHED) + .where('id', 'in', ids) + .set({ status: AssetStatus.ACTIVE, deletedAt: null }) + .executeTakeFirst(); + + return Number(numUpdatedRows); } } diff --git a/server/src/services/trash.service.spec.ts b/server/src/services/trash.service.spec.ts index 748faa14ab..4d877c9dfa 100644 --- a/server/src/services/trash.service.spec.ts +++ b/server/src/services/trash.service.spec.ts @@ -7,6 +7,13 @@ import { IAccessRepositoryMock } from 'test/repositories/access.repository.mock' import { newTestService } from 'test/utils'; import { Mocked } from 'vitest'; +async function* makeAssetIdStream(count: number): AsyncIterableIterator<{ id: string }> { + for (let i = 0; i < count; i++) { + await Promise.resolve(); + yield { id: `asset-${i + 1}` }; + } +} + describe(TrashService.name, () => { let sut: TrashService; @@ -48,14 +55,14 @@ describe(TrashService.name, () => { describe('restore', () => { it('should handle an empty trash', async () => { - trashMock.getDeletedIds.mockResolvedValue({ items: [], hasNextPage: false }); + trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(0)); trashMock.restore.mockResolvedValue(0); await expect(sut.restore(authStub.user1)).resolves.toEqual({ count: 0 }); expect(trashMock.restore).toHaveBeenCalledWith('user-id'); }); it('should restore', async () => { - trashMock.getDeletedIds.mockResolvedValue({ items: ['asset-1'], hasNextPage: false }); + trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(1)); trashMock.restore.mockResolvedValue(1); await expect(sut.restore(authStub.user1)).resolves.toEqual({ count: 1 }); expect(trashMock.restore).toHaveBeenCalledWith('user-id'); @@ -64,14 +71,14 @@ describe(TrashService.name, () => { describe('empty', () => { it('should handle an empty trash', async () => { - trashMock.getDeletedIds.mockResolvedValue({ items: [], hasNextPage: false }); + trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(0)); trashMock.empty.mockResolvedValue(0); await expect(sut.empty(authStub.user1)).resolves.toEqual({ count: 0 }); expect(jobMock.queue).not.toHaveBeenCalled(); }); it('should empty the trash', async () => { - trashMock.getDeletedIds.mockResolvedValue({ items: ['asset-1'], hasNextPage: false }); + trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(1)); trashMock.empty.mockResolvedValue(1); await expect(sut.empty(authStub.user1)).resolves.toEqual({ count: 1 }); expect(trashMock.empty).toHaveBeenCalledWith('user-id'); @@ -88,7 +95,7 @@ describe(TrashService.name, () => { describe('handleQueueEmptyTrash', () => { it('should queue asset delete jobs', async () => { - trashMock.getDeletedIds.mockResolvedValue({ items: ['asset-1'], hasNextPage: false }); + trashMock.getDeletedIds.mockReturnValue(makeAssetIdStream(1)); await expect(sut.handleQueueEmptyTrash()).resolves.toEqual(JobStatus.SUCCESS); expect(jobMock.queueAll).toHaveBeenCalledWith([ { diff --git a/server/src/services/trash.service.ts b/server/src/services/trash.service.ts index 8136ff4c7e..d66461ef94 100644 --- a/server/src/services/trash.service.ts +++ b/server/src/services/trash.service.ts @@ -5,7 +5,6 @@ import { TrashResponseDto } from 'src/dtos/trash.dto'; import { Permission } from 'src/enum'; import { JOBS_ASSET_PAGINATION_SIZE, JobName, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; -import { usePagination } from 'src/utils/pagination'; export class TrashService extends BaseService { async restoreAssets(auth: AuthDto, dto: BulkIdsDto): Promise { @@ -46,27 +45,39 @@ export class TrashService extends BaseService { @OnJob({ name: JobName.QUEUE_TRASH_EMPTY, queue: QueueName.BACKGROUND_TASK }) async handleQueueEmptyTrash() { - let count = 0; - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => - this.trashRepository.getDeletedIds(pagination), - ); + const assets = this.trashRepository.getDeletedIds(); - for await (const assetIds of assetPagination) { - this.logger.debug(`Queueing ${assetIds.length} asset(s) for deletion from the trash`); - count += assetIds.length; - await this.jobRepository.queueAll( - assetIds.map((assetId) => ({ - name: JobName.ASSET_DELETION, - data: { - id: assetId, - deleteOnDisk: true, - }, - })), - ); + let count = 0; + const batch: string[] = []; + for await (const { id } of assets) { + batch.push(id); + + if (batch.length === JOBS_ASSET_PAGINATION_SIZE) { + await this.handleBatch(batch); + count += batch.length; + batch.length = 0; + } } + await this.handleBatch(batch); + count += batch.length; + batch.length = 0; + this.logger.log(`Queued ${count} asset(s) for deletion from the trash`); return JobStatus.SUCCESS; } + + private async handleBatch(ids: string[]) { + this.logger.debug(`Queueing ${ids.length} asset(s) for deletion from the trash`); + await this.jobRepository.queueAll( + ids.map((assetId) => ({ + name: JobName.ASSET_DELETION, + data: { + id: assetId, + deleteOnDisk: true, + }, + })), + ); + } }