diff --git a/server/src/interfaces/cron.interface.ts b/server/src/interfaces/cron.interface.ts new file mode 100644 index 0000000000..ceb554864a --- /dev/null +++ b/server/src/interfaces/cron.interface.ts @@ -0,0 +1,20 @@ +export const ICronRepository = 'ICronRepository'; + +type CronBase = { + name: string; + start?: boolean; +}; + +export type CronCreate = CronBase & { + expression: string; + onTick: () => void; +}; + +export type CronUpdate = CronBase & { + expression?: string; +}; + +export interface ICronRepository { + create(cron: CronCreate): void; + update(cron: CronUpdate): void; +} diff --git a/server/src/interfaces/job.interface.ts b/server/src/interfaces/job.interface.ts index 64a9e5cfe3..7976f81302 100644 --- a/server/src/interfaces/job.interface.ts +++ b/server/src/interfaces/job.interface.ts @@ -315,8 +315,6 @@ export interface IJobRepository { setup(options: { services: ClassConstructor[] }): void; startWorkers(): void; run(job: JobItem): Promise; - addCronJob(name: string, expression: string, onTick: () => void, start?: boolean): void; - updateCronJob(name: string, expression?: string, start?: boolean): void; setConcurrency(queueName: QueueName, concurrency: number): void; queue(item: JobItem): Promise; queueAll(items: JobItem[]): Promise; diff --git a/server/src/repositories/cron.repository.ts b/server/src/repositories/cron.repository.ts new file mode 100644 index 0000000000..fd7589a034 --- /dev/null +++ b/server/src/repositories/cron.repository.ts @@ -0,0 +1,52 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { SchedulerRegistry } from '@nestjs/schedule'; +import { CronJob, CronTime } from 'cron'; +import { CronCreate, CronUpdate, ICronRepository } from 'src/interfaces/cron.interface'; +import { ILoggerRepository } from 'src/interfaces/logger.interface'; + +@Injectable() +export class CronRepository implements ICronRepository { + constructor( + private schedulerRegistry: SchedulerRegistry, + @Inject(ILoggerRepository) private logger: ILoggerRepository, + ) { + this.logger.setContext(CronRepository.name); + } + + create({ name, expression, onTick, start = true }: CronCreate): void { + const job = new CronJob( + expression, + onTick, + // function to run onComplete + undefined, + // whether it should start directly + start, + // timezone + undefined, + // context + undefined, + // runOnInit + undefined, + // utcOffset + undefined, + // prevents memory leaking by automatically stopping when the node process finishes + true, + ); + + this.schedulerRegistry.addCronJob(name, job); + } + + update({ name, expression, start }: CronUpdate): void { + const job = this.schedulerRegistry.getCronJob(name); + if (expression) { + job.setTime(new CronTime(expression)); + } + if (start !== undefined) { + if (start) { + job.start(); + } else { + job.stop(); + } + } + } +} diff --git a/server/src/repositories/index.ts b/server/src/repositories/index.ts index e487df503c..eb6a5d6f71 100644 --- a/server/src/repositories/index.ts +++ b/server/src/repositories/index.ts @@ -6,6 +6,7 @@ import { IKeyRepository } from 'src/interfaces/api-key.interface'; import { IAssetRepository } from 'src/interfaces/asset.interface'; import { IAuditRepository } from 'src/interfaces/audit.interface'; import { IConfigRepository } from 'src/interfaces/config.interface'; +import { ICronRepository } from 'src/interfaces/cron.interface'; import { ICryptoRepository } from 'src/interfaces/crypto.interface'; import { IDatabaseRepository } from 'src/interfaces/database.interface'; import { IEventRepository } from 'src/interfaces/event.interface'; @@ -44,6 +45,7 @@ import { ApiKeyRepository } from 'src/repositories/api-key.repository'; import { AssetRepository } from 'src/repositories/asset.repository'; import { AuditRepository } from 'src/repositories/audit.repository'; import { ConfigRepository } from 'src/repositories/config.repository'; +import { CronRepository } from 'src/repositories/cron.repository'; import { CryptoRepository } from 'src/repositories/crypto.repository'; import { DatabaseRepository } from 'src/repositories/database.repository'; import { EventRepository } from 'src/repositories/event.repository'; @@ -83,6 +85,7 @@ export const repositories = [ { provide: IAssetRepository, useClass: AssetRepository }, { provide: IAuditRepository, useClass: AuditRepository }, { provide: IConfigRepository, useClass: ConfigRepository }, + { provide: ICronRepository, useClass: CronRepository }, { provide: ICryptoRepository, useClass: CryptoRepository }, { provide: IDatabaseRepository, useClass: DatabaseRepository }, { provide: IEventRepository, useClass: EventRepository }, diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 253599fbf3..c6c2947617 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -4,7 +4,6 @@ import { ModuleRef, Reflector } from '@nestjs/core'; import { SchedulerRegistry } from '@nestjs/schedule'; import { JobsOptions, Queue, Worker } from 'bullmq'; import { ClassConstructor } from 'class-transformer'; -import { CronJob, CronTime } from 'cron'; import { setTimeout } from 'node:timers/promises'; import { JobConfig } from 'src/decorators'; import { MetadataKey } from 'src/enum'; @@ -119,43 +118,6 @@ export class JobRepository implements IJobRepository { return item.handler(data); } - addCronJob(name: string, expression: string, onTick: () => void, start = true): void { - const job = new CronJob( - expression, - onTick, - // function to run onComplete - undefined, - // whether it should start directly - start, - // timezone - undefined, - // context - undefined, - // runOnInit - undefined, - // utcOffset - undefined, - // prevents memory leaking by automatically stopping when the node process finishes - true, - ); - - this.schedulerRegistry.addCronJob(name, job); - } - - updateCronJob(name: string, expression?: string, start?: boolean): void { - const job = this.schedulerRegistry.getCronJob(name); - if (expression) { - job.setTime(new CronTime(expression)); - } - if (start !== undefined) { - if (start) { - job.start(); - } else { - job.stop(); - } - } - } - setConcurrency(queueName: QueueName, concurrency: number) { const worker = this.workers[queueName]; if (!worker) { diff --git a/server/src/services/backup.service.spec.ts b/server/src/services/backup.service.spec.ts index df2f80b37c..82537cb9f6 100644 --- a/server/src/services/backup.service.spec.ts +++ b/server/src/services/backup.service.spec.ts @@ -3,8 +3,9 @@ import { defaults, SystemConfig } from 'src/config'; import { StorageCore } from 'src/cores/storage.core'; import { ImmichWorker, StorageFolder } from 'src/enum'; import { IConfigRepository } from 'src/interfaces/config.interface'; +import { ICronRepository } from 'src/interfaces/cron.interface'; import { IDatabaseRepository } from 'src/interfaces/database.interface'; -import { IJobRepository, JobStatus } from 'src/interfaces/job.interface'; +import { JobStatus } from 'src/interfaces/job.interface'; import { IProcessRepository } from 'src/interfaces/process.interface'; import { IStorageRepository } from 'src/interfaces/storage.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; @@ -18,13 +19,13 @@ describe(BackupService.name, () => { let databaseMock: Mocked; let configMock: Mocked; - let jobMock: Mocked; + let cronMock: Mocked; let processMock: Mocked; let storageMock: Mocked; let systemMock: Mocked; beforeEach(() => { - ({ sut, configMock, databaseMock, jobMock, processMock, storageMock, systemMock } = newTestService(BackupService)); + ({ sut, cronMock, configMock, databaseMock, processMock, storageMock, systemMock } = newTestService(BackupService)); }); it('should work', () => { @@ -37,7 +38,7 @@ describe(BackupService.name, () => { await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); - expect(jobMock.addCronJob).toHaveBeenCalled(); + expect(cronMock.create).toHaveBeenCalled(); }); it('should not initialize backup database cron job when lock is taken', async () => { @@ -45,14 +46,14 @@ describe(BackupService.name, () => { await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); - expect(jobMock.addCronJob).not.toHaveBeenCalled(); + expect(cronMock.create).not.toHaveBeenCalled(); }); it('should not initialise backup database job when running on microservices', async () => { configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES); await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); - expect(jobMock.addCronJob).not.toHaveBeenCalled(); + expect(cronMock.create).not.toHaveBeenCalled(); }); }); @@ -75,15 +76,15 @@ describe(BackupService.name, () => { } as SystemConfig, }); - expect(jobMock.updateCronJob).toHaveBeenCalledWith('backupDatabase', '0 1 * * *', true); - expect(jobMock.updateCronJob).toHaveBeenCalled(); + expect(cronMock.update).toHaveBeenCalledWith({ name: 'backupDatabase', expression: '0 1 * * *', start: true }); + expect(cronMock.update).toHaveBeenCalled(); }); it('should do nothing if instance does not have the backup database lock', async () => { databaseMock.tryLock.mockResolvedValue(false); await sut.onConfigInit({ newConfig: defaults }); sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig, oldConfig: defaults }); - expect(jobMock.updateCronJob).not.toHaveBeenCalled(); + expect(cronMock.update).not.toHaveBeenCalled(); }); }); diff --git a/server/src/services/backup.service.ts b/server/src/services/backup.service.ts index 1febe79130..ef76d4fb27 100644 --- a/server/src/services/backup.service.ts +++ b/server/src/services/backup.service.ts @@ -27,12 +27,12 @@ export class BackupService extends BaseService { this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase); if (this.backupLock) { - this.jobRepository.addCronJob( - 'backupDatabase', - database.cronExpression, - () => handlePromiseError(this.jobRepository.queue({ name: JobName.BACKUP_DATABASE }), this.logger), - database.enabled, - ); + this.cronRepository.create({ + name: 'backupDatabase', + expression: database.cronExpression, + onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.BACKUP_DATABASE }), this.logger), + start: database.enabled, + }); } } @@ -42,7 +42,11 @@ export class BackupService extends BaseService { return; } - this.jobRepository.updateCronJob('backupDatabase', backup.database.cronExpression, backup.database.enabled); + this.cronRepository.update({ + name: 'backupDatabase', + expression: backup.database.cronExpression, + start: backup.database.enabled, + }); } @OnEvent({ name: 'config.validate' }) diff --git a/server/src/services/base.service.ts b/server/src/services/base.service.ts index dc7dab102c..3630d69c18 100644 --- a/server/src/services/base.service.ts +++ b/server/src/services/base.service.ts @@ -12,6 +12,7 @@ import { IKeyRepository } from 'src/interfaces/api-key.interface'; import { IAssetRepository } from 'src/interfaces/asset.interface'; import { IAuditRepository } from 'src/interfaces/audit.interface'; import { IConfigRepository } from 'src/interfaces/config.interface'; +import { ICronRepository } from 'src/interfaces/cron.interface'; import { ICryptoRepository } from 'src/interfaces/crypto.interface'; import { IDatabaseRepository } from 'src/interfaces/database.interface'; import { IEventRepository } from 'src/interfaces/event.interface'; @@ -57,6 +58,7 @@ export class BaseService { @Inject(IAlbumUserRepository) protected albumUserRepository: IAlbumUserRepository, @Inject(IAssetRepository) protected assetRepository: IAssetRepository, @Inject(IConfigRepository) protected configRepository: IConfigRepository, + @Inject(ICronRepository) protected cronRepository: ICronRepository, @Inject(ICryptoRepository) protected cryptoRepository: ICryptoRepository, @Inject(IDatabaseRepository) protected databaseRepository: IDatabaseRepository, @Inject(IEventRepository) protected eventRepository: IEventRepository, diff --git a/server/src/services/library.service.spec.ts b/server/src/services/library.service.spec.ts index 879965e82b..37bddab136 100644 --- a/server/src/services/library.service.spec.ts +++ b/server/src/services/library.service.spec.ts @@ -6,6 +6,7 @@ import { UserEntity } from 'src/entities/user.entity'; import { AssetType, ImmichWorker } from 'src/enum'; import { IAssetRepository } from 'src/interfaces/asset.interface'; import { IConfigRepository } from 'src/interfaces/config.interface'; +import { ICronRepository } from 'src/interfaces/cron.interface'; import { IDatabaseRepository } from 'src/interfaces/database.interface'; import { IJobRepository, @@ -36,13 +37,15 @@ describe(LibraryService.name, () => { let assetMock: Mocked; let configMock: Mocked; + let cronMock: Mocked; let databaseMock: Mocked; let jobMock: Mocked; let libraryMock: Mocked; let storageMock: Mocked; beforeEach(() => { - ({ sut, assetMock, configMock, databaseMock, jobMock, libraryMock, storageMock } = newTestService(LibraryService)); + ({ sut, assetMock, configMock, cronMock, databaseMock, jobMock, libraryMock, storageMock } = + newTestService(LibraryService)); databaseMock.tryLock.mockResolvedValue(true); configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES); @@ -56,7 +59,7 @@ describe(LibraryService.name, () => { it('should init cron job and handle config changes', async () => { await sut.onConfigInit({ newConfig: defaults }); - expect(jobMock.addCronJob).toHaveBeenCalled(); + expect(cronMock.create).toHaveBeenCalled(); await sut.onConfigUpdate({ oldConfig: defaults, @@ -71,7 +74,7 @@ describe(LibraryService.name, () => { } as SystemConfig, }); - expect(jobMock.updateCronJob).toHaveBeenCalledWith('libraryScan', '0 1 * * *', true); + expect(cronMock.update).toHaveBeenCalledWith({ name: 'libraryScan', expression: '0 1 * * *', start: true }); }); it('should initialize watcher for all external libraries', async () => { @@ -117,14 +120,14 @@ describe(LibraryService.name, () => { await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig }); - expect(jobMock.addCronJob).not.toHaveBeenCalled(); + expect(cronMock.create).not.toHaveBeenCalled(); }); it('should not initialize watcher or library scan job when running on api', async () => { configMock.getWorker.mockReturnValue(ImmichWorker.API); await sut.onConfigInit({ newConfig: systemConfigStub.libraryScan as SystemConfig }); - expect(jobMock.addCronJob).not.toHaveBeenCalled(); + expect(cronMock.create).not.toHaveBeenCalled(); }); }); @@ -138,7 +141,7 @@ describe(LibraryService.name, () => { databaseMock.tryLock.mockResolvedValue(false); await sut.onConfigInit({ newConfig: defaults }); await sut.onConfigUpdate({ newConfig: systemConfigStub.libraryScan as SystemConfig, oldConfig: defaults }); - expect(jobMock.updateCronJob).not.toHaveBeenCalled(); + expect(cronMock.update).not.toHaveBeenCalled(); }); it('should update cron job and enable watching', async () => { @@ -148,11 +151,11 @@ describe(LibraryService.name, () => { oldConfig: defaults, }); - expect(jobMock.updateCronJob).toHaveBeenCalledWith( - 'libraryScan', - systemConfigStub.libraryScan.library.scan.cronExpression, - systemConfigStub.libraryScan.library.scan.enabled, - ); + expect(cronMock.update).toHaveBeenCalledWith({ + name: 'libraryScan', + expression: systemConfigStub.libraryScan.library.scan.cronExpression, + start: systemConfigStub.libraryScan.library.scan.enabled, + }); }); it('should update cron job and disable watching', async () => { @@ -166,11 +169,11 @@ describe(LibraryService.name, () => { oldConfig: defaults, }); - expect(jobMock.updateCronJob).toHaveBeenCalledWith( - 'libraryScan', - systemConfigStub.libraryScan.library.scan.cronExpression, - systemConfigStub.libraryScan.library.scan.enabled, - ); + expect(cronMock.update).toHaveBeenCalledWith({ + name: 'libraryScan', + expression: systemConfigStub.libraryScan.library.scan.cronExpression, + start: systemConfigStub.libraryScan.library.scan.enabled, + }); }); }); diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index 6e10863793..bd32be9a2d 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -48,12 +48,13 @@ export class LibraryService extends BaseService { this.watchLibraries = this.lock && watch.enabled; if (this.lock) { - this.jobRepository.addCronJob( - 'libraryScan', - scan.cronExpression, - () => handlePromiseError(this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ALL }), this.logger), - scan.enabled, - ); + this.cronRepository.create({ + name: 'libraryScan', + expression: scan.cronExpression, + onTick: () => + handlePromiseError(this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ALL }), this.logger), + start: scan.enabled, + }); } if (this.watchLibraries) { @@ -67,7 +68,11 @@ export class LibraryService extends BaseService { return; } - this.jobRepository.updateCronJob('libraryScan', library.scan.cronExpression, library.scan.enabled); + this.cronRepository.update({ + name: 'libraryScan', + expression: library.scan.cronExpression, + start: library.scan.enabled, + }); if (library.watch.enabled !== this.watchLibraries) { // Watch configuration changed, update accordingly diff --git a/server/test/repositories/cron.repository.mock.ts b/server/test/repositories/cron.repository.mock.ts new file mode 100644 index 0000000000..2b0784e8ac --- /dev/null +++ b/server/test/repositories/cron.repository.mock.ts @@ -0,0 +1,9 @@ +import { ICronRepository } from 'src/interfaces/cron.interface'; +import { Mocked, vitest } from 'vitest'; + +export const newCronRepositoryMock = (): Mocked => { + return { + create: vitest.fn(), + update: vitest.fn(), + }; +}; diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index 2875c4405b..e9557af59b 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -6,8 +6,6 @@ export const newJobRepositoryMock = (): Mocked => { setup: vitest.fn(), startWorkers: vitest.fn(), run: vitest.fn(), - addCronJob: vitest.fn(), - updateCronJob: vitest.fn(), setConcurrency: vitest.fn(), empty: vitest.fn(), pause: vitest.fn(), diff --git a/server/test/utils.ts b/server/test/utils.ts index d37af5118d..7f5b75020c 100644 --- a/server/test/utils.ts +++ b/server/test/utils.ts @@ -12,6 +12,7 @@ import { newKeyRepositoryMock } from 'test/repositories/api-key.repository.mock' import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock'; import { newAuditRepositoryMock } from 'test/repositories/audit.repository.mock'; import { newConfigRepositoryMock } from 'test/repositories/config.repository.mock'; +import { newCronRepositoryMock } from 'test/repositories/cron.repository.mock'; import { newCryptoRepositoryMock } from 'test/repositories/crypto.repository.mock'; import { newDatabaseRepositoryMock } from 'test/repositories/database.repository.mock'; import { newEventRepositoryMock } from 'test/repositories/event.repository.mock'; @@ -62,6 +63,7 @@ export const newTestService = ( const accessMock = newAccessRepositoryMock(); const loggerMock = newLoggerRepositoryMock(); + const cronMock = newCronRepositoryMock(); const cryptoMock = newCryptoRepositoryMock(); const activityMock = newActivityRepositoryMock(); const auditMock = newAuditRepositoryMock(); @@ -108,6 +110,7 @@ export const newTestService = ( albumUserMock, assetMock, configMock, + cronMock, cryptoMock, databaseMock, eventMock, @@ -144,6 +147,7 @@ export const newTestService = ( sut, accessMock, loggerMock, + cronMock, cryptoMock, activityMock, auditMock,