1
0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-19 18:26:46 +01:00

refactor: job names (#1343)

* refactor: job names

* refactor: remove jobId
This commit is contained in:
Jason Rasmussen 2023-01-17 09:43:45 -05:00 committed by GitHub
parent adacfb1110
commit 693adf8488
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 125 additions and 289 deletions

View file

@ -10,7 +10,7 @@ import {
StreamableFile,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { createHash, randomUUID } from 'node:crypto';
import { createHash } from 'node:crypto';
import { QueryFailedError, Repository } from 'typeorm';
import { AuthUserDto } from '../../decorators/auth-user.decorator';
import { AssetEntity, AssetType, SharedLinkType } from '@app/infra';
@ -43,13 +43,7 @@ import { CheckExistingAssetsResponseDto } from './response-dto/check-existing-as
import { UpdateAssetDto } from './dto/update-asset.dto';
import { AssetFileUploadResponseDto } from './response-dto/asset-file-upload-response.dto';
import { BackgroundTaskService } from '../../modules/background-task/background-task.service';
import {
assetUploadedProcessorName,
IAssetUploadedJob,
IVideoTranscodeJob,
mp4ConversionProcessorName,
QueueNameEnum,
} from '@app/job';
import { IAssetUploadedJob, IVideoTranscodeJob, QueueName, JobName } from '@app/job';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { DownloadService } from '../../modules/download/download.service';
@ -80,16 +74,16 @@ export class AssetService {
private backgroundTaskService: BackgroundTaskService,
@InjectQueue(QueueNameEnum.ASSET_UPLOADED)
@InjectQueue(QueueName.ASSET_UPLOADED)
private assetUploadedQueue: Queue<IAssetUploadedJob>,
@InjectQueue(QueueNameEnum.VIDEO_CONVERSION)
@InjectQueue(QueueName.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
private downloadService: DownloadService,
private storageService: StorageService,
@Inject(ISharedLinkRepository) private sharedLinkRepository: ISharedLinkRepository,
@Inject(ISharedLinkRepository) sharedLinkRepository: ISharedLinkRepository,
) {
this.shareCore = new ShareCore(sharedLinkRepository);
}
@ -128,11 +122,7 @@ export class AssetService {
await this.storageService.moveAsset(livePhotoAssetEntity, originalAssetData.originalname);
await this.videoConversionQueue.add(
mp4ConversionProcessorName,
{ asset: livePhotoAssetEntity },
{ jobId: randomUUID() },
);
await this.videoConversionQueue.add(JobName.MP4_CONVERSION, { asset: livePhotoAssetEntity });
}
const assetEntity = await this.createUserAsset(
@ -157,7 +147,7 @@ export class AssetService {
const movedAsset = await this.storageService.moveAsset(assetEntity, originalAssetData.originalname);
await this.assetUploadedQueue.add(
assetUploadedProcessorName,
JobName.ASSET_UPLOADED,
{ asset: movedAsset, fileName: originalAssetData.originalname },
{ jobId: movedAsset.id },
);

View file

@ -1,19 +1,8 @@
import {
exifExtractionProcessorName,
generateJPEGThumbnailProcessorName,
IMetadataExtractionJob,
IThumbnailGenerationJob,
IVideoTranscodeJob,
MachineLearningJobNameEnum,
QueueNameEnum,
templateMigrationProcessorName,
videoMetadataExtractionProcessorName,
} from '@app/job';
import { IMetadataExtractionJob, IThumbnailGenerationJob, IVideoTranscodeJob, QueueName, JobName } from '@app/job';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { AllJobStatusResponseDto } from './response-dto/all-job-status-response.dto';
import { randomUUID } from 'crypto';
import { IAssetRepository } from '../asset/asset-repository';
import { AssetType } from '@app/infra';
import { GetJobDto, JobId } from './dto/get-job.dto';
@ -24,20 +13,20 @@ import { StorageService } from '@app/storage';
@Injectable()
export class JobService {
constructor(
@InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION)
@InjectQueue(QueueName.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue<IThumbnailGenerationJob>,
@InjectQueue(QueueNameEnum.METADATA_EXTRACTION)
@InjectQueue(QueueName.METADATA_EXTRACTION)
private metadataExtractionQueue: Queue<IMetadataExtractionJob>,
@InjectQueue(QueueNameEnum.VIDEO_CONVERSION)
@InjectQueue(QueueName.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
@InjectQueue(QueueNameEnum.MACHINE_LEARNING)
@InjectQueue(QueueName.MACHINE_LEARNING)
private machineLearningQueue: Queue<IMachineLearningJob>,
@InjectQueue(QueueNameEnum.STORAGE_MIGRATION)
private storageMigrationQueue: Queue,
@InjectQueue(QueueName.CONFIG)
private configQueue: Queue,
@Inject(IAssetRepository)
private _assetRepository: IAssetRepository,
@ -47,7 +36,7 @@ export class JobService {
this.thumbnailGeneratorQueue.empty();
this.metadataExtractionQueue.empty();
this.videoConversionQueue.empty();
this.storageMigrationQueue.empty();
this.configQueue.empty();
}
async startJob(jobDto: GetJobDto): Promise<number> {
@ -72,7 +61,7 @@ export class JobService {
const metadataExtractionJobCount = await this.metadataExtractionQueue.getJobCounts();
const videoConversionJobCount = await this.videoConversionQueue.getJobCounts();
const machineLearningJobCount = await this.machineLearningQueue.getJobCounts();
const storageMigrationJobCount = await this.storageMigrationQueue.getJobCounts();
const storageMigrationJobCount = await this.configQueue.getJobCounts();
const response = new AllJobStatusResponseDto();
response.isThumbnailGenerationActive = Boolean(thumbnailGeneratorJobCount.waiting);
@ -108,8 +97,8 @@ export class JobService {
}
if (query.jobId === JobId.STORAGE_TEMPLATE_MIGRATION) {
response.isActive = Boolean((await this.storageMigrationQueue.getJobCounts()).waiting);
response.queueCount = await this.storageMigrationQueue.getJobCounts();
response.isActive = Boolean((await this.configQueue.getJobCounts()).waiting);
response.queueCount = await this.configQueue.getJobCounts();
}
return response;
@ -130,7 +119,7 @@ export class JobService {
this.machineLearningQueue.empty();
return 0;
case JobId.STORAGE_TEMPLATE_MIGRATION:
this.storageMigrationQueue.empty();
this.configQueue.empty();
return 0;
default:
throw new BadRequestException('Invalid job id');
@ -147,7 +136,7 @@ export class JobService {
const assetsWithNoThumbnail = await this._assetRepository.getAssetWithNoThumbnail();
for (const asset of assetsWithNoThumbnail) {
await this.thumbnailGeneratorQueue.add(generateJPEGThumbnailProcessorName, { asset }, { jobId: randomUUID() });
await this.thumbnailGeneratorQueue.add(JobName.GENERATE_JPEG_THUMBNAIL, { asset });
}
return assetsWithNoThumbnail.length;
@ -163,17 +152,9 @@ export class JobService {
const assetsWithNoExif = await this._assetRepository.getAssetWithNoEXIF();
for (const asset of assetsWithNoExif) {
if (asset.type === AssetType.VIDEO) {
await this.metadataExtractionQueue.add(
videoMetadataExtractionProcessorName,
{ asset, fileName: asset.id },
{ jobId: randomUUID() },
);
await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName: asset.id });
} else {
await this.metadataExtractionQueue.add(
exifExtractionProcessorName,
{ asset, fileName: asset.id },
{ jobId: randomUUID() },
);
await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName: asset.id });
}
}
return assetsWithNoExif.length;
@ -189,25 +170,21 @@ export class JobService {
const assetWithNoSmartInfo = await this._assetRepository.getAssetWithNoSmartInfo();
for (const asset of assetWithNoSmartInfo) {
await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(
MachineLearningJobNameEnum.OBJECT_DETECTION,
{ asset },
{ jobId: randomUUID() },
);
await this.machineLearningQueue.add(JobName.IMAGE_TAGGING, { asset });
await this.machineLearningQueue.add(JobName.OBJECT_DETECTION, { asset });
}
return assetWithNoSmartInfo.length;
}
async runStorageMigration() {
const jobCount = await this.storageMigrationQueue.getJobCounts();
const jobCount = await this.configQueue.getJobCounts();
if (jobCount.active > 0) {
throw new BadRequestException('Storage migration job is already running');
}
await this.storageMigrationQueue.add(templateMigrationProcessorName, {}, { jobId: randomUUID() });
await this.configQueue.add(JobName.TEMPLATE_MIGRATION, {});
return 1;
}

View file

@ -1,4 +1,4 @@
import { QueueNameEnum, updateTemplateProcessorName } from '@app/job';
import { JobName, QueueName } from '@app/job';
import {
supportedDayTokens,
supportedHourTokens,
@ -11,7 +11,6 @@ import {
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { randomUUID } from 'crypto';
import { ImmichConfigService } from 'libs/immich-config/src';
import { mapConfig, SystemConfigDto } from './dto/system-config.dto';
import { SystemConfigTemplateStorageOptionDto } from './response-dto/system-config-template-storage-option.dto';
@ -20,8 +19,7 @@ import { SystemConfigTemplateStorageOptionDto } from './response-dto/system-conf
export class SystemConfigService {
constructor(
private immichConfigService: ImmichConfigService,
@InjectQueue(QueueNameEnum.STORAGE_MIGRATION)
private storageMigrationQueue: Queue,
@InjectQueue(QueueName.CONFIG) private configQueue: Queue,
) {}
public async getConfig(): Promise<SystemConfigDto> {
@ -36,7 +34,7 @@ export class SystemConfigService {
public async updateConfig(dto: SystemConfigDto): Promise<SystemConfigDto> {
const config = await this.immichConfigService.updateConfig(dto);
this.storageMigrationQueue.add(updateTemplateProcessorName, {}, { jobId: randomUUID() });
this.configQueue.add(JobName.CONFIG_CHANGE, {});
return mapConfig(config);
}

View file

@ -1,17 +1,11 @@
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AssetEntity, ExifEntity, SmartInfoEntity } from '@app/infra';
import { QueueName } from '@app/job';
import { BackgroundTaskProcessor } from './background-task.processor';
import { BackgroundTaskService } from './background-task.service';
@Module({
imports: [
BullModule.registerQueue({
name: 'background-task',
}),
TypeOrmModule.forFeature([AssetEntity, ExifEntity, SmartInfoEntity]),
],
imports: [BullModule.registerQueue({ name: QueueName.BACKGROUND_TASK })],
providers: [BackgroundTaskService, BackgroundTaskProcessor],
exports: [BackgroundTaskService, BullModule],
})

View file

@ -1,23 +1,12 @@
import { Process, Processor } from '@nestjs/bull';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AssetEntity, SmartInfoEntity } from '@app/infra';
import { Job } from 'bull';
import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto';
import { assetUtils } from '@app/common/utils';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { JobName, QueueName } from '@app/job';
import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto';
@Processor('background-task')
@Processor(QueueName.BACKGROUND_TASK)
export class BackgroundTaskProcessor {
constructor(
@InjectRepository(AssetEntity)
private assetRepository: Repository<AssetEntity>,
@InjectRepository(SmartInfoEntity)
private smartInfoRepository: Repository<SmartInfoEntity>,
) {}
// TODO: Should probably use constants / Interfaces for Queue names / data
@Process('delete-file-on-disk')
@Process(JobName.DELETE_FILE_ON_DISK)
async deleteFileOnDisk(job: Job<{ assets: AssetResponseDto[] }>) {
const { assets } = job.data;

View file

@ -1,23 +1,17 @@
import { InjectQueue } from '@nestjs/bull/dist/decorators';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { randomUUID } from 'node:crypto';
import { JobName, QueueName } from '@app/job';
import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto';
@Injectable()
export class BackgroundTaskService {
constructor(
@InjectQueue('background-task')
@InjectQueue(QueueName.BACKGROUND_TASK)
private backgroundTaskQueue: Queue,
) {}
async deleteFileOnDisk(assets: AssetResponseDto[]) {
await this.backgroundTaskQueue.add(
'delete-file-on-disk',
{
assets,
},
{ jobId: randomUUID() },
);
await this.backgroundTaskQueue.add(JobName.DELETE_FILE_ON_DISK, { assets });
}
}

View file

@ -5,18 +5,7 @@ import { IsNull, Not, Repository } from 'typeorm';
import { AssetEntity, AssetType, ExifEntity, UserEntity } from '@app/infra';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { randomUUID } from 'crypto';
import {
userDeletionProcessorName,
exifExtractionProcessorName,
generateWEBPThumbnailProcessorName,
IMetadataExtractionJob,
IVideoTranscodeJob,
mp4ConversionProcessorName,
QueueNameEnum,
reverseGeocodingProcessorName,
videoMetadataExtractionProcessorName,
} from '@app/job';
import { IMetadataExtractionJob, IVideoTranscodeJob, QueueName, JobName } from '@app/job';
import { ConfigService } from '@nestjs/config';
import { IUserDeletionJob } from '@app/job/interfaces/user-deletion.interface';
import { userUtils } from '@app/common';
@ -33,16 +22,16 @@ export class ScheduleTasksService {
@InjectRepository(ExifEntity)
private exifRepository: Repository<ExifEntity>,
@InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION)
@InjectQueue(QueueName.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue,
@InjectQueue(QueueNameEnum.VIDEO_CONVERSION)
@InjectQueue(QueueName.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
@InjectQueue(QueueNameEnum.METADATA_EXTRACTION)
@InjectQueue(QueueName.METADATA_EXTRACTION)
private metadataExtractionQueue: Queue<IMetadataExtractionJob>,
@InjectQueue(QueueNameEnum.USER_DELETION)
@InjectQueue(QueueName.USER_DELETION)
private userDeletionQueue: Queue<IUserDeletionJob>,
private configService: ConfigService,
@ -62,11 +51,7 @@ export class ScheduleTasksService {
}
for (const asset of assets) {
await this.thumbnailGeneratorQueue.add(
generateWEBPThumbnailProcessorName,
{ asset: asset },
{ jobId: randomUUID() },
);
await this.thumbnailGeneratorQueue.add(JobName.GENERATE_WEBP_THUMBNAIL, { asset: asset });
}
}
@ -84,7 +69,7 @@ export class ScheduleTasksService {
});
for (const asset of assets) {
await this.videoConversionQueue.add(mp4ConversionProcessorName, { asset }, { jobId: randomUUID() });
await this.videoConversionQueue.add(JobName.MP4_CONVERSION, { asset });
}
}
@ -103,10 +88,9 @@ export class ScheduleTasksService {
for (const exif of exifInfo) {
await this.metadataExtractionQueue.add(
reverseGeocodingProcessorName,
JobName.REVERSE_GEOCODING,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
{ exifId: exif.id, latitude: exif.latitude!, longitude: exif.longitude! },
{ jobId: randomUUID() },
);
}
}
@ -122,17 +106,9 @@ export class ScheduleTasksService {
for (const asset of exifAssets) {
if (asset.type === AssetType.VIDEO) {
await this.metadataExtractionQueue.add(
videoMetadataExtractionProcessorName,
{ asset, fileName: asset.id },
{ jobId: randomUUID() },
);
await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName: asset.id });
} else {
await this.metadataExtractionQueue.add(
exifExtractionProcessorName,
{ asset, fileName: asset.id },
{ jobId: randomUUID() },
);
await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName: asset.id });
}
}
}
@ -142,7 +118,7 @@ export class ScheduleTasksService {
const usersToDelete = await this.userRepository.find({ withDeleted: true, where: { deletedAt: Not(IsNull()) } });
for (const user of usersToDelete) {
if (userUtils.isReadyForDeletion(user)) {
await this.userDeletionQueue.add(userDeletionProcessorName, { user: user }, { jobId: randomUUID() });
await this.userDeletionQueue.add(JobName.USER_DELETION, { user });
}
}
}

View file

@ -1,23 +1,17 @@
import { QueueNameEnum } from '@app/job';
import { QueueName } from '@app/job';
import { InjectQueue } from '@nestjs/bull';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Queue } from 'bull';
import { randomUUID } from 'node:crypto';
@Injectable()
export class MicroservicesService implements OnModuleInit {
constructor(
@InjectQueue(QueueNameEnum.CHECKSUM_GENERATION)
@InjectQueue(QueueName.CHECKSUM_GENERATION)
private generateChecksumQueue: Queue,
) {}
async onModuleInit() {
await this.generateChecksumQueue.add(
{},
{
jobId: randomUUID(),
delay: 10000, // wait for migration
},
);
// wait for migration
await this.generateChecksumQueue.add({}, { delay: 10000 });
}
}

View file

@ -4,27 +4,22 @@ import {
IMetadataExtractionJob,
IThumbnailGenerationJob,
IVideoTranscodeJob,
assetUploadedProcessorName,
exifExtractionProcessorName,
generateJPEGThumbnailProcessorName,
mp4ConversionProcessorName,
videoMetadataExtractionProcessorName,
QueueNameEnum,
QueueName,
JobName,
} from '@app/job';
import { InjectQueue, Process, Processor } from '@nestjs/bull';
import { Job, Queue } from 'bull';
import { randomUUID } from 'crypto';
@Processor(QueueNameEnum.ASSET_UPLOADED)
@Processor(QueueName.ASSET_UPLOADED)
export class AssetUploadedProcessor {
constructor(
@InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION)
@InjectQueue(QueueName.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue<IThumbnailGenerationJob>,
@InjectQueue(QueueNameEnum.METADATA_EXTRACTION)
@InjectQueue(QueueName.METADATA_EXTRACTION)
private metadataExtractionQueue: Queue<IMetadataExtractionJob>,
@InjectQueue(QueueNameEnum.VIDEO_CONVERSION)
@InjectQueue(QueueName.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
) {}
@ -37,30 +32,19 @@ export class AssetUploadedProcessor {
*
* @param job asset-uploaded
*/
@Process(assetUploadedProcessorName)
@Process(JobName.ASSET_UPLOADED)
async processUploadedVideo(job: Job<IAssetUploadedJob>) {
const { asset, fileName } = job.data;
await this.thumbnailGeneratorQueue.add(generateJPEGThumbnailProcessorName, { asset }, { jobId: randomUUID() });
await this.thumbnailGeneratorQueue.add(JobName.GENERATE_JPEG_THUMBNAIL, { asset });
// Video Conversion
if (asset.type == AssetType.VIDEO) {
await this.videoConversionQueue.add(mp4ConversionProcessorName, { asset }, { jobId: randomUUID() });
await this.metadataExtractionQueue.add(
videoMetadataExtractionProcessorName,
{ asset, fileName },
{ jobId: randomUUID() },
);
await this.videoConversionQueue.add(JobName.MP4_CONVERSION, { asset });
await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName });
} else {
// Extract Metadata/Exif for Images - Currently the EXIF library on the web cannot extract EXIF for video yet
await this.metadataExtractionQueue.add(
exifExtractionProcessorName,
{
asset,
fileName,
},
{ jobId: randomUUID() },
);
await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName });
}
}
}

View file

@ -1,5 +1,5 @@
import { AssetEntity } from '@app/infra';
import { QueueNameEnum } from '@app/job';
import { QueueName } from '@app/job';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
@ -8,7 +8,7 @@ import fs from 'node:fs';
import { FindOptionsWhere, IsNull, MoreThan, QueryFailedError, Repository } from 'typeorm';
// TODO: just temporary task to generate previous uploaded assets.
@Processor(QueueNameEnum.CHECKSUM_GENERATION)
@Processor(QueueName.CHECKSUM_GENERATION)
export class GenerateChecksumProcessor {
constructor(
@InjectRepository(AssetEntity)

View file

@ -1,6 +1,6 @@
import { AssetEntity } from '@app/infra';
import { SmartInfoEntity } from '@app/infra';
import { MachineLearningJobNameEnum, QueueNameEnum } from '@app/job';
import { QueueName, JobName } from '@app/job';
import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interface';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
@ -11,14 +11,14 @@ import { Repository } from 'typeorm';
const immich_machine_learning_url = process.env.IMMICH_MACHINE_LEARNING_URL || 'http://immich-machine-learning:3003';
@Processor(QueueNameEnum.MACHINE_LEARNING)
@Processor(QueueName.MACHINE_LEARNING)
export class MachineLearningProcessor {
constructor(
@InjectRepository(SmartInfoEntity)
private smartInfoRepository: Repository<SmartInfoEntity>,
) {}
@Process({ name: MachineLearningJobNameEnum.IMAGE_TAGGING, concurrency: 2 })
@Process({ name: JobName.IMAGE_TAGGING, concurrency: 2 })
async tagImage(job: Job<IMachineLearningJob>) {
const { asset } = job.data;
@ -37,7 +37,7 @@ export class MachineLearningProcessor {
}
}
@Process({ name: MachineLearningJobNameEnum.OBJECT_DETECTION, concurrency: 2 })
@Process({ name: JobName.OBJECT_DETECTION, concurrency: 2 })
async detectObject(job: Job<IMachineLearningJob>) {
try {
const { asset }: { asset: AssetEntity } = job.data;

View file

@ -2,11 +2,9 @@ import { AssetEntity, ExifEntity } from '@app/infra';
import {
IExifExtractionProcessor,
IVideoLengthExtractionProcessor,
exifExtractionProcessorName,
videoMetadataExtractionProcessorName,
reverseGeocodingProcessorName,
IReverseGeocodingProcessor,
QueueNameEnum,
QueueName,
JobName,
} from '@app/job';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
@ -73,7 +71,7 @@ export type GeoData = {
distance: number;
};
@Processor(QueueNameEnum.METADATA_EXTRACTION)
@Processor(QueueName.METADATA_EXTRACTION)
export class MetadataExtractionProcessor {
private logger = new Logger(MetadataExtractionProcessor.name);
private isGeocodeInitialized = false;
@ -140,7 +138,7 @@ export class MetadataExtractionProcessor {
return { country, state, city };
}
@Process(exifExtractionProcessorName)
@Process(JobName.EXIF_EXTRACTION)
async extractExifInfo(job: Job<IExifExtractionProcessor>) {
try {
const { asset, fileName }: { asset: AssetEntity; fileName: string } = job.data;
@ -262,7 +260,7 @@ export class MetadataExtractionProcessor {
}
}
@Process({ name: reverseGeocodingProcessorName })
@Process({ name: JobName.REVERSE_GEOCODING })
async reverseGeocoding(job: Job<IReverseGeocodingProcessor>) {
if (this.isGeocodeInitialized) {
const { latitude, longitude } = job.data;
@ -271,7 +269,7 @@ export class MetadataExtractionProcessor {
}
}
@Process({ name: videoMetadataExtractionProcessorName, concurrency: 2 })
@Process({ name: JobName.EXTRACT_VIDEO_METADATA, concurrency: 2 })
async extractVideoMetadata(job: Job<IVideoLengthExtractionProcessor>) {
const { asset, fileName } = job.data;

View file

@ -1,14 +1,14 @@
import { APP_UPLOAD_LOCATION } from '@app/common';
import { AssetEntity } from '@app/infra';
import { ImmichConfigService } from '@app/immich-config';
import { QueueNameEnum, templateMigrationProcessorName, updateTemplateProcessorName } from '@app/job';
import { QueueName, JobName } from '@app/job';
import { StorageService } from '@app/storage';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
@Processor(QueueNameEnum.STORAGE_MIGRATION)
@Processor(QueueName.CONFIG)
export class StorageMigrationProcessor {
readonly logger: Logger = new Logger(StorageMigrationProcessor.name);
@ -24,7 +24,7 @@ export class StorageMigrationProcessor {
* Migration process when a new user set a new storage template.
* @param job
*/
@Process({ name: templateMigrationProcessorName, concurrency: 100 })
@Process({ name: JobName.TEMPLATE_MIGRATION, concurrency: 100 })
async templateMigration() {
console.time('migrating-time');
const assets = await this.assetRepository.find({
@ -54,7 +54,7 @@ export class StorageMigrationProcessor {
* This is to ensure the synchronization between processes.
* @param job
*/
@Process({ name: updateTemplateProcessorName, concurrency: 1 })
@Process({ name: JobName.CONFIG_CHANGE, concurrency: 1 })
async updateTemplate() {
await this.immichConfigService.refreshConfig();
}

View file

@ -1,20 +1,12 @@
import { APP_UPLOAD_LOCATION } from '@app/common';
import { AssetEntity, AssetType } from '@app/infra';
import {
WebpGeneratorProcessor,
generateJPEGThumbnailProcessorName,
generateWEBPThumbnailProcessorName,
JpegGeneratorProcessor,
QueueNameEnum,
MachineLearningJobNameEnum,
} from '@app/job';
import { WebpGeneratorProcessor, JpegGeneratorProcessor, QueueName, JobName } from '@app/job';
import { InjectQueue, Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { mapAsset } from 'apps/immich/src/api-v1/asset/response-dto/asset-response.dto';
import { Job, Queue } from 'bull';
import ffmpeg from 'fluent-ffmpeg';
import { randomUUID } from 'node:crypto';
import { existsSync, mkdirSync } from 'node:fs';
import sanitize from 'sanitize-filename';
import sharp from 'sharp';
@ -23,7 +15,7 @@ import { join } from 'path';
import { CommunicationGateway } from 'apps/immich/src/api-v1/communication/communication.gateway';
import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interface';
@Processor(QueueNameEnum.THUMBNAIL_GENERATION)
@Processor(QueueName.THUMBNAIL_GENERATION)
export class ThumbnailGeneratorProcessor {
readonly logger: Logger = new Logger(ThumbnailGeneratorProcessor.name);
@ -31,16 +23,16 @@ export class ThumbnailGeneratorProcessor {
@InjectRepository(AssetEntity)
private assetRepository: Repository<AssetEntity>,
@InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION)
@InjectQueue(QueueName.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue,
private wsCommunicationGateway: CommunicationGateway,
@InjectQueue(QueueNameEnum.MACHINE_LEARNING)
@InjectQueue(QueueName.MACHINE_LEARNING)
private machineLearningQueue: Queue<IMachineLearningJob>,
) {}
@Process({ name: generateJPEGThumbnailProcessorName, concurrency: 3 })
@Process({ name: JobName.GENERATE_JPEG_THUMBNAIL, concurrency: 3 })
async generateJPEGThumbnail(job: Job<JpegGeneratorProcessor>) {
const basePath = APP_UPLOAD_LOCATION;
@ -70,13 +62,10 @@ export class ThumbnailGeneratorProcessor {
// Update resize path to send to generate webp queue
asset.resizePath = jpegThumbnailPath;
await this.thumbnailGeneratorQueue.add(generateWEBPThumbnailProcessorName, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(
MachineLearningJobNameEnum.OBJECT_DETECTION,
{ asset },
{ jobId: randomUUID() },
);
await this.thumbnailGeneratorQueue.add(JobName.GENERATE_WEBP_THUMBNAIL, { asset });
await this.machineLearningQueue.add(JobName.IMAGE_TAGGING, { asset });
await this.machineLearningQueue.add(JobName.OBJECT_DETECTION, { asset });
this.wsCommunicationGateway.server.to(asset.userId).emit('on_upload_success', JSON.stringify(mapAsset(asset)));
}
@ -104,19 +93,15 @@ export class ThumbnailGeneratorProcessor {
// Update resize path to send to generate webp queue
asset.resizePath = jpegThumbnailPath;
await this.thumbnailGeneratorQueue.add(generateWEBPThumbnailProcessorName, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(
MachineLearningJobNameEnum.OBJECT_DETECTION,
{ asset },
{ jobId: randomUUID() },
);
await this.thumbnailGeneratorQueue.add(JobName.GENERATE_WEBP_THUMBNAIL, { asset });
await this.machineLearningQueue.add(JobName.IMAGE_TAGGING, { asset });
await this.machineLearningQueue.add(JobName.OBJECT_DETECTION, { asset });
this.wsCommunicationGateway.server.to(asset.userId).emit('on_upload_success', JSON.stringify(mapAsset(asset)));
}
}
@Process({ name: generateWEBPThumbnailProcessorName, concurrency: 3 })
@Process({ name: JobName.GENERATE_WEBP_THUMBNAIL, concurrency: 3 })
async generateWepbThumbnail(job: Job<WebpGeneratorProcessor>) {
const { asset } = job.data;

View file

@ -1,6 +1,6 @@
import { APP_UPLOAD_LOCATION, userUtils } from '@app/common';
import { APIKeyEntity, AssetEntity, UserEntity } from '@app/infra';
import { QueueNameEnum, userDeletionProcessorName } from '@app/job';
import { QueueName, JobName } from '@app/job';
import { IUserDeletionJob } from '@app/job/interfaces/user-deletion.interface';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
@ -10,7 +10,7 @@ import { join } from 'path';
import fs from 'fs';
import { Repository } from 'typeorm';
@Processor(QueueNameEnum.USER_DELETION)
@Processor(QueueName.USER_DELETION)
export class UserDeletionProcessor {
private logger = new Logger(UserDeletionProcessor.name);
@ -25,7 +25,7 @@ export class UserDeletionProcessor {
private apiKeyRepository: Repository<APIKeyEntity>,
) {}
@Process(userDeletionProcessorName)
@Process(JobName.USER_DELETION)
async processUserDeletion(job: Job<IUserDeletionJob>) {
const { user } = job.data;

View file

@ -1,7 +1,6 @@
import { APP_UPLOAD_LOCATION } from '@app/common/constants';
import { AssetEntity } from '@app/infra';
import { QueueNameEnum } from '@app/job';
import { mp4ConversionProcessorName } from '@app/job/constants/job-name.constant';
import { QueueName, JobName } from '@app/job';
import { IMp4ConversionProcessor } from '@app/job/interfaces/video-transcode.interface';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
@ -12,7 +11,7 @@ import { existsSync, mkdirSync } from 'fs';
import { ImmichConfigService } from 'libs/immich-config/src';
import { Repository } from 'typeorm';
@Processor(QueueNameEnum.VIDEO_CONVERSION)
@Processor(QueueName.VIDEO_CONVERSION)
export class VideoTranscodeProcessor {
constructor(
@InjectRepository(AssetEntity)
@ -20,7 +19,7 @@ export class VideoTranscodeProcessor {
private immichConfigService: ImmichConfigService,
) {}
@Process({ name: mp4ConversionProcessorName, concurrency: 2 })
@Process({ name: JobName.MP4_CONVERSION, concurrency: 2 })
async mp4Conversion(job: Job<IMp4ConversionProcessor>) {
const { asset } = job.data;

View file

@ -1,32 +1,16 @@
import { BullModuleOptions } from '@nestjs/bull';
import { QueueNameEnum } from './queue-name.constant';
import { QueueName } from './queue-name.constant';
/**
* Shared queues between apps and microservices
*/
export const immichSharedQueues: BullModuleOptions[] = [
{
name: QueueNameEnum.USER_DELETION,
},
{
name: QueueNameEnum.THUMBNAIL_GENERATION,
},
{
name: QueueNameEnum.ASSET_UPLOADED,
},
{
name: QueueNameEnum.METADATA_EXTRACTION,
},
{
name: QueueNameEnum.VIDEO_CONVERSION,
},
{
name: QueueNameEnum.CHECKSUM_GENERATION,
},
{
name: QueueNameEnum.MACHINE_LEARNING,
},
{
name: QueueNameEnum.STORAGE_MIGRATION,
},
{ name: QueueName.USER_DELETION },
{ name: QueueName.THUMBNAIL_GENERATION },
{ name: QueueName.ASSET_UPLOADED },
{ name: QueueName.METADATA_EXTRACTION },
{ name: QueueName.VIDEO_CONVERSION },
{ name: QueueName.CHECKSUM_GENERATION },
{ name: QueueName.MACHINE_LEARNING },
{ name: QueueName.CONFIG },
];

View file

@ -1,42 +1,15 @@
/**
* Asset Uploaded Queue Jobs
*/
export const assetUploadedProcessorName = 'asset-uploaded';
/**
* Video Conversion Queue Jobs
**/
export const mp4ConversionProcessorName = 'mp4-conversion';
/**
* Thumbnail Generator Queue Jobs
*/
export const generateJPEGThumbnailProcessorName = 'generate-jpeg-thumbnail';
export const generateWEBPThumbnailProcessorName = 'generate-webp-thumbnail';
/**
* Metadata Extraction Queue Jobs
*/
export const exifExtractionProcessorName = 'exif-extraction';
export const videoMetadataExtractionProcessorName = 'extract-video-metadata';
export const reverseGeocodingProcessorName = 'reverse-geocoding';
/**
* Machine learning Queue Jobs
*/
export enum MachineLearningJobNameEnum {
export enum JobName {
ASSET_UPLOADED = 'asset-uploaded',
MP4_CONVERSION = 'mp4-conversion',
GENERATE_JPEG_THUMBNAIL = 'generate-jpeg-thumbnail',
GENERATE_WEBP_THUMBNAIL = 'generate-webp-thumbnail',
EXIF_EXTRACTION = 'exif-extraction',
EXTRACT_VIDEO_METADATA = 'extract-video-metadata',
REVERSE_GEOCODING = 'reverse-geocoding',
USER_DELETION = 'user-deletion',
TEMPLATE_MIGRATION = 'template-migration',
CONFIG_CHANGE = 'config-change',
OBJECT_DETECTION = 'detect-object',
IMAGE_TAGGING = 'tag-image',
DELETE_FILE_ON_DISK = 'delete-file-on-disk',
}
/**
* User deletion Queue Jobs
*/
export const userDeletionProcessorName = 'user-deletion';
/**
* Storage Template Migration Queue Jobs
*/
export const templateMigrationProcessorName = 'template-migration';
export const updateTemplateProcessorName = 'update-template';

View file

@ -1,4 +1,4 @@
export enum QueueNameEnum {
export enum QueueName {
THUMBNAIL_GENERATION = 'thumbnail-generation-queue',
METADATA_EXTRACTION = 'metadata-extraction-queue',
VIDEO_CONVERSION = 'video-conversion-queue',
@ -6,5 +6,6 @@ export enum QueueNameEnum {
ASSET_UPLOADED = 'asset-uploaded-queue',
MACHINE_LEARNING = 'machine-learning-queue',
USER_DELETION = 'user-deletion-queue',
STORAGE_MIGRATION = 'storage-template-migration',
CONFIG = 'config-queue',
BACKGROUND_TASK = 'background-task',
}