diff --git a/server/apps/microservices/src/main.ts b/server/apps/microservices/src/main.ts index 9f18c3108d..e4d54859ef 100644 --- a/server/apps/microservices/src/main.ts +++ b/server/apps/microservices/src/main.ts @@ -1,9 +1,9 @@ +import { getLogLevels, SERVER_VERSION } from '@app/domain'; +import { RedisIoAdapter } from '@app/infra'; import { Logger } from '@nestjs/common'; import { NestFactory } from '@nestjs/core'; -import { SERVER_VERSION } from '@app/domain'; -import { getLogLevels } from '@app/domain'; -import { RedisIoAdapter } from '@app/infra'; import { MicroservicesModule } from './microservices.module'; +import { ProcessorService } from './processor.service'; import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor'; const logger = new Logger('ImmichMicroservice'); @@ -15,6 +15,8 @@ async function bootstrap() { const listeningPort = Number(process.env.MICROSERVICES_PORT) || 3002; + await app.get(ProcessorService).init(); + app.useWebSocketAdapter(new RedisIoAdapter(app)); const metadataService = app.get(MetadataExtractionProcessor); diff --git a/server/apps/microservices/src/microservices.module.ts b/server/apps/microservices/src/microservices.module.ts index dc421d5f17..6df9496190 100644 --- a/server/apps/microservices/src/microservices.module.ts +++ b/server/apps/microservices/src/microservices.module.ts @@ -3,17 +3,8 @@ import { InfraModule } from '@app/infra'; import { ExifEntity } from '@app/infra/entities'; import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; -import { - BackgroundTaskProcessor, - ClipEncodingProcessor, - FacialRecognitionProcessor, - ObjectTaggingProcessor, - SearchIndexProcessor, - StorageTemplateMigrationProcessor, - ThumbnailGeneratorProcessor, - VideoTranscodeProcessor, -} from './processors'; -import { MetadataExtractionProcessor, SidecarProcessor } from './processors/metadata-extraction.processor'; +import { ProcessorService } from './processor.service'; +import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor'; @Module({ imports: [ @@ -21,17 +12,6 @@ import { MetadataExtractionProcessor, SidecarProcessor } from './processors/meta DomainModule.register({ imports: [InfraModule] }), TypeOrmModule.forFeature([ExifEntity]), ], - providers: [ - ThumbnailGeneratorProcessor, - MetadataExtractionProcessor, - VideoTranscodeProcessor, - ObjectTaggingProcessor, - ClipEncodingProcessor, - StorageTemplateMigrationProcessor, - BackgroundTaskProcessor, - SearchIndexProcessor, - FacialRecognitionProcessor, - SidecarProcessor, - ], + providers: [MetadataExtractionProcessor, ProcessorService], }) export class MicroservicesModule {} diff --git a/server/apps/microservices/src/processor.service.ts b/server/apps/microservices/src/processor.service.ts new file mode 100644 index 0000000000..bec0d0c38d --- /dev/null +++ b/server/apps/microservices/src/processor.service.ts @@ -0,0 +1,105 @@ +import { + AssetService, + FacialRecognitionService, + JobName, + JOBS_TO_QUEUE, + MediaService, + MetadataService, + PersonService, + QueueName, + QUEUE_TO_CONCURRENCY, + SearchService, + SmartInfoService, + StorageService, + StorageTemplateService, + SystemConfigService, + UserService, +} from '@app/domain'; +import { getQueueToken } from '@nestjs/bull'; +import { Injectable } from '@nestjs/common'; +import { ModuleRef } from '@nestjs/core'; +import { Queue } from 'bull'; +import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor'; + +type JobHandler = (data: T) => void | Promise; + +@Injectable() +export class ProcessorService { + constructor( + private moduleRef: ModuleRef, + // TODO refactor to domain + private metadataProcessor: MetadataExtractionProcessor, + + private assetService: AssetService, + private facialRecognitionService: FacialRecognitionService, + private mediaService: MediaService, + private metadataService: MetadataService, + private personService: PersonService, + private searchService: SearchService, + private smartInfoService: SmartInfoService, + private storageTemplateService: StorageTemplateService, + private storageService: StorageService, + private systemConfigService: SystemConfigService, + private userService: UserService, + ) {} + + private handlers: Record = { + [JobName.ASSET_UPLOADED]: (data) => this.assetService.handleAssetUpload(data), + [JobName.DELETE_FILES]: (data) => this.storageService.handleDeleteFiles(data), + [JobName.USER_DELETE_CHECK]: () => this.userService.handleUserDeleteCheck(), + [JobName.USER_DELETION]: (data) => this.userService.handleUserDelete(data), + [JobName.QUEUE_OBJECT_TAGGING]: (data) => this.smartInfoService.handleQueueObjectTagging(data), + [JobName.DETECT_OBJECTS]: (data) => this.smartInfoService.handleDetectObjects(data), + [JobName.CLASSIFY_IMAGE]: (data) => this.smartInfoService.handleClassifyImage(data), + [JobName.QUEUE_ENCODE_CLIP]: (data) => this.smartInfoService.handleQueueEncodeClip(data), + [JobName.ENCODE_CLIP]: (data) => this.smartInfoService.handleEncodeClip(data), + [JobName.SEARCH_INDEX_ALBUMS]: () => this.searchService.handleIndexAlbums(), + [JobName.SEARCH_INDEX_ASSETS]: () => this.searchService.handleIndexAssets(), + [JobName.SEARCH_INDEX_FACES]: () => this.searchService.handleIndexFaces(), + [JobName.SEARCH_INDEX_ALBUM]: (data) => this.searchService.handleIndexAlbum(data), + [JobName.SEARCH_INDEX_ASSET]: (data) => this.searchService.handleIndexAsset(data), + [JobName.SEARCH_INDEX_FACE]: (data) => this.searchService.handleIndexFace(data), + [JobName.SEARCH_REMOVE_ALBUM]: (data) => this.searchService.handleRemoveAlbum(data), + [JobName.SEARCH_REMOVE_ASSET]: (data) => this.searchService.handleRemoveAsset(data), + [JobName.SEARCH_REMOVE_FACE]: (data) => this.searchService.handleRemoveFace(data), + [JobName.STORAGE_TEMPLATE_MIGRATION]: () => this.storageTemplateService.handleMigration(), + [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: (data) => this.storageTemplateService.handleMigrationSingle(data), + [JobName.SYSTEM_CONFIG_CHANGE]: () => this.systemConfigService.refreshConfig(), + [JobName.QUEUE_GENERATE_THUMBNAILS]: (data) => this.mediaService.handleQueueGenerateThumbnails(data), + [JobName.GENERATE_JPEG_THUMBNAIL]: (data) => this.mediaService.handleGenerateJpegThumbnail(data), + [JobName.GENERATE_WEBP_THUMBNAIL]: (data) => this.mediaService.handleGenerateWepbThumbnail(data), + [JobName.QUEUE_VIDEO_CONVERSION]: (data) => this.mediaService.handleQueueVideoConversion(data), + [JobName.VIDEO_CONVERSION]: (data) => this.mediaService.handleVideoConversion(data), + [JobName.QUEUE_METADATA_EXTRACTION]: (data) => this.metadataProcessor.handleQueueMetadataExtraction(data), + [JobName.EXIF_EXTRACTION]: (data) => this.metadataProcessor.extractExifInfo(data), + [JobName.EXTRACT_VIDEO_METADATA]: (data) => this.metadataProcessor.extractVideoMetadata(data), + [JobName.QUEUE_RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleQueueRecognizeFaces(data), + [JobName.RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleRecognizeFaces(data), + [JobName.GENERATE_FACE_THUMBNAIL]: (data) => this.facialRecognitionService.handleGenerateFaceThumbnail(data), + [JobName.PERSON_CLEANUP]: () => this.personService.handlePersonCleanup(), + [JobName.QUEUE_SIDECAR]: (data) => this.metadataService.handleQueueSidecar(data), + [JobName.SIDECAR_DISCOVERY]: (data) => this.metadataService.handleSidecarDiscovery(data), + [JobName.SIDECAR_SYNC]: (data) => this.metadataService.handleSidecarSync(data), + }; + + async init() { + const queueSeen: Partial> = {}; + + for (const jobName of Object.values(JobName)) { + const handler = this.handlers[jobName]; + const queueName = JOBS_TO_QUEUE[jobName]; + const queue = this.moduleRef.get(getQueueToken(queueName), { strict: false }); + + // only set concurrency on the first job for a queue, since concurrency stacks + const seen = queueSeen[queueName]; + const concurrency = seen ? 0 : QUEUE_TO_CONCURRENCY[queueName]; + queueSeen[queueName] = true; + + await queue.isReady(); + + queue.process(jobName, concurrency, async (job): Promise => { + await handler(job.data); + }); + } + } +} diff --git a/server/apps/microservices/src/processors.ts b/server/apps/microservices/src/processors.ts deleted file mode 100644 index f621ad333f..0000000000 --- a/server/apps/microservices/src/processors.ts +++ /dev/null @@ -1,219 +0,0 @@ -import { - AssetService, - FacialRecognitionService, - IAssetFaceJob, - IAssetJob, - IBaseJob, - IBulkEntityJob, - IDeleteFilesJob, - IFaceThumbnailJob, - IUserDeletionJob, - JobName, - MediaService, - PersonService, - QueueName, - SearchService, - SmartInfoService, - StorageService, - StorageTemplateService, - SystemConfigService, - UserService, -} from '@app/domain'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; - -@Processor(QueueName.BACKGROUND_TASK) -export class BackgroundTaskProcessor { - constructor( - private assetService: AssetService, - private personService: PersonService, - private storageService: StorageService, - private systemConfigService: SystemConfigService, - private userService: UserService, - ) {} - - @Process(JobName.ASSET_UPLOADED) - async onAssetUpload(job: Job) { - await this.assetService.handleAssetUpload(job.data); - } - - @Process(JobName.DELETE_FILES) - async onDeleteFile(job: Job) { - await this.storageService.handleDeleteFiles(job.data); - } - - @Process(JobName.SYSTEM_CONFIG_CHANGE) - async onSystemConfigChange() { - await this.systemConfigService.refreshConfig(); - } - - @Process(JobName.USER_DELETE_CHECK) - async onUserDeleteCheck() { - await this.userService.handleUserDeleteCheck(); - } - - @Process(JobName.USER_DELETION) - async onUserDelete(job: Job) { - await this.userService.handleUserDelete(job.data); - } - - @Process(JobName.PERSON_CLEANUP) - async onPersonCleanup() { - await this.personService.handlePersonCleanup(); - } -} - -@Processor(QueueName.OBJECT_TAGGING) -export class ObjectTaggingProcessor { - constructor(private smartInfoService: SmartInfoService) {} - - @Process({ name: JobName.QUEUE_OBJECT_TAGGING, concurrency: 0 }) - async onQueueObjectTagging(job: Job) { - await this.smartInfoService.handleQueueObjectTagging(job.data); - } - - @Process({ name: JobName.DETECT_OBJECTS, concurrency: 1 }) - async onDetectObjects(job: Job) { - await this.smartInfoService.handleDetectObjects(job.data); - } - - @Process({ name: JobName.CLASSIFY_IMAGE, concurrency: 1 }) - async onClassifyImage(job: Job) { - await this.smartInfoService.handleClassifyImage(job.data); - } -} - -@Processor(QueueName.RECOGNIZE_FACES) -export class FacialRecognitionProcessor { - constructor(private facialRecognitionService: FacialRecognitionService) {} - - @Process({ name: JobName.QUEUE_RECOGNIZE_FACES, concurrency: 0 }) - async onQueueRecognizeFaces(job: Job) { - await this.facialRecognitionService.handleQueueRecognizeFaces(job.data); - } - - @Process({ name: JobName.RECOGNIZE_FACES, concurrency: 1 }) - async onRecognizeFaces(job: Job) { - await this.facialRecognitionService.handleRecognizeFaces(job.data); - } - - @Process({ name: JobName.GENERATE_FACE_THUMBNAIL, concurrency: 1 }) - async onGenerateFaceThumbnail(job: Job) { - await this.facialRecognitionService.handleGenerateFaceThumbnail(job.data); - } -} - -@Processor(QueueName.CLIP_ENCODING) -export class ClipEncodingProcessor { - constructor(private smartInfoService: SmartInfoService) {} - - @Process({ name: JobName.QUEUE_ENCODE_CLIP, concurrency: 0 }) - async onQueueClipEncoding(job: Job) { - await this.smartInfoService.handleQueueEncodeClip(job.data); - } - - @Process({ name: JobName.ENCODE_CLIP, concurrency: 1 }) - async onEncodeClip(job: Job) { - await this.smartInfoService.handleEncodeClip(job.data); - } -} - -@Processor(QueueName.SEARCH) -export class SearchIndexProcessor { - constructor(private searchService: SearchService) {} - - @Process(JobName.SEARCH_INDEX_ALBUMS) - async onIndexAlbums() { - await this.searchService.handleIndexAlbums(); - } - - @Process(JobName.SEARCH_INDEX_ASSETS) - async onIndexAssets() { - await this.searchService.handleIndexAssets(); - } - - @Process(JobName.SEARCH_INDEX_FACES) - async onIndexFaces() { - await this.searchService.handleIndexFaces(); - } - - @Process(JobName.SEARCH_INDEX_ALBUM) - onIndexAlbum(job: Job) { - this.searchService.handleIndexAlbum(job.data); - } - - @Process(JobName.SEARCH_INDEX_ASSET) - onIndexAsset(job: Job) { - this.searchService.handleIndexAsset(job.data); - } - - @Process(JobName.SEARCH_INDEX_FACE) - async onIndexFace(job: Job) { - await this.searchService.handleIndexFace(job.data); - } - - @Process(JobName.SEARCH_REMOVE_ALBUM) - onRemoveAlbum(job: Job) { - this.searchService.handleRemoveAlbum(job.data); - } - - @Process(JobName.SEARCH_REMOVE_ASSET) - onRemoveAsset(job: Job) { - this.searchService.handleRemoveAsset(job.data); - } - - @Process(JobName.SEARCH_REMOVE_FACE) - onRemoveFace(job: Job) { - this.searchService.handleRemoveFace(job.data); - } -} - -@Processor(QueueName.STORAGE_TEMPLATE_MIGRATION) -export class StorageTemplateMigrationProcessor { - constructor(private storageTemplateService: StorageTemplateService) {} - - @Process({ name: JobName.STORAGE_TEMPLATE_MIGRATION }) - async onTemplateMigration() { - await this.storageTemplateService.handleTemplateMigration(); - } - - @Process({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE }) - async onTemplateMigrationSingle(job: Job) { - await this.storageTemplateService.handleTemplateMigrationSingle(job.data); - } -} - -@Processor(QueueName.THUMBNAIL_GENERATION) -export class ThumbnailGeneratorProcessor { - constructor(private mediaService: MediaService) {} - - @Process({ name: JobName.QUEUE_GENERATE_THUMBNAILS, concurrency: 0 }) - async onQueueGenerateThumbnails(job: Job) { - await this.mediaService.handleQueueGenerateThumbnails(job.data); - } - - @Process({ name: JobName.GENERATE_JPEG_THUMBNAIL, concurrency: 3 }) - async onGenerateJpegThumbnail(job: Job) { - await this.mediaService.handleGenerateJpegThumbnail(job.data); - } - - @Process({ name: JobName.GENERATE_WEBP_THUMBNAIL, concurrency: 3 }) - async onGenerateWepbThumbnail(job: Job) { - await this.mediaService.handleGenerateWepbThumbnail(job.data); - } -} - -@Processor(QueueName.VIDEO_CONVERSION) -export class VideoTranscodeProcessor { - constructor(private mediaService: MediaService) {} - - @Process({ name: JobName.QUEUE_VIDEO_CONVERSION, concurrency: 0 }) - async onQueueVideoConversion(job: Job): Promise { - await this.mediaService.handleQueueVideoConversion(job.data); - } - - @Process({ name: JobName.VIDEO_CONVERSION, concurrency: 1 }) - async onVideoConversion(job: Job) { - await this.mediaService.handleVideoConversion(job.data); - } -} diff --git a/server/apps/microservices/src/processors/metadata-extraction.processor.ts b/server/apps/microservices/src/processors/metadata-extraction.processor.ts index 5cb1ca76e8..491aa92772 100644 --- a/server/apps/microservices/src/processors/metadata-extraction.processor.ts +++ b/server/apps/microservices/src/processors/metadata-extraction.processor.ts @@ -10,15 +10,12 @@ import { QueueName, usePagination, WithoutProperty, - WithProperty, } from '@app/domain'; import { AssetEntity, AssetType, ExifEntity } from '@app/infra/entities'; -import { Process, Processor } from '@nestjs/bull'; import { Inject, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { InjectRepository } from '@nestjs/typeorm'; import tz_lookup from '@photostructure/tz-lookup'; -import { Job } from 'bull'; import { ExifDateTime, exiftool, Tags } from 'exiftool-vendored'; import ffmpeg, { FfprobeData } from 'fluent-ffmpeg'; import { Duration } from 'luxon'; @@ -33,7 +30,6 @@ interface ImmichTags extends Tags { ContentIdentifier?: string; } -@Processor(QueueName.METADATA_EXTRACTION) export class MetadataExtractionProcessor { private logger = new Logger(MetadataExtractionProcessor.name); private assetCore: AssetCore; @@ -73,10 +69,9 @@ export class MetadataExtractionProcessor { } } - @Process(JobName.QUEUE_METADATA_EXTRACTION) - async handleQueueMetadataExtraction(job: Job) { + async handleQueueMetadataExtraction(job: IBaseJob) { try { - const { force } = job.data; + const { force } = job; const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { return force ? this.assetRepository.getAll(pagination) @@ -94,9 +89,8 @@ export class MetadataExtractionProcessor { } } - @Process(JobName.EXIF_EXTRACTION) - async extractExifInfo(job: Job) { - let asset = job.data.asset; + async extractExifInfo(job: IAssetJob) { + let asset = job.asset; try { const mediaExifData = await exiftool.read(asset.originalPath).catch((error: any) => { @@ -223,9 +217,8 @@ export class MetadataExtractionProcessor { } } - @Process({ name: JobName.EXTRACT_VIDEO_METADATA, concurrency: 2 }) - async extractVideoMetadata(job: Job) { - let asset = job.data.asset; + async extractVideoMetadata(job: IAssetJob) { + let asset = job.asset; if (!asset.isVisible) { return; @@ -370,83 +363,3 @@ export class MetadataExtractionProcessor { return Duration.fromObject({ seconds: videoDurationInSecond }).toFormat('hh:mm:ss.SSS'); } } - -@Processor(QueueName.SIDECAR) -export class SidecarProcessor { - private logger = new Logger(SidecarProcessor.name); - private assetCore: AssetCore; - - constructor( - @Inject(IAssetRepository) private assetRepository: IAssetRepository, - @Inject(IJobRepository) private jobRepository: IJobRepository, - ) { - this.assetCore = new AssetCore(assetRepository, jobRepository); - } - - @Process(JobName.QUEUE_SIDECAR) - async handleQueueSidecar(job: Job) { - try { - const { force } = job.data; - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getWith(pagination, WithProperty.SIDECAR) - : this.assetRepository.getWithout(pagination, WithoutProperty.SIDECAR); - }); - - for await (const assets of assetPagination) { - for (const asset of assets) { - const name = force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY; - await this.jobRepository.queue({ name, data: { asset } }); - } - } - } catch (error: any) { - this.logger.error(`Unable to queue sidecar scanning`, error?.stack); - } - } - - @Process(JobName.SIDECAR_SYNC) - async handleSidecarSync(job: Job) { - const { asset } = job.data; - if (!asset.isVisible) { - return; - } - - try { - const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; - await this.jobRepository.queue({ name, data: { asset } }); - } catch (error: any) { - this.logger.error(`Unable to queue metadata extraction`, error?.stack); - } - } - - @Process(JobName.SIDECAR_DISCOVERY) - async handleSidecarDiscovery(job: Job) { - let { asset } = job.data; - if (!asset.isVisible) { - return; - } - - if (asset.sidecarPath) { - return; - } - - try { - await fs.promises.access(`${asset.originalPath}.xmp`, fs.constants.W_OK); - - try { - asset = await this.assetCore.save({ id: asset.id, sidecarPath: `${asset.originalPath}.xmp` }); - // TODO: optimize to only queue assets with recent xmp changes - const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; - await this.jobRepository.queue({ name, data: { asset } }); - } catch (error: any) { - this.logger.error(`Unable to sync sidecar`, error?.stack); - } - } catch (error: any) { - if (error.code == 'EACCES') { - this.logger.error(`Unable to queue metadata extraction, file is not writable`, error?.stack); - } - - return; - } - } -} diff --git a/server/libs/domain/src/domain.module.ts b/server/libs/domain/src/domain.module.ts index d06103b0ee..9aea15b447 100644 --- a/server/libs/domain/src/domain.module.ts +++ b/server/libs/domain/src/domain.module.ts @@ -6,6 +6,7 @@ import { AuthService } from './auth'; import { FacialRecognitionService } from './facial-recognition'; import { JobService } from './job'; import { MediaService } from './media'; +import { MetadataService } from './metadata'; import { OAuthService } from './oauth'; import { PartnerService } from './partner'; import { PersonService } from './person'; @@ -26,6 +27,7 @@ const providers: Provider[] = [ FacialRecognitionService, JobService, MediaService, + MetadataService, OAuthService, PersonService, PartnerService, diff --git a/server/libs/domain/src/facial-recognition/facial-recognition.services.ts b/server/libs/domain/src/facial-recognition/facial-recognition.services.ts index 8f5d603087..36b60f7d34 100644 --- a/server/libs/domain/src/facial-recognition/facial-recognition.services.ts +++ b/server/libs/domain/src/facial-recognition/facial-recognition.services.ts @@ -104,7 +104,7 @@ export class FacialRecognitionService { const [asset] = await this.assetRepository.getByIds([assetId]); if (!asset || !asset.resizePath) { this.logger.warn(`Asset not found for facial cropping: ${assetId}`); - return null; + return; } this.logger.verbose(`Cropping face for person: ${personId}`); diff --git a/server/libs/domain/src/job/job.constants.ts b/server/libs/domain/src/job/job.constants.ts index 9edd1df96c..df7a7e7aa4 100644 --- a/server/libs/domain/src/job/job.constants.ts +++ b/server/libs/domain/src/job/job.constants.ts @@ -81,3 +81,79 @@ export enum JobName { } export const JOBS_ASSET_PAGINATION_SIZE = 1000; + +export const JOBS_TO_QUEUE: Record = { + // misc + [JobName.ASSET_UPLOADED]: QueueName.BACKGROUND_TASK, + [JobName.USER_DELETE_CHECK]: QueueName.BACKGROUND_TASK, + [JobName.USER_DELETION]: QueueName.BACKGROUND_TASK, + [JobName.DELETE_FILES]: QueueName.BACKGROUND_TASK, + [JobName.PERSON_CLEANUP]: QueueName.BACKGROUND_TASK, + + // conversion + [JobName.QUEUE_VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION, + [JobName.VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION, + + // thumbnails + [JobName.QUEUE_GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION, + [JobName.GENERATE_JPEG_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION, + [JobName.GENERATE_WEBP_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION, + + // metadata + [JobName.QUEUE_METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION, + [JobName.EXIF_EXTRACTION]: QueueName.METADATA_EXTRACTION, + [JobName.EXTRACT_VIDEO_METADATA]: QueueName.METADATA_EXTRACTION, + + // storage template + [JobName.STORAGE_TEMPLATE_MIGRATION]: QueueName.STORAGE_TEMPLATE_MIGRATION, + [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: QueueName.STORAGE_TEMPLATE_MIGRATION, + [JobName.SYSTEM_CONFIG_CHANGE]: QueueName.STORAGE_TEMPLATE_MIGRATION, + + // object tagging + [JobName.QUEUE_OBJECT_TAGGING]: QueueName.OBJECT_TAGGING, + [JobName.DETECT_OBJECTS]: QueueName.OBJECT_TAGGING, + [JobName.CLASSIFY_IMAGE]: QueueName.OBJECT_TAGGING, + + // facial recognition + [JobName.QUEUE_RECOGNIZE_FACES]: QueueName.RECOGNIZE_FACES, + [JobName.RECOGNIZE_FACES]: QueueName.RECOGNIZE_FACES, + [JobName.GENERATE_FACE_THUMBNAIL]: QueueName.RECOGNIZE_FACES, + + // clip + [JobName.QUEUE_ENCODE_CLIP]: QueueName.CLIP_ENCODING, + [JobName.ENCODE_CLIP]: QueueName.CLIP_ENCODING, + + // search - albums + [JobName.SEARCH_INDEX_ALBUMS]: QueueName.SEARCH, + [JobName.SEARCH_INDEX_ALBUM]: QueueName.SEARCH, + [JobName.SEARCH_REMOVE_ALBUM]: QueueName.SEARCH, + + // search - assets + [JobName.SEARCH_INDEX_ASSETS]: QueueName.SEARCH, + [JobName.SEARCH_INDEX_ASSET]: QueueName.SEARCH, + [JobName.SEARCH_REMOVE_ASSET]: QueueName.SEARCH, + + // search - faces + [JobName.SEARCH_INDEX_FACES]: QueueName.SEARCH, + [JobName.SEARCH_INDEX_FACE]: QueueName.SEARCH, + [JobName.SEARCH_REMOVE_FACE]: QueueName.SEARCH, + + // XMP sidecars + [JobName.QUEUE_SIDECAR]: QueueName.SIDECAR, + [JobName.SIDECAR_DISCOVERY]: QueueName.SIDECAR, + [JobName.SIDECAR_SYNC]: QueueName.SIDECAR, +}; + +// max concurrency for each queue (total concurrency across all jobs) +export const QUEUE_TO_CONCURRENCY: Record = { + [QueueName.BACKGROUND_TASK]: 5, + [QueueName.CLIP_ENCODING]: 2, + [QueueName.METADATA_EXTRACTION]: 5, + [QueueName.OBJECT_TAGGING]: 2, + [QueueName.RECOGNIZE_FACES]: 2, + [QueueName.SEARCH]: 5, + [QueueName.SIDECAR]: 5, + [QueueName.STORAGE_TEMPLATE_MIGRATION]: 5, + [QueueName.THUMBNAIL_GENERATION]: 5, + [QueueName.VIDEO_CONVERSION]: 1, +}; diff --git a/server/libs/domain/src/metadata/index.ts b/server/libs/domain/src/metadata/index.ts index 14ac8b01e9..aa56749dac 100644 --- a/server/libs/domain/src/metadata/index.ts +++ b/server/libs/domain/src/metadata/index.ts @@ -1 +1,2 @@ export * from './geocoding.repository'; +export * from './metadata.service'; diff --git a/server/libs/domain/src/metadata/metadata.service.spec.ts b/server/libs/domain/src/metadata/metadata.service.spec.ts new file mode 100644 index 0000000000..6c1c3af7b1 --- /dev/null +++ b/server/libs/domain/src/metadata/metadata.service.spec.ts @@ -0,0 +1,140 @@ +import { constants } from 'fs/promises'; +import { assetEntityStub, newAssetRepositoryMock, newJobRepositoryMock, newStorageRepositoryMock } from '../../test'; +import { IAssetRepository, WithoutProperty, WithProperty } from '../asset'; +import { IJobRepository, JobName } from '../job'; +import { IStorageRepository } from '../storage'; +import { MetadataService } from './metadata.service'; + +describe(MetadataService.name, () => { + let sut: MetadataService; + let assetMock: jest.Mocked; + let jobMock: jest.Mocked; + let storageMock: jest.Mocked; + + beforeEach(async () => { + assetMock = newAssetRepositoryMock(); + jobMock = newJobRepositoryMock(); + storageMock = newStorageRepositoryMock(); + + sut = new MetadataService(assetMock, jobMock, storageMock); + }); + + it('should be defined', () => { + expect(sut).toBeDefined(); + }); + + describe('handleQueueSidecar', () => { + it('should queue assets with sidecar files', async () => { + assetMock.getWith.mockResolvedValue({ items: [assetEntityStub.sidecar], hasNextPage: false }); + + await sut.handleQueueSidecar({ force: true }); + + expect(assetMock.getWith).toHaveBeenCalledWith({ take: 1000, skip: 0 }, WithProperty.SIDECAR); + expect(assetMock.getWithout).not.toHaveBeenCalled(); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.SIDECAR_SYNC, + data: { asset: assetEntityStub.sidecar }, + }); + }); + + it('should queue assets without sidecar files', async () => { + assetMock.getWithout.mockResolvedValue({ items: [assetEntityStub.image], hasNextPage: false }); + + await sut.handleQueueSidecar({ force: false }); + + expect(assetMock.getWithout).toHaveBeenCalledWith({ take: 1000, skip: 0 }, WithoutProperty.SIDECAR); + expect(assetMock.getWith).not.toHaveBeenCalled(); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.SIDECAR_DISCOVERY, + data: { asset: assetEntityStub.image }, + }); + }); + + it('should log an error', async () => { + assetMock.getWith.mockRejectedValue(new Error('database unavailable')); + await sut.handleQueueSidecar({ force: true }); + expect(jobMock.queue).not.toHaveBeenCalled(); + }); + }); + + describe('handleSidecarSync', () => { + it('should skip hidden assets', async () => { + await sut.handleSidecarSync({ asset: assetEntityStub.livePhotoMotionAsset }); + expect(jobMock.queue).not.toHaveBeenCalled(); + }); + + it('should handle video assets', async () => { + await sut.handleSidecarSync({ asset: assetEntityStub.video }); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.EXTRACT_VIDEO_METADATA, + data: { asset: assetEntityStub.video }, + }); + }); + + it('should handle image assets', async () => { + await sut.handleSidecarSync({ asset: assetEntityStub.image }); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.EXIF_EXTRACTION, + data: { asset: assetEntityStub.image }, + }); + }); + + it('should log an error', async () => { + jobMock.queue.mockRejectedValue(new Error('queue job failed')); + await sut.handleSidecarSync({ asset: assetEntityStub.image }); + }); + }); + + describe('handleSidecarDiscovery', () => { + it('should skip hidden assets', async () => { + await sut.handleSidecarDiscovery({ asset: assetEntityStub.livePhotoMotionAsset }); + expect(storageMock.checkFileExists).not.toHaveBeenCalled(); + }); + + it('should skip assets with a sidecar path', async () => { + await sut.handleSidecarDiscovery({ asset: assetEntityStub.sidecar }); + expect(storageMock.checkFileExists).not.toHaveBeenCalled(); + }); + + it('should do nothing when a sidecar is not found ', async () => { + storageMock.checkFileExists.mockResolvedValue(false); + await sut.handleSidecarDiscovery({ asset: assetEntityStub.image }); + expect(assetMock.save).not.toHaveBeenCalled(); + }); + + it('should update a image asset when a sidecar is found', async () => { + assetMock.save.mockResolvedValue(assetEntityStub.image); + storageMock.checkFileExists.mockResolvedValue(true); + await sut.handleSidecarDiscovery({ asset: assetEntityStub.image }); + expect(storageMock.checkFileExists).toHaveBeenCalledWith('/original/path.ext.xmp', constants.W_OK); + expect(assetMock.save).toHaveBeenCalledWith({ + id: assetEntityStub.image.id, + sidecarPath: '/original/path.ext.xmp', + }); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.EXIF_EXTRACTION, + data: { asset: assetEntityStub.image }, + }); + }); + + it('should update a video asset when a sidecar is found', async () => { + assetMock.save.mockResolvedValue(assetEntityStub.video); + storageMock.checkFileExists.mockResolvedValue(true); + await sut.handleSidecarDiscovery({ asset: assetEntityStub.video }); + expect(storageMock.checkFileExists).toHaveBeenCalledWith('/original/path.ext.xmp', constants.W_OK); + expect(assetMock.save).toHaveBeenCalledWith({ + id: assetEntityStub.image.id, + sidecarPath: '/original/path.ext.xmp', + }); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.EXTRACT_VIDEO_METADATA, + data: { asset: assetEntityStub.video }, + }); + }); + + it('should log an error', async () => { + storageMock.checkFileExists.mockRejectedValue(new Error('bad permission')); + await sut.handleSidecarDiscovery({ asset: assetEntityStub.image }); + }); + }); +}); diff --git a/server/libs/domain/src/metadata/metadata.service.ts b/server/libs/domain/src/metadata/metadata.service.ts new file mode 100644 index 0000000000..9d4363ce51 --- /dev/null +++ b/server/libs/domain/src/metadata/metadata.service.ts @@ -0,0 +1,77 @@ +import { AssetType } from '@app/infra/entities'; +import { Inject, Logger } from '@nestjs/common'; +import { constants } from 'fs/promises'; +import { AssetCore, IAssetRepository, WithoutProperty, WithProperty } from '../asset'; +import { usePagination } from '../domain.util'; +import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job'; +import { IStorageRepository } from '../storage'; + +export class MetadataService { + private logger = new Logger(MetadataService.name); + private assetCore: AssetCore; + + constructor( + @Inject(IAssetRepository) private assetRepository: IAssetRepository, + @Inject(IJobRepository) private jobRepository: IJobRepository, + @Inject(IStorageRepository) private storageRepository: IStorageRepository, + ) { + this.assetCore = new AssetCore(assetRepository, jobRepository); + } + + async handleQueueSidecar(job: IBaseJob) { + try { + const { force } = job; + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getWith(pagination, WithProperty.SIDECAR) + : this.assetRepository.getWithout(pagination, WithoutProperty.SIDECAR); + }); + + for await (const assets of assetPagination) { + for (const asset of assets) { + const name = force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY; + await this.jobRepository.queue({ name, data: { asset } }); + } + } + } catch (error: any) { + this.logger.error(`Unable to queue sidecar scanning`, error?.stack); + } + } + + async handleSidecarSync(job: IAssetJob) { + const { asset } = job; + if (!asset.isVisible) { + return; + } + + try { + const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; + await this.jobRepository.queue({ name, data: { asset } }); + } catch (error: any) { + this.logger.error(`Unable to queue metadata extraction`, error?.stack); + } + } + + async handleSidecarDiscovery(job: IAssetJob) { + let { asset } = job; + if (!asset.isVisible || asset.sidecarPath) { + return; + } + + try { + const sidecarPath = `${asset.originalPath}.xmp`; + const exists = await this.storageRepository.checkFileExists(sidecarPath, constants.W_OK); + if (!exists) { + return; + } + + asset = await this.assetCore.save({ id: asset.id, sidecarPath }); + // TODO: optimize to only queue assets with recent xmp changes + const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION; + await this.jobRepository.queue({ name, data: { asset } }); + } catch (error: any) { + this.logger.error(`Unable to queue metadata extraction: ${error}`, error?.stack); + return; + } + } +} diff --git a/server/libs/domain/src/storage-template/storage-template.service.spec.ts b/server/libs/domain/src/storage-template/storage-template.service.spec.ts index 6200f3f97d..8eea783184 100644 --- a/server/libs/domain/src/storage-template/storage-template.service.spec.ts +++ b/server/libs/domain/src/storage-template/storage-template.service.spec.ts @@ -42,7 +42,7 @@ describe(StorageTemplateService.name, () => { }); userMock.getList.mockResolvedValue([]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); }); @@ -63,7 +63,7 @@ describe(StorageTemplateService.name, () => { .calledWith('upload/library/user-id/2023/2023-02-23/asset-id+1.ext') .mockResolvedValue(false); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); expect(storageMock.checkFileExists).toHaveBeenCalledTimes(2); @@ -86,7 +86,7 @@ describe(StorageTemplateService.name, () => { }); userMock.getList.mockResolvedValue([userEntityStub.user1]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); expect(storageMock.moveFile).not.toHaveBeenCalled(); @@ -106,7 +106,7 @@ describe(StorageTemplateService.name, () => { }); userMock.getList.mockResolvedValue([userEntityStub.user1]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); expect(storageMock.moveFile).not.toHaveBeenCalled(); @@ -122,7 +122,7 @@ describe(StorageTemplateService.name, () => { assetMock.save.mockResolvedValue(assetEntityStub.image); userMock.getList.mockResolvedValue([userEntityStub.user1]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); expect(storageMock.moveFile).toHaveBeenCalledWith( @@ -143,7 +143,7 @@ describe(StorageTemplateService.name, () => { assetMock.save.mockResolvedValue(assetEntityStub.image); userMock.getList.mockResolvedValue([userEntityStub.storageLabel]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); expect(storageMock.moveFile).toHaveBeenCalledWith( @@ -164,7 +164,7 @@ describe(StorageTemplateService.name, () => { storageMock.moveFile.mockRejectedValue(new Error('Read only system')); userMock.getList.mockResolvedValue([userEntityStub.user1]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); expect(storageMock.moveFile).toHaveBeenCalledWith( @@ -182,7 +182,7 @@ describe(StorageTemplateService.name, () => { assetMock.save.mockRejectedValue('Connection Error!'); userMock.getList.mockResolvedValue([userEntityStub.user1]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); expect(assetMock.getAll).toHaveBeenCalled(); expect(assetMock.save).toHaveBeenCalledWith({ @@ -200,6 +200,6 @@ describe(StorageTemplateService.name, () => { storageMock.removeEmptyDirs.mockRejectedValue(new Error('Read only filesystem')); userMock.getList.mockResolvedValue([]); - await sut.handleTemplateMigration(); + await sut.handleMigration(); }); }); diff --git a/server/libs/domain/src/storage-template/storage-template.service.ts b/server/libs/domain/src/storage-template/storage-template.service.ts index e62ade1d3c..f8e2ca7a59 100644 --- a/server/libs/domain/src/storage-template/storage-template.service.ts +++ b/server/libs/domain/src/storage-template/storage-template.service.ts @@ -29,7 +29,7 @@ export class StorageTemplateService { this.core = new StorageTemplateCore(configRepository, config, storageRepository); } - async handleTemplateMigrationSingle(data: IAssetJob) { + async handleMigrationSingle(data: IAssetJob) { const { asset } = data; try { @@ -49,7 +49,7 @@ export class StorageTemplateService { } } - async handleTemplateMigration() { + async handleMigration() { try { console.time('migrating-time'); diff --git a/server/libs/domain/src/storage/storage.repository.ts b/server/libs/domain/src/storage/storage.repository.ts index 7765e10ca3..4ff1b5c018 100644 --- a/server/libs/domain/src/storage/storage.repository.ts +++ b/server/libs/domain/src/storage/storage.repository.ts @@ -20,7 +20,7 @@ export interface IStorageRepository { unlinkDir(folder: string, options?: { recursive?: boolean; force?: boolean }): Promise; removeEmptyDirs(folder: string): Promise; moveFile(source: string, target: string): Promise; - checkFileExists(filepath: string): Promise; + checkFileExists(filepath: string, mode?: number): Promise; mkdirSync(filepath: string): void; checkDiskUsage(folder: string): Promise; } diff --git a/server/libs/domain/test/fixtures.ts b/server/libs/domain/test/fixtures.ts index 2ab37507cb..491e433d5a 100644 --- a/server/libs/domain/test/fixtures.ts +++ b/server/libs/domain/test/fixtures.ts @@ -277,6 +277,35 @@ export const assetEntityStub = { longitude: 100, } as ExifEntity, }), + sidecar: Object.freeze({ + id: 'asset-id', + deviceAssetId: 'device-asset-id', + fileModifiedAt: '2023-02-23T05:06:29.716Z', + fileCreatedAt: '2023-02-23T05:06:29.716Z', + owner: userEntityStub.user1, + ownerId: 'user-id', + deviceId: 'device-id', + originalPath: '/original/path.ext', + resizePath: '/uploads/user-id/thumbs/path.ext', + checksum: Buffer.from('file hash', 'utf8'), + type: AssetType.IMAGE, + webpPath: null, + encodedVideoPath: null, + createdAt: '2023-02-23T05:06:29.716Z', + updatedAt: '2023-02-23T05:06:29.716Z', + mimeType: null, + isFavorite: true, + isArchived: false, + duration: null, + isVisible: true, + livePhotoVideo: null, + livePhotoVideoId: null, + tags: [], + sharedLinks: [], + originalFileName: 'asset-id.ext', + faces: [], + sidecarPath: '/original/path.ext.xmp', + }), }; export const albumStub = { diff --git a/server/libs/infra/src/repositories/filesystem.provider.ts b/server/libs/infra/src/repositories/filesystem.provider.ts index dc85075db0..e3c8c4d615 100644 --- a/server/libs/infra/src/repositories/filesystem.provider.ts +++ b/server/libs/infra/src/repositories/filesystem.provider.ts @@ -22,9 +22,9 @@ export class FilesystemProvider implements IStorageRepository { await moveFile(source, destination, { mkdirp: true, clobber: false }); } - async checkFileExists(filepath: string): Promise { + async checkFileExists(filepath: string, mode = constants.F_OK): Promise { try { - await fs.access(filepath, constants.F_OK); + await fs.access(filepath, mode); return true; } catch (_) { return false; diff --git a/server/libs/infra/src/repositories/job.repository.ts b/server/libs/infra/src/repositories/job.repository.ts index c0ea801c38..2962222822 100644 --- a/server/libs/infra/src/repositories/job.repository.ts +++ b/server/libs/infra/src/repositories/job.repository.ts @@ -1,38 +1,15 @@ -import { IAssetJob, IBaseJob, IJobRepository, JobCounts, JobItem, JobName, QueueName, QueueStatus } from '@app/domain'; -import { InjectQueue } from '@nestjs/bull'; -import { Logger } from '@nestjs/common'; -import { Queue, type JobCounts as BullJobCounts } from 'bull'; +import { IJobRepository, JobCounts, JobItem, JobName, JOBS_TO_QUEUE, QueueName, QueueStatus } from '@app/domain'; +import { getQueueToken } from '@nestjs/bull'; +import { Injectable } from '@nestjs/common'; +import { ModuleRef } from '@nestjs/core'; +import { JobOptions, Queue, type JobCounts as BullJobCounts } from 'bull'; +@Injectable() export class JobRepository implements IJobRepository { - private logger = new Logger(JobRepository.name); - private queueMap: Record = { - [QueueName.STORAGE_TEMPLATE_MIGRATION]: this.storageTemplateMigration, - [QueueName.THUMBNAIL_GENERATION]: this.generateThumbnail, - [QueueName.METADATA_EXTRACTION]: this.metadataExtraction, - [QueueName.OBJECT_TAGGING]: this.objectTagging, - [QueueName.RECOGNIZE_FACES]: this.recognizeFaces, - [QueueName.CLIP_ENCODING]: this.clipEmbedding, - [QueueName.VIDEO_CONVERSION]: this.videoTranscode, - [QueueName.BACKGROUND_TASK]: this.backgroundTask, - [QueueName.SEARCH]: this.searchIndex, - [QueueName.SIDECAR]: this.sidecar, - }; - - constructor( - @InjectQueue(QueueName.BACKGROUND_TASK) private backgroundTask: Queue, - @InjectQueue(QueueName.OBJECT_TAGGING) private objectTagging: Queue, - @InjectQueue(QueueName.CLIP_ENCODING) private clipEmbedding: Queue, - @InjectQueue(QueueName.METADATA_EXTRACTION) private metadataExtraction: Queue, - @InjectQueue(QueueName.RECOGNIZE_FACES) private recognizeFaces: Queue, - @InjectQueue(QueueName.STORAGE_TEMPLATE_MIGRATION) private storageTemplateMigration: Queue, - @InjectQueue(QueueName.THUMBNAIL_GENERATION) private generateThumbnail: Queue, - @InjectQueue(QueueName.VIDEO_CONVERSION) private videoTranscode: Queue, - @InjectQueue(QueueName.SEARCH) private searchIndex: Queue, - @InjectQueue(QueueName.SIDECAR) private sidecar: Queue, - ) {} + constructor(private moduleRef: ModuleRef) {} async getQueueStatus(name: QueueName): Promise { - const queue = this.queueMap[name]; + const queue = this.getQueue(name); return { isActive: !!(await queue.getActiveCount()), @@ -41,116 +18,45 @@ export class JobRepository implements IJobRepository { } pause(name: QueueName) { - return this.queueMap[name].pause(); + return this.getQueue(name).pause(); } resume(name: QueueName) { - return this.queueMap[name].resume(); + return this.getQueue(name).resume(); } empty(name: QueueName) { - return this.queueMap[name].empty(); + return this.getQueue(name).empty(); } getJobCounts(name: QueueName): Promise { // Typecast needed because the `paused` key is missing from Bull's // type definition. Can be removed once fixed upstream. - return this.queueMap[name].getJobCounts() as Promise; + return this.getQueue(name).getJobCounts() as Promise; } async queue(item: JobItem): Promise { + const jobName = item.name; + const jobData = (item as { data?: any })?.data || {}; + const jobOptions = this.getJobOptions(item) || undefined; + + await this.getQueue(JOBS_TO_QUEUE[jobName]).add(jobName, jobData, jobOptions); + } + + private getJobOptions(item: JobItem): JobOptions | null { switch (item.name) { case JobName.ASSET_UPLOADED: - await this.backgroundTask.add(item.name, item.data, { jobId: item.data.asset.id }); - break; - - case JobName.DELETE_FILES: - await this.backgroundTask.add(item.name, item.data); - break; - - case JobName.QUEUE_OBJECT_TAGGING: - case JobName.DETECT_OBJECTS: - case JobName.CLASSIFY_IMAGE: - await this.objectTagging.add(item.name, item.data); - break; - - case JobName.QUEUE_ENCODE_CLIP: - case JobName.ENCODE_CLIP: - await this.clipEmbedding.add(item.name, item.data); - break; - - case JobName.QUEUE_METADATA_EXTRACTION: - case JobName.EXIF_EXTRACTION: - case JobName.EXTRACT_VIDEO_METADATA: - await this.metadataExtraction.add(item.name, item.data); - break; - - case JobName.QUEUE_SIDECAR: - case JobName.SIDECAR_DISCOVERY: - case JobName.SIDECAR_SYNC: - await this.sidecar.add(item.name, item.data); - break; - - case JobName.QUEUE_RECOGNIZE_FACES: - case JobName.RECOGNIZE_FACES: - await this.recognizeFaces.add(item.name, item.data); - break; + return { jobId: item.data.asset.id }; case JobName.GENERATE_FACE_THUMBNAIL: - await this.recognizeFaces.add(item.name, item.data, { priority: 1 }); - break; - - case JobName.PERSON_CLEANUP: - await this.backgroundTask.add(item.name); - break; - - case JobName.QUEUE_GENERATE_THUMBNAILS: - case JobName.GENERATE_JPEG_THUMBNAIL: - case JobName.GENERATE_WEBP_THUMBNAIL: - await this.generateThumbnail.add(item.name, item.data); - break; - - case JobName.USER_DELETION: - await this.backgroundTask.add(item.name, item.data); - break; - - case JobName.STORAGE_TEMPLATE_MIGRATION: - await this.storageTemplateMigration.add(item.name); - break; - - case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: - await this.storageTemplateMigration.add(item.name, item.data); - break; - - case JobName.SYSTEM_CONFIG_CHANGE: - await this.backgroundTask.add(item.name, {}); - break; - - case JobName.QUEUE_VIDEO_CONVERSION: - case JobName.VIDEO_CONVERSION: - await this.videoTranscode.add(item.name, item.data); - break; - - case JobName.SEARCH_INDEX_ASSETS: - case JobName.SEARCH_INDEX_ALBUMS: - case JobName.SEARCH_INDEX_FACES: - await this.searchIndex.add(item.name, {}); - break; - - case JobName.SEARCH_INDEX_ASSET: - case JobName.SEARCH_INDEX_ALBUM: - case JobName.SEARCH_INDEX_FACE: - await this.searchIndex.add(item.name, item.data); - break; - - case JobName.SEARCH_REMOVE_ALBUM: - case JobName.SEARCH_REMOVE_ASSET: - case JobName.SEARCH_REMOVE_FACE: - await this.searchIndex.add(item.name, item.data); - break; + return { priority: 1 }; default: - this.logger.error('Invalid job', item); + return null; } } + + private getQueue(queue: QueueName) { + return this.moduleRef.get(getQueueToken(queue), { strict: false }); + } }