mirror of
https://github.com/immich-app/immich.git
synced 2025-01-10 13:56:47 +01:00
64636c0618
* duplicate detection job, entity, config * queueing * job panel, update api * use embedding in db instead of fetching * disable concurrency * only queue visible assets * handle multiple duplicateIds * update concurrent queue check * add provider * add web placeholder, server endpoint, migration, various fixes * update sql * select embedding by default * rename variable * simplify * remove separate entity, handle re-running with different threshold, set default back to 0.02 * fix tests * add tests * add index to entity * formatting * update asset mock * fix `upsertJobStatus` signature * update sql * formatting * default to 0.03 * optimize clustering * use asset's `duplicateId` if present * update sql * update tests * expose admin setting * refactor * formatting * skip if ml is disabled * debug trash e2e * remove from web * remove from sidebar * test if ml is disabled * update sql * separate duplicate detection from clip in config, disable by default for now * fix doc * lower minimum `maxDistance` * update api * Add and Use Duplicate Detection Feature Flag (#9364) * Add Duplicate Detection Flag * Use Duplicate Detection Flag * Attempt Fixes for Failing Checks * lower minimum `maxDistance` * fix tests --------- Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com> * chore: fixes and additions after rebase * chore: update api (remove new Role enum) * fix: left join smart search so getAll works without machine learning * test: trash e2e go back to checking length of assets is zero * chore: regen api after rebase * test: fix tests after rebase * redundant join --------- Co-authored-by: Nicholas Flamy <30300649+NicholasFlamy@users.noreply.github.com> Co-authored-by: Zack Pollard <zackpollard@ymail.com> Co-authored-by: Zack Pollard <zack@futo.org>
372 lines
15 KiB
TypeScript
372 lines
15 KiB
TypeScript
import { BadRequestException } from '@nestjs/common';
|
|
import { SystemConfig } from 'src/config';
|
|
import { SystemConfigCore } from 'src/cores/system-config.core';
|
|
import { IAssetRepository } from 'src/interfaces/asset.interface';
|
|
import { IEventRepository } from 'src/interfaces/event.interface';
|
|
import {
|
|
IJobRepository,
|
|
JobCommand,
|
|
JobHandler,
|
|
JobItem,
|
|
JobName,
|
|
JobStatus,
|
|
QueueName,
|
|
} from 'src/interfaces/job.interface';
|
|
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
|
import { IMetricRepository } from 'src/interfaces/metric.interface';
|
|
import { IPersonRepository } from 'src/interfaces/person.interface';
|
|
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
|
|
import { JobService } from 'src/services/job.service';
|
|
import { assetStub } from 'test/fixtures/asset.stub';
|
|
import { newAssetRepositoryMock } from 'test/repositories/asset.repository.mock';
|
|
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
|
|
import { newJobRepositoryMock } from 'test/repositories/job.repository.mock';
|
|
import { newLoggerRepositoryMock } from 'test/repositories/logger.repository.mock';
|
|
import { newMetricRepositoryMock } from 'test/repositories/metric.repository.mock';
|
|
import { newPersonRepositoryMock } from 'test/repositories/person.repository.mock';
|
|
import { newSystemMetadataRepositoryMock } from 'test/repositories/system-metadata.repository.mock';
|
|
import { Mocked, vitest } from 'vitest';
|
|
|
|
const makeMockHandlers = (status: JobStatus) => {
|
|
const mock = vitest.fn().mockResolvedValue(status);
|
|
return Object.fromEntries(Object.values(JobName).map((jobName) => [jobName, mock])) as unknown as Record<
|
|
JobName,
|
|
JobHandler
|
|
>;
|
|
};
|
|
|
|
describe(JobService.name, () => {
|
|
let sut: JobService;
|
|
let assetMock: Mocked<IAssetRepository>;
|
|
let eventMock: Mocked<IEventRepository>;
|
|
let jobMock: Mocked<IJobRepository>;
|
|
let personMock: Mocked<IPersonRepository>;
|
|
let metricMock: Mocked<IMetricRepository>;
|
|
let systemMock: Mocked<ISystemMetadataRepository>;
|
|
let loggerMock: Mocked<ILoggerRepository>;
|
|
|
|
beforeEach(() => {
|
|
assetMock = newAssetRepositoryMock();
|
|
systemMock = newSystemMetadataRepositoryMock();
|
|
eventMock = newEventRepositoryMock();
|
|
jobMock = newJobRepositoryMock();
|
|
personMock = newPersonRepositoryMock();
|
|
metricMock = newMetricRepositoryMock();
|
|
loggerMock = newLoggerRepositoryMock();
|
|
sut = new JobService(assetMock, eventMock, jobMock, systemMock, personMock, metricMock, loggerMock);
|
|
});
|
|
|
|
it('should work', () => {
|
|
expect(sut).toBeDefined();
|
|
});
|
|
|
|
describe('handleNightlyJobs', () => {
|
|
it('should run the scheduled jobs', async () => {
|
|
await sut.handleNightlyJobs();
|
|
|
|
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
|
{ name: JobName.ASSET_DELETION_CHECK },
|
|
{ name: JobName.USER_DELETE_CHECK },
|
|
{ name: JobName.PERSON_CLEANUP },
|
|
{ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } },
|
|
{ name: JobName.CLEAN_OLD_AUDIT_LOGS },
|
|
{ name: JobName.USER_SYNC_USAGE },
|
|
{ name: JobName.QUEUE_FACIAL_RECOGNITION, data: { force: false } },
|
|
{ name: JobName.CLEAN_OLD_SESSION_TOKENS },
|
|
]);
|
|
});
|
|
});
|
|
|
|
describe('getAllJobStatus', () => {
|
|
it('should get all job statuses', async () => {
|
|
jobMock.getJobCounts.mockResolvedValue({
|
|
active: 1,
|
|
completed: 1,
|
|
failed: 1,
|
|
delayed: 1,
|
|
waiting: 1,
|
|
paused: 1,
|
|
});
|
|
jobMock.getQueueStatus.mockResolvedValue({
|
|
isActive: true,
|
|
isPaused: true,
|
|
});
|
|
|
|
const expectedJobStatus = {
|
|
jobCounts: {
|
|
active: 1,
|
|
completed: 1,
|
|
delayed: 1,
|
|
failed: 1,
|
|
waiting: 1,
|
|
paused: 1,
|
|
},
|
|
queueStatus: {
|
|
isActive: true,
|
|
isPaused: true,
|
|
},
|
|
};
|
|
|
|
await expect(sut.getAllJobsStatus()).resolves.toEqual({
|
|
[QueueName.BACKGROUND_TASK]: expectedJobStatus,
|
|
[QueueName.DUPLICATE_DETECTION]: expectedJobStatus,
|
|
[QueueName.SMART_SEARCH]: expectedJobStatus,
|
|
[QueueName.METADATA_EXTRACTION]: expectedJobStatus,
|
|
[QueueName.SEARCH]: expectedJobStatus,
|
|
[QueueName.STORAGE_TEMPLATE_MIGRATION]: expectedJobStatus,
|
|
[QueueName.MIGRATION]: expectedJobStatus,
|
|
[QueueName.THUMBNAIL_GENERATION]: expectedJobStatus,
|
|
[QueueName.VIDEO_CONVERSION]: expectedJobStatus,
|
|
[QueueName.FACE_DETECTION]: expectedJobStatus,
|
|
[QueueName.FACIAL_RECOGNITION]: expectedJobStatus,
|
|
[QueueName.SIDECAR]: expectedJobStatus,
|
|
[QueueName.LIBRARY]: expectedJobStatus,
|
|
[QueueName.NOTIFICATION]: expectedJobStatus,
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('handleCommand', () => {
|
|
it('should handle a pause command', async () => {
|
|
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.PAUSE, force: false });
|
|
|
|
expect(jobMock.pause).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION);
|
|
});
|
|
|
|
it('should handle a resume command', async () => {
|
|
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.RESUME, force: false });
|
|
|
|
expect(jobMock.resume).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION);
|
|
});
|
|
|
|
it('should handle an empty command', async () => {
|
|
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.EMPTY, force: false });
|
|
|
|
expect(jobMock.empty).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION);
|
|
});
|
|
|
|
it('should not start a job that is already running', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false });
|
|
|
|
await expect(
|
|
sut.handleCommand(QueueName.VIDEO_CONVERSION, { command: JobCommand.START, force: false }),
|
|
).rejects.toBeInstanceOf(BadRequestException);
|
|
|
|
expect(jobMock.queue).not.toHaveBeenCalled();
|
|
expect(jobMock.queueAll).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('should handle a start video conversion command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.VIDEO_CONVERSION, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_VIDEO_CONVERSION, data: { force: false } });
|
|
});
|
|
|
|
it('should handle a start storage template migration command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.STORAGE_TEMPLATE_MIGRATION, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.STORAGE_TEMPLATE_MIGRATION });
|
|
});
|
|
|
|
it('should handle a start smart search command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.SMART_SEARCH, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_SMART_SEARCH, data: { force: false } });
|
|
});
|
|
|
|
it('should handle a start metadata extraction command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force: false } });
|
|
});
|
|
|
|
it('should handle a start sidecar command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.SIDECAR, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_SIDECAR, data: { force: false } });
|
|
});
|
|
|
|
it('should handle a start thumbnail generation command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.THUMBNAIL_GENERATION, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } });
|
|
});
|
|
|
|
it('should handle a start face detection command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.FACE_DETECTION, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_FACE_DETECTION, data: { force: false } });
|
|
});
|
|
|
|
it('should handle a start facial recognition command', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await sut.handleCommand(QueueName.FACIAL_RECOGNITION, { command: JobCommand.START, force: false });
|
|
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_FACIAL_RECOGNITION, data: { force: false } });
|
|
});
|
|
|
|
it('should throw a bad request when an invalid queue is used', async () => {
|
|
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
|
|
|
await expect(
|
|
sut.handleCommand(QueueName.BACKGROUND_TASK, { command: JobCommand.START, force: false }),
|
|
).rejects.toBeInstanceOf(BadRequestException);
|
|
|
|
expect(jobMock.queue).not.toHaveBeenCalled();
|
|
expect(jobMock.queueAll).not.toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe('init', () => {
|
|
it('should register a handler for each queue', async () => {
|
|
await sut.init(makeMockHandlers(JobStatus.SUCCESS));
|
|
expect(systemMock.get).toHaveBeenCalled();
|
|
expect(jobMock.addHandler).toHaveBeenCalledTimes(Object.keys(QueueName).length);
|
|
});
|
|
|
|
it('should subscribe to config changes', async () => {
|
|
await sut.init(makeMockHandlers(JobStatus.FAILED));
|
|
|
|
SystemConfigCore.create(newSystemMetadataRepositoryMock(false), newLoggerRepositoryMock()).config$.next({
|
|
job: {
|
|
[QueueName.BACKGROUND_TASK]: { concurrency: 10 },
|
|
[QueueName.SMART_SEARCH]: { concurrency: 10 },
|
|
[QueueName.METADATA_EXTRACTION]: { concurrency: 10 },
|
|
[QueueName.FACE_DETECTION]: { concurrency: 10 },
|
|
[QueueName.SEARCH]: { concurrency: 10 },
|
|
[QueueName.SIDECAR]: { concurrency: 10 },
|
|
[QueueName.LIBRARY]: { concurrency: 10 },
|
|
[QueueName.MIGRATION]: { concurrency: 10 },
|
|
[QueueName.THUMBNAIL_GENERATION]: { concurrency: 10 },
|
|
[QueueName.VIDEO_CONVERSION]: { concurrency: 10 },
|
|
[QueueName.NOTIFICATION]: { concurrency: 5 },
|
|
},
|
|
} as SystemConfig);
|
|
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.BACKGROUND_TASK, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.SMART_SEARCH, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.FACE_DETECTION, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.SIDECAR, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.LIBRARY, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.MIGRATION, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.THUMBNAIL_GENERATION, 10);
|
|
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.VIDEO_CONVERSION, 10);
|
|
});
|
|
|
|
const tests: Array<{ item: JobItem; jobs: JobName[] }> = [
|
|
{
|
|
item: { name: JobName.SIDECAR_SYNC, data: { id: 'asset-1' } },
|
|
jobs: [JobName.METADATA_EXTRACTION],
|
|
},
|
|
{
|
|
item: { name: JobName.SIDECAR_DISCOVERY, data: { id: 'asset-1' } },
|
|
jobs: [JobName.METADATA_EXTRACTION],
|
|
},
|
|
{
|
|
item: { name: JobName.METADATA_EXTRACTION, data: { id: 'asset-1' } },
|
|
jobs: [JobName.LINK_LIVE_PHOTOS],
|
|
},
|
|
{
|
|
item: { name: JobName.LINK_LIVE_PHOTOS, data: { id: 'asset-1' } },
|
|
jobs: [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE],
|
|
},
|
|
{
|
|
item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1', source: 'upload' } },
|
|
jobs: [JobName.GENERATE_PREVIEW],
|
|
},
|
|
{
|
|
item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1' } },
|
|
jobs: [],
|
|
},
|
|
{
|
|
item: { name: JobName.GENERATE_PERSON_THUMBNAIL, data: { id: 'asset-1' } },
|
|
jobs: [],
|
|
},
|
|
{
|
|
item: { name: JobName.GENERATE_PREVIEW, data: { id: 'asset-1' } },
|
|
jobs: [JobName.GENERATE_THUMBNAIL, JobName.GENERATE_THUMBHASH],
|
|
},
|
|
{
|
|
item: { name: JobName.GENERATE_PREVIEW, data: { id: 'asset-1', source: 'upload' } },
|
|
jobs: [
|
|
JobName.GENERATE_THUMBNAIL,
|
|
JobName.GENERATE_THUMBHASH,
|
|
JobName.SMART_SEARCH,
|
|
JobName.FACE_DETECTION,
|
|
JobName.VIDEO_CONVERSION,
|
|
],
|
|
},
|
|
{
|
|
item: { name: JobName.GENERATE_PREVIEW, data: { id: 'asset-live-image', source: 'upload' } },
|
|
jobs: [
|
|
JobName.GENERATE_THUMBNAIL,
|
|
JobName.GENERATE_THUMBHASH,
|
|
JobName.SMART_SEARCH,
|
|
JobName.FACE_DETECTION,
|
|
JobName.VIDEO_CONVERSION,
|
|
],
|
|
},
|
|
{
|
|
item: { name: JobName.SMART_SEARCH, data: { id: 'asset-1' } },
|
|
jobs: [],
|
|
},
|
|
{
|
|
item: { name: JobName.FACE_DETECTION, data: { id: 'asset-1' } },
|
|
jobs: [],
|
|
},
|
|
{
|
|
item: { name: JobName.FACIAL_RECOGNITION, data: { id: 'asset-1' } },
|
|
jobs: [],
|
|
},
|
|
];
|
|
|
|
for (const { item, jobs } of tests) {
|
|
it(`should queue ${jobs.length} jobs when a ${item.name} job finishes successfully`, async () => {
|
|
if (item.name === JobName.GENERATE_PREVIEW && item.data.source === 'upload') {
|
|
if (item.data.id === 'asset-live-image') {
|
|
assetMock.getByIds.mockResolvedValue([assetStub.livePhotoStillAsset]);
|
|
} else {
|
|
assetMock.getByIds.mockResolvedValue([assetStub.livePhotoMotionAsset]);
|
|
}
|
|
}
|
|
|
|
await sut.init(makeMockHandlers(JobStatus.SUCCESS));
|
|
await jobMock.addHandler.mock.calls[0][2](item);
|
|
|
|
if (jobs.length > 1) {
|
|
expect(jobMock.queueAll).toHaveBeenCalledWith(
|
|
jobs.map((jobName) => ({ name: jobName, data: expect.anything() })),
|
|
);
|
|
} else {
|
|
expect(jobMock.queue).toHaveBeenCalledTimes(jobs.length);
|
|
for (const jobName of jobs) {
|
|
expect(jobMock.queue).toHaveBeenCalledWith({ name: jobName, data: expect.anything() });
|
|
}
|
|
}
|
|
});
|
|
|
|
it(`should not queue any jobs when ${item.name} finishes with 'false'`, async () => {
|
|
await sut.init(makeMockHandlers(JobStatus.FAILED));
|
|
await jobMock.addHandler.mock.calls[0][2](item);
|
|
|
|
expect(jobMock.queueAll).not.toHaveBeenCalled();
|
|
});
|
|
}
|
|
});
|
|
});
|