1
0
mirror of https://github.com/immich-app/immich.git synced 2024-11-21 18:16:55 +02:00

refactor(server): cron repository (#13988)

This commit is contained in:
Jason Rasmussen 2024-11-07 12:15:54 -05:00 committed by GitHub
parent 2fe6607aea
commit dc2de47204
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 142 additions and 81 deletions

View File

@ -0,0 +1,20 @@
export const ICronRepository = 'ICronRepository';
type CronBase = {
name: string;
start?: boolean;
};
export type CronCreate = CronBase & {
expression: string;
onTick: () => void;
};
export type CronUpdate = CronBase & {
expression?: string;
};
export interface ICronRepository {
create(cron: CronCreate): void;
update(cron: CronUpdate): void;
}

View File

@ -315,8 +315,6 @@ export interface IJobRepository {
setup(options: { services: ClassConstructor<unknown>[] }): void;
startWorkers(): void;
run(job: JobItem): Promise<JobStatus>;
addCronJob(name: string, expression: string, onTick: () => void, start?: boolean): void;
updateCronJob(name: string, expression?: string, start?: boolean): void;
setConcurrency(queueName: QueueName, concurrency: number): void;
queue(item: JobItem): Promise<void>;
queueAll(items: JobItem[]): Promise<void>;

View File

@ -0,0 +1,52 @@
import { Inject, Injectable } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
import { CronJob, CronTime } from 'cron';
import { CronCreate, CronUpdate, ICronRepository } from 'src/interfaces/cron.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
@Injectable()
export class CronRepository implements ICronRepository {
constructor(
private schedulerRegistry: SchedulerRegistry,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
this.logger.setContext(CronRepository.name);
}
create({ name, expression, onTick, start = true }: CronCreate): void {
const job = new CronJob<null, null>(
expression,
onTick,
// function to run onComplete
undefined,
// whether it should start directly
start,
// timezone
undefined,
// context
undefined,
// runOnInit
undefined,
// utcOffset
undefined,
// prevents memory leaking by automatically stopping when the node process finishes
true,
);
this.schedulerRegistry.addCronJob(name, job);
}
update({ name, expression, start }: CronUpdate): void {
const job = this.schedulerRegistry.getCronJob(name);
if (expression) {
job.setTime(new CronTime(expression));
}
if (start !== undefined) {
if (start) {
job.start();
} else {
job.stop();
}
}
}
}

View File

@ -6,6 +6,7 @@ import { IKeyRepository } from 'src/interfaces/api-key.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { IAuditRepository } from 'src/interfaces/audit.interface';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { ICronRepository } from 'src/interfaces/cron.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { IEventRepository } from 'src/interfaces/event.interface';
@ -44,6 +45,7 @@ import { ApiKeyRepository } from 'src/repositories/api-key.repository';
import { AssetRepository } from 'src/repositories/asset.repository';
import { AuditRepository } from 'src/repositories/audit.repository';
import { ConfigRepository } from 'src/repositories/config.repository';
import { CronRepository } from 'src/repositories/cron.repository';
import { CryptoRepository } from 'src/repositories/crypto.repository';
import { DatabaseRepository } from 'src/repositories/database.repository';
import { EventRepository } from 'src/repositories/event.repository';
@ -83,6 +85,7 @@ export const repositories = [
{ provide: IAssetRepository, useClass: AssetRepository },
{ provide: IAuditRepository, useClass: AuditRepository },
{ provide: IConfigRepository, useClass: ConfigRepository },
{ provide: ICronRepository, useClass: CronRepository },
{ provide: ICryptoRepository, useClass: CryptoRepository },
{ provide: IDatabaseRepository, useClass: DatabaseRepository },
{ provide: IEventRepository, useClass: EventRepository },

View File

@ -4,7 +4,6 @@ import { ModuleRef, Reflector } from '@nestjs/core';
import { SchedulerRegistry } from '@nestjs/schedule';
import { JobsOptions, Queue, Worker } from 'bullmq';
import { ClassConstructor } from 'class-transformer';
import { CronJob, CronTime } from 'cron';
import { setTimeout } from 'node:timers/promises';
import { JobConfig } from 'src/decorators';
import { MetadataKey } from 'src/enum';
@ -119,43 +118,6 @@ export class JobRepository implements IJobRepository {
return item.handler(data);
}
addCronJob(name: string, expression: string, onTick: () => void, start = true): void {
const job = new CronJob<null, null>(
expression,
onTick,
// function to run onComplete
undefined,
// whether it should start directly
start,
// timezone
undefined,
// context
undefined,
// runOnInit
undefined,
// utcOffset
undefined,
// prevents memory leaking by automatically stopping when the node process finishes
true,
);
this.schedulerRegistry.addCronJob(name, job);
}
updateCronJob(name: string, expression?: string, start?: boolean): void {
const job = this.schedulerRegistry.getCronJob(name);
if (expression) {
job.setTime(new CronTime(expression));
}
if (start !== undefined) {
if (start) {
job.start();
} else {
job.stop();
}
}
}
setConcurrency(queueName: QueueName, concurrency: number) {
const worker = this.workers[queueName];
if (!worker) {

View File

@ -3,8 +3,9 @@ import { defaults, SystemConfig } from 'src/config';
import { StorageCore } from 'src/cores/storage.core';
import { ImmichWorker, StorageFolder } from 'src/enum';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { ICronRepository } from 'src/interfaces/cron.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { IJobRepository, JobStatus } from 'src/interfaces/job.interface';
import { JobStatus } from 'src/interfaces/job.interface';
import { IProcessRepository } from 'src/interfaces/process.interface';
import { IStorageRepository } from 'src/interfaces/storage.interface';
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
@ -18,13 +19,13 @@ describe(BackupService.name, () => {
let databaseMock: Mocked<IDatabaseRepository>;
let configMock: Mocked<IConfigRepository>;
let jobMock: Mocked<IJobRepository>;
let cronMock: Mocked<ICronRepository>;
let processMock: Mocked<IProcessRepository>;
let storageMock: Mocked<IStorageRepository>;
let systemMock: Mocked<ISystemMetadataRepository>;
beforeEach(() => {
({ sut, configMock, databaseMock, jobMock, processMock, storageMock, systemMock } = newTestService(BackupService));
({ sut, cronMock, configMock, databaseMock, processMock, storageMock, systemMock } = newTestService(BackupService));
});
it('should work', () => {
@ -37,7 +38,7 @@ describe(BackupService.name, () => {
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
expect(jobMock.addCronJob).toHaveBeenCalled();
expect(cronMock.create).toHaveBeenCalled();
});
it('should not initialize backup database cron job when lock is taken', async () => {
@ -45,14 +46,14 @@ describe(BackupService.name, () => {
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
expect(jobMock.addCronJob).not.toHaveBeenCalled();
expect(cronMock.create).not.toHaveBeenCalled();
});
it('should not initialise backup database job when running on microservices', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES);
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
expect(jobMock.addCronJob).not.toHaveBeenCalled();
expect(cronMock.create).not.toHaveBeenCalled();
});
});
@ -75,15 +76,15 @@ describe(BackupService.name, () => {
} as SystemConfig,
});
expect(jobMock.updateCronJob).toHaveBeenCalledWith('backupDatabase', '0 1 * * *', true);
expect(jobMock.updateCronJob).toHaveBeenCalled();
expect(cronMock.update).toHaveBeenCalledWith({ name: 'backupDatabase', expression: '0 1 * * *', start: true });
expect(cronMock.update).toHaveBeenCalled();
});
it('should do nothing if instance does not have the backup database lock', async () => {
databaseMock.tryLock.mockResolvedValue(false);
await sut.onConfigInit({ newConfig: defaults });
sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig, oldConfig: defaults });
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
expect(cronMock.update).not.toHaveBeenCalled();
});
});

View File

@ -27,12 +27,12 @@ export class BackupService extends BaseService {
this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase);
if (this.backupLock) {
this.jobRepository.addCronJob(
'backupDatabase',
database.cronExpression,
() => handlePromiseError(this.jobRepository.queue({ name: JobName.BACKUP_DATABASE }), this.logger),
database.enabled,
);
this.cronRepository.create({
name: 'backupDatabase',
expression: database.cronExpression,
onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.BACKUP_DATABASE }), this.logger),
start: database.enabled,
});
}
}
@ -42,7 +42,11 @@ export class BackupService extends BaseService {
return;
}
this.jobRepository.updateCronJob('backupDatabase', backup.database.cronExpression, backup.database.enabled);
this.cronRepository.update({
name: 'backupDatabase',
expression: backup.database.cronExpression,
start: backup.database.enabled,
});
}
@OnEvent({ name: 'config.validate' })

View File

@ -12,6 +12,7 @@ import { IKeyRepository } from 'src/interfaces/api-key.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { IAuditRepository } from 'src/interfaces/audit.interface';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { ICronRepository } from 'src/interfaces/cron.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { IEventRepository } from 'src/interfaces/event.interface';
@ -57,6 +58,7 @@ export class BaseService {
@Inject(IAlbumUserRepository) protected albumUserRepository: IAlbumUserRepository,
@Inject(IAssetRepository) protected assetRepository: IAssetRepository,
@Inject(IConfigRepository) protected configRepository: IConfigRepository,
@Inject(ICronRepository) protected cronRepository: ICronRepository,
@Inject(ICryptoRepository) protected cryptoRepository: ICryptoRepository,
@Inject(IDatabaseRepository) protected databaseRepository: IDatabaseRepository,
@Inject(IEventRepository) protected eventRepository: IEventRepository,

View File

@ -6,6 +6,7 @@ import { UserEntity } from 'src/entities/user.entity';
import { AssetType, ImmichWorker } from 'src/enum';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { ICronRepository } from 'src/interfaces/cron.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import {
IJobRepository,
@ -36,13 +37,15 @@ describe(LibraryService.name, () => {
let assetMock: Mocked<IAssetRepository>;
let configMock: Mocked<IConfigRepository>;
let cronMock: Mocked<ICronRepository>;
let databaseMock: Mocked<IDatabaseRepository>;
let jobMock: Mocked<IJobRepository>;
let libraryMock: Mocked<ILibraryRepository>;
let storageMock: Mocked<IStorageRepository>;
beforeEach(() => {
({ sut, assetMock, configMock, databaseMock, jobMock, libraryMock, storageMock } = newTestService(LibraryService));
({ sut, assetMock, configMock, cronMock, databaseMock, jobMock, libraryMock, storageMock } =
newTestService(LibraryService));
databaseMock.tryLock.mockResolvedValue(true);
configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES);
@ -56,7 +59,7 @@ describe(LibraryService.name, () => {
it('should init cron job and handle config changes', async () => {
await sut.onConfigInit({ newConfig: defaults });
expect(jobMock.addCronJob).toHaveBeenCalled();
expect(cronMock.create).toHaveBeenCalled();
await sut.onConfigUpdate({
oldConfig: defaults,
@ -71,7 +74,7 @@ describe(LibraryService.name, () => {
} as SystemConfig,
});
expect(jobMock.updateCronJob).toHaveBeenCalledWith('libraryScan', '0 1 * * *', true);
expect(cronMock.update).toHaveBeenCalledWith({ name: 'libraryScan', expression: '0 1 * * *', start: true });
});
it('should initialize watcher for all external libraries', async () => {
@ -117,14 +120,14 @@ describe(LibraryService.name, () => {
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
expect(jobMock.addCronJob).not.toHaveBeenCalled();
expect(cronMock.create).not.toHaveBeenCalled();
});
it('should not initialize watcher or library scan job when running on api', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onConfigInit({ newConfig: systemConfigStub.libraryScan as SystemConfig });
expect(jobMock.addCronJob).not.toHaveBeenCalled();
expect(cronMock.create).not.toHaveBeenCalled();
});
});
@ -138,7 +141,7 @@ describe(LibraryService.name, () => {
databaseMock.tryLock.mockResolvedValue(false);
await sut.onConfigInit({ newConfig: defaults });
await sut.onConfigUpdate({ newConfig: systemConfigStub.libraryScan as SystemConfig, oldConfig: defaults });
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
expect(cronMock.update).not.toHaveBeenCalled();
});
it('should update cron job and enable watching', async () => {
@ -148,11 +151,11 @@ describe(LibraryService.name, () => {
oldConfig: defaults,
});
expect(jobMock.updateCronJob).toHaveBeenCalledWith(
'libraryScan',
systemConfigStub.libraryScan.library.scan.cronExpression,
systemConfigStub.libraryScan.library.scan.enabled,
);
expect(cronMock.update).toHaveBeenCalledWith({
name: 'libraryScan',
expression: systemConfigStub.libraryScan.library.scan.cronExpression,
start: systemConfigStub.libraryScan.library.scan.enabled,
});
});
it('should update cron job and disable watching', async () => {
@ -166,11 +169,11 @@ describe(LibraryService.name, () => {
oldConfig: defaults,
});
expect(jobMock.updateCronJob).toHaveBeenCalledWith(
'libraryScan',
systemConfigStub.libraryScan.library.scan.cronExpression,
systemConfigStub.libraryScan.library.scan.enabled,
);
expect(cronMock.update).toHaveBeenCalledWith({
name: 'libraryScan',
expression: systemConfigStub.libraryScan.library.scan.cronExpression,
start: systemConfigStub.libraryScan.library.scan.enabled,
});
});
});

View File

@ -48,12 +48,13 @@ export class LibraryService extends BaseService {
this.watchLibraries = this.lock && watch.enabled;
if (this.lock) {
this.jobRepository.addCronJob(
'libraryScan',
scan.cronExpression,
() => handlePromiseError(this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ALL }), this.logger),
scan.enabled,
);
this.cronRepository.create({
name: 'libraryScan',
expression: scan.cronExpression,
onTick: () =>
handlePromiseError(this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ALL }), this.logger),
start: scan.enabled,
});
}
if (this.watchLibraries) {
@ -67,7 +68,11 @@ export class LibraryService extends BaseService {
return;
}
this.jobRepository.updateCronJob('libraryScan', library.scan.cronExpression, library.scan.enabled);
this.cronRepository.update({
name: 'libraryScan',
expression: library.scan.cronExpression,
start: library.scan.enabled,
});
if (library.watch.enabled !== this.watchLibraries) {
// Watch configuration changed, update accordingly

View File

@ -0,0 +1,9 @@
import { ICronRepository } from 'src/interfaces/cron.interface';
import { Mocked, vitest } from 'vitest';
export const newCronRepositoryMock = (): Mocked<ICronRepository> => {
return {
create: vitest.fn(),
update: vitest.fn(),
};
};

View File

@ -6,8 +6,6 @@ export const newJobRepositoryMock = (): Mocked<IJobRepository> => {
setup: vitest.fn(),
startWorkers: vitest.fn(),
run: vitest.fn(),
addCronJob: vitest.fn(),
updateCronJob: vitest.fn(),
setConcurrency: vitest.fn(),
empty: vitest.fn(),
pause: vitest.fn(),

View File

@ -12,6 +12,7 @@ import { newKeyRepositoryMock } from 'test/repositories/api-key.repository.mock'
import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock';
import { newAuditRepositoryMock } from 'test/repositories/audit.repository.mock';
import { newConfigRepositoryMock } from 'test/repositories/config.repository.mock';
import { newCronRepositoryMock } from 'test/repositories/cron.repository.mock';
import { newCryptoRepositoryMock } from 'test/repositories/crypto.repository.mock';
import { newDatabaseRepositoryMock } from 'test/repositories/database.repository.mock';
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
@ -62,6 +63,7 @@ export const newTestService = <T extends BaseService>(
const accessMock = newAccessRepositoryMock();
const loggerMock = newLoggerRepositoryMock();
const cronMock = newCronRepositoryMock();
const cryptoMock = newCryptoRepositoryMock();
const activityMock = newActivityRepositoryMock();
const auditMock = newAuditRepositoryMock();
@ -108,6 +110,7 @@ export const newTestService = <T extends BaseService>(
albumUserMock,
assetMock,
configMock,
cronMock,
cryptoMock,
databaseMock,
eventMock,
@ -144,6 +147,7 @@ export const newTestService = <T extends BaseService>(
sut,
accessMock,
loggerMock,
cronMock,
cryptoMock,
activityMock,
auditMock,