From 72bf9439b0d766fa48b2263eb2fe50adfc3cb78a Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Thu, 27 Jun 2024 15:54:20 -0400 Subject: [PATCH] refactor(server): event emits (#10648) * refactor(server): event emits * refactor: change default priority to 0 --- server/src/app.module.ts | 36 ++++++++++----- server/src/decorators.ts | 11 ++++- server/src/interfaces/event.interface.ts | 32 +++++++++----- server/src/middleware/auth.guard.ts | 1 + server/src/repositories/event.repository.ts | 21 ++++++--- server/src/services/api.service.ts | 17 ------- server/src/services/database.service.spec.ts | 44 +++++++++---------- server/src/services/database.service.ts | 7 ++- server/src/services/library.service.spec.ts | 30 ++++++------- server/src/services/library.service.ts | 12 +++-- server/src/services/metadata.service.spec.ts | 8 ++-- server/src/services/metadata.service.ts | 26 ++++++----- server/src/services/microservices.service.ts | 23 ++++------ server/src/services/notification.service.ts | 13 ++---- server/src/services/server-info.service.ts | 6 ++- server/src/services/smart-info.service.ts | 9 +++- .../services/storage-template.service.spec.ts | 6 +-- .../src/services/storage-template.service.ts | 8 ++-- server/src/services/storage.service.spec.ts | 4 +- server/src/services/storage.service.ts | 5 ++- .../services/system-config.service.spec.ts | 4 +- server/src/services/system-config.service.ts | 28 +++++------- server/src/services/version.service.ts | 6 +-- server/src/utils/events.ts | 33 ++++++++++++++ .../repositories/event.repository.mock.ts | 3 +- 25 files changed, 222 insertions(+), 171 deletions(-) create mode 100644 server/src/utils/events.ts diff --git a/server/src/app.module.ts b/server/src/app.module.ts index 2d7f038a67..541f7dc659 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -1,7 +1,7 @@ import { BullModule } from '@nestjs/bullmq'; -import { Module, OnModuleInit, ValidationPipe } from '@nestjs/common'; +import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; -import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core'; +import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE, ModuleRef } from '@nestjs/core'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule'; import { TypeOrmModule } from '@nestjs/typeorm'; @@ -12,6 +12,7 @@ import { bullConfig, bullQueues, clsConfig, immichAppConfig } from 'src/config'; import { controllers } from 'src/controllers'; import { databaseConfig } from 'src/database.config'; import { entities } from 'src/entities'; +import { IEventRepository } from 'src/interfaces/event.interface'; import { AuthGuard } from 'src/middleware/auth.guard'; import { ErrorInterceptor } from 'src/middleware/error.interceptor'; import { FileUploadInterceptor } from 'src/middleware/file-upload.interceptor'; @@ -19,8 +20,7 @@ import { HttpExceptionFilter } from 'src/middleware/http-exception.filter'; import { LoggingInterceptor } from 'src/middleware/logging.interceptor'; import { repositories } from 'src/repositories'; import { services } from 'src/services'; -import { ApiService } from 'src/services/api.service'; -import { MicroservicesService } from 'src/services/microservices.service'; +import { setupEventHandlers } from 'src/utils/events'; import { otelConfig } from 'src/utils/instrumentation'; const common = [...services, ...repositories]; @@ -50,11 +50,19 @@ const imports = [ controllers: [...controllers], providers: [...common, ...middleware], }) -export class ApiModule implements OnModuleInit { - constructor(private service: ApiService) {} +export class ApiModule implements OnModuleInit, OnModuleDestroy { + constructor( + private moduleRef: ModuleRef, + @Inject(IEventRepository) private eventRepository: IEventRepository, + ) {} async onModuleInit() { - await this.service.init(); + setupEventHandlers(this.moduleRef); + await this.eventRepository.emit('onBootstrapEvent', 'api'); + } + + async onModuleDestroy() { + await this.eventRepository.emit('onShutdownEvent'); } } @@ -62,11 +70,19 @@ export class ApiModule implements OnModuleInit { imports: [...imports], providers: [...common, SchedulerRegistry], }) -export class MicroservicesModule implements OnModuleInit { - constructor(private service: MicroservicesService) {} +export class MicroservicesModule implements OnModuleInit, OnModuleDestroy { + constructor( + private moduleRef: ModuleRef, + @Inject(IEventRepository) private eventRepository: IEventRepository, + ) {} async onModuleInit() { - await this.service.init(); + setupEventHandlers(this.moduleRef); + await this.eventRepository.emit('onBootstrapEvent', 'microservices'); + } + + async onModuleDestroy() { + await this.eventRepository.emit('onShutdownEvent'); } } diff --git a/server/src/decorators.ts b/server/src/decorators.ts index e1966013b1..f4fce96cdc 100644 --- a/server/src/decorators.ts +++ b/server/src/decorators.ts @@ -4,7 +4,8 @@ import { OnEventOptions } from '@nestjs/event-emitter/dist/interfaces'; import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger'; import _ from 'lodash'; import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants'; -import { ServerAsyncEvent, ServerEvent } from 'src/interfaces/event.interface'; +import { ServerEvent } from 'src/interfaces/event.interface'; +import { Metadata } from 'src/middleware/auth.guard'; import { setUnion } from 'src/utils/set'; // PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the @@ -129,9 +130,15 @@ export interface GenerateSqlQueries { /** Decorator to enable versioning/tracking of generated Sql */ export const GenerateSql = (...options: GenerateSqlQueries[]) => SetMetadata(GENERATE_SQL_KEY, options); -export const OnServerEvent = (event: ServerEvent | ServerAsyncEvent, options?: OnEventOptions) => +export const OnServerEvent = (event: ServerEvent, options?: OnEventOptions) => OnEvent(event, { suppressErrors: false, ...options }); +export type HandlerOptions = { + /** lower value has higher priority, defaults to 0 */ + priority: number; +}; +export const EventHandlerOptions = (options: HandlerOptions) => SetMetadata(Metadata.EVENT_HANDLER_OPTIONS, options); + type LifecycleRelease = 'NEXT_RELEASE' | string; type LifecycleMetadata = { addedAt?: LifecycleRelease; diff --git a/server/src/interfaces/event.interface.ts b/server/src/interfaces/event.interface.ts index e3539a823d..6b52f21d8d 100644 --- a/server/src/interfaces/event.interface.ts +++ b/server/src/interfaces/event.interface.ts @@ -4,6 +4,23 @@ import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server-i export const IEventRepository = 'IEventRepository'; +type MaybePromise = Promise | T; + +const noop = () => {}; +const dummyHandlers = { + onBootstrapEvent: noop as (app: 'api' | 'microservices') => MaybePromise, + onShutdownEvent: noop as () => MaybePromise, + onConfigUpdateEvent: noop as (update: SystemConfigUpdate) => MaybePromise, + onConfigValidateEvent: noop as (update: SystemConfigUpdate) => MaybePromise, +}; + +export type SystemConfigUpdate = { newConfig: SystemConfig; oldConfig: SystemConfig }; +export type EventHandlers = typeof dummyHandlers; +export type EmitEvent = keyof EventHandlers; +export type EmitEventHandler = (...args: Parameters) => MaybePromise; +export const events = Object.keys(dummyHandlers) as EmitEvent[]; +export type OnEvents = Partial; + export enum ClientEvent { UPLOAD_SUCCESS = 'on_upload_success', USER_DELETE = 'on_user_delete', @@ -44,15 +61,10 @@ export interface ServerEventMap { [ServerEvent.WEBSOCKET_CONNECT]: { userId: string }; } -export enum ServerAsyncEvent { - CONFIG_VALIDATE = 'config.validate', -} - -export interface ServerAsyncEventMap { - [ServerAsyncEvent.CONFIG_VALIDATE]: { newConfig: SystemConfig; oldConfig: SystemConfig }; -} - export interface IEventRepository { + on(event: T, handler: EmitEventHandler): void; + emit(event: T, ...args: Parameters>): Promise; + /** * Send to connected clients for a specific user */ @@ -65,8 +77,4 @@ export interface IEventRepository { * Notify listeners in this and connected processes. Subscribe to an event with `@OnServerEvent` */ serverSend(event: E, data: ServerEventMap[E]): boolean; - /** - * Notify and wait for responses from listeners in this process. Subscribe to an event with `@OnServerEvent` - */ - serverSendAsync(event: E, data: ServerAsyncEventMap[E]): Promise; } diff --git a/server/src/middleware/auth.guard.ts b/server/src/middleware/auth.guard.ts index cbea439489..bac25d80ed 100644 --- a/server/src/middleware/auth.guard.ts +++ b/server/src/middleware/auth.guard.ts @@ -20,6 +20,7 @@ export enum Metadata { ADMIN_ROUTE = 'admin_route', SHARED_ROUTE = 'shared_route', API_KEY_SECURITY = 'api_key', + EVENT_HANDLER_OPTIONS = 'event_handler_options', } type AdminRoute = { admin?: true }; diff --git a/server/src/repositories/event.repository.ts b/server/src/repositories/event.repository.ts index 33dfbfadc5..aecc9d7239 100644 --- a/server/src/repositories/event.repository.ts +++ b/server/src/repositories/event.repository.ts @@ -10,8 +10,9 @@ import { import { Server, Socket } from 'socket.io'; import { ClientEventMap, + EmitEvent, + EmitEventHandler, IEventRepository, - ServerAsyncEventMap, ServerEvent, ServerEventMap, } from 'src/interfaces/event.interface'; @@ -27,6 +28,8 @@ import { Instrumentation } from 'src/utils/instrumentation'; }) @Injectable() export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, IEventRepository { + private emitHandlers: Partial[]>> = {}; + @WebSocketServer() private server?: Server; @@ -71,6 +74,18 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect await client.leave(client.nsp.name); } + on(event: T, handler: EmitEventHandler): void { + const handlers: EmitEventHandler[] = this.emitHandlers[event] || []; + this.emitHandlers[event] = [...handlers, handler]; + } + + async emit(event: T, ...args: Parameters>): Promise { + const handlers = this.emitHandlers[event] || []; + for (const handler of handlers) { + await handler(...args); + } + } + clientSend(event: E, userId: string, data: ClientEventMap[E]) { this.server?.to(userId).emit(event, data); } @@ -84,8 +99,4 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect this.server?.serverSideEmit(event, data); return this.eventEmitter.emit(event, data); } - - serverSendAsync(event: E, data: ServerAsyncEventMap[E]): Promise { - return this.eventEmitter.emitAsync(event, data) as Promise; - } } diff --git a/server/src/services/api.service.ts b/server/src/services/api.service.ts index 9c786a848f..06a6c41bc9 100644 --- a/server/src/services/api.service.ts +++ b/server/src/services/api.service.ts @@ -6,12 +6,8 @@ import { join } from 'node:path'; import { ONE_HOUR, WEB_ROOT } from 'src/constants'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { AuthService } from 'src/services/auth.service'; -import { DatabaseService } from 'src/services/database.service'; import { JobService } from 'src/services/job.service'; -import { ServerInfoService } from 'src/services/server-info.service'; import { SharedLinkService } from 'src/services/shared-link.service'; -import { StorageService } from 'src/services/storage.service'; -import { SystemConfigService } from 'src/services/system-config.service'; import { VersionService } from 'src/services/version.service'; import { OpenGraphTags } from 'src/utils/misc'; @@ -39,12 +35,8 @@ const render = (index: string, meta: OpenGraphTags) => { export class ApiService { constructor( private authService: AuthService, - private configService: SystemConfigService, private jobService: JobService, - private serverService: ServerInfoService, private sharedLinkService: SharedLinkService, - private storageService: StorageService, - private databaseService: DatabaseService, private versionService: VersionService, @Inject(ILoggerRepository) private logger: ILoggerRepository, ) { @@ -61,15 +53,6 @@ export class ApiService { await this.jobService.handleNightlyJobs(); } - async init() { - await this.databaseService.init(); - await this.configService.init(); - this.storageService.init(); - await this.serverService.init(); - await this.versionService.init(); - this.logger.log(`Feature Flags: ${JSON.stringify(await this.serverService.getFeatures(), null, 2)}`); - } - ssr(excludePaths: string[]) { let index = ''; try { diff --git a/server/src/services/database.service.spec.ts b/server/src/services/database.service.spec.ts index 814325ef45..fb1d8e1ed5 100644 --- a/server/src/services/database.service.spec.ts +++ b/server/src/services/database.service.spec.ts @@ -27,7 +27,7 @@ describe(DatabaseService.name, () => { it('should throw an error if PostgreSQL version is below minimum supported version', async () => { databaseMock.getPostgresVersion.mockResolvedValueOnce('13.10.0'); - await expect(sut.init()).rejects.toThrow('Invalid PostgreSQL version. Found 13.10.0'); + await expect(sut.onBootstrapEvent()).rejects.toThrow('Invalid PostgreSQL version. Found 13.10.0'); expect(databaseMock.getPostgresVersion).toHaveBeenCalledTimes(1); }); @@ -35,7 +35,7 @@ describe(DatabaseService.name, () => { it(`should start up successfully with pgvectors`, async () => { databaseMock.getPostgresVersion.mockResolvedValue('14.0.0'); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.getPostgresVersion).toHaveBeenCalled(); expect(databaseMock.createExtension).toHaveBeenCalledWith(DatabaseExtension.VECTORS); @@ -50,7 +50,7 @@ describe(DatabaseService.name, () => { databaseMock.getPostgresVersion.mockResolvedValue('14.0.0'); databaseMock.getExtensionVersion.mockResolvedValue('0.5.0'); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.createExtension).toHaveBeenCalledWith(DatabaseExtension.VECTOR); expect(databaseMock.createExtension).toHaveBeenCalledTimes(1); @@ -60,7 +60,7 @@ describe(DatabaseService.name, () => { it(`should throw an error if the pgvecto.rs extension is not installed`, async () => { databaseMock.getExtensionVersion.mockResolvedValue(''); - await expect(sut.init()).rejects.toThrow(`Unexpected: The pgvecto.rs extension is not installed.`); + await expect(sut.onBootstrapEvent()).rejects.toThrow(`Unexpected: The pgvecto.rs extension is not installed.`); expect(databaseMock.createExtension).toHaveBeenCalledTimes(1); expect(databaseMock.runMigrations).not.toHaveBeenCalled(); @@ -69,7 +69,7 @@ describe(DatabaseService.name, () => { it(`should throw an error if the pgvector extension is not installed`, async () => { process.env.DB_VECTOR_EXTENSION = 'pgvector'; databaseMock.getExtensionVersion.mockResolvedValue(''); - await expect(sut.init()).rejects.toThrow(`Unexpected: The pgvector extension is not installed.`); + await expect(sut.onBootstrapEvent()).rejects.toThrow(`Unexpected: The pgvector extension is not installed.`); expect(databaseMock.createExtension).toHaveBeenCalledTimes(1); expect(databaseMock.runMigrations).not.toHaveBeenCalled(); @@ -78,7 +78,7 @@ describe(DatabaseService.name, () => { it(`should throw an error if the pgvecto.rs extension version is below minimum supported version`, async () => { databaseMock.getExtensionVersion.mockResolvedValue('0.1.0'); - await expect(sut.init()).rejects.toThrow( + await expect(sut.onBootstrapEvent()).rejects.toThrow( 'The pgvecto.rs extension version is 0.1.0, but Immich only supports 0.2.x.', ); @@ -89,7 +89,7 @@ describe(DatabaseService.name, () => { process.env.DB_VECTOR_EXTENSION = 'pgvector'; databaseMock.getExtensionVersion.mockResolvedValue('0.1.0'); - await expect(sut.init()).rejects.toThrow( + await expect(sut.onBootstrapEvent()).rejects.toThrow( 'The pgvector extension version is 0.1.0, but Immich only supports >=0.5 <1', ); @@ -99,7 +99,7 @@ describe(DatabaseService.name, () => { it(`should throw an error if pgvecto.rs extension version is a nightly`, async () => { databaseMock.getExtensionVersion.mockResolvedValue('0.0.0'); - await expect(sut.init()).rejects.toThrow( + await expect(sut.onBootstrapEvent()).rejects.toThrow( 'The pgvecto.rs extension version is 0.0.0, which means it is a nightly release.', ); @@ -111,7 +111,7 @@ describe(DatabaseService.name, () => { process.env.DB_VECTOR_EXTENSION = 'pgvector'; databaseMock.getExtensionVersion.mockResolvedValue('0.0.0'); - await expect(sut.init()).rejects.toThrow( + await expect(sut.onBootstrapEvent()).rejects.toThrow( 'The pgvector extension version is 0.0.0, which means it is a nightly release.', ); @@ -122,7 +122,7 @@ describe(DatabaseService.name, () => { it(`should throw error if pgvecto.rs extension could not be created`, async () => { databaseMock.createExtension.mockRejectedValue(new Error('Failed to create extension')); - await expect(sut.init()).rejects.toThrow('Failed to create extension'); + await expect(sut.onBootstrapEvent()).rejects.toThrow('Failed to create extension'); expect(loggerMock.fatal).toHaveBeenCalledTimes(1); expect(loggerMock.fatal.mock.calls[0][0]).toContain( @@ -137,7 +137,7 @@ describe(DatabaseService.name, () => { databaseMock.getExtensionVersion.mockResolvedValue('0.0.0'); databaseMock.createExtension.mockRejectedValue(new Error('Failed to create extension')); - await expect(sut.init()).rejects.toThrow('Failed to create extension'); + await expect(sut.onBootstrapEvent()).rejects.toThrow('Failed to create extension'); expect(loggerMock.fatal).toHaveBeenCalledTimes(1); expect(loggerMock.fatal.mock.calls[0][0]).toContain( @@ -152,7 +152,7 @@ describe(DatabaseService.name, () => { databaseMock.getAvailableExtensionVersion.mockResolvedValue(version); databaseMock.getExtensionVersion.mockResolvedValue(version); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.updateVectorExtension).toHaveBeenCalledWith('vectors', version); expect(databaseMock.updateVectorExtension).toHaveBeenCalledTimes(1); @@ -168,7 +168,7 @@ describe(DatabaseService.name, () => { databaseMock.getAvailableExtensionVersion.mockResolvedValue(version); databaseMock.getExtensionVersion.mockResolvedValue(version); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.updateVectorExtension).toHaveBeenCalledWith('vector', version); expect(databaseMock.updateVectorExtension).toHaveBeenCalledTimes(1); @@ -182,7 +182,7 @@ describe(DatabaseService.name, () => { it(`should not upgrade pgvecto.rs to ${version}`, async () => { databaseMock.getAvailableExtensionVersion.mockResolvedValue(version); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.updateVectorExtension).not.toHaveBeenCalled(); expect(databaseMock.runMigrations).toHaveBeenCalledTimes(1); @@ -196,7 +196,7 @@ describe(DatabaseService.name, () => { databaseMock.getExtensionVersion.mockResolvedValue('0.5.0'); databaseMock.getAvailableExtensionVersion.mockResolvedValue(version); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.updateVectorExtension).not.toHaveBeenCalled(); expect(databaseMock.runMigrations).toHaveBeenCalledTimes(1); @@ -210,7 +210,7 @@ describe(DatabaseService.name, () => { databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.5.2'); databaseMock.updateVectorExtension.mockRejectedValue(new Error('Failed to update extension')); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(loggerMock.warn.mock.calls[0][0]).toContain('The pgvector extension can be updated to 0.5.2.'); expect(loggerMock.error).toHaveBeenCalledTimes(1); @@ -223,7 +223,7 @@ describe(DatabaseService.name, () => { databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.2.1'); databaseMock.updateVectorExtension.mockRejectedValue(new Error('Failed to update extension')); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(loggerMock.warn.mock.calls[0][0]).toContain('The pgvecto.rs extension can be updated to 0.2.1.'); expect(loggerMock.error).toHaveBeenCalledTimes(1); @@ -236,7 +236,7 @@ describe(DatabaseService.name, () => { databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.2.1'); databaseMock.updateVectorExtension.mockResolvedValue({ restartRequired: true }); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(loggerMock.warn).toHaveBeenCalledTimes(1); expect(loggerMock.warn.mock.calls[0][0]).toContain('pgvecto.rs'); @@ -251,7 +251,7 @@ describe(DatabaseService.name, () => { databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.5.1'); databaseMock.updateVectorExtension.mockResolvedValue({ restartRequired: true }); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(loggerMock.warn).toHaveBeenCalledTimes(1); expect(loggerMock.warn.mock.calls[0][0]).toContain('pgvector'); @@ -263,7 +263,7 @@ describe(DatabaseService.name, () => { it('should reindex if needed', async () => { databaseMock.shouldReindex.mockResolvedValue(true); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.shouldReindex).toHaveBeenCalledTimes(2); expect(databaseMock.reindex).toHaveBeenCalledTimes(2); @@ -274,7 +274,7 @@ describe(DatabaseService.name, () => { it('should not reindex if not needed', async () => { databaseMock.shouldReindex.mockResolvedValue(false); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.shouldReindex).toHaveBeenCalledTimes(2); expect(databaseMock.reindex).toHaveBeenCalledTimes(0); @@ -286,7 +286,7 @@ describe(DatabaseService.name, () => { process.env.DB_SKIP_MIGRATIONS = 'true'; databaseMock.getExtensionVersion.mockResolvedValue('0.2.0'); - await expect(sut.init()).resolves.toBeUndefined(); + await expect(sut.onBootstrapEvent()).resolves.toBeUndefined(); expect(databaseMock.runMigrations).not.toHaveBeenCalled(); }); diff --git a/server/src/services/database.service.ts b/server/src/services/database.service.ts index 4b809faac5..dd3183bb56 100644 --- a/server/src/services/database.service.ts +++ b/server/src/services/database.service.ts @@ -2,6 +2,7 @@ import { Inject, Injectable } from '@nestjs/common'; import semver from 'semver'; import { POSTGRES_VERSION_RANGE, VECTORS_VERSION_RANGE, VECTOR_VERSION_RANGE } from 'src/constants'; import { getVectorExtension } from 'src/database.config'; +import { EventHandlerOptions } from 'src/decorators'; import { DatabaseExtension, DatabaseLock, @@ -9,6 +10,7 @@ import { IDatabaseRepository, VectorIndex, } from 'src/interfaces/database.interface'; +import { OnEvents } from 'src/interfaces/event.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; type CreateFailedArgs = { name: string; extension: string; otherName: string }; @@ -63,7 +65,7 @@ const messages = { }; @Injectable() -export class DatabaseService { +export class DatabaseService implements OnEvents { constructor( @Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository, @Inject(ILoggerRepository) private logger: ILoggerRepository, @@ -71,7 +73,8 @@ export class DatabaseService { this.logger.setContext(DatabaseService.name); } - async init() { + @EventHandlerOptions({ priority: -200 }) + async onBootstrapEvent() { const version = await this.databaseRepository.getPostgresVersion(); const current = semver.coerce(version); if (!current || !semver.satisfies(current, POSTGRES_VERSION_RANGE)) { diff --git a/server/src/services/library.service.spec.ts b/server/src/services/library.service.spec.ts index 5b5da61770..4aad2d3d58 100644 --- a/server/src/services/library.service.spec.ts +++ b/server/src/services/library.service.spec.ts @@ -69,11 +69,11 @@ describe(LibraryService.name, () => { expect(sut).toBeDefined(); }); - describe('init', () => { + describe('onBootstrapEvent', () => { it('should init cron job and subscribe to config changes', async () => { systemMock.get.mockResolvedValue(systemConfigStub.libraryScan); - await sut.init(); + await sut.onBootstrapEvent(); expect(systemMock.get).toHaveBeenCalled(); expect(jobMock.addCronJob).toHaveBeenCalled(); @@ -105,7 +105,7 @@ describe(LibraryService.name, () => { ), ); - await sut.init(); + await sut.onBootstrapEvent(); expect(storageMock.watch.mock.calls).toEqual( expect.arrayContaining([ @@ -118,7 +118,7 @@ describe(LibraryService.name, () => { it('should not initialize watcher when watching is disabled', async () => { systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled); - await sut.init(); + await sut.onBootstrapEvent(); expect(storageMock.watch).not.toHaveBeenCalled(); }); @@ -127,16 +127,16 @@ describe(LibraryService.name, () => { systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled); databaseMock.tryLock.mockResolvedValue(false); - await sut.init(); + await sut.onBootstrapEvent(); expect(storageMock.watch).not.toHaveBeenCalled(); }); }); - describe('onValidateConfig', () => { + describe('onConfigValidateEvent', () => { it('should allow a valid cron expression', () => { expect(() => - sut.onValidateConfig({ + sut.onConfigValidateEvent({ newConfig: { library: { scan: { cronExpression: '0 0 * * *' } } } as SystemConfig, oldConfig: {} as SystemConfig, }), @@ -145,7 +145,7 @@ describe(LibraryService.name, () => { it('should fail for an invalid cron expression', () => { expect(() => - sut.onValidateConfig({ + sut.onConfigValidateEvent({ newConfig: { library: { scan: { cronExpression: 'foo' } } } as SystemConfig, oldConfig: {} as SystemConfig, }), @@ -730,7 +730,7 @@ describe(LibraryService.name, () => { const mockClose = vitest.fn(); storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose })); - await sut.init(); + await sut.onBootstrapEvent(); await sut.delete(libraryStub.externalLibraryWithImportPaths1.id); expect(mockClose).toHaveBeenCalled(); @@ -861,7 +861,7 @@ describe(LibraryService.name, () => { libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); libraryMock.getAll.mockResolvedValue([]); - await sut.init(); + await sut.onBootstrapEvent(); await sut.create({ ownerId: authStub.admin.user.id, importPaths: libraryStub.externalLibraryWithImportPaths1.importPaths, @@ -917,7 +917,7 @@ describe(LibraryService.name, () => { systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled); libraryMock.getAll.mockResolvedValue([]); - await sut.init(); + await sut.onBootstrapEvent(); }); it('should update library', async () => { @@ -933,7 +933,7 @@ describe(LibraryService.name, () => { beforeEach(async () => { systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled); - await sut.init(); + await sut.onBootstrapEvent(); }); it('should not watch library', async () => { @@ -949,7 +949,7 @@ describe(LibraryService.name, () => { beforeEach(async () => { systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled); libraryMock.getAll.mockResolvedValue([]); - await sut.init(); + await sut.onBootstrapEvent(); }); it('should watch library', async () => { @@ -1107,8 +1107,8 @@ describe(LibraryService.name, () => { const mockClose = vitest.fn(); storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose })); - await sut.init(); - await sut.teardown(); + await sut.onBootstrapEvent(); + await sut.onShutdownEvent(); expect(mockClose).toHaveBeenCalledTimes(2); }); diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index 78bacb98c9..af054dc46e 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -6,7 +6,6 @@ import path, { basename, parse } from 'node:path'; import picomatch from 'picomatch'; import { StorageCore } from 'src/cores/storage.core'; import { SystemConfigCore } from 'src/cores/system-config.core'; -import { OnServerEvent } from 'src/decorators'; import { CreateLibraryDto, LibraryResponseDto, @@ -23,7 +22,7 @@ import { LibraryEntity } from 'src/entities/library.entity'; import { IAssetRepository, WithProperty } from 'src/interfaces/asset.interface'; import { ICryptoRepository } from 'src/interfaces/crypto.interface'; import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface'; -import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface'; +import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface'; import { IBaseJob, IEntityJob, @@ -46,7 +45,7 @@ import { validateCronExpression } from 'src/validation'; const LIBRARY_SCAN_BATCH_SIZE = 5000; @Injectable() -export class LibraryService { +export class LibraryService implements OnEvents { private configCore: SystemConfigCore; private watchLibraries = false; private watchLock = false; @@ -66,7 +65,7 @@ export class LibraryService { this.configCore = SystemConfigCore.create(systemMetadataRepository, this.logger); } - async init() { + async onBootstrapEvent() { const config = await this.configCore.getConfig({ withCache: false }); const { watch, scan } = config.library; @@ -103,8 +102,7 @@ export class LibraryService { }); } - @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE) - onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) { + onConfigValidateEvent({ newConfig }: SystemConfigUpdate) { const { scan } = newConfig.library; if (!validateCronExpression(scan.cronExpression)) { throw new Error(`Invalid cron expression ${scan.cronExpression}`); @@ -189,7 +187,7 @@ export class LibraryService { } } - async teardown() { + async onShutdownEvent() { await this.unwatchAll(); } diff --git a/server/src/services/metadata.service.spec.ts b/server/src/services/metadata.service.spec.ts index b36a3c0be9..8731fcf9e6 100644 --- a/server/src/services/metadata.service.spec.ts +++ b/server/src/services/metadata.service.spec.ts @@ -95,16 +95,16 @@ describe(MetadataService.name, () => { }); afterEach(async () => { - await sut.teardown(); + await sut.onShutdownEvent(); }); it('should be defined', () => { expect(sut).toBeDefined(); }); - describe('init', () => { + describe('onBootstrapEvent', () => { it('should pause and resume queue during init', async () => { - await sut.init(); + await sut.onBootstrapEvent('microservices'); expect(jobMock.pause).toHaveBeenCalledTimes(1); expect(mapMock.init).toHaveBeenCalledTimes(1); @@ -114,7 +114,7 @@ describe(MetadataService.name, () => { it('should return if reverse geocoding is disabled', async () => { systemMock.get.mockResolvedValue({ reverseGeocoding: { enabled: false } }); - await sut.init(); + await sut.onBootstrapEvent('microservices'); expect(jobMock.pause).not.toHaveBeenCalled(); expect(mapMock.init).not.toHaveBeenCalled(); diff --git a/server/src/services/metadata.service.ts b/server/src/services/metadata.service.ts index 3c7845010f..de0ab4bb3e 100644 --- a/server/src/services/metadata.service.ts +++ b/server/src/services/metadata.service.ts @@ -5,7 +5,7 @@ import _ from 'lodash'; import { Duration } from 'luxon'; import { constants } from 'node:fs/promises'; import path from 'node:path'; -import { Subscription } from 'rxjs'; +import { SystemConfig } from 'src/config'; import { StorageCore } from 'src/cores/storage.core'; import { SystemConfigCore } from 'src/cores/system-config.core'; import { AssetEntity, AssetType } from 'src/entities/asset.entity'; @@ -14,7 +14,7 @@ import { IAlbumRepository } from 'src/interfaces/album.interface'; import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface'; import { ICryptoRepository } from 'src/interfaces/crypto.interface'; import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface'; -import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface'; +import { ClientEvent, IEventRepository, OnEvents } from 'src/interfaces/event.interface'; import { IBaseJob, IEntityJob, @@ -34,7 +34,6 @@ import { IPersonRepository } from 'src/interfaces/person.interface'; import { IStorageRepository } from 'src/interfaces/storage.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; import { IUserRepository } from 'src/interfaces/user.interface'; -import { handlePromiseError } from 'src/utils/misc'; import { usePagination } from 'src/utils/pagination'; /** look for a date from these tags (in order) */ @@ -97,10 +96,9 @@ const validate = (value: T): NonNullable | null => { }; @Injectable() -export class MetadataService { +export class MetadataService implements OnEvents { private storageCore: StorageCore; private configCore: SystemConfigCore; - private subscription: Subscription | null = null; constructor( @Inject(IAlbumRepository) private albumRepository: IAlbumRepository, @@ -132,12 +130,19 @@ export class MetadataService { ); } - async init() { - if (!this.subscription) { - this.subscription = this.configCore.config$.subscribe(() => handlePromiseError(this.init(), this.logger)); + async onBootstrapEvent(app: 'api' | 'microservices') { + if (app !== 'microservices') { + return; } + const config = await this.configCore.getConfig({ withCache: false }); + await this.init(config); + } - const { reverseGeocoding } = await this.configCore.getConfig({ withCache: false }); + async onConfigUpdateEvent({ newConfig }: { newConfig: SystemConfig }) { + await this.init(newConfig); + } + + private async init({ reverseGeocoding }: SystemConfig) { const { enabled } = reverseGeocoding; if (!enabled) { @@ -155,8 +160,7 @@ export class MetadataService { } } - async teardown() { - this.subscription?.unsubscribe(); + async onShutdownEvent() { await this.repository.teardown(); } diff --git a/server/src/services/microservices.service.ts b/server/src/services/microservices.service.ts index 8e9f6ca082..fe1f4edc07 100644 --- a/server/src/services/microservices.service.ts +++ b/server/src/services/microservices.service.ts @@ -1,8 +1,8 @@ import { Injectable } from '@nestjs/common'; +import { OnEvents } from 'src/interfaces/event.interface'; import { IDeleteFilesJob, JobName } from 'src/interfaces/job.interface'; import { AssetService } from 'src/services/asset.service'; import { AuditService } from 'src/services/audit.service'; -import { DatabaseService } from 'src/services/database.service'; import { DuplicateService } from 'src/services/duplicate.service'; import { JobService } from 'src/services/job.service'; import { LibraryService } from 'src/services/library.service'; @@ -14,18 +14,15 @@ import { SessionService } from 'src/services/session.service'; import { SmartInfoService } from 'src/services/smart-info.service'; import { StorageTemplateService } from 'src/services/storage-template.service'; import { StorageService } from 'src/services/storage.service'; -import { SystemConfigService } from 'src/services/system-config.service'; import { UserService } from 'src/services/user.service'; import { VersionService } from 'src/services/version.service'; import { otelShutdown } from 'src/utils/instrumentation'; @Injectable() -export class MicroservicesService { +export class MicroservicesService implements OnEvents { constructor( private auditService: AuditService, private assetService: AssetService, - private configService: SystemConfigService, - private databaseService: DatabaseService, private jobService: JobService, private libraryService: LibraryService, private mediaService: MediaService, @@ -41,11 +38,11 @@ export class MicroservicesService { private versionService: VersionService, ) {} - async init() { - await this.databaseService.init(); - await this.configService.init(); - await this.libraryService.init(); - await this.notificationService.init(); + async onBootstrapEvent(app: 'api' | 'microservices') { + if (app !== 'microservices') { + return; + } + await this.jobService.init({ [JobName.ASSET_DELETION]: (data) => this.assetService.handleAssetDeletion(data), [JobName.ASSET_DELETION_CHECK]: () => this.assetService.handleAssetDeletionCheck(), @@ -95,13 +92,9 @@ export class MicroservicesService { [JobName.NOTIFY_SIGNUP]: (data) => this.notificationService.handleUserSignup(data), [JobName.VERSION_CHECK]: () => this.versionService.handleVersionCheck(), }); - - await this.metadataService.init(); } - async teardown() { - await this.libraryService.teardown(); - await this.metadataService.teardown(); + async onShutdown() { await otelShutdown(); } } diff --git a/server/src/services/notification.service.ts b/server/src/services/notification.service.ts index acf80ce1fa..56bb264f3b 100644 --- a/server/src/services/notification.service.ts +++ b/server/src/services/notification.service.ts @@ -1,12 +1,11 @@ import { HttpException, HttpStatus, Inject, Injectable } from '@nestjs/common'; import { DEFAULT_EXTERNAL_DOMAIN } from 'src/constants'; import { SystemConfigCore } from 'src/cores/system-config.core'; -import { OnServerEvent } from 'src/decorators'; import { SystemConfigSmtpDto } from 'src/dtos/system-config.dto'; import { AlbumEntity } from 'src/entities/album.entity'; import { IAlbumRepository } from 'src/interfaces/album.interface'; import { IAssetRepository } from 'src/interfaces/asset.interface'; -import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface'; +import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface'; import { IEmailJob, IJobRepository, @@ -23,7 +22,7 @@ import { IUserRepository } from 'src/interfaces/user.interface'; import { getPreferences } from 'src/utils/preferences'; @Injectable() -export class NotificationService { +export class NotificationService implements OnEvents { private configCore: SystemConfigCore; constructor( @@ -39,13 +38,7 @@ export class NotificationService { this.configCore = SystemConfigCore.create(systemMetadataRepository, logger); } - init() { - // TODO - return Promise.resolve(); - } - - @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE) - async onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) { + async onConfigValidateEvent({ newConfig }: SystemConfigUpdate) { try { if (newConfig.notifications.smtp.enabled) { await this.notificationRepository.verifySmtp(newConfig.notifications.smtp.transport); diff --git a/server/src/services/server-info.service.ts b/server/src/services/server-info.service.ts index fd7652df8d..3f495aa635 100644 --- a/server/src/services/server-info.service.ts +++ b/server/src/services/server-info.service.ts @@ -14,6 +14,7 @@ import { UsageByUserDto, } from 'src/dtos/server-info.dto'; import { SystemMetadataKey } from 'src/entities/system-metadata.entity'; +import { OnEvents } from 'src/interfaces/event.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { IServerInfoRepository } from 'src/interfaces/server-info.interface'; import { IStorageRepository } from 'src/interfaces/storage.interface'; @@ -24,7 +25,7 @@ import { mimeTypes } from 'src/utils/mime-types'; import { isDuplicateDetectionEnabled, isFacialRecognitionEnabled, isSmartSearchEnabled } from 'src/utils/misc'; @Injectable() -export class ServerInfoService { +export class ServerInfoService implements OnEvents { private configCore: SystemConfigCore; constructor( @@ -38,13 +39,14 @@ export class ServerInfoService { this.configCore = SystemConfigCore.create(systemMetadataRepository, this.logger); } - async init(): Promise { + async onBootstrapEvent(): Promise { const featureFlags = await this.getFeatures(); if (featureFlags.configFile) { await this.systemMetadataRepository.set(SystemMetadataKey.ADMIN_ONBOARDING, { isOnboarded: true, }); } + this.logger.log(`Feature Flags: ${JSON.stringify(await this.getFeatures(), null, 2)}`); } async getAboutInfo(): Promise { diff --git a/server/src/services/smart-info.service.ts b/server/src/services/smart-info.service.ts index 960c90f69b..19a4319045 100644 --- a/server/src/services/smart-info.service.ts +++ b/server/src/services/smart-info.service.ts @@ -2,6 +2,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { SystemConfigCore } from 'src/cores/system-config.core'; import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface'; import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface'; +import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface'; import { IBaseJob, IEntityJob, @@ -19,7 +20,7 @@ import { isSmartSearchEnabled } from 'src/utils/misc'; import { usePagination } from 'src/utils/pagination'; @Injectable() -export class SmartInfoService { +export class SmartInfoService implements OnEvents { private configCore: SystemConfigCore; constructor( @@ -49,6 +50,12 @@ export class SmartInfoService { await this.jobRepository.resume(QueueName.SMART_SEARCH); } + async onConfigUpdateEvent({ oldConfig, newConfig }: SystemConfigUpdate) { + if (oldConfig.machineLearning.clip.modelName !== newConfig.machineLearning.clip.modelName) { + await this.repository.init(newConfig.machineLearning.clip.modelName); + } + } + async handleQueueEncodeClip({ force }: IBaseJob): Promise { const { machineLearning } = await this.configCore.getConfig({ withCache: false }); if (!isSmartSearchEnabled(machineLearning)) { diff --git a/server/src/services/storage-template.service.spec.ts b/server/src/services/storage-template.service.spec.ts index c1a47cdcf0..7a9b9952e0 100644 --- a/server/src/services/storage-template.service.spec.ts +++ b/server/src/services/storage-template.service.spec.ts @@ -76,10 +76,10 @@ describe(StorageTemplateService.name, () => { SystemConfigCore.create(systemMock, loggerMock).config$.next(defaults); }); - describe('onValidateConfig', () => { + describe('onConfigValidateEvent', () => { it('should allow valid templates', () => { expect(() => - sut.onValidateConfig({ + sut.onConfigValidateEvent({ newConfig: { storageTemplate: { template: @@ -93,7 +93,7 @@ describe(StorageTemplateService.name, () => { it('should fail for an invalid template', () => { expect(() => - sut.onValidateConfig({ + sut.onConfigValidateEvent({ newConfig: { storageTemplate: { template: '{{foo}}', diff --git a/server/src/services/storage-template.service.ts b/server/src/services/storage-template.service.ts index 7c4cf738db..b0d1b1f85b 100644 --- a/server/src/services/storage-template.service.ts +++ b/server/src/services/storage-template.service.ts @@ -15,14 +15,13 @@ import { } from 'src/constants'; import { StorageCore, StorageFolder } from 'src/cores/storage.core'; import { SystemConfigCore } from 'src/cores/system-config.core'; -import { OnServerEvent } from 'src/decorators'; import { AssetEntity, AssetType } from 'src/entities/asset.entity'; import { AssetPathType } from 'src/entities/move.entity'; import { IAlbumRepository } from 'src/interfaces/album.interface'; import { IAssetRepository } from 'src/interfaces/asset.interface'; import { ICryptoRepository } from 'src/interfaces/crypto.interface'; import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface'; -import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface'; +import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface'; import { IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobStatus } from 'src/interfaces/job.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { IMoveRepository } from 'src/interfaces/move.interface'; @@ -46,7 +45,7 @@ interface RenderMetadata { } @Injectable() -export class StorageTemplateService { +export class StorageTemplateService implements OnEvents { private configCore: SystemConfigCore; private storageCore: StorageCore; private _template: { @@ -88,8 +87,7 @@ export class StorageTemplateService { ); } - @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE) - onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) { + onConfigValidateEvent({ newConfig }: SystemConfigUpdate) { try { const { compiled } = this.compile(newConfig.storageTemplate.template); this.render(compiled, { diff --git a/server/src/services/storage.service.spec.ts b/server/src/services/storage.service.spec.ts index ee574e0ba6..5ce6d92d26 100644 --- a/server/src/services/storage.service.spec.ts +++ b/server/src/services/storage.service.spec.ts @@ -20,9 +20,9 @@ describe(StorageService.name, () => { expect(sut).toBeDefined(); }); - describe('init', () => { + describe('onBootstrapEvent', () => { it('should create the library folder on initialization', () => { - sut.init(); + sut.onBootstrapEvent(); expect(storageMock.mkdirSync).toHaveBeenCalledWith('upload/library'); }); }); diff --git a/server/src/services/storage.service.ts b/server/src/services/storage.service.ts index 829888e244..8222d7c46d 100644 --- a/server/src/services/storage.service.ts +++ b/server/src/services/storage.service.ts @@ -1,11 +1,12 @@ import { Inject, Injectable } from '@nestjs/common'; import { StorageCore, StorageFolder } from 'src/cores/storage.core'; +import { OnEvents } from 'src/interfaces/event.interface'; import { IDeleteFilesJob, JobStatus } from 'src/interfaces/job.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { IStorageRepository } from 'src/interfaces/storage.interface'; @Injectable() -export class StorageService { +export class StorageService implements OnEvents { constructor( @Inject(IStorageRepository) private storageRepository: IStorageRepository, @Inject(ILoggerRepository) private logger: ILoggerRepository, @@ -13,7 +14,7 @@ export class StorageService { this.logger.setContext(StorageService.name); } - init() { + onBootstrapEvent() { const libraryBase = StorageCore.getBaseFolder(StorageFolder.LIBRARY); this.storageRepository.mkdirSync(libraryBase); } diff --git a/server/src/services/system-config.service.spec.ts b/server/src/services/system-config.service.spec.ts index ea287321b1..3bafd60a73 100644 --- a/server/src/services/system-config.service.spec.ts +++ b/server/src/services/system-config.service.spec.ts @@ -16,7 +16,6 @@ import { SystemMetadataKey } from 'src/entities/system-metadata.entity'; import { IEventRepository, ServerEvent } from 'src/interfaces/event.interface'; import { QueueName } from 'src/interfaces/job.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; -import { ISearchRepository } from 'src/interfaces/search.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; import { SystemConfigService } from 'src/services/system-config.service'; import { newEventRepositoryMock } from 'test/repositories/event.repository.mock'; @@ -180,14 +179,13 @@ describe(SystemConfigService.name, () => { let systemMock: Mocked; let eventMock: Mocked; let loggerMock: Mocked; - let smartInfoMock: Mocked; beforeEach(() => { delete process.env.IMMICH_CONFIG_FILE; systemMock = newSystemMetadataRepositoryMock(); eventMock = newEventRepositoryMock(); loggerMock = newLoggerRepositoryMock(); - sut = new SystemConfigService(systemMock, eventMock, loggerMock, smartInfoMock); + sut = new SystemConfigService(systemMock, eventMock, loggerMock); }); it('should work', () => { diff --git a/server/src/services/system-config.service.ts b/server/src/services/system-config.service.ts index d1e535daba..2992b9f86b 100644 --- a/server/src/services/system-config.service.ts +++ b/server/src/services/system-config.service.ts @@ -13,35 +13,34 @@ import { supportedYearTokens, } from 'src/constants'; import { SystemConfigCore } from 'src/cores/system-config.core'; -import { OnServerEvent } from 'src/decorators'; +import { EventHandlerOptions, OnServerEvent } from 'src/decorators'; import { SystemConfigDto, SystemConfigTemplateStorageOptionDto, mapConfig } from 'src/dtos/system-config.dto'; import { ClientEvent, IEventRepository, - ServerAsyncEvent, - ServerAsyncEventMap, + OnEvents, ServerEvent, + SystemConfigUpdate, } from 'src/interfaces/event.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; -import { ISearchRepository } from 'src/interfaces/search.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; @Injectable() -export class SystemConfigService { +export class SystemConfigService implements OnEvents { private core: SystemConfigCore; constructor( @Inject(ISystemMetadataRepository) repository: ISystemMetadataRepository, @Inject(IEventRepository) private eventRepository: IEventRepository, @Inject(ILoggerRepository) private logger: ILoggerRepository, - @Inject(ISearchRepository) private smartInfoRepository: ISearchRepository, ) { this.logger.setContext(SystemConfigService.name); this.core = SystemConfigCore.create(repository, this.logger); this.core.config$.subscribe((config) => this.setLogLevel(config)); } - async init() { + @EventHandlerOptions({ priority: -100 }) + async onBootstrapEvent() { const config = await this.core.getConfig({ withCache: false }); this.config$.next(config); } @@ -59,8 +58,7 @@ export class SystemConfigService { return mapConfig(defaults); } - @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE) - onValidateConfig({ newConfig, oldConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) { + onConfigValidateEvent({ newConfig, oldConfig }: SystemConfigUpdate) { if (!_.isEqual(instanceToPlain(newConfig.logging), oldConfig.logging) && this.getEnvLogLevel()) { throw new Error('Logging cannot be changed while the environment variable IMMICH_LOG_LEVEL is set.'); } @@ -74,10 +72,7 @@ export class SystemConfigService { const oldConfig = await this.core.getConfig({ withCache: false }); try { - await this.eventRepository.serverSendAsync(ServerAsyncEvent.CONFIG_VALIDATE, { - newConfig: dto, - oldConfig, - }); + await this.eventRepository.emit('onConfigValidateEvent', { newConfig: dto, oldConfig }); } catch (error) { this.logger.warn(`Unable to save system config due to a validation error: ${error}`); throw new BadRequestException(error instanceof Error ? error.message : error); @@ -85,12 +80,11 @@ export class SystemConfigService { const newConfig = await this.core.updateConfig(dto); + // TODO probably move web socket emits to a separate service this.eventRepository.clientBroadcast(ClientEvent.CONFIG_UPDATE, {}); this.eventRepository.serverSend(ServerEvent.CONFIG_UPDATE, null); + await this.eventRepository.emit('onConfigUpdateEvent', { newConfig, oldConfig }); - if (oldConfig.machineLearning.clip.modelName !== newConfig.machineLearning.clip.modelName) { - await this.smartInfoRepository.init(newConfig.machineLearning.clip.modelName); - } return mapConfig(newConfig); } @@ -115,7 +109,7 @@ export class SystemConfigService { } @OnServerEvent(ServerEvent.CONFIG_UPDATE) - async onConfigUpdate() { + async onConfigUpdateEvent() { await this.core.refreshConfig(); } diff --git a/server/src/services/version.service.ts b/server/src/services/version.service.ts index ecaa2f6f49..a42e550be6 100644 --- a/server/src/services/version.service.ts +++ b/server/src/services/version.service.ts @@ -6,7 +6,7 @@ import { SystemConfigCore } from 'src/cores/system-config.core'; import { OnServerEvent } from 'src/decorators'; import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server-info.dto'; import { SystemMetadataKey, VersionCheckMetadata } from 'src/entities/system-metadata.entity'; -import { ClientEvent, IEventRepository, ServerEvent, ServerEventMap } from 'src/interfaces/event.interface'; +import { ClientEvent, IEventRepository, OnEvents, ServerEvent, ServerEventMap } from 'src/interfaces/event.interface'; import { IJobRepository, JobName, JobStatus } from 'src/interfaces/job.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { IServerInfoRepository } from 'src/interfaces/server-info.interface'; @@ -22,7 +22,7 @@ const asNotification = ({ checkedAt, releaseVersion }: VersionCheckMetadata): Re }; @Injectable() -export class VersionService { +export class VersionService implements OnEvents { private configCore: SystemConfigCore; constructor( @@ -36,7 +36,7 @@ export class VersionService { this.configCore = SystemConfigCore.create(systemMetadataRepository, this.logger); } - async init(): Promise { + async onBootstrapEvent(): Promise { await this.handleVersionCheck(); } diff --git a/server/src/utils/events.ts b/server/src/utils/events.ts new file mode 100644 index 0000000000..1bee4c6558 --- /dev/null +++ b/server/src/utils/events.ts @@ -0,0 +1,33 @@ +import { ModuleRef, Reflector } from '@nestjs/core'; +import _ from 'lodash'; +import { HandlerOptions } from 'src/decorators'; +import { EmitEvent, EmitEventHandler, IEventRepository, OnEvents, events } from 'src/interfaces/event.interface'; +import { Metadata } from 'src/middleware/auth.guard'; +import { services } from 'src/services'; + +export const setupEventHandlers = (moduleRef: ModuleRef) => { + const reflector = moduleRef.get(Reflector, { strict: false }); + const repository = moduleRef.get(IEventRepository); + const handlers: Array<{ event: EmitEvent; handler: EmitEventHandler; priority: number }> = []; + + // discovery + for (const Service of services) { + const instance = moduleRef.get(Service); + for (const event of events) { + const handler = instance[event] as EmitEventHandler; + if (typeof handler !== 'function') { + continue; + } + + const options = reflector.get(Metadata.EVENT_HANDLER_OPTIONS, handler); + const priority = options?.priority || 0; + + handlers.push({ event, handler: handler.bind(instance), priority }); + } + } + + // register by priority + for (const { event, handler } of _.orderBy(handlers, ['priority'], ['asc'])) { + repository.on(event, handler); + } +}; diff --git a/server/test/repositories/event.repository.mock.ts b/server/test/repositories/event.repository.mock.ts index 2277fec83b..a9af627599 100644 --- a/server/test/repositories/event.repository.mock.ts +++ b/server/test/repositories/event.repository.mock.ts @@ -3,9 +3,10 @@ import { Mocked, vitest } from 'vitest'; export const newEventRepositoryMock = (): Mocked => { return { + on: vitest.fn(), + emit: vitest.fn() as any, clientSend: vitest.fn(), clientBroadcast: vitest.fn(), serverSend: vitest.fn(), - serverSendAsync: vitest.fn(), }; };