1
0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-04 02:46:47 +01:00

fix: duplicated library scan jobs and api server library watch (#13734)

This commit is contained in:
Zack Pollard 2024-10-25 14:48:42 +01:00 committed by GitHub
parent 43d18ccc36
commit d95b474e58
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 49 additions and 29 deletions

View file

@ -19,7 +19,7 @@ export enum DatabaseLock {
StorageTemplateMigration = 420, StorageTemplateMigration = 420,
VersionHistory = 500, VersionHistory = 500,
CLIPDimSize = 512, CLIPDimSize = 512,
LibraryWatch = 1337, Library = 1337,
GetSystemConfig = 69, GetSystemConfig = 69,
} }

View file

@ -3,7 +3,7 @@ import { Stats } from 'node:fs';
import { defaults, SystemConfig } from 'src/config'; import { defaults, SystemConfig } from 'src/config';
import { mapLibrary } from 'src/dtos/library.dto'; import { mapLibrary } from 'src/dtos/library.dto';
import { UserEntity } from 'src/entities/user.entity'; import { UserEntity } from 'src/entities/user.entity';
import { AssetType } from 'src/enum'; import { AssetType, ImmichWorker } from 'src/enum';
import { IAssetRepository } from 'src/interfaces/asset.interface'; import { IAssetRepository } from 'src/interfaces/asset.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface'; import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { import {
@ -55,7 +55,7 @@ describe(LibraryService.name, () => {
it('should init cron job and handle config changes', async () => { it('should init cron job and handle config changes', async () => {
systemMock.get.mockResolvedValue(systemConfigStub.libraryScan); systemMock.get.mockResolvedValue(systemConfigStub.libraryScan);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
expect(jobMock.addCronJob).toHaveBeenCalled(); expect(jobMock.addCronJob).toHaveBeenCalled();
expect(systemMock.get).toHaveBeenCalled(); expect(systemMock.get).toHaveBeenCalled();
@ -91,7 +91,7 @@ describe(LibraryService.name, () => {
), ),
); );
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
expect(storageMock.watch.mock.calls).toEqual( expect(storageMock.watch.mock.calls).toEqual(
expect.arrayContaining([ expect.arrayContaining([
@ -104,7 +104,7 @@ describe(LibraryService.name, () => {
it('should not initialize watcher when watching is disabled', async () => { it('should not initialize watcher when watching is disabled', async () => {
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled); systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
expect(storageMock.watch).not.toHaveBeenCalled(); expect(storageMock.watch).not.toHaveBeenCalled();
}); });
@ -113,17 +113,32 @@ describe(LibraryService.name, () => {
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled); systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
databaseMock.tryLock.mockResolvedValue(false); databaseMock.tryLock.mockResolvedValue(false);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
expect(storageMock.watch).not.toHaveBeenCalled(); expect(storageMock.watch).not.toHaveBeenCalled();
}); });
it('should not initialize library scan cron job when lock is taken', async () => {
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
databaseMock.tryLock.mockResolvedValue(false);
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
expect(jobMock.addCronJob).not.toHaveBeenCalled();
});
it('should not initialize watcher or library scan job when running on api', async () => {
await sut.onBootstrap(ImmichWorker.API);
expect(jobMock.addCronJob).not.toHaveBeenCalled();
});
}); });
describe('onConfigUpdateEvent', () => { describe('onConfigUpdateEvent', () => {
beforeEach(async () => { beforeEach(async () => {
systemMock.get.mockResolvedValue(defaults); systemMock.get.mockResolvedValue(defaults);
databaseMock.tryLock.mockResolvedValue(true); databaseMock.tryLock.mockResolvedValue(true);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
}); });
it('should do nothing if oldConfig is not provided', async () => { it('should do nothing if oldConfig is not provided', async () => {
@ -133,7 +148,7 @@ describe(LibraryService.name, () => {
it('should do nothing if instance does not have the watch lock', async () => { it('should do nothing if instance does not have the watch lock', async () => {
databaseMock.tryLock.mockResolvedValue(false); databaseMock.tryLock.mockResolvedValue(false);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
await sut.onConfigUpdate({ newConfig: systemConfigStub.libraryScan as SystemConfig, oldConfig: defaults }); await sut.onConfigUpdate({ newConfig: systemConfigStub.libraryScan as SystemConfig, oldConfig: defaults });
expect(jobMock.updateCronJob).not.toHaveBeenCalled(); expect(jobMock.updateCronJob).not.toHaveBeenCalled();
}); });
@ -693,7 +708,7 @@ describe(LibraryService.name, () => {
const mockClose = vitest.fn(); const mockClose = vitest.fn();
storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose })); storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
await sut.delete(libraryStub.externalLibraryWithImportPaths1.id); await sut.delete(libraryStub.externalLibraryWithImportPaths1.id);
expect(mockClose).toHaveBeenCalled(); expect(mockClose).toHaveBeenCalled();
@ -827,7 +842,7 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.getAll.mockResolvedValue([]); libraryMock.getAll.mockResolvedValue([]);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
await sut.create({ await sut.create({
ownerId: authStub.admin.user.id, ownerId: authStub.admin.user.id,
importPaths: libraryStub.externalLibraryWithImportPaths1.importPaths, importPaths: libraryStub.externalLibraryWithImportPaths1.importPaths,
@ -890,7 +905,7 @@ describe(LibraryService.name, () => {
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled); systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
libraryMock.getAll.mockResolvedValue([]); libraryMock.getAll.mockResolvedValue([]);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
}); });
it('should throw an error if an import path is invalid', async () => { it('should throw an error if an import path is invalid', async () => {
@ -931,7 +946,7 @@ describe(LibraryService.name, () => {
beforeEach(async () => { beforeEach(async () => {
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled); systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
}); });
it('should not watch library', async () => { it('should not watch library', async () => {
@ -947,7 +962,7 @@ describe(LibraryService.name, () => {
beforeEach(async () => { beforeEach(async () => {
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled); systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
libraryMock.getAll.mockResolvedValue([]); libraryMock.getAll.mockResolvedValue([]);
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
}); });
it('should watch library', async () => { it('should watch library', async () => {
@ -1113,7 +1128,7 @@ describe(LibraryService.name, () => {
const mockClose = vitest.fn(); const mockClose = vitest.fn();
storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose })); storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
await sut.onBootstrap(); await sut.onBootstrap(ImmichWorker.MICROSERVICES);
await sut.onShutdown(); await sut.onShutdown();
expect(mockClose).toHaveBeenCalledTimes(2); expect(mockClose).toHaveBeenCalledTimes(2);

View file

@ -16,7 +16,7 @@ import {
} from 'src/dtos/library.dto'; } from 'src/dtos/library.dto';
import { AssetEntity } from 'src/entities/asset.entity'; import { AssetEntity } from 'src/entities/asset.entity';
import { LibraryEntity } from 'src/entities/library.entity'; import { LibraryEntity } from 'src/entities/library.entity';
import { AssetType } from 'src/enum'; import { AssetType, ImmichWorker } from 'src/enum';
import { DatabaseLock } from 'src/interfaces/database.interface'; import { DatabaseLock } from 'src/interfaces/database.interface';
import { ArgOf } from 'src/interfaces/event.interface'; import { ArgOf } from 'src/interfaces/event.interface';
import { import {
@ -36,27 +36,32 @@ import { validateCronExpression } from 'src/validation';
@Injectable() @Injectable()
export class LibraryService extends BaseService { export class LibraryService extends BaseService {
private watchLibraries = false; private watchLibraries = false;
private watchLock = false; private lock = false;
private watchers: Record<string, () => Promise<void>> = {}; private watchers: Record<string, () => Promise<void>> = {};
@OnEvent({ name: 'app.bootstrap' }) @OnEvent({ name: 'app.bootstrap' })
async onBootstrap() { async onBootstrap(workerType: ImmichWorker) {
if (workerType !== ImmichWorker.MICROSERVICES) {
return;
}
const config = await this.getConfig({ withCache: false }); const config = await this.getConfig({ withCache: false });
const { watch, scan } = config.library; const { watch, scan } = config.library;
// This ensures that library watching only occurs in one microservice // This ensures that library watching only occurs in one microservice
// TODO: we could make the lock be per-library instead of global this.lock = await this.databaseRepository.tryLock(DatabaseLock.Library);
this.watchLock = await this.databaseRepository.tryLock(DatabaseLock.LibraryWatch);
this.watchLibraries = this.watchLock && watch.enabled; this.watchLibraries = this.lock && watch.enabled;
if (this.lock) {
this.jobRepository.addCronJob( this.jobRepository.addCronJob(
'libraryScan', 'libraryScan',
scan.cronExpression, scan.cronExpression,
() => handlePromiseError(this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ALL }), this.logger), () => handlePromiseError(this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ALL }), this.logger),
scan.enabled, scan.enabled,
); );
}
if (this.watchLibraries) { if (this.watchLibraries) {
await this.watchAll(); await this.watchAll();
@ -65,7 +70,7 @@ export class LibraryService extends BaseService {
@OnEvent({ name: 'config.update', server: true }) @OnEvent({ name: 'config.update', server: true })
async onConfigUpdate({ newConfig: { library }, oldConfig }: ArgOf<'config.update'>) { async onConfigUpdate({ newConfig: { library }, oldConfig }: ArgOf<'config.update'>) {
if (!oldConfig || !this.watchLock) { if (!oldConfig || !this.lock) {
return; return;
} }
@ -180,7 +185,7 @@ export class LibraryService extends BaseService {
} }
private async unwatchAll() { private async unwatchAll() {
if (!this.watchLock) { if (!this.lock) {
return false; return false;
} }
@ -190,7 +195,7 @@ export class LibraryService extends BaseService {
} }
async watchAll() { async watchAll() {
if (!this.watchLock) { if (!this.lock) {
return false; return false;
} }