1
0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-01 08:31:59 +00:00

refactor: server events (#8204)

* refactor: server events

* fix typo

---------

Co-authored-by: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com>
This commit is contained in:
Jason Rasmussen 2024-03-22 18:24:02 -04:00 committed by GitHub
parent b6e4be72f0
commit 6e93ddf2f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 166 additions and 181 deletions

View file

@ -43,9 +43,9 @@ import { IAssetStackRepository } from 'src/interfaces/asset-stack.interface';
import { IAssetRepositoryV1 } from 'src/interfaces/asset-v1.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { IAuditRepository } from 'src/interfaces/audit.interface';
import { ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { IEventRepository } from 'src/interfaces/event.interface';
import { IJobRepository } from 'src/interfaces/job.interface';
import { ILibraryRepository } from 'src/interfaces/library.interface';
import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface';
@ -74,9 +74,9 @@ import { AssetStackRepository } from 'src/repositories/asset-stack.repository';
import { AssetRepositoryV1 } from 'src/repositories/asset-v1.repository';
import { AssetRepository } from 'src/repositories/asset.repository';
import { AuditRepository } from 'src/repositories/audit.repository';
import { CommunicationRepository } from 'src/repositories/communication.repository';
import { CryptoRepository } from 'src/repositories/crypto.repository';
import { DatabaseRepository } from 'src/repositories/database.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { FilesystemProvider } from 'src/repositories/filesystem.provider';
import { JobRepository } from 'src/repositories/job.repository';
import { LibraryRepository } from 'src/repositories/library.repository';
@ -200,9 +200,9 @@ const repositories: Provider[] = [
{ provide: IAssetRepositoryV1, useClass: AssetRepositoryV1 },
{ provide: IAssetStackRepository, useClass: AssetStackRepository },
{ provide: IAuditRepository, useClass: AuditRepository },
{ provide: ICommunicationRepository, useClass: CommunicationRepository },
{ provide: ICryptoRepository, useClass: CryptoRepository },
{ provide: IDatabaseRepository, useClass: DatabaseRepository },
{ provide: IEventRepository, useClass: EventRepository },
{ provide: IJobRepository, useClass: JobRepository },
{ provide: ILibraryRepository, useClass: LibraryRepository },
{ provide: IKeyRepository, useClass: ApiKeyRepository },

View file

@ -1,7 +1,8 @@
import { SetMetadata } from '@nestjs/common';
import { OnEvent, OnEventType } from '@nestjs/event-emitter';
import { OnEvent } from '@nestjs/event-emitter';
import { OnEventOptions } from '@nestjs/event-emitter/dist/interfaces';
import _ from 'lodash';
import { ServerAsyncEvent, ServerEvent } from 'src/interfaces/event.interface';
import { setUnion } from 'src/utils/set';
// PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the
@ -125,5 +126,5 @@ export interface GenerateSqlQueries {
/** Decorator to enable versioning/tracking of generated Sql */
export const GenerateSql = (...options: GenerateSqlQueries[]) => SetMetadata(GENERATE_SQL_KEY, options);
export const OnEventInternal = (event: OnEventType, options?: OnEventOptions) =>
export const OnServerEvent = (event: ServerEvent | ServerAsyncEvent, options?: OnEventOptions) =>
OnEvent(event, { suppressErrors: false, ...options });

View file

@ -2,7 +2,7 @@ import { AssetResponseDto } from 'src/dtos/asset-response.dto';
import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server-info.dto';
import { SystemConfig } from 'src/entities/system-config.entity';
export const ICommunicationRepository = 'ICommunicationRepository';
export const IEventRepository = 'IEventRepository';
export enum ClientEvent {
UPLOAD_SUCCESS = 'on_upload_success',
@ -19,18 +19,6 @@ export enum ClientEvent {
NEW_RELEASE = 'on_new_release',
}
export enum ServerEvent {
CONFIG_UPDATE = 'config:update',
}
export enum InternalEvent {
VALIDATE_CONFIG = 'validate_config',
}
export interface InternalEventMap {
[InternalEvent.VALIDATE_CONFIG]: { newConfig: SystemConfig; oldConfig: SystemConfig };
}
export interface ClientEventMap {
[ClientEvent.UPLOAD_SUCCESS]: AssetResponseDto;
[ClientEvent.USER_DELETE]: string;
@ -46,15 +34,39 @@ export interface ClientEventMap {
[ClientEvent.NEW_RELEASE]: ReleaseNotification;
}
export type OnConnectCallback = (userId: string) => void | Promise<void>;
export type OnServerEventCallback = () => Promise<void>;
export interface ICommunicationRepository {
send<E extends keyof ClientEventMap>(event: E, userId: string, data: ClientEventMap[E]): void;
broadcast<E extends keyof ClientEventMap>(event: E, data: ClientEventMap[E]): void;
on(event: 'connect', callback: OnConnectCallback): void;
on(event: ServerEvent, callback: OnServerEventCallback): void;
sendServerEvent(event: ServerEvent): void;
emit<E extends keyof InternalEventMap>(event: E, data: InternalEventMap[E]): boolean;
emitAsync<E extends keyof InternalEventMap>(event: E, data: InternalEventMap[E]): Promise<any>;
export enum ServerEvent {
CONFIG_UPDATE = 'config.update',
WEBSOCKET_CONNECT = 'websocket.connect',
}
export interface ServerEventMap {
[ServerEvent.CONFIG_UPDATE]: null;
[ServerEvent.WEBSOCKET_CONNECT]: { userId: string };
}
export enum ServerAsyncEvent {
CONFIG_VALIDATE = 'config.validate',
}
export interface ServerAsyncEventMap {
[ServerAsyncEvent.CONFIG_VALIDATE]: { newConfig: SystemConfig; oldConfig: SystemConfig };
}
export interface IEventRepository {
/**
* Send to connected clients for a specific user
*/
clientSend<E extends keyof ClientEventMap>(event: E, userId: string, data: ClientEventMap[E]): void;
/**
* Send to all connected clients
*/
clientBroadcast<E extends keyof ClientEventMap>(event: E, data: ClientEventMap[E]): void;
/**
* Notify listeners in this and connected processes. Subscribe to an event with `@OnServerEvent`
*/
serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]): boolean;
/**
* Notify and wait for responses from listeners in this process. Subscribe to an event with `@OnServerEvent`
*/
serverSendAsync<E extends keyof ServerAsyncEventMap>(event: E, data: ServerAsyncEventMap[E]): Promise<any>;
}

View file

@ -8,13 +8,12 @@ import {
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import {
ClientEvent,
ICommunicationRepository,
InternalEventMap,
OnConnectCallback,
OnServerEventCallback,
ClientEventMap,
IEventRepository,
ServerAsyncEventMap,
ServerEvent,
} from 'src/interfaces/communication.interface';
ServerEventMap,
} from 'src/interfaces/event.interface';
import { AuthService } from 'src/services/auth.service';
import { Instrumentation } from 'src/utils/instrumentation';
import { ImmichLogger } from 'src/utils/logger';
@ -25,14 +24,8 @@ import { ImmichLogger } from 'src/utils/logger';
path: '/api/socket.io',
transports: ['websocket'],
})
export class CommunicationRepository
implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, ICommunicationRepository
{
private logger = new ImmichLogger(CommunicationRepository.name);
private onConnectCallbacks: OnConnectCallback[] = [];
private onServerEventCallbacks: Record<ServerEvent, OnServerEventCallback[]> = {
[ServerEvent.CONFIG_UPDATE]: [],
};
export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, IEventRepository {
private logger = new ImmichLogger(EventRepository.name);
@WebSocketServer()
private server?: Server;
@ -46,38 +39,23 @@ export class CommunicationRepository
this.logger.log('Initialized websocket server');
for (const event of Object.values(ServerEvent)) {
server.on(event, async () => {
if (event === ServerEvent.WEBSOCKET_CONNECT) {
continue;
}
server.on(event, (data: unknown) => {
this.logger.debug(`Server event: ${event} (receive)`);
const callbacks = this.onServerEventCallbacks[event];
for (const callback of callbacks) {
await callback();
}
this.eventEmitter.emit(event, data);
});
}
}
on(event: 'connect' | ServerEvent, callback: OnConnectCallback | OnServerEventCallback) {
switch (event) {
case 'connect': {
this.onConnectCallbacks.push(callback);
break;
}
default: {
this.onServerEventCallbacks[event].push(callback as OnServerEventCallback);
break;
}
}
}
async handleConnection(client: Socket) {
try {
this.logger.log(`Websocket Connect: ${client.id}`);
const auth = await this.authService.validate(client.request.headers, {});
await client.join(auth.user.id);
for (const callback of this.onConnectCallbacks) {
await callback(auth.user.id);
}
this.serverSend(ServerEvent.WEBSOCKET_CONNECT, { userId: auth.user.id });
} catch (error: Error | any) {
this.logger.error(`Websocket connection error: ${error}`, error?.stack);
client.emit('error', 'unauthorized');
@ -90,24 +68,21 @@ export class CommunicationRepository
await client.leave(client.nsp.name);
}
send(event: ClientEvent, userId: string, data: any) {
clientSend<E extends keyof ClientEventMap>(event: E, userId: string, data: ClientEventMap[E]) {
this.server?.to(userId).emit(event, data);
}
broadcast(event: ClientEvent, data: any) {
clientBroadcast<E extends keyof ClientEventMap>(event: E, data: ClientEventMap[E]) {
this.server?.emit(event, data);
}
sendServerEvent(event: ServerEvent) {
serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]) {
this.logger.debug(`Server event: ${event} (send)`);
this.server?.serverSideEmit(event);
}
emit<E extends keyof InternalEventMap>(event: E, data: InternalEventMap[E]): boolean {
this.server?.serverSideEmit(event, data);
return this.eventEmitter.emit(event, data);
}
emitAsync<E extends keyof InternalEventMap, R = any[]>(event: E, data: InternalEventMap[E]): Promise<R> {
serverSendAsync<E extends keyof ServerAsyncEventMap, R = any[]>(event: E, data: ServerAsyncEventMap[E]): Promise<R> {
return this.eventEmitter.emitAsync(event, data) as Promise<R>;
}
}

View file

@ -5,7 +5,7 @@ import { AssetJobName, AssetStatsResponseDto, UploadFieldName } from 'src/dtos/a
import { AssetEntity, AssetType } from 'src/entities/asset.entity';
import { IAssetStackRepository } from 'src/interfaces/asset-stack.interface';
import { AssetStats, IAssetRepository, TimeBucketSize } from 'src/interfaces/asset.interface';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
import { IJobRepository, JobItem, JobName } from 'src/interfaces/job.interface';
import { IPartnerRepository } from 'src/interfaces/partner.interface';
import { IStorageRepository } from 'src/interfaces/storage.interface';
@ -20,7 +20,7 @@ import { userStub } from 'test/fixtures/user.stub';
import { IAccessRepositoryMock, newAccessRepositoryMock } from 'test/repositories/access.repository.mock';
import { newAssetStackRepositoryMock } from 'test/repositories/asset-stack.repository.mock';
import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock';
import { newCommunicationRepositoryMock } from 'test/repositories/communication.repository.mock';
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
import { newJobRepositoryMock } from 'test/repositories/job.repository.mock';
import { newPartnerRepositoryMock } from 'test/repositories/partner.repository.mock';
import { newStorageRepositoryMock } from 'test/repositories/storage.repository.mock';
@ -152,7 +152,7 @@ describe(AssetService.name, () => {
let jobMock: jest.Mocked<IJobRepository>;
let storageMock: jest.Mocked<IStorageRepository>;
let userMock: jest.Mocked<IUserRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let eventMock: jest.Mocked<IEventRepository>;
let configMock: jest.Mocked<ISystemConfigRepository>;
let partnerMock: jest.Mocked<IPartnerRepository>;
let assetStackMock: jest.Mocked<IAssetStackRepository>;
@ -164,7 +164,7 @@ describe(AssetService.name, () => {
beforeEach(() => {
accessMock = newAccessRepositoryMock();
assetMock = newAssetRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
eventMock = newEventRepositoryMock();
jobMock = newJobRepositoryMock();
storageMock = newStorageRepositoryMock();
userMock = newUserRepositoryMock();
@ -179,7 +179,7 @@ describe(AssetService.name, () => {
configMock,
storageMock,
userMock,
communicationMock,
eventMock,
partnerMock,
assetStackMock,
);
@ -704,7 +704,7 @@ describe(AssetService.name, () => {
stackParentId: 'parent',
});
expect(communicationMock.send).toHaveBeenCalledWith(ClientEvent.ASSET_STACK_UPDATE, authStub.user1.user.id, [
expect(eventMock.clientSend).toHaveBeenCalledWith(ClientEvent.ASSET_STACK_UPDATE, authStub.user1.user.id, [
'asset-1',
'parent',
]);

View file

@ -31,7 +31,7 @@ import { LibraryType } from 'src/entities/library.entity';
import { IAccessRepository } from 'src/interfaces/access.interface';
import { IAssetStackRepository } from 'src/interfaces/asset-stack.interface';
import { IAssetRepository, TimeBucketOptions } from 'src/interfaces/asset.interface';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
import {
IAssetDeletionJob,
IJobRepository,
@ -75,7 +75,7 @@ export class AssetService {
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
@Inject(IUserRepository) private userRepository: IUserRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(IPartnerRepository) private partnerRepository: IPartnerRepository,
@Inject(IAssetStackRepository) private assetStackRepository: IAssetStackRepository,
) {
@ -395,7 +395,7 @@ export class AssetService {
.flatMap((stack) => (stack ? [stack] : []))
.filter((stack) => stack.assets.length < 2);
await Promise.all(stacksToDelete.map((as) => this.assetStackRepository.delete(as.id)));
this.communicationRepository.send(ClientEvent.ASSET_STACK_UPDATE, auth.user.id, ids);
this.eventRepository.clientSend(ClientEvent.ASSET_STACK_UPDATE, auth.user.id, ids);
}
async handleAssetDeletionCheck(): Promise<JobStatus> {
@ -454,7 +454,7 @@ export class AssetService {
await this.assetRepository.remove(asset);
await this.userRepository.updateUsage(asset.ownerId, -(asset.exifInfo?.fileSizeInByte || 0));
this.communicationRepository.send(ClientEvent.ASSET_DELETE, asset.ownerId, id);
this.eventRepository.clientSend(ClientEvent.ASSET_DELETE, asset.ownerId, id);
// TODO refactor this to use cascades
if (asset.livePhotoVideoId) {
@ -482,7 +482,7 @@ export class AssetService {
await this.jobRepository.queueAll(ids.map((id) => ({ name: JobName.ASSET_DELETION, data: { id } })));
} else {
await this.assetRepository.softDeleteAll(ids);
this.communicationRepository.send(ClientEvent.ASSET_TRASH, auth.user.id, ids);
this.eventRepository.clientSend(ClientEvent.ASSET_TRASH, auth.user.id, ids);
}
}
@ -513,7 +513,7 @@ export class AssetService {
primaryAssetId: newParentId,
});
this.communicationRepository.send(ClientEvent.ASSET_STACK_UPDATE, auth.user.id, [
this.eventRepository.clientSend(ClientEvent.ASSET_STACK_UPDATE, auth.user.id, [
...childIds,
newParentId,
oldParentId,

View file

@ -2,7 +2,7 @@ import { BadRequestException } from '@nestjs/common';
import { FeatureFlag, SystemConfigCore } from 'src/cores/system-config.core';
import { SystemConfig, SystemConfigKey } from 'src/entities/system-config.entity';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { ICommunicationRepository } from 'src/interfaces/communication.interface';
import { IEventRepository } from 'src/interfaces/event.interface';
import {
IJobRepository,
JobCommand,
@ -17,7 +17,7 @@ import { ISystemConfigRepository } from 'src/interfaces/system-config.interface'
import { JobService } from 'src/services/job.service';
import { assetStub } from 'test/fixtures/asset.stub';
import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock';
import { newCommunicationRepositoryMock } from 'test/repositories/communication.repository.mock';
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
import { newJobRepositoryMock } from 'test/repositories/job.repository.mock';
import { newPersonRepositoryMock } from 'test/repositories/person.repository.mock';
import { newSystemConfigRepositoryMock } from 'test/repositories/system-config.repository.mock';
@ -34,17 +34,17 @@ describe(JobService.name, () => {
let sut: JobService;
let assetMock: jest.Mocked<IAssetRepository>;
let configMock: jest.Mocked<ISystemConfigRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let eventMock: jest.Mocked<IEventRepository>;
let jobMock: jest.Mocked<IJobRepository>;
let personMock: jest.Mocked<IPersonRepository>;
beforeEach(() => {
assetMock = newAssetRepositoryMock();
configMock = newSystemConfigRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
eventMock = newEventRepositoryMock();
jobMock = newJobRepositoryMock();
personMock = newPersonRepositoryMock();
sut = new JobService(assetMock, communicationMock, jobMock, configMock, personMock);
sut = new JobService(assetMock, eventMock, jobMock, configMock, personMock);
});
it('should work', () => {

View file

@ -4,7 +4,7 @@ import { mapAsset } from 'src/dtos/asset-response.dto';
import { AllJobStatusResponseDto, JobCommandDto, JobStatusDto } from 'src/dtos/job.dto';
import { AssetType } from 'src/entities/asset.entity';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
import {
ConcurrentQueueName,
IJobRepository,
@ -27,7 +27,7 @@ export class JobService {
constructor(
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IPersonRepository) private personRepository: IPersonRepository,
@ -219,7 +219,7 @@ export class JobService {
if (item.data.source === 'sidecar-write') {
const [asset] = await this.assetRepository.getByIdsWithAllRelations([item.data.id]);
if (asset) {
this.communicationRepository.send(ClientEvent.ASSET_UPDATE, asset.ownerId, mapAsset(asset));
this.eventRepository.clientSend(ClientEvent.ASSET_UPDATE, asset.ownerId, mapAsset(asset));
}
}
await this.jobRepository.queue({ name: JobName.LINK_LIVE_PHOTOS, data: item.data });
@ -242,7 +242,7 @@ export class JobService {
const { id } = item.data;
const person = await this.personRepository.getById(id);
if (person) {
this.communicationRepository.send(ClientEvent.PERSON_THUMBNAIL, person.ownerId, person.id);
this.eventRepository.clientSend(ClientEvent.PERSON_THUMBNAIL, person.ownerId, person.id);
}
break;
}
@ -279,13 +279,13 @@ export class JobService {
// Only live-photo motion part will be marked as not visible immediately on upload. Skip notifying clients
if (asset && asset.isVisible) {
this.communicationRepository.send(ClientEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset));
this.eventRepository.clientSend(ClientEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset));
}
break;
}
case JobName.USER_DELETION: {
this.communicationRepository.broadcast(ClientEvent.USER_DELETE, item.data.id);
this.eventRepository.clientBroadcast(ClientEvent.USER_DELETE, item.data.id);
break;
}
}

View file

@ -127,10 +127,10 @@ describe(LibraryService.name, () => {
});
});
describe('validateConfig', () => {
describe('onValidateConfig', () => {
it('should allow a valid cron expression', () => {
expect(() =>
sut.validateConfig({
sut.onValidateConfig({
newConfig: { library: { scan: { cronExpression: '0 0 * * *' } } } as SystemConfig,
oldConfig: {} as SystemConfig,
}),
@ -139,7 +139,7 @@ describe(LibraryService.name, () => {
it('should fail for an invalid cron expression', () => {
expect(() =>
sut.validateConfig({
sut.onValidateConfig({
newConfig: { library: { scan: { cronExpression: 'foo' } } } as SystemConfig,
oldConfig: {} as SystemConfig,
}),

View file

@ -7,7 +7,7 @@ import path, { basename, parse } from 'node:path';
import picomatch from 'picomatch';
import { StorageCore } from 'src/cores/storage.core';
import { SystemConfigCore } from 'src/cores/system-config.core';
import { OnEventInternal } from 'src/decorators';
import { OnServerEvent } from 'src/decorators';
import {
CreateLibraryDto,
LibraryResponseDto,
@ -23,9 +23,9 @@ import {
import { AssetType } from 'src/entities/asset.entity';
import { LibraryEntity, LibraryType } from 'src/entities/library.entity';
import { IAssetRepository, WithProperty } from 'src/interfaces/asset.interface';
import { InternalEvent, InternalEventMap } from 'src/interfaces/communication.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface';
import {
IBaseJob,
IEntityJob,
@ -105,8 +105,8 @@ export class LibraryService extends EventEmitter {
});
}
@OnEventInternal(InternalEvent.VALIDATE_CONFIG)
validateConfig({ newConfig }: InternalEventMap[InternalEvent.VALIDATE_CONFIG]) {
@OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE)
onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) {
const { scan } = newConfig.library;
if (!validateCronExpression(scan.cronExpression)) {
throw new Error(`Invalid cron expression ${scan.cronExpression}`);

View file

@ -8,9 +8,9 @@ import { ExifEntity } from 'src/entities/exif.entity';
import { SystemConfigKey } from 'src/entities/system-config.entity';
import { IAlbumRepository } from 'src/interfaces/album.interface';
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
import { IJobRepository, JobName, JobStatus } from 'src/interfaces/job.interface';
import { IMediaRepository } from 'src/interfaces/media.interface';
import { IMetadataRepository, ImmichTags } from 'src/interfaces/metadata.interface';
@ -24,9 +24,9 @@ import { fileStub } from 'test/fixtures/file.stub';
import { probeStub } from 'test/fixtures/media.stub';
import { newAlbumRepositoryMock } from 'test/repositories/album.repository.mock';
import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock';
import { newCommunicationRepositoryMock } from 'test/repositories/communication.repository.mock';
import { newCryptoRepositoryMock } from 'test/repositories/crypto.repository.mock';
import { newDatabaseRepositoryMock } from 'test/repositories/database.repository.mock';
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
import { newJobRepositoryMock } from 'test/repositories/job.repository.mock';
import { newMediaRepositoryMock } from 'test/repositories/media.repository.mock';
import { newMetadataRepositoryMock } from 'test/repositories/metadata.repository.mock';
@ -46,7 +46,7 @@ describe(MetadataService.name, () => {
let mediaMock: jest.Mocked<IMediaRepository>;
let personMock: jest.Mocked<IPersonRepository>;
let storageMock: jest.Mocked<IStorageRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let eventMock: jest.Mocked<IEventRepository>;
let databaseMock: jest.Mocked<IDatabaseRepository>;
let sut: MetadataService;
@ -59,7 +59,7 @@ describe(MetadataService.name, () => {
metadataMock = newMetadataRepositoryMock();
moveMock = newMoveRepositoryMock();
personMock = newPersonRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
eventMock = newEventRepositoryMock();
storageMock = newStorageRepositoryMock();
mediaMock = newMediaRepositoryMock();
databaseMock = newDatabaseRepositoryMock();
@ -67,7 +67,7 @@ describe(MetadataService.name, () => {
sut = new MetadataService(
albumMock,
assetMock,
communicationMock,
eventMock,
cryptoRepository,
databaseMock,
jobMock,
@ -195,7 +195,7 @@ describe(MetadataService.name, () => {
await expect(sut.handleLivePhotoLinking({ id: assetStub.livePhotoStillAsset.id })).resolves.toBe(
JobStatus.SUCCESS,
);
expect(communicationMock.send).toHaveBeenCalledWith(
expect(eventMock.clientSend).toHaveBeenCalledWith(
ClientEvent.ASSET_HIDDEN,
assetStub.livePhotoMotionAsset.ownerId,
assetStub.livePhotoMotionAsset.id,

View file

@ -12,9 +12,9 @@ import { AssetEntity, AssetType } from 'src/entities/asset.entity';
import { ExifEntity } from 'src/entities/exif.entity';
import { IAlbumRepository } from 'src/interfaces/album.interface';
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
import {
IBaseJob,
IEntityJob,
@ -105,7 +105,7 @@ export class MetadataService {
constructor(
@Inject(IAlbumRepository) private albumRepository: IAlbumRepository,
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository,
@Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@ -185,7 +185,7 @@ export class MetadataService {
await this.albumRepository.removeAsset(motionAsset.id);
// Notify clients to hide the linked live photo asset
this.communicationRepository.send(ClientEvent.ASSET_HIDDEN, motionAsset.ownerId, motionAsset.id);
this.eventRepository.clientSend(ClientEvent.ASSET_HIDDEN, motionAsset.ownerId, motionAsset.id);
return JobStatus.SUCCESS;
}

View file

@ -1,13 +1,13 @@
import { serverVersion } from 'src/constants';
import { SystemMetadataKey } from 'src/entities/system-metadata.entity';
import { ICommunicationRepository } from 'src/interfaces/communication.interface';
import { IEventRepository } from 'src/interfaces/event.interface';
import { IServerInfoRepository } from 'src/interfaces/server-info.interface';
import { IStorageRepository } from 'src/interfaces/storage.interface';
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
import { IUserRepository } from 'src/interfaces/user.interface';
import { ServerInfoService } from 'src/services/server-info.service';
import { newCommunicationRepositoryMock } from 'test/repositories/communication.repository.mock';
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
import { newStorageRepositoryMock } from 'test/repositories/storage.repository.mock';
import { newSystemConfigRepositoryMock } from 'test/repositories/system-config.repository.mock';
import { newServerInfoRepositoryMock } from 'test/repositories/system-info.repository.mock';
@ -16,7 +16,7 @@ import { newUserRepositoryMock } from 'test/repositories/user.repository.mock';
describe(ServerInfoService.name, () => {
let sut: ServerInfoService;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let eventMock: jest.Mocked<IEventRepository>;
let configMock: jest.Mocked<ISystemConfigRepository>;
let serverInfoMock: jest.Mocked<IServerInfoRepository>;
let storageMock: jest.Mocked<IStorageRepository>;
@ -25,20 +25,13 @@ describe(ServerInfoService.name, () => {
beforeEach(() => {
configMock = newSystemConfigRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
eventMock = newEventRepositoryMock();
serverInfoMock = newServerInfoRepositoryMock();
storageMock = newStorageRepositoryMock();
userMock = newUserRepositoryMock();
systemMetadataMock = newSystemMetadataRepositoryMock();
sut = new ServerInfoService(
communicationMock,
configMock,
userMock,
serverInfoMock,
storageMock,
systemMetadataMock,
);
sut = new ServerInfoService(eventMock, configMock, userMock, serverInfoMock, storageMock, systemMetadataMock);
});
it('should work', () => {

View file

@ -3,6 +3,7 @@ import { DateTime } from 'luxon';
import { isDev, serverVersion } from 'src/constants';
import { StorageCore, StorageFolder } from 'src/cores/storage.core';
import { SystemConfigCore } from 'src/cores/system-config.core';
import { OnServerEvent } from 'src/decorators';
import {
ServerConfigDto,
ServerFeaturesDto,
@ -13,7 +14,7 @@ import {
UsageByUserDto,
} from 'src/dtos/server-info.dto';
import { SystemMetadataKey } from 'src/entities/system-metadata.entity';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ClientEvent, IEventRepository, ServerEvent, ServerEventMap } from 'src/interfaces/event.interface';
import { IServerInfoRepository } from 'src/interfaces/server-info.interface';
import { IStorageRepository } from 'src/interfaces/storage.interface';
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
@ -32,7 +33,7 @@ export class ServerInfoService {
private releaseVersionCheckedAt: DateTime | null = null;
constructor(
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IUserRepository) private userRepository: IUserRepository,
@Inject(IServerInfoRepository) private repository: IServerInfoRepository,
@ -40,9 +41,10 @@ export class ServerInfoService {
@Inject(ISystemMetadataRepository) private readonly systemMetadataRepository: ISystemMetadataRepository,
) {
this.configCore = SystemConfigCore.create(configRepository);
this.communicationRepository.on('connect', (userId) => this.handleConnect(userId));
}
onConnect() {}
async init(): Promise<void> {
await this.handleVersionCheck();
@ -169,8 +171,9 @@ export class ServerInfoService {
return true;
}
private handleConnect(userId: string) {
this.communicationRepository.send(ClientEvent.SERVER_VERSION, userId, serverVersion);
@OnServerEvent(ServerEvent.WEBSOCKET_CONNECT)
onWebsocketConnection({ userId }: ServerEventMap[ServerEvent.WEBSOCKET_CONNECT]) {
this.eventRepository.clientSend(ClientEvent.SERVER_VERSION, userId, serverVersion);
this.newReleaseNotification(userId);
}
@ -184,7 +187,7 @@ export class ServerInfoService {
};
userId
? this.communicationRepository.send(event, userId, payload)
: this.communicationRepository.broadcast(event, payload);
? this.eventRepository.clientSend(event, userId, payload)
: this.eventRepository.clientBroadcast(event, payload);
}
}

View file

@ -70,10 +70,10 @@ describe(StorageTemplateService.name, () => {
SystemConfigCore.create(configMock).config$.next(defaults);
});
describe('validate', () => {
describe('onValidateConfig', () => {
it('should allow valid templates', () => {
expect(() =>
sut.validate({
sut.onValidateConfig({
newConfig: {
storageTemplate: {
template:
@ -87,7 +87,7 @@ describe(StorageTemplateService.name, () => {
it('should fail for an invalid template', () => {
expect(() =>
sut.validate({
sut.onValidateConfig({
newConfig: {
storageTemplate: {
template: '{{foo}}',

View file

@ -14,15 +14,15 @@ import {
} from 'src/constants';
import { StorageCore, StorageFolder } from 'src/cores/storage.core';
import { SystemConfigCore } from 'src/cores/system-config.core';
import { OnEventInternal } from 'src/decorators';
import { OnServerEvent } from 'src/decorators';
import { AssetEntity, AssetType } from 'src/entities/asset.entity';
import { AssetPathType } from 'src/entities/move.entity';
import { SystemConfig } from 'src/entities/system-config.entity';
import { IAlbumRepository } from 'src/interfaces/album.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { InternalEvent, InternalEventMap } from 'src/interfaces/communication.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface';
import { IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobStatus } from 'src/interfaces/job.interface';
import { IMoveRepository } from 'src/interfaces/move.interface';
import { IPersonRepository } from 'src/interfaces/person.interface';
@ -86,8 +86,8 @@ export class StorageTemplateService {
);
}
@OnEventInternal(InternalEvent.VALIDATE_CONFIG)
validate({ newConfig }: InternalEventMap[InternalEvent.VALIDATE_CONFIG]) {
@OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE)
onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) {
try {
const { compiled } = this.compile(newConfig.storageTemplate.template);
this.render(compiled, {

View file

@ -13,13 +13,13 @@ import {
TranscodePolicy,
VideoCodec,
} from 'src/entities/system-config.entity';
import { ICommunicationRepository, ServerEvent } from 'src/interfaces/communication.interface';
import { IEventRepository, ServerEvent } from 'src/interfaces/event.interface';
import { QueueName } from 'src/interfaces/job.interface';
import { ISearchRepository } from 'src/interfaces/search.interface';
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
import { SystemConfigService } from 'src/services/system-config.service';
import { ImmichLogger } from 'src/utils/logger';
import { newCommunicationRepositoryMock } from 'test/repositories/communication.repository.mock';
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
import { newSystemConfigRepositoryMock } from 'test/repositories/system-config.repository.mock';
const updates: SystemConfigEntity[] = [
@ -152,14 +152,14 @@ const updatedConfig = Object.freeze<SystemConfig>({
describe(SystemConfigService.name, () => {
let sut: SystemConfigService;
let configMock: jest.Mocked<ISystemConfigRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let eventMock: jest.Mocked<IEventRepository>;
let smartInfoMock: jest.Mocked<ISearchRepository>;
beforeEach(() => {
delete process.env.IMMICH_CONFIG_FILE;
configMock = newSystemConfigRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
sut = new SystemConfigService(configMock, communicationMock, smartInfoMock);
eventMock = newEventRepositoryMock();
sut = new SystemConfigService(configMock, eventMock, smartInfoMock);
});
it('should work', () => {
@ -330,8 +330,8 @@ describe(SystemConfigService.name, () => {
await expect(sut.updateConfig(updatedConfig)).resolves.toEqual(updatedConfig);
expect(communicationMock.broadcast).toHaveBeenCalled();
expect(communicationMock.sendServerEvent).toHaveBeenCalledWith(ServerEvent.CONFIG_UPDATE);
expect(eventMock.clientBroadcast).toHaveBeenCalled();
expect(eventMock.serverSend).toHaveBeenCalledWith(ServerEvent.CONFIG_UPDATE, null);
expect(configMock.saveAll).toHaveBeenCalledWith(updates);
});

View file

@ -12,16 +12,16 @@ import {
supportedYearTokens,
} from 'src/constants';
import { SystemConfigCore } from 'src/cores/system-config.core';
import { OnEventInternal } from 'src/decorators';
import { OnServerEvent } from 'src/decorators';
import { SystemConfigDto, SystemConfigTemplateStorageOptionDto, mapConfig } from 'src/dtos/system-config.dto';
import { LogLevel, SystemConfig } from 'src/entities/system-config.entity';
import {
ClientEvent,
ICommunicationRepository,
InternalEvent,
InternalEventMap,
IEventRepository,
ServerAsyncEvent,
ServerAsyncEventMap,
ServerEvent,
} from 'src/interfaces/communication.interface';
} from 'src/interfaces/event.interface';
import { ISearchRepository } from 'src/interfaces/search.interface';
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
import { ImmichLogger } from 'src/utils/logger';
@ -33,11 +33,10 @@ export class SystemConfigService {
constructor(
@Inject(ISystemConfigRepository) private repository: ISystemConfigRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(ISearchRepository) private smartInfoRepository: ISearchRepository,
) {
this.core = SystemConfigCore.create(repository);
this.communicationRepository.on(ServerEvent.CONFIG_UPDATE, () => this.handleConfigUpdate());
this.core.config$.subscribe((config) => this.setLogLevel(config));
}
@ -60,8 +59,8 @@ export class SystemConfigService {
return mapConfig(config);
}
@OnEventInternal(InternalEvent.VALIDATE_CONFIG)
validateConfig({ newConfig, oldConfig }: InternalEventMap[InternalEvent.VALIDATE_CONFIG]) {
@OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE)
onValidateConfig({ newConfig, oldConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) {
if (!_.isEqual(instanceToPlain(newConfig.logging), oldConfig.logging) && this.getEnvLogLevel()) {
throw new Error('Logging cannot be changed while the environment variable LOG_LEVEL is set.');
}
@ -71,7 +70,10 @@ export class SystemConfigService {
const oldConfig = await this.core.getConfig();
try {
await this.communicationRepository.emitAsync(InternalEvent.VALIDATE_CONFIG, { newConfig: dto, oldConfig });
await this.eventRepository.serverSendAsync(ServerAsyncEvent.CONFIG_VALIDATE, {
newConfig: dto,
oldConfig,
});
} catch (error) {
this.logger.warn(`Unable to save system config due to a validation error: ${error}`);
throw new BadRequestException(error instanceof Error ? error.message : error);
@ -79,8 +81,8 @@ export class SystemConfigService {
const newConfig = await this.core.updateConfig(dto);
this.communicationRepository.broadcast(ClientEvent.CONFIG_UPDATE, {});
this.communicationRepository.sendServerEvent(ServerEvent.CONFIG_UPDATE);
this.eventRepository.clientBroadcast(ClientEvent.CONFIG_UPDATE, {});
this.eventRepository.serverSend(ServerEvent.CONFIG_UPDATE, null);
if (oldConfig.machineLearning.clip.modelName !== newConfig.machineLearning.clip.modelName) {
await this.smartInfoRepository.init(newConfig.machineLearning.clip.modelName);
@ -90,7 +92,7 @@ export class SystemConfigService {
// this is only used by the cli on config change, and it's not actually needed anymore
async refreshConfig() {
this.communicationRepository.sendServerEvent(ServerEvent.CONFIG_UPDATE);
this.eventRepository.serverSend(ServerEvent.CONFIG_UPDATE, null);
await this.core.refreshConfig();
return true;
}
@ -126,7 +128,8 @@ export class SystemConfigService {
return theme.customCss;
}
private async handleConfigUpdate() {
@OnServerEvent(ServerEvent.CONFIG_UPDATE)
async onConfigUpdate() {
await this.core.refreshConfig();
}

View file

@ -1,13 +1,13 @@
import { BadRequestException } from '@nestjs/common';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
import { IJobRepository, JobName } from 'src/interfaces/job.interface';
import { TrashService } from 'src/services/trash.service';
import { assetStub } from 'test/fixtures/asset.stub';
import { authStub } from 'test/fixtures/auth.stub';
import { IAccessRepositoryMock, newAccessRepositoryMock } from 'test/repositories/access.repository.mock';
import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock';
import { newCommunicationRepositoryMock } from 'test/repositories/communication.repository.mock';
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
import { newJobRepositoryMock } from 'test/repositories/job.repository.mock';
describe(TrashService.name, () => {
@ -15,7 +15,7 @@ describe(TrashService.name, () => {
let accessMock: IAccessRepositoryMock;
let assetMock: jest.Mocked<IAssetRepository>;
let jobMock: jest.Mocked<IJobRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let eventMock: jest.Mocked<IEventRepository>;
it('should work', () => {
expect(sut).toBeDefined();
@ -24,10 +24,10 @@ describe(TrashService.name, () => {
beforeEach(() => {
accessMock = newAccessRepositoryMock();
assetMock = newAssetRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
eventMock = newEventRepositoryMock();
jobMock = newJobRepositoryMock();
sut = new TrashService(accessMock, assetMock, jobMock, communicationMock);
sut = new TrashService(accessMock, assetMock, jobMock, eventMock);
});
describe('restoreAssets', () => {
@ -54,14 +54,14 @@ describe(TrashService.name, () => {
assetMock.getByUserId.mockResolvedValue({ items: [], hasNextPage: false });
await expect(sut.restore(authStub.user1)).resolves.toBeUndefined();
expect(assetMock.restoreAll).not.toHaveBeenCalled();
expect(communicationMock.send).not.toHaveBeenCalled();
expect(eventMock.clientSend).not.toHaveBeenCalled();
});
it('should restore and notify', async () => {
assetMock.getByUserId.mockResolvedValue({ items: [assetStub.image], hasNextPage: false });
await expect(sut.restore(authStub.user1)).resolves.toBeUndefined();
expect(assetMock.restoreAll).toHaveBeenCalledWith([assetStub.image.id]);
expect(communicationMock.send).toHaveBeenCalledWith(ClientEvent.ASSET_RESTORE, authStub.user1.user.id, [
expect(eventMock.clientSend).toHaveBeenCalledWith(ClientEvent.ASSET_RESTORE, authStub.user1.user.id, [
assetStub.image.id,
]);
});

View file

@ -5,7 +5,7 @@ import { BulkIdsDto } from 'src/dtos/asset-ids.response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { IAccessRepository } from 'src/interfaces/access.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { ClientEvent, ICommunicationRepository } from 'src/interfaces/communication.interface';
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
import { IJobRepository, JOBS_ASSET_PAGINATION_SIZE, JobName } from 'src/interfaces/job.interface';
import { usePagination } from 'src/utils/pagination';
@ -16,7 +16,7 @@ export class TrashService {
@Inject(IAccessRepository) accessRepository: IAccessRepository,
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
) {
this.access = AccessCore.create(accessRepository);
}
@ -60,6 +60,6 @@ export class TrashService {
}
await this.assetRepository.restoreAll(ids);
this.communicationRepository.send(ClientEvent.ASSET_RESTORE, auth.user.id, ids);
this.eventRepository.clientSend(ClientEvent.ASSET_RESTORE, auth.user.id, ids);
}
}

View file

@ -1,12 +0,0 @@
import { ICommunicationRepository } from 'src/interfaces/communication.interface';
export const newCommunicationRepositoryMock = (): jest.Mocked<ICommunicationRepository> => {
return {
send: jest.fn(),
broadcast: jest.fn(),
on: jest.fn(),
sendServerEvent: jest.fn(),
emit: jest.fn(),
emitAsync: jest.fn(),
};
};

View file

@ -0,0 +1,10 @@
import { IEventRepository } from 'src/interfaces/event.interface';
export const newEventRepositoryMock = (): jest.Mocked<IEventRepository> => {
return {
clientSend: jest.fn(),
clientBroadcast: jest.fn(),
serverSend: jest.fn(),
serverSendAsync: jest.fn(),
};
};