mirror of
https://github.com/immich-app/immich.git
synced 2025-02-04 18:35:31 +02:00
refactor: migrate trash to kysely (#15233)
This commit is contained in:
parent
cc6a8b0c74
commit
e51091b6e5
@ -1,10 +1,8 @@
|
|||||||
import { Paginated, PaginationOptions } from 'src/utils/pagination';
|
|
||||||
|
|
||||||
export const ITrashRepository = 'ITrashRepository';
|
export const ITrashRepository = 'ITrashRepository';
|
||||||
|
|
||||||
export interface ITrashRepository {
|
export interface ITrashRepository {
|
||||||
empty(userId: string): Promise<number>;
|
empty(userId: string): Promise<number>;
|
||||||
restore(userId: string): Promise<number>;
|
restore(userId: string): Promise<number>;
|
||||||
restoreAll(assetIds: string[]): Promise<number>;
|
restoreAll(assetIds: string[]): Promise<number>;
|
||||||
getDeletedIds(pagination: PaginationOptions): Paginated<string>;
|
getDeletedIds(): AsyncIterableIterator<{ id: string }>;
|
||||||
}
|
}
|
||||||
|
27
server/src/queries/trash.repository.sql
Normal file
27
server/src/queries/trash.repository.sql
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
-- NOTE: This file is auto generated by ./sql-generator
|
||||||
|
|
||||||
|
-- TrashRepository.restore
|
||||||
|
update "assets"
|
||||||
|
set
|
||||||
|
"status" = $1,
|
||||||
|
"deletedAt" = $2
|
||||||
|
where
|
||||||
|
"ownerId" = $3
|
||||||
|
and "status" = $4
|
||||||
|
|
||||||
|
-- TrashRepository.empty
|
||||||
|
update "assets"
|
||||||
|
set
|
||||||
|
"status" = $1
|
||||||
|
where
|
||||||
|
"ownerId" = $2
|
||||||
|
and "status" = $3
|
||||||
|
|
||||||
|
-- TrashRepository.restoreAll
|
||||||
|
update "assets"
|
||||||
|
set
|
||||||
|
"status" = $1,
|
||||||
|
"deletedAt" = $2
|
||||||
|
where
|
||||||
|
"status" = $3
|
||||||
|
and "id" in ($4)
|
@ -1,52 +1,50 @@
|
|||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { Kysely } from 'kysely';
|
||||||
import { AssetEntity } from 'src/entities/asset.entity';
|
import { InjectKysely } from 'nestjs-kysely';
|
||||||
|
import { DB } from 'src/db';
|
||||||
|
import { DummyValue, GenerateSql } from 'src/decorators';
|
||||||
import { AssetStatus } from 'src/enum';
|
import { AssetStatus } from 'src/enum';
|
||||||
import { ITrashRepository } from 'src/interfaces/trash.interface';
|
import { ITrashRepository } from 'src/interfaces/trash.interface';
|
||||||
import { Paginated, paginatedBuilder, PaginationOptions } from 'src/utils/pagination';
|
|
||||||
import { In, Repository } from 'typeorm';
|
|
||||||
|
|
||||||
export class TrashRepository implements ITrashRepository {
|
export class TrashRepository implements ITrashRepository {
|
||||||
constructor(@InjectRepository(AssetEntity) private assetRepository: Repository<AssetEntity>) {}
|
constructor(@InjectKysely() private db: Kysely<DB>) {}
|
||||||
|
|
||||||
async getDeletedIds(pagination: PaginationOptions): Paginated<string> {
|
getDeletedIds(): AsyncIterableIterator<{ id: string }> {
|
||||||
const { hasNextPage, items } = await paginatedBuilder(
|
return this.db.selectFrom('assets').select(['id']).where('status', '=', AssetStatus.DELETED).stream();
|
||||||
this.assetRepository
|
|
||||||
.createQueryBuilder('asset')
|
|
||||||
.select('asset.id')
|
|
||||||
.where({ status: AssetStatus.DELETED })
|
|
||||||
.withDeleted(),
|
|
||||||
pagination,
|
|
||||||
);
|
|
||||||
|
|
||||||
return {
|
|
||||||
hasNextPage,
|
|
||||||
items: items.map((asset) => asset.id),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GenerateSql({ params: [DummyValue.UUID] })
|
||||||
async restore(userId: string): Promise<number> {
|
async restore(userId: string): Promise<number> {
|
||||||
const result = await this.assetRepository.update(
|
const { numUpdatedRows } = await this.db
|
||||||
{ ownerId: userId, status: AssetStatus.TRASHED },
|
.updateTable('assets')
|
||||||
{ status: AssetStatus.ACTIVE, deletedAt: null },
|
.where('ownerId', '=', userId)
|
||||||
);
|
.where('status', '=', AssetStatus.TRASHED)
|
||||||
|
.set({ status: AssetStatus.ACTIVE, deletedAt: null })
|
||||||
|
.executeTakeFirst();
|
||||||
|
|
||||||
return result.affected || 0;
|
return Number(numUpdatedRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GenerateSql({ params: [DummyValue.UUID] })
|
||||||
async empty(userId: string): Promise<number> {
|
async empty(userId: string): Promise<number> {
|
||||||
const result = await this.assetRepository.update(
|
const { numUpdatedRows } = await this.db
|
||||||
{ ownerId: userId, status: AssetStatus.TRASHED },
|
.updateTable('assets')
|
||||||
{ status: AssetStatus.DELETED },
|
.where('ownerId', '=', userId)
|
||||||
);
|
.where('status', '=', AssetStatus.TRASHED)
|
||||||
|
.set({ status: AssetStatus.DELETED })
|
||||||
|
.executeTakeFirst();
|
||||||
|
|
||||||
return result.affected || 0;
|
return Number(numUpdatedRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GenerateSql({ params: [[DummyValue.UUID]] })
|
||||||
async restoreAll(ids: string[]): Promise<number> {
|
async restoreAll(ids: string[]): Promise<number> {
|
||||||
const result = await this.assetRepository.update(
|
const { numUpdatedRows } = await this.db
|
||||||
{ id: In(ids), status: AssetStatus.TRASHED },
|
.updateTable('assets')
|
||||||
{ status: AssetStatus.ACTIVE, deletedAt: null },
|
.where('status', '=', AssetStatus.TRASHED)
|
||||||
);
|
.where('id', 'in', ids)
|
||||||
return result.affected ?? 0;
|
.set({ status: AssetStatus.ACTIVE, deletedAt: null })
|
||||||
|
.executeTakeFirst();
|
||||||
|
|
||||||
|
return Number(numUpdatedRows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,13 @@ import { IAccessRepositoryMock } from 'test/repositories/access.repository.mock'
|
|||||||
import { newTestService } from 'test/utils';
|
import { newTestService } from 'test/utils';
|
||||||
import { Mocked } from 'vitest';
|
import { Mocked } from 'vitest';
|
||||||
|
|
||||||
|
async function* makeAssetIdStream(count: number): AsyncIterableIterator<{ id: string }> {
|
||||||
|
for (let i = 0; i < count; i++) {
|
||||||
|
await Promise.resolve();
|
||||||
|
yield { id: `asset-${i + 1}` };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
describe(TrashService.name, () => {
|
describe(TrashService.name, () => {
|
||||||
let sut: TrashService;
|
let sut: TrashService;
|
||||||
|
|
||||||
@ -48,14 +55,14 @@ describe(TrashService.name, () => {
|
|||||||
|
|
||||||
describe('restore', () => {
|
describe('restore', () => {
|
||||||
it('should handle an empty trash', async () => {
|
it('should handle an empty trash', async () => {
|
||||||
trashMock.getDeletedIds.mockResolvedValue({ items: [], hasNextPage: false });
|
trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(0));
|
||||||
trashMock.restore.mockResolvedValue(0);
|
trashMock.restore.mockResolvedValue(0);
|
||||||
await expect(sut.restore(authStub.user1)).resolves.toEqual({ count: 0 });
|
await expect(sut.restore(authStub.user1)).resolves.toEqual({ count: 0 });
|
||||||
expect(trashMock.restore).toHaveBeenCalledWith('user-id');
|
expect(trashMock.restore).toHaveBeenCalledWith('user-id');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should restore', async () => {
|
it('should restore', async () => {
|
||||||
trashMock.getDeletedIds.mockResolvedValue({ items: ['asset-1'], hasNextPage: false });
|
trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(1));
|
||||||
trashMock.restore.mockResolvedValue(1);
|
trashMock.restore.mockResolvedValue(1);
|
||||||
await expect(sut.restore(authStub.user1)).resolves.toEqual({ count: 1 });
|
await expect(sut.restore(authStub.user1)).resolves.toEqual({ count: 1 });
|
||||||
expect(trashMock.restore).toHaveBeenCalledWith('user-id');
|
expect(trashMock.restore).toHaveBeenCalledWith('user-id');
|
||||||
@ -64,14 +71,14 @@ describe(TrashService.name, () => {
|
|||||||
|
|
||||||
describe('empty', () => {
|
describe('empty', () => {
|
||||||
it('should handle an empty trash', async () => {
|
it('should handle an empty trash', async () => {
|
||||||
trashMock.getDeletedIds.mockResolvedValue({ items: [], hasNextPage: false });
|
trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(0));
|
||||||
trashMock.empty.mockResolvedValue(0);
|
trashMock.empty.mockResolvedValue(0);
|
||||||
await expect(sut.empty(authStub.user1)).resolves.toEqual({ count: 0 });
|
await expect(sut.empty(authStub.user1)).resolves.toEqual({ count: 0 });
|
||||||
expect(jobMock.queue).not.toHaveBeenCalled();
|
expect(jobMock.queue).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should empty the trash', async () => {
|
it('should empty the trash', async () => {
|
||||||
trashMock.getDeletedIds.mockResolvedValue({ items: ['asset-1'], hasNextPage: false });
|
trashMock.getDeletedIds.mockResolvedValue(makeAssetIdStream(1));
|
||||||
trashMock.empty.mockResolvedValue(1);
|
trashMock.empty.mockResolvedValue(1);
|
||||||
await expect(sut.empty(authStub.user1)).resolves.toEqual({ count: 1 });
|
await expect(sut.empty(authStub.user1)).resolves.toEqual({ count: 1 });
|
||||||
expect(trashMock.empty).toHaveBeenCalledWith('user-id');
|
expect(trashMock.empty).toHaveBeenCalledWith('user-id');
|
||||||
@ -88,7 +95,7 @@ describe(TrashService.name, () => {
|
|||||||
|
|
||||||
describe('handleQueueEmptyTrash', () => {
|
describe('handleQueueEmptyTrash', () => {
|
||||||
it('should queue asset delete jobs', async () => {
|
it('should queue asset delete jobs', async () => {
|
||||||
trashMock.getDeletedIds.mockResolvedValue({ items: ['asset-1'], hasNextPage: false });
|
trashMock.getDeletedIds.mockReturnValue(makeAssetIdStream(1));
|
||||||
await expect(sut.handleQueueEmptyTrash()).resolves.toEqual(JobStatus.SUCCESS);
|
await expect(sut.handleQueueEmptyTrash()).resolves.toEqual(JobStatus.SUCCESS);
|
||||||
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
||||||
{
|
{
|
||||||
|
@ -5,7 +5,6 @@ import { TrashResponseDto } from 'src/dtos/trash.dto';
|
|||||||
import { Permission } from 'src/enum';
|
import { Permission } from 'src/enum';
|
||||||
import { JOBS_ASSET_PAGINATION_SIZE, JobName, JobStatus, QueueName } from 'src/interfaces/job.interface';
|
import { JOBS_ASSET_PAGINATION_SIZE, JobName, JobStatus, QueueName } from 'src/interfaces/job.interface';
|
||||||
import { BaseService } from 'src/services/base.service';
|
import { BaseService } from 'src/services/base.service';
|
||||||
import { usePagination } from 'src/utils/pagination';
|
|
||||||
|
|
||||||
export class TrashService extends BaseService {
|
export class TrashService extends BaseService {
|
||||||
async restoreAssets(auth: AuthDto, dto: BulkIdsDto): Promise<TrashResponseDto> {
|
async restoreAssets(auth: AuthDto, dto: BulkIdsDto): Promise<TrashResponseDto> {
|
||||||
@ -46,27 +45,39 @@ export class TrashService extends BaseService {
|
|||||||
|
|
||||||
@OnJob({ name: JobName.QUEUE_TRASH_EMPTY, queue: QueueName.BACKGROUND_TASK })
|
@OnJob({ name: JobName.QUEUE_TRASH_EMPTY, queue: QueueName.BACKGROUND_TASK })
|
||||||
async handleQueueEmptyTrash() {
|
async handleQueueEmptyTrash() {
|
||||||
let count = 0;
|
const assets = this.trashRepository.getDeletedIds();
|
||||||
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
|
|
||||||
this.trashRepository.getDeletedIds(pagination),
|
|
||||||
);
|
|
||||||
|
|
||||||
for await (const assetIds of assetPagination) {
|
let count = 0;
|
||||||
this.logger.debug(`Queueing ${assetIds.length} asset(s) for deletion from the trash`);
|
const batch: string[] = [];
|
||||||
count += assetIds.length;
|
for await (const { id } of assets) {
|
||||||
await this.jobRepository.queueAll(
|
batch.push(id);
|
||||||
assetIds.map((assetId) => ({
|
|
||||||
name: JobName.ASSET_DELETION,
|
if (batch.length === JOBS_ASSET_PAGINATION_SIZE) {
|
||||||
data: {
|
await this.handleBatch(batch);
|
||||||
id: assetId,
|
count += batch.length;
|
||||||
deleteOnDisk: true,
|
batch.length = 0;
|
||||||
},
|
}
|
||||||
})),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.handleBatch(batch);
|
||||||
|
count += batch.length;
|
||||||
|
batch.length = 0;
|
||||||
|
|
||||||
this.logger.log(`Queued ${count} asset(s) for deletion from the trash`);
|
this.logger.log(`Queued ${count} asset(s) for deletion from the trash`);
|
||||||
|
|
||||||
return JobStatus.SUCCESS;
|
return JobStatus.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async handleBatch(ids: string[]) {
|
||||||
|
this.logger.debug(`Queueing ${ids.length} asset(s) for deletion from the trash`);
|
||||||
|
await this.jobRepository.queueAll(
|
||||||
|
ids.map((assetId) => ({
|
||||||
|
name: JobName.ASSET_DELETION,
|
||||||
|
data: {
|
||||||
|
id: assetId,
|
||||||
|
deleteOnDisk: true,
|
||||||
|
},
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user