diff --git a/server/apps/immich/src/api-v1/asset/asset.service.ts b/server/apps/immich/src/api-v1/asset/asset.service.ts index 6f2138ea59..f036264915 100644 --- a/server/apps/immich/src/api-v1/asset/asset.service.ts +++ b/server/apps/immich/src/api-v1/asset/asset.service.ts @@ -10,7 +10,7 @@ import { StreamableFile, } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { createHash, randomUUID } from 'node:crypto'; +import { createHash } from 'node:crypto'; import { QueryFailedError, Repository } from 'typeorm'; import { AuthUserDto } from '../../decorators/auth-user.decorator'; import { AssetEntity, AssetType, SharedLinkType } from '@app/infra'; @@ -43,13 +43,7 @@ import { CheckExistingAssetsResponseDto } from './response-dto/check-existing-as import { UpdateAssetDto } from './dto/update-asset.dto'; import { AssetFileUploadResponseDto } from './response-dto/asset-file-upload-response.dto'; import { BackgroundTaskService } from '../../modules/background-task/background-task.service'; -import { - assetUploadedProcessorName, - IAssetUploadedJob, - IVideoTranscodeJob, - mp4ConversionProcessorName, - QueueNameEnum, -} from '@app/job'; +import { IAssetUploadedJob, IVideoTranscodeJob, QueueName, JobName } from '@app/job'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { DownloadService } from '../../modules/download/download.service'; @@ -80,16 +74,16 @@ export class AssetService { private backgroundTaskService: BackgroundTaskService, - @InjectQueue(QueueNameEnum.ASSET_UPLOADED) + @InjectQueue(QueueName.ASSET_UPLOADED) private assetUploadedQueue: Queue, - @InjectQueue(QueueNameEnum.VIDEO_CONVERSION) + @InjectQueue(QueueName.VIDEO_CONVERSION) private videoConversionQueue: Queue, private downloadService: DownloadService, private storageService: StorageService, - @Inject(ISharedLinkRepository) private sharedLinkRepository: ISharedLinkRepository, + @Inject(ISharedLinkRepository) sharedLinkRepository: ISharedLinkRepository, ) { this.shareCore = new ShareCore(sharedLinkRepository); } @@ -128,11 +122,7 @@ export class AssetService { await this.storageService.moveAsset(livePhotoAssetEntity, originalAssetData.originalname); - await this.videoConversionQueue.add( - mp4ConversionProcessorName, - { asset: livePhotoAssetEntity }, - { jobId: randomUUID() }, - ); + await this.videoConversionQueue.add(JobName.MP4_CONVERSION, { asset: livePhotoAssetEntity }); } const assetEntity = await this.createUserAsset( @@ -157,7 +147,7 @@ export class AssetService { const movedAsset = await this.storageService.moveAsset(assetEntity, originalAssetData.originalname); await this.assetUploadedQueue.add( - assetUploadedProcessorName, + JobName.ASSET_UPLOADED, { asset: movedAsset, fileName: originalAssetData.originalname }, { jobId: movedAsset.id }, ); diff --git a/server/apps/immich/src/api-v1/job/job.service.ts b/server/apps/immich/src/api-v1/job/job.service.ts index 42d34262c2..b09db2a849 100644 --- a/server/apps/immich/src/api-v1/job/job.service.ts +++ b/server/apps/immich/src/api-v1/job/job.service.ts @@ -1,19 +1,8 @@ -import { - exifExtractionProcessorName, - generateJPEGThumbnailProcessorName, - IMetadataExtractionJob, - IThumbnailGenerationJob, - IVideoTranscodeJob, - MachineLearningJobNameEnum, - QueueNameEnum, - templateMigrationProcessorName, - videoMetadataExtractionProcessorName, -} from '@app/job'; +import { IMetadataExtractionJob, IThumbnailGenerationJob, IVideoTranscodeJob, QueueName, JobName } from '@app/job'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { BadRequestException, Inject, Injectable } from '@nestjs/common'; import { AllJobStatusResponseDto } from './response-dto/all-job-status-response.dto'; -import { randomUUID } from 'crypto'; import { IAssetRepository } from '../asset/asset-repository'; import { AssetType } from '@app/infra'; import { GetJobDto, JobId } from './dto/get-job.dto'; @@ -24,20 +13,20 @@ import { StorageService } from '@app/storage'; @Injectable() export class JobService { constructor( - @InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION) + @InjectQueue(QueueName.THUMBNAIL_GENERATION) private thumbnailGeneratorQueue: Queue, - @InjectQueue(QueueNameEnum.METADATA_EXTRACTION) + @InjectQueue(QueueName.METADATA_EXTRACTION) private metadataExtractionQueue: Queue, - @InjectQueue(QueueNameEnum.VIDEO_CONVERSION) + @InjectQueue(QueueName.VIDEO_CONVERSION) private videoConversionQueue: Queue, - @InjectQueue(QueueNameEnum.MACHINE_LEARNING) + @InjectQueue(QueueName.MACHINE_LEARNING) private machineLearningQueue: Queue, - @InjectQueue(QueueNameEnum.STORAGE_MIGRATION) - private storageMigrationQueue: Queue, + @InjectQueue(QueueName.CONFIG) + private configQueue: Queue, @Inject(IAssetRepository) private _assetRepository: IAssetRepository, @@ -47,7 +36,7 @@ export class JobService { this.thumbnailGeneratorQueue.empty(); this.metadataExtractionQueue.empty(); this.videoConversionQueue.empty(); - this.storageMigrationQueue.empty(); + this.configQueue.empty(); } async startJob(jobDto: GetJobDto): Promise { @@ -72,7 +61,7 @@ export class JobService { const metadataExtractionJobCount = await this.metadataExtractionQueue.getJobCounts(); const videoConversionJobCount = await this.videoConversionQueue.getJobCounts(); const machineLearningJobCount = await this.machineLearningQueue.getJobCounts(); - const storageMigrationJobCount = await this.storageMigrationQueue.getJobCounts(); + const storageMigrationJobCount = await this.configQueue.getJobCounts(); const response = new AllJobStatusResponseDto(); response.isThumbnailGenerationActive = Boolean(thumbnailGeneratorJobCount.waiting); @@ -108,8 +97,8 @@ export class JobService { } if (query.jobId === JobId.STORAGE_TEMPLATE_MIGRATION) { - response.isActive = Boolean((await this.storageMigrationQueue.getJobCounts()).waiting); - response.queueCount = await this.storageMigrationQueue.getJobCounts(); + response.isActive = Boolean((await this.configQueue.getJobCounts()).waiting); + response.queueCount = await this.configQueue.getJobCounts(); } return response; @@ -130,7 +119,7 @@ export class JobService { this.machineLearningQueue.empty(); return 0; case JobId.STORAGE_TEMPLATE_MIGRATION: - this.storageMigrationQueue.empty(); + this.configQueue.empty(); return 0; default: throw new BadRequestException('Invalid job id'); @@ -147,7 +136,7 @@ export class JobService { const assetsWithNoThumbnail = await this._assetRepository.getAssetWithNoThumbnail(); for (const asset of assetsWithNoThumbnail) { - await this.thumbnailGeneratorQueue.add(generateJPEGThumbnailProcessorName, { asset }, { jobId: randomUUID() }); + await this.thumbnailGeneratorQueue.add(JobName.GENERATE_JPEG_THUMBNAIL, { asset }); } return assetsWithNoThumbnail.length; @@ -163,17 +152,9 @@ export class JobService { const assetsWithNoExif = await this._assetRepository.getAssetWithNoEXIF(); for (const asset of assetsWithNoExif) { if (asset.type === AssetType.VIDEO) { - await this.metadataExtractionQueue.add( - videoMetadataExtractionProcessorName, - { asset, fileName: asset.id }, - { jobId: randomUUID() }, - ); + await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName: asset.id }); } else { - await this.metadataExtractionQueue.add( - exifExtractionProcessorName, - { asset, fileName: asset.id }, - { jobId: randomUUID() }, - ); + await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName: asset.id }); } } return assetsWithNoExif.length; @@ -189,25 +170,21 @@ export class JobService { const assetWithNoSmartInfo = await this._assetRepository.getAssetWithNoSmartInfo(); for (const asset of assetWithNoSmartInfo) { - await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() }); - await this.machineLearningQueue.add( - MachineLearningJobNameEnum.OBJECT_DETECTION, - { asset }, - { jobId: randomUUID() }, - ); + await this.machineLearningQueue.add(JobName.IMAGE_TAGGING, { asset }); + await this.machineLearningQueue.add(JobName.OBJECT_DETECTION, { asset }); } return assetWithNoSmartInfo.length; } async runStorageMigration() { - const jobCount = await this.storageMigrationQueue.getJobCounts(); + const jobCount = await this.configQueue.getJobCounts(); if (jobCount.active > 0) { throw new BadRequestException('Storage migration job is already running'); } - await this.storageMigrationQueue.add(templateMigrationProcessorName, {}, { jobId: randomUUID() }); + await this.configQueue.add(JobName.TEMPLATE_MIGRATION, {}); return 1; } diff --git a/server/apps/immich/src/api-v1/system-config/system-config.service.ts b/server/apps/immich/src/api-v1/system-config/system-config.service.ts index 38fcf95938..d3b6fb10e9 100644 --- a/server/apps/immich/src/api-v1/system-config/system-config.service.ts +++ b/server/apps/immich/src/api-v1/system-config/system-config.service.ts @@ -1,4 +1,4 @@ -import { QueueNameEnum, updateTemplateProcessorName } from '@app/job'; +import { JobName, QueueName } from '@app/job'; import { supportedDayTokens, supportedHourTokens, @@ -11,7 +11,6 @@ import { import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; -import { randomUUID } from 'crypto'; import { ImmichConfigService } from 'libs/immich-config/src'; import { mapConfig, SystemConfigDto } from './dto/system-config.dto'; import { SystemConfigTemplateStorageOptionDto } from './response-dto/system-config-template-storage-option.dto'; @@ -20,8 +19,7 @@ import { SystemConfigTemplateStorageOptionDto } from './response-dto/system-conf export class SystemConfigService { constructor( private immichConfigService: ImmichConfigService, - @InjectQueue(QueueNameEnum.STORAGE_MIGRATION) - private storageMigrationQueue: Queue, + @InjectQueue(QueueName.CONFIG) private configQueue: Queue, ) {} public async getConfig(): Promise { @@ -36,7 +34,7 @@ export class SystemConfigService { public async updateConfig(dto: SystemConfigDto): Promise { const config = await this.immichConfigService.updateConfig(dto); - this.storageMigrationQueue.add(updateTemplateProcessorName, {}, { jobId: randomUUID() }); + this.configQueue.add(JobName.CONFIG_CHANGE, {}); return mapConfig(config); } diff --git a/server/apps/immich/src/modules/background-task/background-task.module.ts b/server/apps/immich/src/modules/background-task/background-task.module.ts index b862e13601..2e557fff7e 100644 --- a/server/apps/immich/src/modules/background-task/background-task.module.ts +++ b/server/apps/immich/src/modules/background-task/background-task.module.ts @@ -1,17 +1,11 @@ import { BullModule } from '@nestjs/bull'; import { Module } from '@nestjs/common'; -import { TypeOrmModule } from '@nestjs/typeorm'; -import { AssetEntity, ExifEntity, SmartInfoEntity } from '@app/infra'; +import { QueueName } from '@app/job'; import { BackgroundTaskProcessor } from './background-task.processor'; import { BackgroundTaskService } from './background-task.service'; @Module({ - imports: [ - BullModule.registerQueue({ - name: 'background-task', - }), - TypeOrmModule.forFeature([AssetEntity, ExifEntity, SmartInfoEntity]), - ], + imports: [BullModule.registerQueue({ name: QueueName.BACKGROUND_TASK })], providers: [BackgroundTaskService, BackgroundTaskProcessor], exports: [BackgroundTaskService, BullModule], }) diff --git a/server/apps/immich/src/modules/background-task/background-task.processor.ts b/server/apps/immich/src/modules/background-task/background-task.processor.ts index 5d7eee1d9e..9607bbe5ba 100644 --- a/server/apps/immich/src/modules/background-task/background-task.processor.ts +++ b/server/apps/immich/src/modules/background-task/background-task.processor.ts @@ -1,23 +1,12 @@ -import { Process, Processor } from '@nestjs/bull'; -import { InjectRepository } from '@nestjs/typeorm'; -import { Repository } from 'typeorm'; -import { AssetEntity, SmartInfoEntity } from '@app/infra'; -import { Job } from 'bull'; -import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto'; import { assetUtils } from '@app/common/utils'; +import { Process, Processor } from '@nestjs/bull'; +import { Job } from 'bull'; +import { JobName, QueueName } from '@app/job'; +import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto'; -@Processor('background-task') +@Processor(QueueName.BACKGROUND_TASK) export class BackgroundTaskProcessor { - constructor( - @InjectRepository(AssetEntity) - private assetRepository: Repository, - - @InjectRepository(SmartInfoEntity) - private smartInfoRepository: Repository, - ) {} - - // TODO: Should probably use constants / Interfaces for Queue names / data - @Process('delete-file-on-disk') + @Process(JobName.DELETE_FILE_ON_DISK) async deleteFileOnDisk(job: Job<{ assets: AssetResponseDto[] }>) { const { assets } = job.data; diff --git a/server/apps/immich/src/modules/background-task/background-task.service.ts b/server/apps/immich/src/modules/background-task/background-task.service.ts index 0fb26b66a4..dd9a32a9be 100644 --- a/server/apps/immich/src/modules/background-task/background-task.service.ts +++ b/server/apps/immich/src/modules/background-task/background-task.service.ts @@ -1,23 +1,17 @@ import { InjectQueue } from '@nestjs/bull/dist/decorators'; import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; -import { randomUUID } from 'node:crypto'; +import { JobName, QueueName } from '@app/job'; import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto'; @Injectable() export class BackgroundTaskService { constructor( - @InjectQueue('background-task') + @InjectQueue(QueueName.BACKGROUND_TASK) private backgroundTaskQueue: Queue, ) {} async deleteFileOnDisk(assets: AssetResponseDto[]) { - await this.backgroundTaskQueue.add( - 'delete-file-on-disk', - { - assets, - }, - { jobId: randomUUID() }, - ); + await this.backgroundTaskQueue.add(JobName.DELETE_FILE_ON_DISK, { assets }); } } diff --git a/server/apps/immich/src/modules/schedule-tasks/schedule-tasks.service.ts b/server/apps/immich/src/modules/schedule-tasks/schedule-tasks.service.ts index 41c61cfcb7..764879bfc3 100644 --- a/server/apps/immich/src/modules/schedule-tasks/schedule-tasks.service.ts +++ b/server/apps/immich/src/modules/schedule-tasks/schedule-tasks.service.ts @@ -5,18 +5,7 @@ import { IsNull, Not, Repository } from 'typeorm'; import { AssetEntity, AssetType, ExifEntity, UserEntity } from '@app/infra'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; -import { randomUUID } from 'crypto'; -import { - userDeletionProcessorName, - exifExtractionProcessorName, - generateWEBPThumbnailProcessorName, - IMetadataExtractionJob, - IVideoTranscodeJob, - mp4ConversionProcessorName, - QueueNameEnum, - reverseGeocodingProcessorName, - videoMetadataExtractionProcessorName, -} from '@app/job'; +import { IMetadataExtractionJob, IVideoTranscodeJob, QueueName, JobName } from '@app/job'; import { ConfigService } from '@nestjs/config'; import { IUserDeletionJob } from '@app/job/interfaces/user-deletion.interface'; import { userUtils } from '@app/common'; @@ -33,16 +22,16 @@ export class ScheduleTasksService { @InjectRepository(ExifEntity) private exifRepository: Repository, - @InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION) + @InjectQueue(QueueName.THUMBNAIL_GENERATION) private thumbnailGeneratorQueue: Queue, - @InjectQueue(QueueNameEnum.VIDEO_CONVERSION) + @InjectQueue(QueueName.VIDEO_CONVERSION) private videoConversionQueue: Queue, - @InjectQueue(QueueNameEnum.METADATA_EXTRACTION) + @InjectQueue(QueueName.METADATA_EXTRACTION) private metadataExtractionQueue: Queue, - @InjectQueue(QueueNameEnum.USER_DELETION) + @InjectQueue(QueueName.USER_DELETION) private userDeletionQueue: Queue, private configService: ConfigService, @@ -62,11 +51,7 @@ export class ScheduleTasksService { } for (const asset of assets) { - await this.thumbnailGeneratorQueue.add( - generateWEBPThumbnailProcessorName, - { asset: asset }, - { jobId: randomUUID() }, - ); + await this.thumbnailGeneratorQueue.add(JobName.GENERATE_WEBP_THUMBNAIL, { asset: asset }); } } @@ -84,7 +69,7 @@ export class ScheduleTasksService { }); for (const asset of assets) { - await this.videoConversionQueue.add(mp4ConversionProcessorName, { asset }, { jobId: randomUUID() }); + await this.videoConversionQueue.add(JobName.MP4_CONVERSION, { asset }); } } @@ -103,10 +88,9 @@ export class ScheduleTasksService { for (const exif of exifInfo) { await this.metadataExtractionQueue.add( - reverseGeocodingProcessorName, + JobName.REVERSE_GEOCODING, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion { exifId: exif.id, latitude: exif.latitude!, longitude: exif.longitude! }, - { jobId: randomUUID() }, ); } } @@ -122,17 +106,9 @@ export class ScheduleTasksService { for (const asset of exifAssets) { if (asset.type === AssetType.VIDEO) { - await this.metadataExtractionQueue.add( - videoMetadataExtractionProcessorName, - { asset, fileName: asset.id }, - { jobId: randomUUID() }, - ); + await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName: asset.id }); } else { - await this.metadataExtractionQueue.add( - exifExtractionProcessorName, - { asset, fileName: asset.id }, - { jobId: randomUUID() }, - ); + await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName: asset.id }); } } } @@ -142,7 +118,7 @@ export class ScheduleTasksService { const usersToDelete = await this.userRepository.find({ withDeleted: true, where: { deletedAt: Not(IsNull()) } }); for (const user of usersToDelete) { if (userUtils.isReadyForDeletion(user)) { - await this.userDeletionQueue.add(userDeletionProcessorName, { user: user }, { jobId: randomUUID() }); + await this.userDeletionQueue.add(JobName.USER_DELETION, { user }); } } } diff --git a/server/apps/microservices/src/microservices.service.ts b/server/apps/microservices/src/microservices.service.ts index 5a03220f97..4e06e454cb 100644 --- a/server/apps/microservices/src/microservices.service.ts +++ b/server/apps/microservices/src/microservices.service.ts @@ -1,23 +1,17 @@ -import { QueueNameEnum } from '@app/job'; +import { QueueName } from '@app/job'; import { InjectQueue } from '@nestjs/bull'; import { Injectable, OnModuleInit } from '@nestjs/common'; import { Queue } from 'bull'; -import { randomUUID } from 'node:crypto'; @Injectable() export class MicroservicesService implements OnModuleInit { constructor( - @InjectQueue(QueueNameEnum.CHECKSUM_GENERATION) + @InjectQueue(QueueName.CHECKSUM_GENERATION) private generateChecksumQueue: Queue, ) {} async onModuleInit() { - await this.generateChecksumQueue.add( - {}, - { - jobId: randomUUID(), - delay: 10000, // wait for migration - }, - ); + // wait for migration + await this.generateChecksumQueue.add({}, { delay: 10000 }); } } diff --git a/server/apps/microservices/src/processors/asset-uploaded.processor.ts b/server/apps/microservices/src/processors/asset-uploaded.processor.ts index dc13e63ad6..55c4e8dcca 100644 --- a/server/apps/microservices/src/processors/asset-uploaded.processor.ts +++ b/server/apps/microservices/src/processors/asset-uploaded.processor.ts @@ -4,27 +4,22 @@ import { IMetadataExtractionJob, IThumbnailGenerationJob, IVideoTranscodeJob, - assetUploadedProcessorName, - exifExtractionProcessorName, - generateJPEGThumbnailProcessorName, - mp4ConversionProcessorName, - videoMetadataExtractionProcessorName, - QueueNameEnum, + QueueName, + JobName, } from '@app/job'; import { InjectQueue, Process, Processor } from '@nestjs/bull'; import { Job, Queue } from 'bull'; -import { randomUUID } from 'crypto'; -@Processor(QueueNameEnum.ASSET_UPLOADED) +@Processor(QueueName.ASSET_UPLOADED) export class AssetUploadedProcessor { constructor( - @InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION) + @InjectQueue(QueueName.THUMBNAIL_GENERATION) private thumbnailGeneratorQueue: Queue, - @InjectQueue(QueueNameEnum.METADATA_EXTRACTION) + @InjectQueue(QueueName.METADATA_EXTRACTION) private metadataExtractionQueue: Queue, - @InjectQueue(QueueNameEnum.VIDEO_CONVERSION) + @InjectQueue(QueueName.VIDEO_CONVERSION) private videoConversionQueue: Queue, ) {} @@ -37,30 +32,19 @@ export class AssetUploadedProcessor { * * @param job asset-uploaded */ - @Process(assetUploadedProcessorName) + @Process(JobName.ASSET_UPLOADED) async processUploadedVideo(job: Job) { const { asset, fileName } = job.data; - await this.thumbnailGeneratorQueue.add(generateJPEGThumbnailProcessorName, { asset }, { jobId: randomUUID() }); + await this.thumbnailGeneratorQueue.add(JobName.GENERATE_JPEG_THUMBNAIL, { asset }); // Video Conversion if (asset.type == AssetType.VIDEO) { - await this.videoConversionQueue.add(mp4ConversionProcessorName, { asset }, { jobId: randomUUID() }); - await this.metadataExtractionQueue.add( - videoMetadataExtractionProcessorName, - { asset, fileName }, - { jobId: randomUUID() }, - ); + await this.videoConversionQueue.add(JobName.MP4_CONVERSION, { asset }); + await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName }); } else { // Extract Metadata/Exif for Images - Currently the EXIF library on the web cannot extract EXIF for video yet - await this.metadataExtractionQueue.add( - exifExtractionProcessorName, - { - asset, - fileName, - }, - { jobId: randomUUID() }, - ); + await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName }); } } } diff --git a/server/apps/microservices/src/processors/generate-checksum.processor.ts b/server/apps/microservices/src/processors/generate-checksum.processor.ts index b5e2f60c82..5885477723 100644 --- a/server/apps/microservices/src/processors/generate-checksum.processor.ts +++ b/server/apps/microservices/src/processors/generate-checksum.processor.ts @@ -1,5 +1,5 @@ import { AssetEntity } from '@app/infra'; -import { QueueNameEnum } from '@app/job'; +import { QueueName } from '@app/job'; import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; @@ -8,7 +8,7 @@ import fs from 'node:fs'; import { FindOptionsWhere, IsNull, MoreThan, QueryFailedError, Repository } from 'typeorm'; // TODO: just temporary task to generate previous uploaded assets. -@Processor(QueueNameEnum.CHECKSUM_GENERATION) +@Processor(QueueName.CHECKSUM_GENERATION) export class GenerateChecksumProcessor { constructor( @InjectRepository(AssetEntity) diff --git a/server/apps/microservices/src/processors/machine-learning.processor.ts b/server/apps/microservices/src/processors/machine-learning.processor.ts index a0a61f8fed..6236c9a192 100644 --- a/server/apps/microservices/src/processors/machine-learning.processor.ts +++ b/server/apps/microservices/src/processors/machine-learning.processor.ts @@ -1,6 +1,6 @@ import { AssetEntity } from '@app/infra'; import { SmartInfoEntity } from '@app/infra'; -import { MachineLearningJobNameEnum, QueueNameEnum } from '@app/job'; +import { QueueName, JobName } from '@app/job'; import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interface'; import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; @@ -11,14 +11,14 @@ import { Repository } from 'typeorm'; const immich_machine_learning_url = process.env.IMMICH_MACHINE_LEARNING_URL || 'http://immich-machine-learning:3003'; -@Processor(QueueNameEnum.MACHINE_LEARNING) +@Processor(QueueName.MACHINE_LEARNING) export class MachineLearningProcessor { constructor( @InjectRepository(SmartInfoEntity) private smartInfoRepository: Repository, ) {} - @Process({ name: MachineLearningJobNameEnum.IMAGE_TAGGING, concurrency: 2 }) + @Process({ name: JobName.IMAGE_TAGGING, concurrency: 2 }) async tagImage(job: Job) { const { asset } = job.data; @@ -37,7 +37,7 @@ export class MachineLearningProcessor { } } - @Process({ name: MachineLearningJobNameEnum.OBJECT_DETECTION, concurrency: 2 }) + @Process({ name: JobName.OBJECT_DETECTION, concurrency: 2 }) async detectObject(job: Job) { try { const { asset }: { asset: AssetEntity } = job.data; diff --git a/server/apps/microservices/src/processors/metadata-extraction.processor.ts b/server/apps/microservices/src/processors/metadata-extraction.processor.ts index 251024c334..3e5078a5c5 100644 --- a/server/apps/microservices/src/processors/metadata-extraction.processor.ts +++ b/server/apps/microservices/src/processors/metadata-extraction.processor.ts @@ -2,11 +2,9 @@ import { AssetEntity, ExifEntity } from '@app/infra'; import { IExifExtractionProcessor, IVideoLengthExtractionProcessor, - exifExtractionProcessorName, - videoMetadataExtractionProcessorName, - reverseGeocodingProcessorName, IReverseGeocodingProcessor, - QueueNameEnum, + QueueName, + JobName, } from '@app/job'; import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; @@ -73,7 +71,7 @@ export type GeoData = { distance: number; }; -@Processor(QueueNameEnum.METADATA_EXTRACTION) +@Processor(QueueName.METADATA_EXTRACTION) export class MetadataExtractionProcessor { private logger = new Logger(MetadataExtractionProcessor.name); private isGeocodeInitialized = false; @@ -140,7 +138,7 @@ export class MetadataExtractionProcessor { return { country, state, city }; } - @Process(exifExtractionProcessorName) + @Process(JobName.EXIF_EXTRACTION) async extractExifInfo(job: Job) { try { const { asset, fileName }: { asset: AssetEntity; fileName: string } = job.data; @@ -262,7 +260,7 @@ export class MetadataExtractionProcessor { } } - @Process({ name: reverseGeocodingProcessorName }) + @Process({ name: JobName.REVERSE_GEOCODING }) async reverseGeocoding(job: Job) { if (this.isGeocodeInitialized) { const { latitude, longitude } = job.data; @@ -271,7 +269,7 @@ export class MetadataExtractionProcessor { } } - @Process({ name: videoMetadataExtractionProcessorName, concurrency: 2 }) + @Process({ name: JobName.EXTRACT_VIDEO_METADATA, concurrency: 2 }) async extractVideoMetadata(job: Job) { const { asset, fileName } = job.data; diff --git a/server/apps/microservices/src/processors/storage-migration.processor.ts b/server/apps/microservices/src/processors/storage-migration.processor.ts index 7dff2e1cb7..e39a507366 100644 --- a/server/apps/microservices/src/processors/storage-migration.processor.ts +++ b/server/apps/microservices/src/processors/storage-migration.processor.ts @@ -1,14 +1,14 @@ import { APP_UPLOAD_LOCATION } from '@app/common'; import { AssetEntity } from '@app/infra'; import { ImmichConfigService } from '@app/immich-config'; -import { QueueNameEnum, templateMigrationProcessorName, updateTemplateProcessorName } from '@app/job'; +import { QueueName, JobName } from '@app/job'; import { StorageService } from '@app/storage'; import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -@Processor(QueueNameEnum.STORAGE_MIGRATION) +@Processor(QueueName.CONFIG) export class StorageMigrationProcessor { readonly logger: Logger = new Logger(StorageMigrationProcessor.name); @@ -24,7 +24,7 @@ export class StorageMigrationProcessor { * Migration process when a new user set a new storage template. * @param job */ - @Process({ name: templateMigrationProcessorName, concurrency: 100 }) + @Process({ name: JobName.TEMPLATE_MIGRATION, concurrency: 100 }) async templateMigration() { console.time('migrating-time'); const assets = await this.assetRepository.find({ @@ -54,7 +54,7 @@ export class StorageMigrationProcessor { * This is to ensure the synchronization between processes. * @param job */ - @Process({ name: updateTemplateProcessorName, concurrency: 1 }) + @Process({ name: JobName.CONFIG_CHANGE, concurrency: 1 }) async updateTemplate() { await this.immichConfigService.refreshConfig(); } diff --git a/server/apps/microservices/src/processors/thumbnail.processor.ts b/server/apps/microservices/src/processors/thumbnail.processor.ts index 352fcd691b..9ee225755c 100644 --- a/server/apps/microservices/src/processors/thumbnail.processor.ts +++ b/server/apps/microservices/src/processors/thumbnail.processor.ts @@ -1,20 +1,12 @@ import { APP_UPLOAD_LOCATION } from '@app/common'; import { AssetEntity, AssetType } from '@app/infra'; -import { - WebpGeneratorProcessor, - generateJPEGThumbnailProcessorName, - generateWEBPThumbnailProcessorName, - JpegGeneratorProcessor, - QueueNameEnum, - MachineLearningJobNameEnum, -} from '@app/job'; +import { WebpGeneratorProcessor, JpegGeneratorProcessor, QueueName, JobName } from '@app/job'; import { InjectQueue, Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { mapAsset } from 'apps/immich/src/api-v1/asset/response-dto/asset-response.dto'; import { Job, Queue } from 'bull'; import ffmpeg from 'fluent-ffmpeg'; -import { randomUUID } from 'node:crypto'; import { existsSync, mkdirSync } from 'node:fs'; import sanitize from 'sanitize-filename'; import sharp from 'sharp'; @@ -23,7 +15,7 @@ import { join } from 'path'; import { CommunicationGateway } from 'apps/immich/src/api-v1/communication/communication.gateway'; import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interface'; -@Processor(QueueNameEnum.THUMBNAIL_GENERATION) +@Processor(QueueName.THUMBNAIL_GENERATION) export class ThumbnailGeneratorProcessor { readonly logger: Logger = new Logger(ThumbnailGeneratorProcessor.name); @@ -31,16 +23,16 @@ export class ThumbnailGeneratorProcessor { @InjectRepository(AssetEntity) private assetRepository: Repository, - @InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION) + @InjectQueue(QueueName.THUMBNAIL_GENERATION) private thumbnailGeneratorQueue: Queue, private wsCommunicationGateway: CommunicationGateway, - @InjectQueue(QueueNameEnum.MACHINE_LEARNING) + @InjectQueue(QueueName.MACHINE_LEARNING) private machineLearningQueue: Queue, ) {} - @Process({ name: generateJPEGThumbnailProcessorName, concurrency: 3 }) + @Process({ name: JobName.GENERATE_JPEG_THUMBNAIL, concurrency: 3 }) async generateJPEGThumbnail(job: Job) { const basePath = APP_UPLOAD_LOCATION; @@ -70,13 +62,10 @@ export class ThumbnailGeneratorProcessor { // Update resize path to send to generate webp queue asset.resizePath = jpegThumbnailPath; - await this.thumbnailGeneratorQueue.add(generateWEBPThumbnailProcessorName, { asset }, { jobId: randomUUID() }); - await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() }); - await this.machineLearningQueue.add( - MachineLearningJobNameEnum.OBJECT_DETECTION, - { asset }, - { jobId: randomUUID() }, - ); + await this.thumbnailGeneratorQueue.add(JobName.GENERATE_WEBP_THUMBNAIL, { asset }); + await this.machineLearningQueue.add(JobName.IMAGE_TAGGING, { asset }); + await this.machineLearningQueue.add(JobName.OBJECT_DETECTION, { asset }); + this.wsCommunicationGateway.server.to(asset.userId).emit('on_upload_success', JSON.stringify(mapAsset(asset))); } @@ -104,19 +93,15 @@ export class ThumbnailGeneratorProcessor { // Update resize path to send to generate webp queue asset.resizePath = jpegThumbnailPath; - await this.thumbnailGeneratorQueue.add(generateWEBPThumbnailProcessorName, { asset }, { jobId: randomUUID() }); - await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() }); - await this.machineLearningQueue.add( - MachineLearningJobNameEnum.OBJECT_DETECTION, - { asset }, - { jobId: randomUUID() }, - ); + await this.thumbnailGeneratorQueue.add(JobName.GENERATE_WEBP_THUMBNAIL, { asset }); + await this.machineLearningQueue.add(JobName.IMAGE_TAGGING, { asset }); + await this.machineLearningQueue.add(JobName.OBJECT_DETECTION, { asset }); this.wsCommunicationGateway.server.to(asset.userId).emit('on_upload_success', JSON.stringify(mapAsset(asset))); } } - @Process({ name: generateWEBPThumbnailProcessorName, concurrency: 3 }) + @Process({ name: JobName.GENERATE_WEBP_THUMBNAIL, concurrency: 3 }) async generateWepbThumbnail(job: Job) { const { asset } = job.data; diff --git a/server/apps/microservices/src/processors/user-deletion.processor.ts b/server/apps/microservices/src/processors/user-deletion.processor.ts index 4e7097a080..53f4451c69 100644 --- a/server/apps/microservices/src/processors/user-deletion.processor.ts +++ b/server/apps/microservices/src/processors/user-deletion.processor.ts @@ -1,6 +1,6 @@ import { APP_UPLOAD_LOCATION, userUtils } from '@app/common'; import { APIKeyEntity, AssetEntity, UserEntity } from '@app/infra'; -import { QueueNameEnum, userDeletionProcessorName } from '@app/job'; +import { QueueName, JobName } from '@app/job'; import { IUserDeletionJob } from '@app/job/interfaces/user-deletion.interface'; import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; @@ -10,7 +10,7 @@ import { join } from 'path'; import fs from 'fs'; import { Repository } from 'typeorm'; -@Processor(QueueNameEnum.USER_DELETION) +@Processor(QueueName.USER_DELETION) export class UserDeletionProcessor { private logger = new Logger(UserDeletionProcessor.name); @@ -25,7 +25,7 @@ export class UserDeletionProcessor { private apiKeyRepository: Repository, ) {} - @Process(userDeletionProcessorName) + @Process(JobName.USER_DELETION) async processUserDeletion(job: Job) { const { user } = job.data; diff --git a/server/apps/microservices/src/processors/video-transcode.processor.ts b/server/apps/microservices/src/processors/video-transcode.processor.ts index 891d9ecf54..c7e297e1d9 100644 --- a/server/apps/microservices/src/processors/video-transcode.processor.ts +++ b/server/apps/microservices/src/processors/video-transcode.processor.ts @@ -1,7 +1,6 @@ import { APP_UPLOAD_LOCATION } from '@app/common/constants'; import { AssetEntity } from '@app/infra'; -import { QueueNameEnum } from '@app/job'; -import { mp4ConversionProcessorName } from '@app/job/constants/job-name.constant'; +import { QueueName, JobName } from '@app/job'; import { IMp4ConversionProcessor } from '@app/job/interfaces/video-transcode.interface'; import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; @@ -12,7 +11,7 @@ import { existsSync, mkdirSync } from 'fs'; import { ImmichConfigService } from 'libs/immich-config/src'; import { Repository } from 'typeorm'; -@Processor(QueueNameEnum.VIDEO_CONVERSION) +@Processor(QueueName.VIDEO_CONVERSION) export class VideoTranscodeProcessor { constructor( @InjectRepository(AssetEntity) @@ -20,7 +19,7 @@ export class VideoTranscodeProcessor { private immichConfigService: ImmichConfigService, ) {} - @Process({ name: mp4ConversionProcessorName, concurrency: 2 }) + @Process({ name: JobName.MP4_CONVERSION, concurrency: 2 }) async mp4Conversion(job: Job) { const { asset } = job.data; diff --git a/server/libs/job/src/constants/bull-queue-registration.constant.ts b/server/libs/job/src/constants/bull-queue-registration.constant.ts index 240536cb11..8a05f06398 100644 --- a/server/libs/job/src/constants/bull-queue-registration.constant.ts +++ b/server/libs/job/src/constants/bull-queue-registration.constant.ts @@ -1,32 +1,16 @@ import { BullModuleOptions } from '@nestjs/bull'; -import { QueueNameEnum } from './queue-name.constant'; +import { QueueName } from './queue-name.constant'; /** * Shared queues between apps and microservices */ export const immichSharedQueues: BullModuleOptions[] = [ - { - name: QueueNameEnum.USER_DELETION, - }, - { - name: QueueNameEnum.THUMBNAIL_GENERATION, - }, - { - name: QueueNameEnum.ASSET_UPLOADED, - }, - { - name: QueueNameEnum.METADATA_EXTRACTION, - }, - { - name: QueueNameEnum.VIDEO_CONVERSION, - }, - { - name: QueueNameEnum.CHECKSUM_GENERATION, - }, - { - name: QueueNameEnum.MACHINE_LEARNING, - }, - { - name: QueueNameEnum.STORAGE_MIGRATION, - }, + { name: QueueName.USER_DELETION }, + { name: QueueName.THUMBNAIL_GENERATION }, + { name: QueueName.ASSET_UPLOADED }, + { name: QueueName.METADATA_EXTRACTION }, + { name: QueueName.VIDEO_CONVERSION }, + { name: QueueName.CHECKSUM_GENERATION }, + { name: QueueName.MACHINE_LEARNING }, + { name: QueueName.CONFIG }, ]; diff --git a/server/libs/job/src/constants/job-name.constant.ts b/server/libs/job/src/constants/job-name.constant.ts index 5979e36833..4da798dc1d 100644 --- a/server/libs/job/src/constants/job-name.constant.ts +++ b/server/libs/job/src/constants/job-name.constant.ts @@ -1,42 +1,15 @@ -/** - * Asset Uploaded Queue Jobs - */ -export const assetUploadedProcessorName = 'asset-uploaded'; - -/** - * Video Conversion Queue Jobs - **/ -export const mp4ConversionProcessorName = 'mp4-conversion'; - -/** - * Thumbnail Generator Queue Jobs - */ -export const generateJPEGThumbnailProcessorName = 'generate-jpeg-thumbnail'; -export const generateWEBPThumbnailProcessorName = 'generate-webp-thumbnail'; - -/** - * Metadata Extraction Queue Jobs - */ -export const exifExtractionProcessorName = 'exif-extraction'; -export const videoMetadataExtractionProcessorName = 'extract-video-metadata'; -export const reverseGeocodingProcessorName = 'reverse-geocoding'; - -/** - * Machine learning Queue Jobs - */ - -export enum MachineLearningJobNameEnum { +export enum JobName { + ASSET_UPLOADED = 'asset-uploaded', + MP4_CONVERSION = 'mp4-conversion', + GENERATE_JPEG_THUMBNAIL = 'generate-jpeg-thumbnail', + GENERATE_WEBP_THUMBNAIL = 'generate-webp-thumbnail', + EXIF_EXTRACTION = 'exif-extraction', + EXTRACT_VIDEO_METADATA = 'extract-video-metadata', + REVERSE_GEOCODING = 'reverse-geocoding', + USER_DELETION = 'user-deletion', + TEMPLATE_MIGRATION = 'template-migration', + CONFIG_CHANGE = 'config-change', OBJECT_DETECTION = 'detect-object', IMAGE_TAGGING = 'tag-image', + DELETE_FILE_ON_DISK = 'delete-file-on-disk', } - -/** - * User deletion Queue Jobs - */ -export const userDeletionProcessorName = 'user-deletion'; - -/** - * Storage Template Migration Queue Jobs - */ -export const templateMigrationProcessorName = 'template-migration'; -export const updateTemplateProcessorName = 'update-template'; diff --git a/server/libs/job/src/constants/queue-name.constant.ts b/server/libs/job/src/constants/queue-name.constant.ts index 2741b6507e..8a93bda243 100644 --- a/server/libs/job/src/constants/queue-name.constant.ts +++ b/server/libs/job/src/constants/queue-name.constant.ts @@ -1,4 +1,4 @@ -export enum QueueNameEnum { +export enum QueueName { THUMBNAIL_GENERATION = 'thumbnail-generation-queue', METADATA_EXTRACTION = 'metadata-extraction-queue', VIDEO_CONVERSION = 'video-conversion-queue', @@ -6,5 +6,6 @@ export enum QueueNameEnum { ASSET_UPLOADED = 'asset-uploaded-queue', MACHINE_LEARNING = 'machine-learning-queue', USER_DELETION = 'user-deletion-queue', - STORAGE_MIGRATION = 'storage-template-migration', + CONFIG = 'config-queue', + BACKGROUND_TASK = 'background-task', }