import { getQueueToken } from '@nestjs/bullmq'; import { Inject, Injectable } from '@nestjs/common'; import { ModuleRef, Reflector } from '@nestjs/core'; import { SchedulerRegistry } from '@nestjs/schedule'; import { JobsOptions, Queue, Worker } from 'bullmq'; import { ClassConstructor } from 'class-transformer'; import { setTimeout } from 'node:timers/promises'; import { JobConfig } from 'src/decorators'; import { MetadataKey } from 'src/enum'; import { IConfigRepository } from 'src/interfaces/config.interface'; import { IEventRepository } from 'src/interfaces/event.interface'; import { IEntityJob, IJobRepository, JobCounts, JobItem, JobName, JobOf, JobStatus, QueueCleanType, QueueName, QueueStatus, } from 'src/interfaces/job.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; type JobMapItem = { jobName: JobName; queueName: QueueName; handler: (job: JobOf<any>) => Promise<JobStatus>; label: string; }; @Injectable() export class JobRepository implements IJobRepository { private workers: Partial<Record<QueueName, Worker>> = {}; private handlers: Partial<Record<JobName, JobMapItem>> = {}; constructor( private moduleRef: ModuleRef, private schedulerRegistry: SchedulerRegistry, @Inject(IConfigRepository) private configRepository: IConfigRepository, @Inject(IEventRepository) private eventRepository: IEventRepository, @Inject(ILoggerRepository) private logger: ILoggerRepository, ) { this.logger.setContext(JobRepository.name); } setup({ services }: { services: ClassConstructor<unknown>[] }) { const reflector = this.moduleRef.get(Reflector, { strict: false }); // discovery for (const Service of services) { const instance = this.moduleRef.get<any>(Service); for (const methodName of getMethodNames(instance)) { const handler = instance[methodName]; const config = reflector.get<JobConfig>(MetadataKey.JOB_CONFIG, handler); if (!config) { continue; } const { name: jobName, queue: queueName } = config; const label = `${Service.name}.${handler.name}`; // one handler per job if (this.handlers[jobName]) { const jobKey = getKeyByValue(JobName, jobName); const errorMessage = `Failed to add job handler for ${label}`; this.logger.error( `${errorMessage}. JobName.${jobKey} is already handled by ${this.handlers[jobName].label}.`, ); throw new ImmichStartupError(errorMessage); } this.handlers[jobName] = { label, jobName, queueName, handler: handler.bind(instance), }; this.logger.verbose(`Added job handler: ${jobName} => ${label}`); } } // no missing handlers for (const [jobKey, jobName] of Object.entries(JobName)) { const item = this.handlers[jobName]; if (!item) { const errorMessage = `Failed to find job handler for Job.${jobKey} ("${jobName}")`; this.logger.error( `${errorMessage}. Make sure to add the @OnJob({ name: JobName.${jobKey}, queue: QueueName.XYZ }) decorator for the new job.`, ); throw new ImmichStartupError(errorMessage); } } } startWorkers() { const { bull } = this.configRepository.getEnv(); for (const queueName of Object.values(QueueName)) { this.logger.debug(`Starting worker for queue: ${queueName}`); this.workers[queueName] = new Worker( queueName, (job) => this.eventRepository.emit('job.start', queueName, job as JobItem), { ...bull.config, concurrency: 1 }, ); } } async run({ name, data }: JobItem) { const item = this.handlers[name as JobName]; if (!item) { this.logger.warn(`Skipping unknown job: "${name}"`); return JobStatus.SKIPPED; } return item.handler(data); } setConcurrency(queueName: QueueName, concurrency: number) { const worker = this.workers[queueName]; if (!worker) { this.logger.warn(`Unable to set queue concurrency, worker not found: '${queueName}'`); return; } worker.concurrency = concurrency; } async getQueueStatus(name: QueueName): Promise<QueueStatus> { const queue = this.getQueue(name); return { isActive: !!(await queue.getActiveCount()), isPaused: await queue.isPaused(), }; } pause(name: QueueName) { return this.getQueue(name).pause(); } resume(name: QueueName) { return this.getQueue(name).resume(); } empty(name: QueueName) { return this.getQueue(name).drain(); } clear(name: QueueName, type: QueueCleanType) { return this.getQueue(name).clean(0, 1000, type); } getJobCounts(name: QueueName): Promise<JobCounts> { return this.getQueue(name).getJobCounts( 'active', 'completed', 'failed', 'delayed', 'waiting', 'paused', ) as unknown as Promise<JobCounts>; } private getQueueName(name: JobName) { return (this.handlers[name] as JobMapItem).queueName; } async queueAll(items: JobItem[]): Promise<void> { if (items.length === 0) { return; } const promises = []; const itemsByQueue = {} as Record<string, (JobItem & { data: any; options: JobsOptions | undefined })[]>; for (const item of items) { const queueName = this.getQueueName(item.name); const job = { name: item.name, data: item.data || {}, options: this.getJobOptions(item) || undefined, } as JobItem & { data: any; options: JobsOptions | undefined }; if (job.options?.jobId) { // need to use add() instead of addBulk() for jobId deduplication promises.push(this.getQueue(queueName).add(item.name, item.data, job.options)); } else { itemsByQueue[queueName] = itemsByQueue[queueName] || []; itemsByQueue[queueName].push(job); } } for (const [queueName, jobs] of Object.entries(itemsByQueue)) { const queue = this.getQueue(queueName as QueueName); promises.push(queue.addBulk(jobs)); } await Promise.all(promises); } async queue(item: JobItem): Promise<void> { return this.queueAll([item]); } async waitForQueueCompletion(...queues: QueueName[]): Promise<void> { let activeQueue: QueueStatus | undefined; do { const statuses = await Promise.all(queues.map((name) => this.getQueueStatus(name))); activeQueue = statuses.find((status) => status.isActive); } while (activeQueue); { this.logger.verbose(`Waiting for ${activeQueue} queue to stop...`); await setTimeout(1000); } } private getJobOptions(item: JobItem): JobsOptions | null { switch (item.name) { case JobName.NOTIFY_ALBUM_UPDATE: { return { jobId: item.data.id, delay: item.data?.delay }; } case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { return { jobId: item.data.id }; } case JobName.GENERATE_PERSON_THUMBNAIL: { return { priority: 1 }; } case JobName.QUEUE_FACIAL_RECOGNITION: { return { jobId: JobName.QUEUE_FACIAL_RECOGNITION }; } default: { return null; } } } private getQueue(queue: QueueName): Queue { return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false }); } public async removeJob(jobId: string, name: JobName): Promise<IEntityJob | undefined> { const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId); if (!existingJob) { return; } try { await existingJob.remove(); } catch (error: any) { if (error.message?.includes('Missing key for job')) { return; } throw error; } return existingJob.data; } }