diff --git a/server/src/domain/asset/asset.service.spec.ts b/server/src/domain/asset/asset.service.spec.ts index 08fd2e4061..907892da6a 100644 --- a/server/src/domain/asset/asset.service.spec.ts +++ b/server/src/domain/asset/asset.service.spec.ts @@ -7,6 +7,7 @@ import { faceStub, newAccessRepositoryMock, newAssetRepositoryMock, + newCommunicationRepositoryMock, newCryptoRepositoryMock, newJobRepositoryMock, newStorageRepositoryMock, @@ -14,6 +15,7 @@ import { } from '@test'; import { when } from 'jest-when'; import { Readable } from 'stream'; +import { ICommunicationRepository } from '../communication'; import { ICryptoRepository } from '../crypto'; import { IJobRepository, JobItem, JobName } from '../job'; import { IStorageRepository } from '../storage'; @@ -153,6 +155,7 @@ describe(AssetService.name, () => { let cryptoMock: jest.Mocked; let jobMock: jest.Mocked; let storageMock: jest.Mocked; + let communicationMock: jest.Mocked; let configMock: jest.Mocked; it('should work', () => { @@ -162,11 +165,12 @@ describe(AssetService.name, () => { beforeEach(async () => { accessMock = newAccessRepositoryMock(); assetMock = newAssetRepositoryMock(); + communicationMock = newCommunicationRepositoryMock(); cryptoMock = newCryptoRepositoryMock(); jobMock = newJobRepositoryMock(); storageMock = newStorageRepositoryMock(); configMock = newSystemConfigRepositoryMock(); - sut = new AssetService(accessMock, assetMock, cryptoMock, jobMock, configMock, storageMock); + sut = new AssetService(accessMock, assetMock, cryptoMock, jobMock, configMock, storageMock, communicationMock); when(assetMock.getById) .calledWith(assetStub.livePhotoStillAsset.id) diff --git a/server/src/domain/asset/asset.service.ts b/server/src/domain/asset/asset.service.ts index 8a562354f8..b04ae646bb 100644 --- a/server/src/domain/asset/asset.service.ts +++ b/server/src/domain/asset/asset.service.ts @@ -6,6 +6,7 @@ import { extname } from 'path'; import sanitize from 'sanitize-filename'; import { AccessCore, IAccessRepository, Permission } from '../access'; import { AuthUserDto } from '../auth'; +import { CommunicationEvent, ICommunicationRepository } from '../communication'; import { ICryptoRepository } from '../crypto'; import { mimeTypes } from '../domain.constant'; import { HumanReadableSize, usePagination } from '../domain.util'; @@ -72,6 +73,7 @@ export class AssetService { @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository, @Inject(IStorageRepository) private storageRepository: IStorageRepository, + @Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository, ) { this.access = new AccessCore(accessRepository); this.storageCore = new StorageCore(storageRepository); @@ -362,6 +364,7 @@ export class AssetService { await this.assetRepository.remove(asset); await this.jobRepository.queue({ name: JobName.SEARCH_REMOVE_ASSET, data: { ids: [asset.id] } }); + this.communicationRepository.send(CommunicationEvent.ASSET_DELETE, asset.ownerId, id); // TODO refactor this to use cascades if (asset.livePhotoVideoId) { @@ -392,6 +395,7 @@ export class AssetService { } else { await this.assetRepository.softDeleteAll(ids); await this.jobRepository.queue({ name: JobName.SEARCH_REMOVE_ASSET, data: { ids } }); + this.communicationRepository.send(CommunicationEvent.ASSET_TRASH, authUser.id, ids); } } diff --git a/server/src/domain/communication/communication.repository.ts b/server/src/domain/communication/communication.repository.ts index 3201d07337..b379c7a62b 100644 --- a/server/src/domain/communication/communication.repository.ts +++ b/server/src/domain/communication/communication.repository.ts @@ -2,6 +2,10 @@ export const ICommunicationRepository = 'ICommunicationRepository'; export enum CommunicationEvent { UPLOAD_SUCCESS = 'on_upload_success', + ASSET_DELETE = 'on_asset_delete', + ASSET_TRASH = 'on_asset_trash', + PERSON_THUMBNAIL = 'on_person_thumbnail', + SERVER_VERSION = 'on_server_version', CONFIG_UPDATE = 'on_config_update', } diff --git a/server/src/domain/job/job.service.spec.ts b/server/src/domain/job/job.service.spec.ts index f5924b9982..3a44b71d80 100644 --- a/server/src/domain/job/job.service.spec.ts +++ b/server/src/domain/job/job.service.spec.ts @@ -6,10 +6,12 @@ import { newAssetRepositoryMock, newCommunicationRepositoryMock, newJobRepositoryMock, + newPersonRepositoryMock, newSystemConfigRepositoryMock, } from '@test'; import { IAssetRepository } from '../asset'; import { ICommunicationRepository } from '../communication'; +import { IPersonRepository } from '../person'; import { ISystemConfigRepository } from '../system-config'; import { SystemConfigCore } from '../system-config/system-config.core'; import { JobCommand, JobName, QueueName } from './job.constants'; @@ -30,13 +32,15 @@ describe(JobService.name, () => { let configMock: jest.Mocked; let communicationMock: jest.Mocked; let jobMock: jest.Mocked; + let personMock: jest.Mocked; beforeEach(async () => { assetMock = newAssetRepositoryMock(); configMock = newSystemConfigRepositoryMock(); communicationMock = newCommunicationRepositoryMock(); jobMock = newJobRepositoryMock(); - sut = new JobService(assetMock, communicationMock, jobMock, configMock); + personMock = newPersonRepositoryMock(); + sut = new JobService(assetMock, communicationMock, jobMock, configMock, personMock); }); it('should work', () => { diff --git a/server/src/domain/job/job.service.ts b/server/src/domain/job/job.service.ts index c342286519..9eb2e10cd8 100644 --- a/server/src/domain/job/job.service.ts +++ b/server/src/domain/job/job.service.ts @@ -2,6 +2,7 @@ import { AssetType } from '@app/infra/entities'; import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common'; import { IAssetRepository, mapAsset } from '../asset'; import { CommunicationEvent, ICommunicationRepository } from '../communication'; +import { IPersonRepository } from '../person'; import { FeatureFlag, ISystemConfigRepository } from '../system-config'; import { SystemConfigCore } from '../system-config/system-config.core'; import { JobCommand, JobName, QueueName } from './job.constants'; @@ -18,6 +19,7 @@ export class JobService { @Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository, @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository, + @Inject(IPersonRepository) private personRepository: IPersonRepository, ) { this.configCore = new SystemConfigCore(configRepository); } @@ -172,15 +174,20 @@ export class JobService { } break; + case JobName.GENERATE_PERSON_THUMBNAIL: + const { id } = item.data; + const person = await this.personRepository.getById(id); + if (person) { + this.communicationRepository.send(CommunicationEvent.PERSON_THUMBNAIL, person.ownerId, id); + } + break; + case JobName.GENERATE_JPEG_THUMBNAIL: { await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: item.data }); await this.jobRepository.queue({ name: JobName.GENERATE_THUMBHASH_THUMBNAIL, data: item.data }); await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: item.data }); await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: item.data }); await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: item.data }); - if (item.data.source !== 'upload') { - break; - } const [asset] = await this.assetRepository.getByIds([item.data.id]); if (asset) { @@ -189,10 +196,20 @@ export class JobService { } else if (asset.livePhotoVideoId) { await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.livePhotoVideoId } }); } - this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset)); } break; } + + case JobName.GENERATE_WEBP_THUMBNAIL: { + if (item.data.source !== 'upload') { + break; + } + + const [asset] = await this.assetRepository.getByIds([item.data.id]); + if (asset) { + this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset)); + } + } } // In addition to the above jobs, all of these should queue `SEARCH_INDEX_ASSET` diff --git a/server/src/infra/communication.gateway.ts b/server/src/infra/communication.gateway.ts deleted file mode 100644 index 483f9d9d7c..0000000000 --- a/server/src/infra/communication.gateway.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { AuthService } from '@app/domain'; -import { Logger } from '@nestjs/common'; -import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; -import { Server, Socket } from 'socket.io'; - -@WebSocketGateway({ cors: true }) -export class CommunicationGateway implements OnGatewayConnection, OnGatewayDisconnect { - private logger = new Logger(CommunicationGateway.name); - - constructor(private authService: AuthService) {} - - @WebSocketServer() server!: Server; - - handleDisconnect(client: Socket) { - client.leave(client.nsp.name); - this.logger.log(`Client ${client.id} disconnected from Websocket`); - } - - async handleConnection(client: Socket) { - try { - this.logger.log(`New websocket connection: ${client.id}`); - const user = await this.authService.validate(client.request.headers, {}); - if (user) { - client.join(user.id); - } else { - client.emit('error', 'unauthorized'); - client.disconnect(); - } - } catch (e) { - client.emit('error', 'unauthorized'); - client.disconnect(); - } - } -} diff --git a/server/src/infra/infra.module.ts b/server/src/infra/infra.module.ts index 9b75c7f2a3..48f9a89007 100644 --- a/server/src/infra/infra.module.ts +++ b/server/src/infra/infra.module.ts @@ -27,7 +27,6 @@ import { BullModule } from '@nestjs/bullmq'; import { Global, Module, Provider } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { TypeOrmModule } from '@nestjs/typeorm'; -import { CommunicationGateway } from './communication.gateway'; import { databaseConfig } from './database.config'; import { databaseEntities } from './entities'; import { bullConfig, bullQueues } from './infra.config'; @@ -90,7 +89,7 @@ const providers: Provider[] = [ BullModule.forRoot(bullConfig), BullModule.registerQueue(...bullQueues), ], - providers: [...providers, CommunicationGateway], + providers: [...providers], exports: [...providers, BullModule], }) export class InfraModule {} diff --git a/server/src/infra/repositories/communication.repository.ts b/server/src/infra/repositories/communication.repository.ts index 5fb11fd2ea..2486f5de0f 100644 --- a/server/src/infra/repositories/communication.repository.ts +++ b/server/src/infra/repositories/communication.repository.ts @@ -1,16 +1,43 @@ -import { CommunicationEvent } from '@app/domain'; -import { Injectable } from '@nestjs/common'; -import { CommunicationGateway } from '../communication.gateway'; +import { AuthService, CommunicationEvent, ICommunicationRepository, serverVersion } from '@app/domain'; +import { Logger } from '@nestjs/common'; +import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; -@Injectable() -export class CommunicationRepository { - constructor(private ws: CommunicationGateway) {} +@WebSocketGateway({ cors: true }) +export class CommunicationRepository implements OnGatewayConnection, OnGatewayDisconnect, ICommunicationRepository { + private logger = new Logger(CommunicationRepository.name); + + constructor(private authService: AuthService) {} + + @WebSocketServer() server!: Server; + + async handleConnection(client: Socket) { + try { + this.logger.log(`New websocket connection: ${client.id}`); + const user = await this.authService.validate(client.request.headers, {}); + if (user) { + client.join(user.id); + this.send(CommunicationEvent.SERVER_VERSION, user.id, serverVersion); + } else { + client.emit('error', 'unauthorized'); + client.disconnect(); + } + } catch (e) { + client.emit('error', 'unauthorized'); + client.disconnect(); + } + } + + handleDisconnect(client: Socket) { + client.leave(client.nsp.name); + this.logger.log(`Client ${client.id} disconnected from Websocket`); + } send(event: CommunicationEvent, userId: string, data: any) { - this.ws.server.to(userId).emit(event, JSON.stringify(data)); + this.server.to(userId).emit(event, JSON.stringify(data)); } broadcast(event: CommunicationEvent, data: any) { - this.ws.server.emit(event, data); + this.server.emit(event, data); } } diff --git a/web/src/lib/components/photos-page/asset-date-group.svelte b/web/src/lib/components/photos-page/asset-date-group.svelte index 9207cd9248..48dd58f762 100644 --- a/web/src/lib/components/photos-page/asset-date-group.svelte +++ b/web/src/lib/components/photos-page/asset-date-group.svelte @@ -13,6 +13,7 @@ import type { AssetStore } from '$lib/stores/assets.store'; import type { AssetInteractionStore } from '$lib/stores/asset-interaction.store'; import type { Viewport } from '$lib/stores/assets.store'; + import { flip } from 'svelte/animate'; export let assets: AssetResponseDto[]; export let bucketDate: string; @@ -176,6 +177,7 @@
{ showSkeleton = false; document.addEventListener('keydown', onKeyboardPress); + assetStore.connect(); await assetStore.init(viewport); }); @@ -55,6 +56,8 @@ if ($showAssetViewer) { $showAssetViewer = false; } + + assetStore.disconnect(); }); const handleKeyboardPress = (event: KeyboardEvent) => { diff --git a/web/src/lib/components/shared-components/status-box.svelte b/web/src/lib/components/shared-components/status-box.svelte index 98e75e0550..21568d45f0 100644 --- a/web/src/lib/components/shared-components/status-box.svelte +++ b/web/src/lib/components/shared-components/status-box.svelte @@ -1,52 +1,40 @@
@@ -61,7 +49,7 @@

@@ -88,7 +76,7 @@

Status

- {#if isServerOk} + {#if $connected}

Online

{:else}

Offline

@@ -97,20 +85,18 @@

Version

- - {serverVersion} - + {#if $connected && version} + + {version} + + {:else} +

Unknown

+ {/if}
-
diff --git a/web/src/lib/components/shared-components/upload-panel.svelte b/web/src/lib/components/shared-components/upload-panel.svelte index 4ca4c91810..c8bfeb5d71 100644 --- a/web/src/lib/components/shared-components/upload-panel.svelte +++ b/web/src/lib/components/shared-components/upload-panel.svelte @@ -36,15 +36,12 @@ in:fade={{ duration: 250 }} out:fade={{ duration: 250 }} on:outroend={() => { - const errorInfo = - $errorCounter > 0 - ? `Upload completed with ${$errorCounter} error${$errorCounter > 1 ? 's' : ''}` - : 'Upload success'; - const type = $errorCounter > 0 ? NotificationType.Warning : NotificationType.Info; - notificationController.show({ - message: `${errorInfo}, refresh the page to see new upload assets`, - type, + message: + ($errorCounter > 0 + ? `Upload completed with ${$errorCounter} error${$errorCounter > 1 ? 's' : ''}` + : 'Upload success') + ', refresh the page to see new upload assets.', + type: $errorCounter > 0 ? NotificationType.Warning : NotificationType.Info, }); if ($duplicateCounter > 0) { diff --git a/web/src/lib/stores/assets.store.ts b/web/src/lib/stores/assets.store.ts index e3f4271dbe..3902c8b102 100644 --- a/web/src/lib/stores/assets.store.ts +++ b/web/src/lib/stores/assets.store.ts @@ -1,6 +1,9 @@ import { api, AssetApiGetTimeBucketsRequest, AssetResponseDto } from '@api'; -import { writable } from 'svelte/store'; +import { throttle } from 'lodash-es'; +import { DateTime } from 'luxon'; +import { Unsubscriber, writable } from 'svelte/store'; import { handleError } from '../utils/handle-error'; +import { websocketStore } from './websocket'; export enum BucketPosition { Above = 'above', @@ -34,11 +37,33 @@ export class AssetBucket { position!: BucketPosition; } +const isMismatched = (option: boolean | undefined, value: boolean): boolean => + option === undefined ? false : option !== value; + const THUMBNAIL_HEIGHT = 235; +interface AddAsset { + type: 'add'; + value: AssetResponseDto; +} + +interface DeleteAsset { + type: 'delete'; + value: string; +} + +interface TrashAsset { + type: 'trash'; + value: string; +} + +type PendingChange = AddAsset | DeleteAsset | TrashAsset; + export class AssetStore { private store$ = writable(this); private assetToBucket: Record = {}; + private pendingChanges: PendingChange[] = []; + private unsubscribers: Unsubscriber[] = []; initialized = false; timelineHeight = 0; @@ -52,6 +77,63 @@ export class AssetStore { subscribe = this.store$.subscribe; + connect() { + this.unsubscribers.push( + websocketStore.onUploadSuccess.subscribe((value) => { + if (value) { + this.pendingChanges.push({ type: 'add', value }); + this.processPendingChanges(); + } + }), + + websocketStore.onAssetTrash.subscribe((ids) => { + console.log('onAssetTrash', ids); + if (ids) { + for (const id of ids) { + this.pendingChanges.push({ type: 'trash', value: id }); + } + this.processPendingChanges(); + } + }), + + websocketStore.onAssetDelete.subscribe((value) => { + if (value) { + this.pendingChanges.push({ type: 'delete', value }); + this.processPendingChanges(); + } + }), + ); + } + + disconnect() { + for (const unsubscribe of this.unsubscribers) { + unsubscribe(); + } + } + + processPendingChanges = throttle(() => { + for (const { type, value } of this.pendingChanges) { + switch (type) { + case 'add': + this.addAsset(value); + break; + + case 'trash': + if (!this.options.isTrashed) { + this.removeAsset(value); + } + break; + + case 'delete': + this.removeAsset(value); + break; + } + } + + this.pendingChanges = []; + this.emit(true); + }, 10_000); + async init(viewport: Viewport) { this.initialized = false; this.timelineHeight = 0; @@ -168,6 +250,46 @@ export class AssetStore { return scrollTimeline ? delta : 0; } + private addAsset(asset: AssetResponseDto): void { + if ( + this.assetToBucket[asset.id] || + this.options.userId || + this.options.personId || + this.options.albumId || + isMismatched(this.options.isArchived, asset.isArchived) || + isMismatched(this.options.isFavorite, asset.isFavorite) + ) { + return; + } + + const timeBucket = DateTime.fromISO(asset.fileCreatedAt).toUTC().startOf('month').toString(); + let bucket = this.getBucketByDate(timeBucket); + + if (!bucket) { + bucket = { + bucketDate: timeBucket, + bucketHeight: THUMBNAIL_HEIGHT, + assets: [], + cancelToken: null, + position: BucketPosition.Unknown, + }; + + this.buckets.push(bucket); + this.buckets = this.buckets.sort((a, b) => { + const aDate = DateTime.fromISO(a.bucketDate).toUTC(); + const bDate = DateTime.fromISO(b.bucketDate).toUTC(); + return bDate.diff(aDate).milliseconds; + }); + } + + bucket.assets.push(asset); + bucket.assets.sort((a, b) => { + const aDate = DateTime.fromISO(a.fileCreatedAt).toUTC(); + const bDate = DateTime.fromISO(b.fileCreatedAt).toUTC(); + return bDate.diff(aDate).milliseconds; + }); + } + getBucketByDate(bucketDate: string): AssetBucket | null { return this.buckets.find((bucket) => bucket.bucketDate === bucketDate) || null; } diff --git a/web/src/lib/stores/websocket.ts b/web/src/lib/stores/websocket.ts index 09777891e5..407fb70c3e 100644 --- a/web/src/lib/stores/websocket.ts +++ b/web/src/lib/stores/websocket.ts @@ -1,10 +1,19 @@ -import { io, Socket } from 'socket.io-client'; +import type { AssetResponseDto, ServerVersionResponseDto } from '@api'; +import { io } from 'socket.io-client'; +import { writable } from 'svelte/store'; -let websocket: Socket; +export const websocketStore = { + onUploadSuccess: writable(), + onAssetDelete: writable(), + onAssetTrash: writable(), + onPersonThumbnail: writable(), + serverVersion: writable(), + connected: writable(false), +}; export const openWebsocketConnection = () => { try { - websocket = io('', { + const websocket = io('', { path: '/api/socket.io', transports: ['polling'], reconnection: true, @@ -12,21 +21,18 @@ export const openWebsocketConnection = () => { autoConnect: true, }); - listenToEvent(websocket); + websocket + .on('connect', () => websocketStore.connected.set(true)) + .on('disconnect', () => websocketStore.connected.set(false)) + // .on('on_upload_success', (data) => websocketStore.onUploadSuccess.set(JSON.parse(data) as AssetResponseDto)) + .on('on_asset_delete', (data) => websocketStore.onAssetDelete.set(JSON.parse(data) as string)) + .on('on_asset_trash', (data) => websocketStore.onAssetTrash.set(JSON.parse(data) as string[])) + .on('on_person_thumbnail', (data) => websocketStore.onPersonThumbnail.set(JSON.parse(data) as string)) + .on('on_server_version', (data) => websocketStore.serverVersion.set(JSON.parse(data) as ServerVersionResponseDto)) + .on('error', (e) => console.log('Websocket Error', e)); + + return () => websocket?.close(); } catch (e) { console.log('Cannot connect to websocket ', e); } }; - -const listenToEvent = (socket: Socket) => { - //TODO: if we are not using this, we should probably remove it? - socket.on('on_upload_success', () => undefined); - - socket.on('error', (e) => { - console.log('Websocket Error', e); - }); -}; - -export const closeWebsocketConnection = () => { - websocket?.close(); -}; diff --git a/web/src/routes/(user)/people/[personId]/+page.svelte b/web/src/routes/(user)/people/[personId]/+page.svelte index 8fc4473d3e..5dac37b506 100644 --- a/web/src/routes/(user)/people/[personId]/+page.svelte +++ b/web/src/routes/(user)/people/[personId]/+page.svelte @@ -25,6 +25,7 @@ import { AppRoute } from '$lib/constants'; import { createAssetInteractionStore } from '$lib/stores/asset-interaction.store'; import { AssetStore } from '$lib/stores/assets.store'; + import { websocketStore } from '$lib/stores/websocket'; import { handleError } from '$lib/utils/handle-error'; import { AssetResponseDto, PersonResponseDto, TimeBucketSize, api } from '@api'; import { onMount } from 'svelte'; @@ -54,6 +55,7 @@ }); const assetInteractionStore = createAssetInteractionStore(); const { selectedAssets, isMultiSelectState } = assetInteractionStore; + const { onPersonThumbnail } = websocketStore; let viewMode: ViewMode = ViewMode.VIEW_ASSETS; let isEditingName = false; @@ -65,12 +67,15 @@ let potentialMergePeople: PersonResponseDto[] = []; let personName = ''; + let thumbnailData = api.getPeopleThumbnailUrl(data.person.id); let name: string = data.person.name; let suggestedPeople: PersonResponseDto[] = []; $: isAllArchive = Array.from($selectedAssets).every((asset) => asset.isArchived); $: isAllFavorite = Array.from($selectedAssets).every((asset) => asset.isFavorite); + $: $onPersonThumbnail === data.person.id && + (thumbnailData = api.getPeopleThumbnailUrl(data.person.id) + `?now=${Date.now()}`); $: { suggestedPeople = !name @@ -141,14 +146,8 @@ await api.personApi.updatePerson({ id: data.person.id, personUpdateDto: { featureFaceAssetId: asset.id } }); - // TODO: Replace by Websocket in the future - notificationController.show({ - message: 'Feature photo updated, refresh page to see changes', - type: NotificationType.Info, - }); - + notificationController.show({ message: 'Feature photo updated', type: NotificationType.Info }); assetInteractionStore.clearMultiselect(); - // scroll to top viewMode = ViewMode.VIEW_ASSETS; }; @@ -376,7 +375,7 @@ { + openWebsocketConnection(); + try { await loadConfig(); } catch (error) {