diff --git a/server/src/app.module.ts b/server/src/app.module.ts index 8ed9d5f6ed..436b0f1252 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -10,6 +10,7 @@ import { controllers } from 'src/controllers'; import { entities } from 'src/entities'; import { ImmichWorker } from 'src/enum'; import { IEventRepository } from 'src/interfaces/event.interface'; +import { IJobRepository } from 'src/interfaces/job.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { ITelemetryRepository } from 'src/interfaces/telemetry.interface'; import { AuthGuard } from 'src/middleware/auth.guard'; @@ -64,6 +65,7 @@ abstract class BaseModule implements OnModuleInit, OnModuleDestroy { constructor( @Inject(ILoggerRepository) logger: ILoggerRepository, @Inject(IEventRepository) private eventRepository: IEventRepository, + @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(ITelemetryRepository) private telemetryRepository: ITelemetryRepository, ) { logger.setAppName(this.worker); @@ -73,6 +75,12 @@ abstract class BaseModule implements OnModuleInit, OnModuleDestroy { async onModuleInit() { this.telemetryRepository.setup({ repositories: repositories.map(({ useClass }) => useClass) }); + + this.jobRepository.setup({ services }); + if (this.worker === ImmichWorker.MICROSERVICES) { + this.jobRepository.startWorkers(); + } + this.eventRepository.setup({ services }); await this.eventRepository.emit('app.bootstrap', this.worker); } diff --git a/server/src/decorators.ts b/server/src/decorators.ts index db755c5ff9..be379bf64e 100644 --- a/server/src/decorators.ts +++ b/server/src/decorators.ts @@ -4,6 +4,7 @@ import _ from 'lodash'; import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants'; import { MetadataKey } from 'src/enum'; import { EmitEvent } from 'src/interfaces/event.interface'; +import { JobName, QueueName } from 'src/interfaces/job.interface'; import { setUnion } from 'src/utils/set'; // PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the @@ -122,6 +123,12 @@ export type EventConfig = { }; export const OnEvent = (config: EventConfig) => SetMetadata(MetadataKey.EVENT_CONFIG, config); +export type JobConfig = { + name: JobName; + queue: QueueName; +}; +export const OnJob = (config: JobConfig) => SetMetadata(MetadataKey.JOB_CONFIG, config); + type LifecycleRelease = 'NEXT_RELEASE' | string; type LifecycleMetadata = { addedAt?: LifecycleRelease; diff --git a/server/src/enum.ts b/server/src/enum.ts index c3b3d8341b..0b82185285 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -335,6 +335,7 @@ export enum MetadataKey { SHARED_ROUTE = 'shared_route', API_KEY_SECURITY = 'api_key', EVENT_CONFIG = 'event_config', + JOB_CONFIG = 'job_config', TELEMETRY_ENABLED = 'telemetry_enabled', } diff --git a/server/src/interfaces/event.interface.ts b/server/src/interfaces/event.interface.ts index 8b59457914..0ed3d63f2a 100644 --- a/server/src/interfaces/event.interface.ts +++ b/server/src/interfaces/event.interface.ts @@ -3,6 +3,7 @@ import { SystemConfig } from 'src/config'; import { AssetResponseDto } from 'src/dtos/asset-response.dto'; import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.dto'; import { ImmichWorker } from 'src/enum'; +import { JobItem, QueueName } from 'src/interfaces/job.interface'; export const IEventRepository = 'IEventRepository'; @@ -38,6 +39,8 @@ type EventMap = { 'assets.delete': [{ assetIds: string[]; userId: string }]; 'assets.restore': [{ assetIds: string[]; userId: string }]; + 'job.start': [QueueName, JobItem]; + // session events 'session.delete': [{ sessionId: string }]; diff --git a/server/src/interfaces/job.interface.ts b/server/src/interfaces/job.interface.ts index 31945f97ec..64a9e5cfe3 100644 --- a/server/src/interfaces/job.interface.ts +++ b/server/src/interfaces/job.interface.ts @@ -1,3 +1,4 @@ +import { ClassConstructor } from 'class-transformer'; import { EmailImageAttachment } from 'src/interfaces/notification.interface'; export enum QueueName { @@ -238,8 +239,8 @@ export type JobItem = // Migration | { name: JobName.QUEUE_MIGRATION; data?: IBaseJob } - | { name: JobName.MIGRATE_ASSET; data?: IEntityJob } - | { name: JobName.MIGRATE_PERSON; data?: IEntityJob } + | { name: JobName.MIGRATE_ASSET; data: IEntityJob } + | { name: JobName.MIGRATE_PERSON; data: IEntityJob } // Metadata Extraction | { name: JobName.QUEUE_METADATA_EXTRACTION; data: IBaseJob } @@ -286,7 +287,7 @@ export type JobItem = | { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob } | { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob } | { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob } - | { name: JobName.LIBRARY_SYNC_ASSET; data: IEntityJob } + | { name: JobName.LIBRARY_SYNC_ASSET; data: ILibraryAssetJob } | { name: JobName.LIBRARY_DELETE; data: IEntityJob } | { name: JobName.LIBRARY_QUEUE_SYNC_ALL; data?: IBaseJob } | { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob } @@ -305,14 +306,15 @@ export enum JobStatus { FAILED = 'failed', SKIPPED = 'skipped', } - -export type JobHandler = (data: T) => Promise; -export type JobItemHandler = (item: JobItem) => Promise; +export type Jobs = { [K in JobItem['name']]: (JobItem & { name: K })['data'] }; +export type JobOf = Jobs[T]; export const IJobRepository = 'IJobRepository'; export interface IJobRepository { - addHandler(queueName: QueueName, concurrency: number, handler: JobItemHandler): void; + setup(options: { services: ClassConstructor[] }): void; + startWorkers(): void; + run(job: JobItem): Promise; addCronJob(name: string, expression: string, onTick: () => void, start?: boolean): void; updateCronJob(name: string, expression?: string, start?: boolean): void; setConcurrency(queueName: QueueName, concurrency: number): void; diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 131dd770aa..253599fbf3 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -1,124 +1,122 @@ import { getQueueToken } from '@nestjs/bullmq'; import { Inject, Injectable } from '@nestjs/common'; -import { ModuleRef } from '@nestjs/core'; +import { ModuleRef, Reflector } from '@nestjs/core'; import { SchedulerRegistry } from '@nestjs/schedule'; -import { Job, JobsOptions, Processor, Queue, Worker, WorkerOptions } from 'bullmq'; +import { JobsOptions, Queue, Worker } from 'bullmq'; +import { ClassConstructor } from 'class-transformer'; import { CronJob, CronTime } from 'cron'; import { setTimeout } from 'node:timers/promises'; +import { JobConfig } from 'src/decorators'; +import { MetadataKey } from 'src/enum'; import { IConfigRepository } from 'src/interfaces/config.interface'; +import { IEventRepository } from 'src/interfaces/event.interface'; import { IEntityJob, IJobRepository, JobCounts, JobItem, JobName, + JobOf, + JobStatus, QueueCleanType, QueueName, QueueStatus, } from 'src/interfaces/job.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; +import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; -export const JOBS_TO_QUEUE: Record = { - // misc - [JobName.ASSET_DELETION]: QueueName.BACKGROUND_TASK, - [JobName.ASSET_DELETION_CHECK]: QueueName.BACKGROUND_TASK, - [JobName.USER_DELETE_CHECK]: QueueName.BACKGROUND_TASK, - [JobName.USER_DELETION]: QueueName.BACKGROUND_TASK, - [JobName.DELETE_FILES]: QueueName.BACKGROUND_TASK, - [JobName.CLEAN_OLD_AUDIT_LOGS]: QueueName.BACKGROUND_TASK, - [JobName.CLEAN_OLD_SESSION_TOKENS]: QueueName.BACKGROUND_TASK, - [JobName.PERSON_CLEANUP]: QueueName.BACKGROUND_TASK, - [JobName.USER_SYNC_USAGE]: QueueName.BACKGROUND_TASK, - - // backups - [JobName.BACKUP_DATABASE]: QueueName.BACKUP_DATABASE, - - // conversion - [JobName.QUEUE_VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION, - [JobName.VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION, - - // thumbnails - [JobName.QUEUE_GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION, - [JobName.GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION, - [JobName.GENERATE_PERSON_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION, - - // tags - [JobName.TAG_CLEANUP]: QueueName.BACKGROUND_TASK, - - // metadata - [JobName.QUEUE_METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION, - [JobName.METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION, - [JobName.LINK_LIVE_PHOTOS]: QueueName.METADATA_EXTRACTION, - - // storage template - [JobName.STORAGE_TEMPLATE_MIGRATION]: QueueName.STORAGE_TEMPLATE_MIGRATION, - [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: QueueName.STORAGE_TEMPLATE_MIGRATION, - - // migration - [JobName.QUEUE_MIGRATION]: QueueName.MIGRATION, - [JobName.MIGRATE_ASSET]: QueueName.MIGRATION, - [JobName.MIGRATE_PERSON]: QueueName.MIGRATION, - - // facial recognition - [JobName.QUEUE_FACE_DETECTION]: QueueName.FACE_DETECTION, - [JobName.FACE_DETECTION]: QueueName.FACE_DETECTION, - [JobName.QUEUE_FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION, - [JobName.FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION, - - // smart search - [JobName.QUEUE_SMART_SEARCH]: QueueName.SMART_SEARCH, - [JobName.SMART_SEARCH]: QueueName.SMART_SEARCH, - - // duplicate detection - [JobName.QUEUE_DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION, - [JobName.DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION, - - // XMP sidecars - [JobName.QUEUE_SIDECAR]: QueueName.SIDECAR, - [JobName.SIDECAR_DISCOVERY]: QueueName.SIDECAR, - [JobName.SIDECAR_SYNC]: QueueName.SIDECAR, - [JobName.SIDECAR_WRITE]: QueueName.SIDECAR, - - // Library management - [JobName.LIBRARY_SYNC_FILE]: QueueName.LIBRARY, - [JobName.LIBRARY_QUEUE_SYNC_FILES]: QueueName.LIBRARY, - [JobName.LIBRARY_QUEUE_SYNC_ASSETS]: QueueName.LIBRARY, - [JobName.LIBRARY_DELETE]: QueueName.LIBRARY, - [JobName.LIBRARY_SYNC_ASSET]: QueueName.LIBRARY, - [JobName.LIBRARY_QUEUE_SYNC_ALL]: QueueName.LIBRARY, - [JobName.LIBRARY_QUEUE_CLEANUP]: QueueName.LIBRARY, - - // Notification - [JobName.SEND_EMAIL]: QueueName.NOTIFICATION, - [JobName.NOTIFY_ALBUM_INVITE]: QueueName.NOTIFICATION, - [JobName.NOTIFY_ALBUM_UPDATE]: QueueName.NOTIFICATION, - [JobName.NOTIFY_SIGNUP]: QueueName.NOTIFICATION, - - // Version check - [JobName.VERSION_CHECK]: QueueName.BACKGROUND_TASK, - - // Trash - [JobName.QUEUE_TRASH_EMPTY]: QueueName.BACKGROUND_TASK, +type JobMapItem = { + jobName: JobName; + queueName: QueueName; + handler: (job: JobOf) => Promise; + label: string; }; @Injectable() export class JobRepository implements IJobRepository { private workers: Partial> = {}; + private handlers: Partial> = {}; constructor( - private moduleReference: ModuleRef, - private schedulerReqistry: SchedulerRegistry, + private moduleRef: ModuleRef, + private schedulerRegistry: SchedulerRegistry, @Inject(IConfigRepository) private configRepository: IConfigRepository, + @Inject(IEventRepository) private eventRepository: IEventRepository, @Inject(ILoggerRepository) private logger: ILoggerRepository, ) { this.logger.setContext(JobRepository.name); } - addHandler(queueName: QueueName, concurrency: number, handler: (item: JobItem) => Promise) { + setup({ services }: { services: ClassConstructor[] }) { + const reflector = this.moduleRef.get(Reflector, { strict: false }); + + // discovery + for (const Service of services) { + const instance = this.moduleRef.get(Service); + for (const methodName of getMethodNames(instance)) { + const handler = instance[methodName]; + const config = reflector.get(MetadataKey.JOB_CONFIG, handler); + if (!config) { + continue; + } + + const { name: jobName, queue: queueName } = config; + const label = `${Service.name}.${handler.name}`; + + // one handler per job + if (this.handlers[jobName]) { + const jobKey = getKeyByValue(JobName, jobName); + const errorMessage = `Failed to add job handler for ${label}`; + this.logger.error( + `${errorMessage}. JobName.${jobKey} is already handled by ${this.handlers[jobName].label}.`, + ); + throw new ImmichStartupError(errorMessage); + } + + this.handlers[jobName] = { + label, + jobName, + queueName, + handler: handler.bind(instance), + }; + + this.logger.verbose(`Added job handler: ${jobName} => ${label}`); + } + } + + // no missing handlers + for (const [jobKey, jobName] of Object.entries(JobName)) { + const item = this.handlers[jobName]; + if (!item) { + const errorMessage = `Failed to find job handler for Job.${jobKey} ("${jobName}")`; + this.logger.error( + `${errorMessage}. Make sure to add the @OnJob({ name: JobName.${jobKey}, queue: QueueName.XYZ }) decorator for the new job.`, + ); + throw new ImmichStartupError(errorMessage); + } + } + } + + startWorkers() { const { bull } = this.configRepository.getEnv(); - const workerHandler: Processor = async (job: Job) => handler(job as JobItem); - const workerOptions: WorkerOptions = { ...bull.config, concurrency }; - this.workers[queueName] = new Worker(queueName, workerHandler, workerOptions); + for (const queueName of Object.values(QueueName)) { + this.logger.debug(`Starting worker for queue: ${queueName}`); + this.workers[queueName] = new Worker( + queueName, + (job) => this.eventRepository.emit('job.start', queueName, job as JobItem), + { ...bull.config, concurrency: 1 }, + ); + } + } + + async run({ name, data }: JobItem) { + const item = this.handlers[name as JobName]; + if (!item) { + this.logger.warn(`Skipping unknown job: "${name}"`); + return JobStatus.SKIPPED; + } + + return item.handler(data); } addCronJob(name: string, expression: string, onTick: () => void, start = true): void { @@ -141,11 +139,11 @@ export class JobRepository implements IJobRepository { true, ); - this.schedulerReqistry.addCronJob(name, job); + this.schedulerRegistry.addCronJob(name, job); } updateCronJob(name: string, expression?: string, start?: boolean): void { - const job = this.schedulerReqistry.getCronJob(name); + const job = this.schedulerRegistry.getCronJob(name); if (expression) { job.setTime(new CronTime(expression)); } @@ -204,6 +202,10 @@ export class JobRepository implements IJobRepository { ) as unknown as Promise; } + private getQueueName(name: JobName) { + return (this.handlers[name] as JobMapItem).queueName; + } + async queueAll(items: JobItem[]): Promise { if (items.length === 0) { return; @@ -212,7 +214,7 @@ export class JobRepository implements IJobRepository { const promises = []; const itemsByQueue = {} as Record; for (const item of items) { - const queueName = JOBS_TO_QUEUE[item.name]; + const queueName = this.getQueueName(item.name); const job = { name: item.name, data: item.data || {}, @@ -273,11 +275,11 @@ export class JobRepository implements IJobRepository { } private getQueue(queue: QueueName): Queue { - return this.moduleReference.get(getQueueToken(queue), { strict: false }); + return this.moduleRef.get(getQueueToken(queue), { strict: false }); } public async removeJob(jobId: string, name: JobName): Promise { - const existingJob = await this.getQueue(JOBS_TO_QUEUE[name]).getJob(jobId); + const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId); if (!existingJob) { return; } diff --git a/server/src/services/asset.service.ts b/server/src/services/asset.service.ts index 2f31806e81..e9f128194b 100644 --- a/server/src/services/asset.service.ts +++ b/server/src/services/asset.service.ts @@ -1,6 +1,7 @@ import { BadRequestException } from '@nestjs/common'; import _ from 'lodash'; import { DateTime, Duration } from 'luxon'; +import { OnJob } from 'src/decorators'; import { AssetResponseDto, MemoryLaneResponseDto, @@ -21,12 +22,13 @@ import { MemoryLaneDto } from 'src/dtos/search.dto'; import { AssetEntity } from 'src/entities/asset.entity'; import { AssetStatus, Permission } from 'src/enum'; import { - IAssetDeleteJob, ISidecarWriteJob, JOBS_ASSET_PAGINATION_SIZE, JobItem, JobName, + JobOf, JobStatus, + QueueName, } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; import { getAssetFiles, getMyPartnerIds, onAfterUnlink, onBeforeLink, onBeforeUnlink } from 'src/utils/asset.util'; @@ -186,6 +188,7 @@ export class AssetService extends BaseService { await this.assetRepository.updateAll(ids, options); } + @OnJob({ name: JobName.ASSET_DELETION_CHECK, queue: QueueName.BACKGROUND_TASK }) async handleAssetDeletionCheck(): Promise { const config = await this.getConfig({ withCache: false }); const trashedDays = config.trash.enabled ? config.trash.days : 0; @@ -211,7 +214,8 @@ export class AssetService extends BaseService { return JobStatus.SUCCESS; } - async handleAssetDeletion(job: IAssetDeleteJob): Promise { + @OnJob({ name: JobName.ASSET_DELETION, queue: QueueName.BACKGROUND_TASK }) + async handleAssetDeletion(job: JobOf): Promise { const { id, deleteOnDisk } = job; const asset = await this.assetRepository.getById(id, { diff --git a/server/src/services/audit.service.ts b/server/src/services/audit.service.ts index d891c88b39..3fc838e5e9 100644 --- a/server/src/services/audit.service.ts +++ b/server/src/services/audit.service.ts @@ -3,6 +3,7 @@ import { DateTime } from 'luxon'; import { resolve } from 'node:path'; import { AUDIT_LOG_MAX_DURATION } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; +import { OnJob } from 'src/decorators'; import { AuditDeletesDto, AuditDeletesResponseDto, @@ -21,13 +22,14 @@ import { StorageFolder, UserPathType, } from 'src/enum'; -import { JOBS_ASSET_PAGINATION_SIZE, JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JOBS_ASSET_PAGINATION_SIZE, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; import { getAssetFiles } from 'src/utils/asset.util'; import { usePagination } from 'src/utils/pagination'; @Injectable() export class AuditService extends BaseService { + @OnJob({ name: JobName.CLEAN_OLD_AUDIT_LOGS, queue: QueueName.BACKGROUND_TASK }) async handleCleanup(): Promise { await this.auditRepository.removeBefore(DateTime.now().minus(AUDIT_LOG_MAX_DURATION).toJSDate()); return JobStatus.SUCCESS; diff --git a/server/src/services/backup.service.ts b/server/src/services/backup.service.ts index ba2ab816cd..9856a8cfa2 100644 --- a/server/src/services/backup.service.ts +++ b/server/src/services/backup.service.ts @@ -1,11 +1,11 @@ import { Injectable } from '@nestjs/common'; import { default as path } from 'node:path'; import { StorageCore } from 'src/cores/storage.core'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { ImmichWorker, StorageFolder } from 'src/enum'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; -import { JobName, JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; import { handlePromiseError } from 'src/utils/misc'; import { validateCronExpression } from 'src/validation'; @@ -75,6 +75,7 @@ export class BackupService extends BaseService { this.logger.debug(`Database Backup Cleanup Finished, deleted ${toDelete.length} backups`); } + @OnJob({ name: JobName.BACKUP_DATABASE, queue: QueueName.BACKUP_DATABASE }) async handleBackupDatabase(): Promise { this.logger.debug(`Database Backup Started`); diff --git a/server/src/services/duplicate.service.ts b/server/src/services/duplicate.service.ts index e76b80b043..2fac7fcd3e 100644 --- a/server/src/services/duplicate.service.ts +++ b/server/src/services/duplicate.service.ts @@ -1,10 +1,11 @@ import { Injectable } from '@nestjs/common'; +import { OnJob } from 'src/decorators'; import { mapAsset } from 'src/dtos/asset-response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; import { DuplicateResponseDto, mapDuplicateResponse } from 'src/dtos/duplicate.dto'; import { AssetEntity } from 'src/entities/asset.entity'; import { WithoutProperty } from 'src/interfaces/asset.interface'; -import { IBaseJob, IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobName, JobStatus } from 'src/interfaces/job.interface'; +import { JOBS_ASSET_PAGINATION_SIZE, JobName, JobOf, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { AssetDuplicateResult } from 'src/interfaces/search.interface'; import { BaseService } from 'src/services/base.service'; import { getAssetFiles } from 'src/utils/asset.util'; @@ -19,7 +20,8 @@ export class DuplicateService extends BaseService { return mapDuplicateResponse(res.map((a) => mapAsset(a, { auth, withStack: true }))); } - async handleQueueSearchDuplicates({ force }: IBaseJob): Promise { + @OnJob({ name: JobName.QUEUE_DUPLICATE_DETECTION, queue: QueueName.DUPLICATE_DETECTION }) + async handleQueueSearchDuplicates({ force }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: false }); if (!isDuplicateDetectionEnabled(machineLearning)) { return JobStatus.SKIPPED; @@ -40,7 +42,8 @@ export class DuplicateService extends BaseService { return JobStatus.SUCCESS; } - async handleSearchDuplicates({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.DUPLICATE_DETECTION, queue: QueueName.DUPLICATE_DETECTION }) + async handleSearchDuplicates({ id }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: true }); if (!isDuplicateDetectionEnabled(machineLearning)) { return JobStatus.SKIPPED; diff --git a/server/src/services/index.ts b/server/src/services/index.ts index 89c6afd7f4..0dd8bdae66 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -17,7 +17,6 @@ import { MapService } from 'src/services/map.service'; import { MediaService } from 'src/services/media.service'; import { MemoryService } from 'src/services/memory.service'; import { MetadataService } from 'src/services/metadata.service'; -import { MicroservicesService } from 'src/services/microservices.service'; import { NotificationService } from 'src/services/notification.service'; import { PartnerService } from 'src/services/partner.service'; import { PersonService } from 'src/services/person.service'; @@ -60,7 +59,6 @@ export const services = [ MediaService, MemoryService, MetadataService, - MicroservicesService, NotificationService, PartnerService, PersonService, diff --git a/server/src/services/job.service.spec.ts b/server/src/services/job.service.spec.ts index 8e42693dc0..03e89b07ec 100644 --- a/server/src/services/job.service.spec.ts +++ b/server/src/services/job.service.spec.ts @@ -2,37 +2,23 @@ import { BadRequestException } from '@nestjs/common'; import { defaults } from 'src/config'; import { ImmichWorker } from 'src/enum'; import { IAssetRepository } from 'src/interfaces/asset.interface'; -import { - IJobRepository, - JobCommand, - JobHandler, - JobItem, - JobName, - JobStatus, - QueueName, -} from 'src/interfaces/job.interface'; -import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; +import { IJobRepository, JobCommand, JobItem, JobName, JobStatus, QueueName } from 'src/interfaces/job.interface'; +import { ILoggerRepository } from 'src/interfaces/logger.interface'; +import { ITelemetryRepository } from 'src/interfaces/telemetry.interface'; import { JobService } from 'src/services/job.service'; import { assetStub } from 'test/fixtures/asset.stub'; import { newTestService } from 'test/utils'; -import { Mocked, vitest } from 'vitest'; - -const makeMockHandlers = (status: JobStatus) => { - const mock = vitest.fn().mockResolvedValue(status); - return Object.fromEntries(Object.values(JobName).map((jobName) => [jobName, mock])) as unknown as Record< - JobName, - JobHandler - >; -}; +import { Mocked } from 'vitest'; describe(JobService.name, () => { let sut: JobService; let assetMock: Mocked; let jobMock: Mocked; - let systemMock: Mocked; + let loggerMock: Mocked; + let telemetryMock: Mocked; beforeEach(() => { - ({ sut, assetMock, jobMock, systemMock } = newTestService(JobService)); + ({ sut, assetMock, jobMock, loggerMock, telemetryMock } = newTestService(JobService)); }); it('should work', () => { @@ -225,11 +211,19 @@ describe(JobService.name, () => { }); }); - describe('init', () => { - it('should register a handler for each queue', async () => { - await sut.init(makeMockHandlers(JobStatus.SUCCESS)); - expect(systemMock.get).toHaveBeenCalled(); - expect(jobMock.addHandler).toHaveBeenCalledTimes(Object.keys(QueueName).length); + describe('onJobStart', () => { + it('should process a successful job', async () => { + jobMock.run.mockResolvedValue(JobStatus.SUCCESS); + + await sut.onJobStart(QueueName.BACKGROUND_TASK, { + name: JobName.DELETE_FILES, + data: { files: ['path/to/file'] }, + }); + + expect(telemetryMock.jobs.addToGauge).toHaveBeenCalledWith('immich.queues.background_task.active', 1); + expect(telemetryMock.jobs.addToGauge).toHaveBeenCalledWith('immich.queues.background_task.active', -1); + expect(telemetryMock.jobs.addToCounter).toHaveBeenCalledWith('immich.jobs.delete_files.success', 1); + expect(loggerMock.error).not.toHaveBeenCalled(); }); const tests: Array<{ item: JobItem; jobs: JobName[] }> = [ @@ -297,8 +291,9 @@ describe(JobService.name, () => { } } - await sut.init(makeMockHandlers(JobStatus.SUCCESS)); - await jobMock.addHandler.mock.calls[0][2](item); + jobMock.run.mockResolvedValue(JobStatus.SUCCESS); + + await sut.onJobStart(QueueName.BACKGROUND_TASK, item); if (jobs.length > 1) { expect(jobMock.queueAll).toHaveBeenCalledWith( @@ -313,8 +308,9 @@ describe(JobService.name, () => { }); it(`should not queue any jobs when ${item.name} fails`, async () => { - await sut.init(makeMockHandlers(JobStatus.FAILED)); - await jobMock.addHandler.mock.calls[0][2](item); + jobMock.run.mockResolvedValue(JobStatus.FAILED); + + await sut.onJobStart(QueueName.BACKGROUND_TASK, item); expect(jobMock.queueAll).not.toHaveBeenCalled(); }); diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index 15046a0ef5..ff5ddbf006 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -4,11 +4,10 @@ import { OnEvent } from 'src/decorators'; import { mapAsset } from 'src/dtos/asset-response.dto'; import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto'; import { AssetType, ImmichWorker, ManualJobName } from 'src/enum'; -import { ArgOf } from 'src/interfaces/event.interface'; +import { ArgOf, ArgsOf } from 'src/interfaces/event.interface'; import { ConcurrentQueueName, JobCommand, - JobHandler, JobItem, JobName, JobStatus, @@ -47,8 +46,8 @@ export class JobService extends BaseService { } @OnEvent({ name: 'config.update', server: true }) - onConfigUpdate({ newConfig: config, oldConfig }: ArgOf<'config.update'>) { - if (!oldConfig || !this.isMicroservices) { + onConfigUpdate({ newConfig: config }: ArgOf<'config.update'>) { + if (!this.isMicroservices) { return; } @@ -177,41 +176,21 @@ export class JobService extends BaseService { } } - async init(jobHandlers: Record) { - const config = await this.getConfig({ withCache: false }); - for (const queueName of Object.values(QueueName)) { - let concurrency = 1; - - if (this.isConcurrentQueue(queueName)) { - concurrency = config.job[queueName].concurrency; + @OnEvent({ name: 'job.start' }) + async onJobStart(...[queueName, job]: ArgsOf<'job.start'>) { + const queueMetric = `immich.queues.${snakeCase(queueName)}.active`; + this.telemetryRepository.jobs.addToGauge(queueMetric, 1); + try { + const status = await this.jobRepository.run(job); + const jobMetric = `immich.jobs.${job.name.replaceAll('-', '_')}.${status}`; + this.telemetryRepository.jobs.addToCounter(jobMetric, 1); + if (status === JobStatus.SUCCESS || status == JobStatus.SKIPPED) { + await this.onDone(job); } - - this.logger.debug(`Registering ${queueName} with a concurrency of ${concurrency}`); - this.jobRepository.addHandler(queueName, concurrency, async (item: JobItem): Promise => { - const { name, data } = item; - - const handler = jobHandlers[name]; - if (!handler) { - this.logger.warn(`Skipping unknown job: "${name}"`); - return; - } - - const queueMetric = `immich.queues.${snakeCase(queueName)}.active`; - this.telemetryRepository.jobs.addToGauge(queueMetric, 1); - - try { - const status = await handler(data); - const jobMetric = `immich.jobs.${name.replaceAll('-', '_')}.${status}`; - this.telemetryRepository.jobs.addToCounter(jobMetric, 1); - if (status === JobStatus.SUCCESS || status == JobStatus.SKIPPED) { - await this.onDone(item); - } - } catch (error: Error | any) { - this.logger.error(`Unable to run job handler (${queueName}/${name}): ${error}`, error?.stack, data); - } finally { - this.telemetryRepository.jobs.addToGauge(queueMetric, -1); - } - }); + } catch (error: Error | any) { + this.logger.error(`Unable to run job handler (${queueName}/${job.name}): ${error}`, error?.stack, job.data); + } finally { + this.telemetryRepository.jobs.addToGauge(queueMetric, -1); } } diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index 6c329e80ec..ffb0803100 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -3,7 +3,7 @@ import { R_OK } from 'node:constants'; import path, { basename, isAbsolute, parse } from 'node:path'; import picomatch from 'picomatch'; import { StorageCore } from 'src/cores/storage.core'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { CreateLibraryDto, LibraryResponseDto, @@ -19,14 +19,7 @@ import { LibraryEntity } from 'src/entities/library.entity'; import { AssetType, ImmichWorker } from 'src/enum'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; -import { - IEntityJob, - ILibraryAssetJob, - ILibraryFileJob, - JobName, - JOBS_LIBRARY_PAGINATION_SIZE, - JobStatus, -} from 'src/interfaces/job.interface'; +import { JobName, JobOf, JOBS_LIBRARY_PAGINATION_SIZE, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; import { mimeTypes } from 'src/utils/mime-types'; import { handlePromiseError } from 'src/utils/misc'; @@ -223,6 +216,7 @@ export class LibraryService extends BaseService { return libraries.map((library) => mapLibrary(library)); } + @OnJob({ name: JobName.LIBRARY_QUEUE_CLEANUP, queue: QueueName.LIBRARY }) async handleQueueCleanup(): Promise { this.logger.debug('Cleaning up any pending library deletions'); const pendingDeletion = await this.libraryRepository.getAllDeleted(); @@ -340,7 +334,8 @@ export class LibraryService extends BaseService { await this.jobRepository.queue({ name: JobName.LIBRARY_DELETE, data: { id } }); } - async handleDeleteLibrary(job: IEntityJob): Promise { + @OnJob({ name: JobName.LIBRARY_DELETE, queue: QueueName.LIBRARY }) + async handleDeleteLibrary(job: JobOf): Promise { const libraryId = job.id; const assetPagination = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) => @@ -374,7 +369,8 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - async handleSyncFile(job: ILibraryFileJob): Promise { + @OnJob({ name: JobName.LIBRARY_SYNC_FILE, queue: QueueName.LIBRARY }) + async handleSyncFile(job: JobOf): Promise { // Only needs to handle new assets const assetPath = path.normalize(job.assetPath); @@ -458,6 +454,7 @@ export class LibraryService extends BaseService { await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ASSETS, data: { id } }); } + @OnJob({ name: JobName.LIBRARY_QUEUE_SYNC_ALL, queue: QueueName.LIBRARY }) async handleQueueSyncAll(): Promise { this.logger.debug(`Refreshing all external libraries`); @@ -483,7 +480,8 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - async handleSyncAsset(job: ILibraryAssetJob): Promise { + @OnJob({ name: JobName.LIBRARY_SYNC_ASSET, queue: QueueName.LIBRARY }) + async handleSyncAsset(job: JobOf): Promise { const asset = await this.assetRepository.getById(job.id); if (!asset) { return JobStatus.SKIPPED; @@ -538,7 +536,8 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - async handleQueueSyncFiles(job: IEntityJob): Promise { + @OnJob({ name: JobName.LIBRARY_QUEUE_SYNC_FILES, queue: QueueName.LIBRARY }) + async handleQueueSyncFiles(job: JobOf): Promise { const library = await this.libraryRepository.get(job.id); if (!library) { this.logger.debug(`Library ${job.id} not found, skipping refresh`); @@ -589,7 +588,8 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - async handleQueueSyncAssets(job: IEntityJob): Promise { + @OnJob({ name: JobName.LIBRARY_QUEUE_SYNC_ASSETS, queue: QueueName.LIBRARY }) + async handleQueueSyncAssets(job: JobOf): Promise { const library = await this.libraryRepository.get(job.id); if (!library) { return JobStatus.SKIPPED; diff --git a/server/src/services/media.service.ts b/server/src/services/media.service.ts index 8393f5dc76..cce1324da9 100644 --- a/server/src/services/media.service.ts +++ b/server/src/services/media.service.ts @@ -1,6 +1,7 @@ import { Injectable } from '@nestjs/common'; import { dirname } from 'node:path'; import { StorageCore } from 'src/cores/storage.core'; +import { OnJob } from 'src/decorators'; import { SystemConfigFFmpegDto } from 'src/dtos/system-config.dto'; import { AssetEntity } from 'src/entities/asset.entity'; import { @@ -19,11 +20,10 @@ import { } from 'src/enum'; import { UpsertFileOptions, WithoutProperty } from 'src/interfaces/asset.interface'; import { - IBaseJob, - IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobItem, JobName, + JobOf, JobStatus, QueueName, } from 'src/interfaces/job.interface'; @@ -39,7 +39,8 @@ export class MediaService extends BaseService { private maliOpenCL?: boolean; private devices?: string[]; - async handleQueueGenerateThumbnails({ force }: IBaseJob): Promise { + @OnJob({ name: JobName.QUEUE_GENERATE_THUMBNAILS, queue: QueueName.THUMBNAIL_GENERATION }) + async handleQueueGenerateThumbnails({ force }: JobOf): Promise { const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { return force ? this.assetRepository.getAll(pagination, { @@ -90,6 +91,7 @@ export class MediaService extends BaseService { return JobStatus.SUCCESS; } + @OnJob({ name: JobName.QUEUE_MIGRATION, queue: QueueName.MIGRATION }) async handleQueueMigration(): Promise { const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => this.assetRepository.getAll(pagination), @@ -120,7 +122,8 @@ export class MediaService extends BaseService { return JobStatus.SUCCESS; } - async handleAssetMigration({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.MIGRATE_ASSET, queue: QueueName.MIGRATION }) + async handleAssetMigration({ id }: JobOf): Promise { const { image } = await this.getConfig({ withCache: true }); const [asset] = await this.assetRepository.getByIds([id], { files: true }); if (!asset) { @@ -134,7 +137,8 @@ export class MediaService extends BaseService { return JobStatus.SUCCESS; } - async handleGenerateThumbnails({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.GENERATE_THUMBNAILS, queue: QueueName.THUMBNAIL_GENERATION }) + async handleGenerateThumbnails({ id }: JobOf): Promise { const asset = await this.assetRepository.getById(id, { exifInfo: true, files: true }); if (!asset) { this.logger.warn(`Thumbnail generation failed for asset ${id}: not found`); @@ -257,7 +261,8 @@ export class MediaService extends BaseService { return { previewPath, thumbnailPath, thumbhash }; } - async handleQueueVideoConversion(job: IBaseJob): Promise { + @OnJob({ name: JobName.QUEUE_VIDEO_CONVERSION, queue: QueueName.VIDEO_CONVERSION }) + async handleQueueVideoConversion(job: JobOf): Promise { const { force } = job; const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { @@ -275,7 +280,8 @@ export class MediaService extends BaseService { return JobStatus.SUCCESS; } - async handleVideoConversion({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.VIDEO_CONVERSION, queue: QueueName.VIDEO_CONVERSION }) + async handleVideoConversion({ id }: JobOf): Promise { const [asset] = await this.assetRepository.getByIds([id]); if (!asset || asset.type !== AssetType.VIDEO) { return JobStatus.FAILED; diff --git a/server/src/services/metadata.service.ts b/server/src/services/metadata.service.ts index a45bcd4252..3e958ad4df 100644 --- a/server/src/services/metadata.service.ts +++ b/server/src/services/metadata.service.ts @@ -7,7 +7,7 @@ import { constants } from 'node:fs/promises'; import path from 'node:path'; import { SystemConfig } from 'src/config'; import { StorageCore } from 'src/cores/storage.core'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { AssetFaceEntity } from 'src/entities/asset-face.entity'; import { AssetEntity } from 'src/entities/asset.entity'; import { ExifEntity } from 'src/entities/exif.entity'; @@ -16,15 +16,7 @@ import { AssetType, ImmichWorker, SourceType } from 'src/enum'; import { WithoutProperty } from 'src/interfaces/asset.interface'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; -import { - IBaseJob, - IEntityJob, - ISidecarWriteJob, - JobName, - JOBS_ASSET_PAGINATION_SIZE, - JobStatus, - QueueName, -} from 'src/interfaces/job.interface'; +import { JobName, JobOf, JOBS_ASSET_PAGINATION_SIZE, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { ReverseGeocodeResult } from 'src/interfaces/map.interface'; import { ImmichTags } from 'src/interfaces/metadata.interface'; import { BaseService } from 'src/services/base.service'; @@ -124,7 +116,8 @@ export class MetadataService extends BaseService { } } - async handleLivePhotoLinking(job: IEntityJob): Promise { + @OnJob({ name: JobName.LINK_LIVE_PHOTOS, queue: QueueName.METADATA_EXTRACTION }) + async handleLivePhotoLinking(job: JobOf): Promise { const { id } = job; const [asset] = await this.assetRepository.getByIds([id], { exifInfo: true }); if (!asset?.exifInfo) { @@ -159,7 +152,8 @@ export class MetadataService extends BaseService { return JobStatus.SUCCESS; } - async handleQueueMetadataExtraction(job: IBaseJob): Promise { + @OnJob({ name: JobName.QUEUE_METADATA_EXTRACTION, queue: QueueName.METADATA_EXTRACTION }) + async handleQueueMetadataExtraction(job: JobOf): Promise { const { force } = job; const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { return force @@ -176,7 +170,8 @@ export class MetadataService extends BaseService { return JobStatus.SUCCESS; } - async handleMetadataExtraction({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.METADATA_EXTRACTION, queue: QueueName.METADATA_EXTRACTION }) + async handleMetadataExtraction({ id }: JobOf): Promise { const { metadata, reverseGeocoding } = await this.getConfig({ withCache: true }); const [asset] = await this.assetRepository.getByIds([id], { faces: { person: false } }); if (!asset) { @@ -260,7 +255,8 @@ export class MetadataService extends BaseService { return JobStatus.SUCCESS; } - async handleQueueSidecar(job: IBaseJob): Promise { + @OnJob({ name: JobName.QUEUE_SIDECAR, queue: QueueName.SIDECAR }) + async handleQueueSidecar(job: JobOf): Promise { const { force } = job; const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { return force @@ -280,11 +276,13 @@ export class MetadataService extends BaseService { return JobStatus.SUCCESS; } - handleSidecarSync({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.SIDECAR_SYNC, queue: QueueName.SIDECAR }) + handleSidecarSync({ id }: JobOf): Promise { return this.processSidecar(id, true); } - handleSidecarDiscovery({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.SIDECAR_DISCOVERY, queue: QueueName.SIDECAR }) + handleSidecarDiscovery({ id }: JobOf): Promise { return this.processSidecar(id, false); } @@ -298,7 +296,8 @@ export class MetadataService extends BaseService { await this.jobRepository.queue({ name: JobName.SIDECAR_WRITE, data: { id: assetId, tags: true } }); } - async handleSidecarWrite(job: ISidecarWriteJob): Promise { + @OnJob({ name: JobName.SIDECAR_WRITE, queue: QueueName.SIDECAR }) + async handleSidecarWrite(job: JobOf): Promise { const { id, description, dateTimeOriginal, latitude, longitude, rating, tags } = job; const [asset] = await this.assetRepository.getByIds([id], { tags: true }); if (!asset) { diff --git a/server/src/services/microservices.service.ts b/server/src/services/microservices.service.ts deleted file mode 100644 index c600077809..0000000000 --- a/server/src/services/microservices.service.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { OnEvent } from 'src/decorators'; -import { ImmichWorker } from 'src/enum'; -import { ArgOf } from 'src/interfaces/event.interface'; -import { IDeleteFilesJob, JobName } from 'src/interfaces/job.interface'; -import { AssetService } from 'src/services/asset.service'; -import { AuditService } from 'src/services/audit.service'; -import { BackupService } from 'src/services/backup.service'; -import { DuplicateService } from 'src/services/duplicate.service'; -import { JobService } from 'src/services/job.service'; -import { LibraryService } from 'src/services/library.service'; -import { MediaService } from 'src/services/media.service'; -import { MetadataService } from 'src/services/metadata.service'; -import { NotificationService } from 'src/services/notification.service'; -import { PersonService } from 'src/services/person.service'; -import { SessionService } from 'src/services/session.service'; -import { SmartInfoService } from 'src/services/smart-info.service'; -import { StorageTemplateService } from 'src/services/storage-template.service'; -import { StorageService } from 'src/services/storage.service'; -import { TagService } from 'src/services/tag.service'; -import { TrashService } from 'src/services/trash.service'; -import { UserService } from 'src/services/user.service'; -import { VersionService } from 'src/services/version.service'; - -@Injectable() -export class MicroservicesService { - constructor( - private auditService: AuditService, - private assetService: AssetService, - private backupService: BackupService, - private jobService: JobService, - private libraryService: LibraryService, - private mediaService: MediaService, - private metadataService: MetadataService, - private notificationService: NotificationService, - private personService: PersonService, - private smartInfoService: SmartInfoService, - private sessionService: SessionService, - private storageTemplateService: StorageTemplateService, - private storageService: StorageService, - private tagService: TagService, - private trashService: TrashService, - private userService: UserService, - private duplicateService: DuplicateService, - private versionService: VersionService, - ) {} - - @OnEvent({ name: 'app.bootstrap' }) - async onBootstrap(app: ArgOf<'app.bootstrap'>) { - if (app !== ImmichWorker.MICROSERVICES) { - return; - } - - await this.jobService.init({ - [JobName.ASSET_DELETION]: (data) => this.assetService.handleAssetDeletion(data), - [JobName.ASSET_DELETION_CHECK]: () => this.assetService.handleAssetDeletionCheck(), - [JobName.BACKUP_DATABASE]: () => this.backupService.handleBackupDatabase(), - [JobName.DELETE_FILES]: (data: IDeleteFilesJob) => this.storageService.handleDeleteFiles(data), - [JobName.CLEAN_OLD_AUDIT_LOGS]: () => this.auditService.handleCleanup(), - [JobName.CLEAN_OLD_SESSION_TOKENS]: () => this.sessionService.handleCleanup(), - [JobName.USER_DELETE_CHECK]: () => this.userService.handleUserDeleteCheck(), - [JobName.USER_DELETION]: (data) => this.userService.handleUserDelete(data), - [JobName.USER_SYNC_USAGE]: () => this.userService.handleUserSyncUsage(), - [JobName.QUEUE_SMART_SEARCH]: (data) => this.smartInfoService.handleQueueEncodeClip(data), - [JobName.SMART_SEARCH]: (data) => this.smartInfoService.handleEncodeClip(data), - [JobName.QUEUE_DUPLICATE_DETECTION]: (data) => this.duplicateService.handleQueueSearchDuplicates(data), - [JobName.DUPLICATE_DETECTION]: (data) => this.duplicateService.handleSearchDuplicates(data), - [JobName.STORAGE_TEMPLATE_MIGRATION]: () => this.storageTemplateService.handleMigration(), - [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: (data) => this.storageTemplateService.handleMigrationSingle(data), - [JobName.QUEUE_MIGRATION]: () => this.mediaService.handleQueueMigration(), - [JobName.MIGRATE_ASSET]: (data) => this.mediaService.handleAssetMigration(data), - [JobName.MIGRATE_PERSON]: (data) => this.personService.handlePersonMigration(data), - [JobName.QUEUE_GENERATE_THUMBNAILS]: (data) => this.mediaService.handleQueueGenerateThumbnails(data), - [JobName.GENERATE_THUMBNAILS]: (data) => this.mediaService.handleGenerateThumbnails(data), - [JobName.QUEUE_VIDEO_CONVERSION]: (data) => this.mediaService.handleQueueVideoConversion(data), - [JobName.VIDEO_CONVERSION]: (data) => this.mediaService.handleVideoConversion(data), - [JobName.QUEUE_METADATA_EXTRACTION]: (data) => this.metadataService.handleQueueMetadataExtraction(data), - [JobName.METADATA_EXTRACTION]: (data) => this.metadataService.handleMetadataExtraction(data), - [JobName.LINK_LIVE_PHOTOS]: (data) => this.metadataService.handleLivePhotoLinking(data), - [JobName.QUEUE_FACE_DETECTION]: (data) => this.personService.handleQueueDetectFaces(data), - [JobName.FACE_DETECTION]: (data) => this.personService.handleDetectFaces(data), - [JobName.QUEUE_FACIAL_RECOGNITION]: (data) => this.personService.handleQueueRecognizeFaces(data), - [JobName.FACIAL_RECOGNITION]: (data) => this.personService.handleRecognizeFaces(data), - [JobName.GENERATE_PERSON_THUMBNAIL]: (data) => this.personService.handleGeneratePersonThumbnail(data), - [JobName.PERSON_CLEANUP]: () => this.personService.handlePersonCleanup(), - [JobName.QUEUE_SIDECAR]: (data) => this.metadataService.handleQueueSidecar(data), - [JobName.SIDECAR_DISCOVERY]: (data) => this.metadataService.handleSidecarDiscovery(data), - [JobName.SIDECAR_SYNC]: (data) => this.metadataService.handleSidecarSync(data), - [JobName.SIDECAR_WRITE]: (data) => this.metadataService.handleSidecarWrite(data), - [JobName.LIBRARY_QUEUE_SYNC_ALL]: () => this.libraryService.handleQueueSyncAll(), - [JobName.LIBRARY_QUEUE_SYNC_FILES]: (data) => this.libraryService.handleQueueSyncFiles(data), //Queues all files paths on disk - [JobName.LIBRARY_SYNC_FILE]: (data) => this.libraryService.handleSyncFile(data), //Handles a single path on disk //Watcher calls for new files - [JobName.LIBRARY_QUEUE_SYNC_ASSETS]: (data) => this.libraryService.handleQueueSyncAssets(data), //Queues all library assets - [JobName.LIBRARY_SYNC_ASSET]: (data) => this.libraryService.handleSyncAsset(data), //Handles all library assets // Watcher calls for unlink and changed - [JobName.LIBRARY_DELETE]: (data) => this.libraryService.handleDeleteLibrary(data), - [JobName.LIBRARY_QUEUE_CLEANUP]: () => this.libraryService.handleQueueCleanup(), - [JobName.SEND_EMAIL]: (data) => this.notificationService.handleSendEmail(data), - [JobName.NOTIFY_ALBUM_INVITE]: (data) => this.notificationService.handleAlbumInvite(data), - [JobName.NOTIFY_ALBUM_UPDATE]: (data) => this.notificationService.handleAlbumUpdate(data), - [JobName.NOTIFY_SIGNUP]: (data) => this.notificationService.handleUserSignup(data), - [JobName.TAG_CLEANUP]: () => this.tagService.handleTagCleanup(), - [JobName.VERSION_CHECK]: () => this.versionService.handleVersionCheck(), - [JobName.QUEUE_TRASH_EMPTY]: () => this.trashService.handleQueueEmptyTrash(), - }); - } -} diff --git a/server/src/services/notification.service.ts b/server/src/services/notification.service.ts index c3c7727468..e7c0201963 100644 --- a/server/src/services/notification.service.ts +++ b/server/src/services/notification.service.ts @@ -1,17 +1,16 @@ import { BadRequestException, Injectable } from '@nestjs/common'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { SystemConfigSmtpDto } from 'src/dtos/system-config.dto'; import { AlbumEntity } from 'src/entities/album.entity'; import { ArgOf } from 'src/interfaces/event.interface'; import { - IEmailJob, IEntityJob, - INotifyAlbumInviteJob, INotifyAlbumUpdateJob, - INotifySignupJob, JobItem, JobName, + JobOf, JobStatus, + QueueName, } from 'src/interfaces/job.interface'; import { EmailImageAttachment, EmailTemplate } from 'src/interfaces/notification.interface'; import { BaseService } from 'src/services/base.service'; @@ -176,7 +175,8 @@ export class NotificationService extends BaseService { return { messageId }; } - async handleUserSignup({ id, tempPassword }: INotifySignupJob) { + @OnJob({ name: JobName.NOTIFY_SIGNUP, queue: QueueName.NOTIFICATION }) + async handleUserSignup({ id, tempPassword }: JobOf) { const user = await this.userRepository.get(id, { withDeleted: false }); if (!user) { return JobStatus.SKIPPED; @@ -207,7 +207,8 @@ export class NotificationService extends BaseService { return JobStatus.SUCCESS; } - async handleAlbumInvite({ id, recipientId }: INotifyAlbumInviteJob) { + @OnJob({ name: JobName.NOTIFY_ALBUM_INVITE, queue: QueueName.NOTIFICATION }) + async handleAlbumInvite({ id, recipientId }: JobOf) { const album = await this.albumRepository.getById(id, { withAssets: false }); if (!album) { return JobStatus.SKIPPED; @@ -254,7 +255,8 @@ export class NotificationService extends BaseService { return JobStatus.SUCCESS; } - async handleAlbumUpdate({ id, recipientIds }: INotifyAlbumUpdateJob) { + @OnJob({ name: JobName.NOTIFY_ALBUM_UPDATE, queue: QueueName.NOTIFICATION }) + async handleAlbumUpdate({ id, recipientIds }: JobOf) { const album = await this.albumRepository.getById(id, { withAssets: false }); if (!album) { @@ -312,7 +314,8 @@ export class NotificationService extends BaseService { return JobStatus.SUCCESS; } - async handleSendEmail(data: IEmailJob): Promise { + @OnJob({ name: JobName.SEND_EMAIL, queue: QueueName.NOTIFICATION }) + async handleSendEmail(data: JobOf): Promise { const { notifications } = await this.getConfig({ withCache: false }); if (!notifications.smtp.enabled) { return JobStatus.SKIPPED; diff --git a/server/src/services/person.service.ts b/server/src/services/person.service.ts index e5f016d8ef..5b6e721eab 100644 --- a/server/src/services/person.service.ts +++ b/server/src/services/person.service.ts @@ -1,6 +1,7 @@ import { BadRequestException, Injectable, NotFoundException } from '@nestjs/common'; import { FACE_THUMBNAIL_SIZE } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; +import { OnJob } from 'src/decorators'; import { BulkIdErrorReason, BulkIdResponseDto } from 'src/dtos/asset-ids.response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; import { @@ -33,13 +34,10 @@ import { } from 'src/enum'; import { WithoutProperty } from 'src/interfaces/asset.interface'; import { - IBaseJob, - IDeferrableJob, - IEntityJob, - INightlyJob, JOBS_ASSET_PAGINATION_SIZE, JobItem, JobName, + JobOf, JobStatus, QueueName, } from 'src/interfaces/job.interface'; @@ -231,13 +229,15 @@ export class PersonService extends BaseService { this.logger.debug(`Deleted ${people.length} people`); } + @OnJob({ name: JobName.PERSON_CLEANUP, queue: QueueName.BACKGROUND_TASK }) async handlePersonCleanup(): Promise { const people = await this.personRepository.getAllWithoutFaces(); await this.delete(people); return JobStatus.SUCCESS; } - async handleQueueDetectFaces({ force }: IBaseJob): Promise { + @OnJob({ name: JobName.QUEUE_FACE_DETECTION, queue: QueueName.FACE_DETECTION }) + async handleQueueDetectFaces({ force }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: false }); if (!isFacialRecognitionEnabled(machineLearning)) { return JobStatus.SKIPPED; @@ -272,7 +272,8 @@ export class PersonService extends BaseService { return JobStatus.SUCCESS; } - async handleDetectFaces({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.FACE_DETECTION, queue: QueueName.FACE_DETECTION }) + async handleDetectFaces({ id }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: true }); if (!isFacialRecognitionEnabled(machineLearning)) { return JobStatus.SKIPPED; @@ -376,7 +377,8 @@ export class PersonService extends BaseService { return intersection / union; } - async handleQueueRecognizeFaces({ force, nightly }: INightlyJob): Promise { + @OnJob({ name: JobName.QUEUE_FACIAL_RECOGNITION, queue: QueueName.FACIAL_RECOGNITION }) + async handleQueueRecognizeFaces({ force, nightly }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: false }); if (!isFacialRecognitionEnabled(machineLearning)) { return JobStatus.SKIPPED; @@ -426,7 +428,8 @@ export class PersonService extends BaseService { return JobStatus.SUCCESS; } - async handleRecognizeFaces({ id, deferred }: IDeferrableJob): Promise { + @OnJob({ name: JobName.FACIAL_RECOGNITION, queue: QueueName.FACIAL_RECOGNITION }) + async handleRecognizeFaces({ id, deferred }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: true }); if (!isFacialRecognitionEnabled(machineLearning)) { return JobStatus.SKIPPED; @@ -509,7 +512,8 @@ export class PersonService extends BaseService { return JobStatus.SUCCESS; } - async handlePersonMigration({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.MIGRATE_PERSON, queue: QueueName.MIGRATION }) + async handlePersonMigration({ id }: JobOf): Promise { const person = await this.personRepository.getById(id); if (!person) { return JobStatus.FAILED; @@ -520,7 +524,8 @@ export class PersonService extends BaseService { return JobStatus.SUCCESS; } - async handleGeneratePersonThumbnail(data: IEntityJob): Promise { + @OnJob({ name: JobName.GENERATE_PERSON_THUMBNAIL, queue: QueueName.THUMBNAIL_GENERATION }) + async handleGeneratePersonThumbnail(data: JobOf): Promise { const { machineLearning, metadata, image } = await this.getConfig({ withCache: true }); if (!isFacialRecognitionEnabled(machineLearning) && !isFaceImportEnabled(metadata)) { return JobStatus.SKIPPED; diff --git a/server/src/services/session.service.ts b/server/src/services/session.service.ts index 2e27942c66..68df7828ad 100644 --- a/server/src/services/session.service.ts +++ b/server/src/services/session.service.ts @@ -1,14 +1,16 @@ import { Injectable } from '@nestjs/common'; import { DateTime } from 'luxon'; +import { OnJob } from 'src/decorators'; import { AuthDto } from 'src/dtos/auth.dto'; import { SessionResponseDto, mapSession } from 'src/dtos/session.dto'; import { Permission } from 'src/enum'; -import { JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; @Injectable() export class SessionService extends BaseService { - async handleCleanup() { + @OnJob({ name: JobName.CLEAN_OLD_SESSION_TOKENS, queue: QueueName.BACKGROUND_TASK }) + async handleCleanup(): Promise { const sessions = await this.sessionRepository.search({ updatedBefore: DateTime.now().minus({ days: 90 }).toJSDate(), }); diff --git a/server/src/services/smart-info.service.ts b/server/src/services/smart-info.service.ts index 778f40c931..e74c0a7fe5 100644 --- a/server/src/services/smart-info.service.ts +++ b/server/src/services/smart-info.service.ts @@ -1,18 +1,11 @@ import { Injectable } from '@nestjs/common'; import { SystemConfig } from 'src/config'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { ImmichWorker } from 'src/enum'; import { WithoutProperty } from 'src/interfaces/asset.interface'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; -import { - IBaseJob, - IEntityJob, - JOBS_ASSET_PAGINATION_SIZE, - JobName, - JobStatus, - QueueName, -} from 'src/interfaces/job.interface'; +import { JOBS_ASSET_PAGINATION_SIZE, JobName, JobOf, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; import { getAssetFiles } from 'src/utils/asset.util'; import { getCLIPModelInfo, isSmartSearchEnabled } from 'src/utils/misc'; @@ -86,7 +79,8 @@ export class SmartInfoService extends BaseService { }); } - async handleQueueEncodeClip({ force }: IBaseJob): Promise { + @OnJob({ name: JobName.QUEUE_SMART_SEARCH, queue: QueueName.SMART_SEARCH }) + async handleQueueEncodeClip({ force }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: false }); if (!isSmartSearchEnabled(machineLearning)) { return JobStatus.SKIPPED; @@ -111,7 +105,8 @@ export class SmartInfoService extends BaseService { return JobStatus.SUCCESS; } - async handleEncodeClip({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.SMART_SEARCH, queue: QueueName.SMART_SEARCH }) + async handleEncodeClip({ id }: JobOf): Promise { const { machineLearning } = await this.getConfig({ withCache: true }); if (!isSmartSearchEnabled(machineLearning)) { return JobStatus.SKIPPED; diff --git a/server/src/services/storage-template.service.ts b/server/src/services/storage-template.service.ts index d239435660..f54f493e21 100644 --- a/server/src/services/storage-template.service.ts +++ b/server/src/services/storage-template.service.ts @@ -4,13 +4,13 @@ import { DateTime } from 'luxon'; import path from 'node:path'; import sanitize from 'sanitize-filename'; import { StorageCore } from 'src/cores/storage.core'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { SystemConfigTemplateStorageOptionDto } from 'src/dtos/system-config.dto'; import { AssetEntity } from 'src/entities/asset.entity'; import { AssetPathType, AssetType, StorageFolder } from 'src/enum'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; -import { IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JobOf, JOBS_ASSET_PAGINATION_SIZE, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; import { getLivePhotoMotionFilename } from 'src/utils/file'; import { usePagination } from 'src/utils/pagination'; @@ -108,7 +108,8 @@ export class StorageTemplateService extends BaseService { return { ...storageTokens, presetOptions: storagePresets }; } - async handleMigrationSingle({ id }: IEntityJob): Promise { + @OnJob({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, queue: QueueName.STORAGE_TEMPLATE_MIGRATION }) + async handleMigrationSingle({ id }: JobOf): Promise { const config = await this.getConfig({ withCache: true }); const storageTemplateEnabled = config.storageTemplate.enabled; if (!storageTemplateEnabled) { @@ -137,6 +138,7 @@ export class StorageTemplateService extends BaseService { return JobStatus.SUCCESS; } + @OnJob({ name: JobName.STORAGE_TEMPLATE_MIGRATION, queue: QueueName.STORAGE_TEMPLATE_MIGRATION }) async handleMigration(): Promise { this.logger.log('Starting storage template migration'); const { storageTemplate } = await this.getConfig({ withCache: true }); diff --git a/server/src/services/storage.service.spec.ts b/server/src/services/storage.service.spec.ts index dd9bb9969d..dd97a063ae 100644 --- a/server/src/services/storage.service.spec.ts +++ b/server/src/services/storage.service.spec.ts @@ -3,7 +3,8 @@ import { IConfigRepository } from 'src/interfaces/config.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { IStorageRepository } from 'src/interfaces/storage.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; -import { ImmichStartupError, StorageService } from 'src/services/storage.service'; +import { StorageService } from 'src/services/storage.service'; +import { ImmichStartupError } from 'src/utils/misc'; import { mockEnvData } from 'test/repositories/config.repository.mock'; import { newTestService } from 'test/utils'; import { Mocked } from 'vitest'; diff --git a/server/src/services/storage.service.ts b/server/src/services/storage.service.ts index 3b6a16fb41..ce26df4869 100644 --- a/server/src/services/storage.service.ts +++ b/server/src/services/storage.service.ts @@ -1,15 +1,13 @@ import { Injectable } from '@nestjs/common'; import { join } from 'node:path'; import { StorageCore } from 'src/cores/storage.core'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { SystemFlags } from 'src/entities/system-metadata.entity'; import { StorageFolder, SystemMetadataKey } from 'src/enum'; import { DatabaseLock } from 'src/interfaces/database.interface'; -import { IDeleteFilesJob, JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JobOf, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; - -export class ImmichStartupError extends Error {} -export const isStartUpError = (error: unknown): error is ImmichStartupError => error instanceof ImmichStartupError; +import { ImmichStartupError } from 'src/utils/misc'; const docsMessage = `Please see https://immich.app/docs/administration/system-integrity#folder-checks for more information.`; @@ -66,7 +64,8 @@ export class StorageService extends BaseService { }); } - async handleDeleteFiles(job: IDeleteFilesJob) { + @OnJob({ name: JobName.DELETE_FILES, queue: QueueName.BACKGROUND_TASK }) + async handleDeleteFiles(job: JobOf): Promise { const { files } = job; // TODO: one job per file diff --git a/server/src/services/tag.service.ts b/server/src/services/tag.service.ts index 5534d74efa..2aca400cc7 100644 --- a/server/src/services/tag.service.ts +++ b/server/src/services/tag.service.ts @@ -1,4 +1,5 @@ import { BadRequestException, Injectable } from '@nestjs/common'; +import { OnJob } from 'src/decorators'; import { BulkIdResponseDto, BulkIdsDto } from 'src/dtos/asset-ids.response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; import { @@ -12,7 +13,7 @@ import { } from 'src/dtos/tag.dto'; import { TagEntity } from 'src/entities/tag.entity'; import { Permission } from 'src/enum'; -import { JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { AssetTagItem } from 'src/interfaces/tag.interface'; import { BaseService } from 'src/services/base.service'; import { addAssets, removeAssets } from 'src/utils/asset.util'; @@ -131,6 +132,7 @@ export class TagService extends BaseService { return results; } + @OnJob({ name: JobName.TAG_CLEANUP, queue: QueueName.BACKGROUND_TASK }) async handleTagCleanup() { await this.tagRepository.deleteEmptyTags(); return JobStatus.SUCCESS; diff --git a/server/src/services/trash.service.ts b/server/src/services/trash.service.ts index 91c359392e..621dee0f81 100644 --- a/server/src/services/trash.service.ts +++ b/server/src/services/trash.service.ts @@ -1,9 +1,9 @@ -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { BulkIdsDto } from 'src/dtos/asset-ids.response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; import { TrashResponseDto } from 'src/dtos/trash.dto'; import { Permission } from 'src/enum'; -import { JOBS_ASSET_PAGINATION_SIZE, JobName, JobStatus } from 'src/interfaces/job.interface'; +import { JOBS_ASSET_PAGINATION_SIZE, JobName, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; import { usePagination } from 'src/utils/pagination'; @@ -44,6 +44,7 @@ export class TrashService extends BaseService { await this.jobRepository.queue({ name: JobName.QUEUE_TRASH_EMPTY, data: {} }); } + @OnJob({ name: JobName.QUEUE_TRASH_EMPTY, queue: QueueName.BACKGROUND_TASK }) async handleQueueEmptyTrash() { let count = 0; const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => diff --git a/server/src/services/user.service.ts b/server/src/services/user.service.ts index f67d04cbd3..926482fb9c 100644 --- a/server/src/services/user.service.ts +++ b/server/src/services/user.service.ts @@ -2,6 +2,7 @@ import { BadRequestException, Injectable, NotFoundException } from '@nestjs/comm import { DateTime } from 'luxon'; import { SALT_ROUNDS } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; +import { OnJob } from 'src/decorators'; import { AuthDto } from 'src/dtos/auth.dto'; import { LicenseKeyDto, LicenseResponseDto } from 'src/dtos/license.dto'; import { UserPreferencesResponseDto, UserPreferencesUpdateDto, mapPreferences } from 'src/dtos/user-preferences.dto'; @@ -10,7 +11,7 @@ import { UserAdminResponseDto, UserResponseDto, UserUpdateMeDto, mapUser, mapUse import { UserMetadataEntity } from 'src/entities/user-metadata.entity'; import { UserEntity } from 'src/entities/user.entity'; import { CacheControl, StorageFolder, UserMetadataKey } from 'src/enum'; -import { IEntityJob, JobName, JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JobOf, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { UserFindOptions } from 'src/interfaces/user.interface'; import { BaseService } from 'src/services/base.service'; import { ImmichFileResponse } from 'src/utils/file'; @@ -163,11 +164,13 @@ export class UserService extends BaseService { return licenseData; } + @OnJob({ name: JobName.USER_SYNC_USAGE, queue: QueueName.BACKGROUND_TASK }) async handleUserSyncUsage(): Promise { await this.userRepository.syncUsage(); return JobStatus.SUCCESS; } + @OnJob({ name: JobName.USER_DELETE_CHECK, queue: QueueName.BACKGROUND_TASK }) async handleUserDeleteCheck(): Promise { const users = await this.userRepository.getDeletedUsers(); const config = await this.getConfig({ withCache: false }); @@ -181,7 +184,8 @@ export class UserService extends BaseService { return JobStatus.SUCCESS; } - async handleUserDelete({ id, force }: IEntityJob): Promise { + @OnJob({ name: JobName.USER_DELETION, queue: QueueName.BACKGROUND_TASK }) + async handleUserDelete({ id, force }: JobOf): Promise { const config = await this.getConfig({ withCache: false }); const user = await this.userRepository.get(id, { withDeleted: true }); if (!user) { diff --git a/server/src/services/version.service.ts b/server/src/services/version.service.ts index 231ced1a95..ff4fa3c6bf 100644 --- a/server/src/services/version.service.ts +++ b/server/src/services/version.service.ts @@ -2,13 +2,13 @@ import { Injectable } from '@nestjs/common'; import { DateTime } from 'luxon'; import semver, { SemVer } from 'semver'; import { serverVersion } from 'src/constants'; -import { OnEvent } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.dto'; import { VersionCheckMetadata } from 'src/entities/system-metadata.entity'; import { ImmichEnvironment, SystemMetadataKey } from 'src/enum'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; -import { JobName, JobStatus } from 'src/interfaces/job.interface'; +import { JobName, JobStatus, QueueName } from 'src/interfaces/job.interface'; import { BaseService } from 'src/services/base.service'; const asNotification = ({ checkedAt, releaseVersion }: VersionCheckMetadata): ReleaseNotification => { @@ -48,6 +48,7 @@ export class VersionService extends BaseService { await this.jobRepository.queue({ name: JobName.VERSION_CHECK, data: {} }); } + @OnJob({ name: JobName.VERSION_CHECK, queue: QueueName.BACKGROUND_TASK }) async handleVersionCheck(): Promise { try { this.logger.debug('Running version check'); diff --git a/server/src/utils/logger.ts b/server/src/utils/logger.ts index 2e33a7bcb5..cf66404d69 100644 --- a/server/src/utils/logger.ts +++ b/server/src/utils/logger.ts @@ -16,7 +16,7 @@ export const logGlobalError = (logger: ILoggerRepository, error: Error) => { } if (error instanceof Error) { - logger.error(`Unknown error: ${error}`); + logger.error(`Unknown error: ${error}`, error?.stack); return; } }; diff --git a/server/src/utils/misc.ts b/server/src/utils/misc.ts index 6e435e68a8..6a64923a3b 100644 --- a/server/src/utils/misc.ts +++ b/server/src/utils/misc.ts @@ -15,6 +15,32 @@ import { CLIP_MODEL_INFO, serverVersion } from 'src/constants'; import { ImmichCookie, ImmichHeader, MetadataKey } from 'src/enum'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; +export class ImmichStartupError extends Error {} +export const isStartUpError = (error: unknown): error is ImmichStartupError => error instanceof ImmichStartupError; + +export const getKeyByValue = (object: Record, value: unknown) => + Object.keys(object).find((key) => object[key] === value); + +export const getMethodNames = (instance: any) => { + const ctx = Object.getPrototypeOf(instance); + const methods: string[] = []; + for (const property of Object.getOwnPropertyNames(ctx)) { + const descriptor = Object.getOwnPropertyDescriptor(ctx, property); + if (!descriptor || descriptor.get || descriptor.set) { + continue; + } + + const handler = instance[property]; + if (typeof handler !== 'function') { + continue; + } + + methods.push(property); + } + + return methods; +}; + export const getExternalDomain = (server: SystemConfig['server'], port: number) => server.externalDomain || `http://localhost:${port}`; diff --git a/server/src/workers/api.ts b/server/src/workers/api.ts index bc8eb22b20..5196e7595c 100644 --- a/server/src/workers/api.ts +++ b/server/src/workers/api.ts @@ -13,8 +13,7 @@ import { WebSocketAdapter } from 'src/middleware/websocket.adapter'; import { ConfigRepository } from 'src/repositories/config.repository'; import { bootstrapTelemetry } from 'src/repositories/telemetry.repository'; import { ApiService } from 'src/services/api.service'; -import { isStartUpError } from 'src/services/storage.service'; -import { useSwagger } from 'src/utils/misc'; +import { isStartUpError, useSwagger } from 'src/utils/misc'; async function bootstrap() { process.title = 'immich-api'; diff --git a/server/src/workers/microservices.ts b/server/src/workers/microservices.ts index bd1e65d6cc..0fa056d5d4 100644 --- a/server/src/workers/microservices.ts +++ b/server/src/workers/microservices.ts @@ -7,7 +7,7 @@ import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { WebSocketAdapter } from 'src/middleware/websocket.adapter'; import { ConfigRepository } from 'src/repositories/config.repository'; import { bootstrapTelemetry } from 'src/repositories/telemetry.repository'; -import { isStartUpError } from 'src/services/storage.service'; +import { isStartUpError } from 'src/utils/misc'; export async function bootstrap() { const { telemetry } = new ConfigRepository().getEnv(); diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index cfa1826dd8..2875c4405b 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -3,7 +3,9 @@ import { Mocked, vitest } from 'vitest'; export const newJobRepositoryMock = (): Mocked => { return { - addHandler: vitest.fn(), + setup: vitest.fn(), + startWorkers: vitest.fn(), + run: vitest.fn(), addCronJob: vitest.fn(), updateCronJob: vitest.fn(), setConcurrency: vitest.fn(),