From b34abf25f0e5748e014736846c9845e6a02f170a Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Wed, 13 Dec 2023 12:23:51 -0500 Subject: [PATCH] feat(server): server-side events (#5669) --- server/src/domain/asset/asset.service.spec.ts | 4 +- server/src/domain/asset/asset.service.ts | 14 ++-- server/src/domain/job/job.constants.ts | 2 - server/src/domain/job/job.service.ts | 8 +-- .../domain/metadata/metadata.service.spec.ts | 4 +- .../src/domain/metadata/metadata.service.ts | 6 +- .../repositories/communication.repository.ts | 17 +++-- .../src/domain/repositories/job.repository.ts | 1 - .../domain/server-info/server-info.service.ts | 8 +-- .../system-config.service.spec.ts | 20 ++---- .../system-config/system-config.service.ts | 23 +++++-- .../repositories/communication.repository.ts | 64 ++++++++++++++++--- .../src/infra/repositories/job.repository.ts | 2 - server/src/microservices/app.service.ts | 1 - .../communication.repository.mock.ts | 3 +- 15 files changed, 114 insertions(+), 63 deletions(-) diff --git a/server/src/domain/asset/asset.service.spec.ts b/server/src/domain/asset/asset.service.spec.ts index f3dab855c7..a86ffc9ffb 100644 --- a/server/src/domain/asset/asset.service.spec.ts +++ b/server/src/domain/asset/asset.service.spec.ts @@ -20,7 +20,7 @@ import { ImmichFileResponse } from '../domain.util'; import { JobName } from '../job'; import { AssetStats, - CommunicationEvent, + ClientEvent, IAssetRepository, ICommunicationRepository, ICryptoRepository, @@ -764,7 +764,7 @@ describe(AssetService.name, () => { stackParentId: 'parent', }); - expect(communicationMock.send).toHaveBeenCalledWith(CommunicationEvent.ASSET_UPDATE, authStub.user1.user.id, [ + expect(communicationMock.send).toHaveBeenCalledWith(ClientEvent.ASSET_UPDATE, authStub.user1.user.id, [ 'asset-1', ]); }); diff --git a/server/src/domain/asset/asset.service.ts b/server/src/domain/asset/asset.service.ts index aad36c05cf..340ddb1edb 100644 --- a/server/src/domain/asset/asset.service.ts +++ b/server/src/domain/asset/asset.service.ts @@ -10,7 +10,7 @@ import { mimeTypes } from '../domain.constant'; import { HumanReadableSize, ImmichFileResponse, usePagination } from '../domain.util'; import { IAssetDeletionJob, ISidecarWriteJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job'; import { - CommunicationEvent, + ClientEvent, IAccessRepository, IAssetRepository, ICommunicationRepository, @@ -434,7 +434,7 @@ export class AssetService { } await this.assetRepository.updateAll(ids, options); - this.communicationRepository.send(CommunicationEvent.ASSET_UPDATE, auth.user.id, ids); + this.communicationRepository.send(ClientEvent.ASSET_UPDATE, auth.user.id, ids); } async handleAssetDeletionCheck() { @@ -478,7 +478,7 @@ export class AssetService { } await this.assetRepository.remove(asset); - this.communicationRepository.send(CommunicationEvent.ASSET_DELETE, asset.ownerId, id); + this.communicationRepository.send(ClientEvent.ASSET_DELETE, asset.ownerId, id); // TODO refactor this to use cascades if (asset.livePhotoVideoId) { @@ -508,7 +508,7 @@ export class AssetService { } } else { await this.assetRepository.softDeleteAll(ids); - this.communicationRepository.send(CommunicationEvent.ASSET_TRASH, auth.user.id, ids); + this.communicationRepository.send(ClientEvent.ASSET_TRASH, auth.user.id, ids); } } @@ -521,7 +521,7 @@ export class AssetService { for await (const assets of assetPagination) { const ids = assets.map((a) => a.id); await this.assetRepository.restoreAll(ids); - this.communicationRepository.send(CommunicationEvent.ASSET_RESTORE, auth.user.id, ids); + this.communicationRepository.send(ClientEvent.ASSET_RESTORE, auth.user.id, ids); } return; } @@ -540,7 +540,7 @@ export class AssetService { const { ids } = dto; await this.access.requirePermission(auth, Permission.ASSET_RESTORE, ids); await this.assetRepository.restoreAll(ids); - this.communicationRepository.send(CommunicationEvent.ASSET_RESTORE, auth.user.id, ids); + this.communicationRepository.send(ClientEvent.ASSET_RESTORE, auth.user.id, ids); } async updateStackParent(auth: AuthDto, dto: UpdateStackParentDto): Promise { @@ -556,7 +556,7 @@ export class AssetService { childIds.push(...(oldParent.stack?.map((a) => a.id) ?? [])); } - this.communicationRepository.send(CommunicationEvent.ASSET_UPDATE, auth.user.id, [...childIds, newParentId]); + this.communicationRepository.send(ClientEvent.ASSET_UPDATE, auth.user.id, [...childIds, newParentId]); await this.assetRepository.updateAll(childIds, { stackParentId: newParentId }); // Remove ParentId of new parent if this was previously a child of some other asset return this.assetRepository.updateAll([newParentId], { stackParentId: null }); diff --git a/server/src/domain/job/job.constants.ts b/server/src/domain/job/job.constants.ts index 0dd15d260f..287d22db60 100644 --- a/server/src/domain/job/job.constants.ts +++ b/server/src/domain/job/job.constants.ts @@ -49,7 +49,6 @@ export enum JobName { // storage template STORAGE_TEMPLATE_MIGRATION = 'storage-template-migration', STORAGE_TEMPLATE_MIGRATION_SINGLE = 'storage-template-migration-single', - SYSTEM_CONFIG_CHANGE = 'system-config-change', // migration QUEUE_MIGRATION = 'queue-migration', @@ -101,7 +100,6 @@ export const JOBS_TO_QUEUE: Record = { [JobName.CLEAN_OLD_AUDIT_LOGS]: QueueName.BACKGROUND_TASK, [JobName.PERSON_CLEANUP]: QueueName.BACKGROUND_TASK, [JobName.PERSON_DELETE]: QueueName.BACKGROUND_TASK, - [JobName.SYSTEM_CONFIG_CHANGE]: QueueName.BACKGROUND_TASK, // conversion [JobName.QUEUE_VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION, diff --git a/server/src/domain/job/job.service.ts b/server/src/domain/job/job.service.ts index b57013ae00..c8976c02a6 100644 --- a/server/src/domain/job/job.service.ts +++ b/server/src/domain/job/job.service.ts @@ -2,7 +2,7 @@ import { AssetType } from '@app/infra/entities'; import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common'; import { mapAsset } from '../asset'; import { - CommunicationEvent, + ClientEvent, IAssetRepository, ICommunicationRepository, IJobRepository, @@ -181,7 +181,7 @@ export class JobService { if (item.data.source === 'sidecar-write') { const [asset] = await this.assetRepository.getByIds([item.data.id]); if (asset) { - this.communicationRepository.send(CommunicationEvent.ASSET_UPDATE, asset.ownerId, mapAsset(asset)); + this.communicationRepository.send(ClientEvent.ASSET_UPDATE, asset.ownerId, mapAsset(asset)); } } await this.jobRepository.queue({ name: JobName.LINK_LIVE_PHOTOS, data: item.data }); @@ -201,7 +201,7 @@ export class JobService { const { id } = item.data; const person = await this.personRepository.getById(id); if (person) { - this.communicationRepository.send(CommunicationEvent.PERSON_THUMBNAIL, person.ownerId, person.id); + this.communicationRepository.send(ClientEvent.PERSON_THUMBNAIL, person.ownerId, person.id); } break; @@ -232,7 +232,7 @@ export class JobService { // Only live-photo motion part will be marked as not visible immediately on upload. Skip notifying clients if (asset && asset.isVisible) { - this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset)); + this.communicationRepository.send(ClientEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset)); } } } diff --git a/server/src/domain/metadata/metadata.service.spec.ts b/server/src/domain/metadata/metadata.service.spec.ts index 2700f0080f..cfabc1149a 100644 --- a/server/src/domain/metadata/metadata.service.spec.ts +++ b/server/src/domain/metadata/metadata.service.spec.ts @@ -20,7 +20,7 @@ import { constants } from 'fs/promises'; import { when } from 'jest-when'; import { JobName } from '../job'; import { - CommunicationEvent, + ClientEvent, IAlbumRepository, IAssetRepository, ICommunicationRepository, @@ -190,7 +190,7 @@ describe(MetadataService.name, () => { await expect(sut.handleLivePhotoLinking({ id: assetStub.livePhotoStillAsset.id })).resolves.toBe(true); expect(communicationMock.send).toHaveBeenCalledWith( - CommunicationEvent.ASSET_HIDDEN, + ClientEvent.ASSET_HIDDEN, assetStub.livePhotoMotionAsset.ownerId, assetStub.livePhotoMotionAsset.id, ); diff --git a/server/src/domain/metadata/metadata.service.ts b/server/src/domain/metadata/metadata.service.ts index e40bfead66..e160eda636 100644 --- a/server/src/domain/metadata/metadata.service.ts +++ b/server/src/domain/metadata/metadata.service.ts @@ -9,7 +9,7 @@ import { Subscription } from 'rxjs'; import { usePagination } from '../domain.util'; import { IBaseJob, IEntityJob, ISidecarWriteJob, JOBS_ASSET_PAGINATION_SIZE, JobName, QueueName } from '../job'; import { - CommunicationEvent, + ClientEvent, ExifDuration, IAlbumRepository, IAssetRepository, @@ -171,7 +171,7 @@ export class MetadataService { await this.albumRepository.removeAsset(motionAsset.id); // Notify clients to hide the linked live photo asset - this.communicationRepository.send(CommunicationEvent.ASSET_HIDDEN, motionAsset.ownerId, motionAsset.id); + this.communicationRepository.send(ClientEvent.ASSET_HIDDEN, motionAsset.ownerId, motionAsset.id); return true; } @@ -460,7 +460,7 @@ export class MetadataService { }; if (exifData.latitude === 0 && exifData.longitude === 0) { - console.warn('Exif data has latitude and longitude of 0, setting to null'); + this.logger.warn('Exif data has latitude and longitude of 0, setting to null'); exifData.latitude = null; exifData.longitude = null; } diff --git a/server/src/domain/repositories/communication.repository.ts b/server/src/domain/repositories/communication.repository.ts index 86397d5cd8..29617f0310 100644 --- a/server/src/domain/repositories/communication.repository.ts +++ b/server/src/domain/repositories/communication.repository.ts @@ -1,6 +1,6 @@ export const ICommunicationRepository = 'ICommunicationRepository'; -export enum CommunicationEvent { +export enum ClientEvent { UPLOAD_SUCCESS = 'on_upload_success', ASSET_DELETE = 'on_asset_delete', ASSET_TRASH = 'on_asset_trash', @@ -13,10 +13,17 @@ export enum CommunicationEvent { NEW_RELEASE = 'on_new_release', } -export type Callback = (userId: string) => Promise; +export enum ServerEvent { + CONFIG_UPDATE = 'config:update', +} + +export type OnConnectCallback = (userId: string) => Promise; +export type OnServerEventCallback = () => Promise; export interface ICommunicationRepository { - send(event: CommunicationEvent, userId: string, data: any): void; - broadcast(event: CommunicationEvent, data: any): void; - addEventListener(event: 'connect', callback: Callback): void; + send(event: ClientEvent, userId: string, data: any): void; + broadcast(event: ClientEvent, data: any): void; + on(event: 'connect', callback: OnConnectCallback): void; + on(event: ServerEvent, callback: OnServerEventCallback): void; + sendServerEvent(event: ServerEvent): void; } diff --git a/server/src/domain/repositories/job.repository.ts b/server/src/domain/repositories/job.repository.ts index a1998b7922..f19af3d14d 100644 --- a/server/src/domain/repositories/job.repository.ts +++ b/server/src/domain/repositories/job.repository.ts @@ -46,7 +46,6 @@ export type JobItem = // Storage Template | { name: JobName.STORAGE_TEMPLATE_MIGRATION; data?: IBaseJob } | { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE; data: IEntityJob } - | { name: JobName.SYSTEM_CONFIG_CHANGE; data?: IBaseJob } // Migration | { name: JobName.QUEUE_MIGRATION; data?: IBaseJob } diff --git a/server/src/domain/server-info/server-info.service.ts b/server/src/domain/server-info/server-info.service.ts index c215709eb6..291bb32cf0 100644 --- a/server/src/domain/server-info/server-info.service.ts +++ b/server/src/domain/server-info/server-info.service.ts @@ -3,7 +3,7 @@ import { DateTime } from 'luxon'; import { ServerVersion, isDev, mimeTypes, serverVersion } from '../domain.constant'; import { asHumanReadable } from '../domain.util'; import { - CommunicationEvent, + ClientEvent, ICommunicationRepository, IServerInfoRepository, IStorageRepository, @@ -38,7 +38,7 @@ export class ServerInfoService { @Inject(IStorageRepository) private storageRepository: IStorageRepository, ) { this.configCore = SystemConfigCore.create(configRepository); - this.communicationRepository.addEventListener('connect', (userId) => this.handleConnect(userId)); + this.communicationRepository.on('connect', (userId) => this.handleConnect(userId)); } async getInfo(): Promise { @@ -154,12 +154,12 @@ export class ServerInfoService { } private async handleConnect(userId: string) { - this.communicationRepository.send(CommunicationEvent.SERVER_VERSION, userId, serverVersion); + this.communicationRepository.send(ClientEvent.SERVER_VERSION, userId, serverVersion); this.newReleaseNotification(userId); } private newReleaseNotification(userId?: string) { - const event = CommunicationEvent.NEW_RELEASE; + const event = ClientEvent.NEW_RELEASE; const payload = { isAvailable: this.releaseVersion.isNewerThan(serverVersion), checkedAt: this.releaseVersionCheckedAt, diff --git a/server/src/domain/system-config/system-config.service.spec.ts b/server/src/domain/system-config/system-config.service.spec.ts index 30e216a7d3..6d1aa503d3 100644 --- a/server/src/domain/system-config/system-config.service.spec.ts +++ b/server/src/domain/system-config/system-config.service.spec.ts @@ -11,14 +11,9 @@ import { VideoCodec, } from '@app/infra/entities'; import { BadRequestException } from '@nestjs/common'; -import { newCommunicationRepositoryMock, newJobRepositoryMock, newSystemConfigRepositoryMock } from '@test'; -import { JobName, QueueName } from '../job'; -import { - ICommunicationRepository, - IJobRepository, - ISmartInfoRepository, - ISystemConfigRepository, -} from '../repositories'; +import { newCommunicationRepositoryMock, newSystemConfigRepositoryMock } from '@test'; +import { QueueName } from '../job'; +import { ICommunicationRepository, ISmartInfoRepository, ISystemConfigRepository, ServerEvent } from '../repositories'; import { defaults, SystemConfigValidator } from './system-config.core'; import { SystemConfigService } from './system-config.service'; @@ -137,15 +132,13 @@ describe(SystemConfigService.name, () => { let sut: SystemConfigService; let configMock: jest.Mocked; let communicationMock: jest.Mocked; - let jobMock: jest.Mocked; let smartInfoMock: jest.Mocked; beforeEach(async () => { delete process.env.IMMICH_CONFIG_FILE; configMock = newSystemConfigRepositoryMock(); communicationMock = newCommunicationRepositoryMock(); - jobMock = newJobRepositoryMock(); - sut = new SystemConfigService(configMock, communicationMock, jobMock, smartInfoMock); + sut = new SystemConfigService(configMock, communicationMock, smartInfoMock); }); it('should work', () => { @@ -269,13 +262,14 @@ describe(SystemConfigService.name, () => { }); describe('updateConfig', () => { - it('should notify the microservices process', async () => { + it('should update the config and emit client and server events', async () => { configMock.load.mockResolvedValue(updates); await expect(sut.updateConfig(updatedConfig)).resolves.toEqual(updatedConfig); + expect(communicationMock.broadcast).toHaveBeenCalled(); + expect(communicationMock.sendServerEvent).toHaveBeenCalledWith(ServerEvent.CONFIG_UPDATE); expect(configMock.saveAll).toHaveBeenCalledWith(updates); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.SYSTEM_CONFIG_CHANGE }); }); it('should throw an error if the config is not valid', async () => { diff --git a/server/src/domain/system-config/system-config.service.ts b/server/src/domain/system-config/system-config.service.ts index 6a53a3e465..12c78101ee 100644 --- a/server/src/domain/system-config/system-config.service.ts +++ b/server/src/domain/system-config/system-config.service.ts @@ -1,11 +1,10 @@ -import { Inject, Injectable } from '@nestjs/common'; -import { JobName } from '../job'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { - CommunicationEvent, + ClientEvent, ICommunicationRepository, - IJobRepository, ISmartInfoRepository, ISystemConfigRepository, + ServerEvent, } from '../repositories'; import { SystemConfigDto, mapConfig } from './dto/system-config.dto'; import { SystemConfigTemplateStorageOptionDto } from './response-dto/system-config-template-storage-option.dto'; @@ -23,14 +22,16 @@ import { SystemConfigCore, SystemConfigValidator } from './system-config.core'; @Injectable() export class SystemConfigService { + private logger = new Logger(SystemConfigService.name); private core: SystemConfigCore; + constructor( @Inject(ISystemConfigRepository) private repository: ISystemConfigRepository, @Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository, - @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(ISmartInfoRepository) private smartInfoRepository: ISmartInfoRepository, ) { this.core = SystemConfigCore.create(repository); + this.communicationRepository.on(ServerEvent.CONFIG_UPDATE, () => this.handleConfigUpdate()); } get config$() { @@ -50,15 +51,19 @@ export class SystemConfigService { async updateConfig(dto: SystemConfigDto): Promise { const oldConfig = await this.core.getConfig(); const newConfig = await this.core.updateConfig(dto); - await this.jobRepository.queue({ name: JobName.SYSTEM_CONFIG_CHANGE }); - this.communicationRepository.broadcast(CommunicationEvent.CONFIG_UPDATE, {}); + + this.communicationRepository.broadcast(ClientEvent.CONFIG_UPDATE, {}); + this.communicationRepository.sendServerEvent(ServerEvent.CONFIG_UPDATE); + if (oldConfig.machineLearning.clip.modelName !== newConfig.machineLearning.clip.modelName) { await this.smartInfoRepository.init(newConfig.machineLearning.clip.modelName); } return mapConfig(newConfig); } + // this is only used by the cli on config change, and it's not actually needed anymore async refreshConfig() { + this.communicationRepository.sendServerEvent(ServerEvent.CONFIG_UPDATE); await this.core.refreshConfig(); return true; } @@ -97,4 +102,8 @@ export class SystemConfigService { const { theme } = await this.core.getConfig(); return theme.customCss; } + + private async handleConfigUpdate() { + await this.core.refreshConfig(); + } } diff --git a/server/src/infra/repositories/communication.repository.ts b/server/src/infra/repositories/communication.repository.ts index 3ce0ccc496..558c911c2b 100644 --- a/server/src/infra/repositories/communication.repository.ts +++ b/server/src/infra/repositories/communication.repository.ts @@ -1,19 +1,60 @@ -import { AuthService, Callback, CommunicationEvent, ICommunicationRepository } from '@app/domain'; +import { + AuthService, + ClientEvent, + ICommunicationRepository, + OnConnectCallback, + OnServerEventCallback, + ServerEvent, +} from '@app/domain'; import { Logger } from '@nestjs/common'; -import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; +import { + OnGatewayConnection, + OnGatewayDisconnect, + OnGatewayInit, + WebSocketGateway, + WebSocketServer, +} from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; @WebSocketGateway({ cors: true, path: '/api/socket.io' }) -export class CommunicationRepository implements OnGatewayConnection, OnGatewayDisconnect, ICommunicationRepository { +export class CommunicationRepository + implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, ICommunicationRepository +{ private logger = new Logger(CommunicationRepository.name); - private onConnectCallbacks: Callback[] = []; + private onConnectCallbacks: OnConnectCallback[] = []; + private onServerEventCallbacks: Record = { + [ServerEvent.CONFIG_UPDATE]: [], + }; + + @WebSocketServer() + private server?: Server; constructor(private authService: AuthService) {} - @WebSocketServer() server?: Server; + afterInit(server: Server) { + this.logger.log('Initialized websocket server'); - addEventListener(event: 'connect', callback: Callback) { - this.onConnectCallbacks.push(callback); + for (const event of Object.values(ServerEvent)) { + server.on(event, async () => { + this.logger.debug(`Server event: ${event} (receive)`); + const callbacks = this.onServerEventCallbacks[event]; + for (const callback of callbacks) { + await callback(); + } + }); + } + } + + on(event: 'connect' | ServerEvent, callback: OnConnectCallback | OnServerEventCallback) { + switch (event) { + case 'connect': + this.onConnectCallbacks.push(callback); + break; + + default: + this.onServerEventCallbacks[event].push(callback as OnServerEventCallback); + break; + } } async handleConnection(client: Socket) { @@ -36,11 +77,16 @@ export class CommunicationRepository implements OnGatewayConnection, OnGatewayDi await client.leave(client.nsp.name); } - send(event: CommunicationEvent, userId: string, data: any) { + send(event: ClientEvent, userId: string, data: any) { this.server?.to(userId).emit(event, data); } - broadcast(event: CommunicationEvent, data: any) { + broadcast(event: ClientEvent, data: any) { this.server?.emit(event, data); } + + sendServerEvent(event: ServerEvent) { + this.logger.debug(`Server event: ${event} (send)`); + this.server?.serverSideEmit(event); + } } diff --git a/server/src/infra/repositories/job.repository.ts b/server/src/infra/repositories/job.repository.ts index 4d802cd4b9..a359845fcf 100644 --- a/server/src/infra/repositories/job.repository.ts +++ b/server/src/infra/repositories/job.repository.ts @@ -129,8 +129,6 @@ export class JobRepository implements IJobRepository { return { jobId: item.data.id }; case JobName.GENERATE_PERSON_THUMBNAIL: return { priority: 1 }; - case JobName.SYSTEM_CONFIG_CHANGE: - return { priority: 1 }; default: return null; diff --git a/server/src/microservices/app.service.ts b/server/src/microservices/app.service.ts index 48a167d88a..abbd8a6bde 100644 --- a/server/src/microservices/app.service.ts +++ b/server/src/microservices/app.service.ts @@ -55,7 +55,6 @@ export class AppService { [JobName.QUEUE_MIGRATION]: () => this.mediaService.handleQueueMigration(), [JobName.MIGRATE_ASSET]: (data) => this.mediaService.handleAssetMigration(data), [JobName.MIGRATE_PERSON]: (data) => this.personService.handlePersonMigration(data), - [JobName.SYSTEM_CONFIG_CHANGE]: () => this.systemConfigService.refreshConfig(), [JobName.QUEUE_GENERATE_THUMBNAILS]: (data) => this.mediaService.handleQueueGenerateThumbnails(data), [JobName.GENERATE_JPEG_THUMBNAIL]: (data) => this.mediaService.handleGenerateJpegThumbnail(data), [JobName.GENERATE_WEBP_THUMBNAIL]: (data) => this.mediaService.handleGenerateWebpThumbnail(data), diff --git a/server/test/repositories/communication.repository.mock.ts b/server/test/repositories/communication.repository.mock.ts index 2db02e5277..6fb95bffd9 100644 --- a/server/test/repositories/communication.repository.mock.ts +++ b/server/test/repositories/communication.repository.mock.ts @@ -4,6 +4,7 @@ export const newCommunicationRepositoryMock = (): jest.Mocked