mirror of
https://github.com/immich-app/immich.git
synced 2025-01-06 03:46:47 +01:00
feat: run all offline checks in a single job
This commit is contained in:
parent
ba71fd42da
commit
8ecde3b277
3 changed files with 61 additions and 53 deletions
|
@ -85,7 +85,7 @@ export enum JobName {
|
||||||
LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files',
|
LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files',
|
||||||
LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets',
|
LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets',
|
||||||
LIBRARY_SYNC_FILE = 'library-sync-file',
|
LIBRARY_SYNC_FILE = 'library-sync-file',
|
||||||
LIBRARY_SYNC_ASSET = 'library-sync-asset',
|
LIBRARY_SYNC_ASSETS = 'library-sync-assets',
|
||||||
LIBRARY_DELETE = 'library-delete',
|
LIBRARY_DELETE = 'library-delete',
|
||||||
LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all',
|
LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all',
|
||||||
LIBRARY_QUEUE_CLEANUP = 'library-queue-cleanup',
|
LIBRARY_QUEUE_CLEANUP = 'library-queue-cleanup',
|
||||||
|
@ -148,15 +148,15 @@ export interface ILibraryFileJob extends IEntityJob {
|
||||||
assetPath: string;
|
assetPath: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ILibraryAssetJob extends IEntityJob {
|
|
||||||
importPaths: string[];
|
|
||||||
exclusionPatterns: string[];
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface IBulkEntityJob extends IBaseJob {
|
export interface IBulkEntityJob extends IBaseJob {
|
||||||
ids: string[];
|
ids: string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface ILibraryAssetsJob extends IBulkEntityJob {
|
||||||
|
importPaths: string[];
|
||||||
|
exclusionPatterns: string[];
|
||||||
|
}
|
||||||
|
|
||||||
export interface IDeleteFilesJob extends IBaseJob {
|
export interface IDeleteFilesJob extends IBaseJob {
|
||||||
files: Array<string | null | undefined>;
|
files: Array<string | null | undefined>;
|
||||||
}
|
}
|
||||||
|
@ -287,7 +287,7 @@ export type JobItem =
|
||||||
| { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob }
|
| { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob }
|
||||||
| { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob }
|
| { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob }
|
||||||
| { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob }
|
| { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob }
|
||||||
| { name: JobName.LIBRARY_SYNC_ASSET; data: ILibraryAssetJob }
|
| { name: JobName.LIBRARY_SYNC_ASSETS; data: ILibraryAssetsJob }
|
||||||
| { name: JobName.LIBRARY_DELETE; data: IEntityJob }
|
| { name: JobName.LIBRARY_DELETE; data: IEntityJob }
|
||||||
| { name: JobName.LIBRARY_QUEUE_SYNC_ALL; data?: IBaseJob }
|
| { name: JobName.LIBRARY_QUEUE_SYNC_ALL; data?: IBaseJob }
|
||||||
| { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob }
|
| { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob }
|
||||||
|
|
|
@ -10,7 +10,7 @@ import { ICronRepository } from 'src/interfaces/cron.interface';
|
||||||
import { IDatabaseRepository } from 'src/interfaces/database.interface';
|
import { IDatabaseRepository } from 'src/interfaces/database.interface';
|
||||||
import {
|
import {
|
||||||
IJobRepository,
|
IJobRepository,
|
||||||
ILibraryAssetJob,
|
ILibraryAssetsJob,
|
||||||
ILibraryFileJob,
|
ILibraryFileJob,
|
||||||
JobName,
|
JobName,
|
||||||
JOBS_LIBRARY_PAGINATION_SIZE,
|
JOBS_LIBRARY_PAGINATION_SIZE,
|
||||||
|
@ -231,7 +231,7 @@ describe(LibraryService.name, () => {
|
||||||
|
|
||||||
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
||||||
{
|
{
|
||||||
name: JobName.LIBRARY_SYNC_ASSET,
|
name: JobName.LIBRARY_SYNC_ASSETS,
|
||||||
data: {
|
data: {
|
||||||
id: assetStub.external.id,
|
id: assetStub.external.id,
|
||||||
importPaths: libraryStub.externalLibrary1.importPaths,
|
importPaths: libraryStub.externalLibrary1.importPaths,
|
||||||
|
@ -250,22 +250,22 @@ describe(LibraryService.name, () => {
|
||||||
|
|
||||||
describe('handleSyncAsset', () => {
|
describe('handleSyncAsset', () => {
|
||||||
it('should skip missing assets', async () => {
|
it('should skip missing assets', async () => {
|
||||||
const mockAssetJob: ILibraryAssetJob = {
|
const mockAssetJob: ILibraryAssetsJob = {
|
||||||
id: assetStub.external.id,
|
ids: [assetStub.external.id],
|
||||||
importPaths: ['/'],
|
importPaths: ['/'],
|
||||||
exclusionPatterns: [],
|
exclusionPatterns: [],
|
||||||
};
|
};
|
||||||
|
|
||||||
assetMock.getById.mockResolvedValue(null);
|
assetMock.getById.mockResolvedValue(null);
|
||||||
|
|
||||||
await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SKIPPED);
|
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SKIPPED);
|
||||||
|
|
||||||
expect(assetMock.remove).not.toHaveBeenCalled();
|
expect(assetMock.remove).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should offline assets no longer on disk', async () => {
|
it('should offline assets no longer on disk', async () => {
|
||||||
const mockAssetJob: ILibraryAssetJob = {
|
const mockAssetJob: ILibraryAssetsJob = {
|
||||||
id: assetStub.external.id,
|
ids: [assetStub.external.id],
|
||||||
importPaths: ['/'],
|
importPaths: ['/'],
|
||||||
exclusionPatterns: [],
|
exclusionPatterns: [],
|
||||||
};
|
};
|
||||||
|
@ -273,7 +273,7 @@ describe(LibraryService.name, () => {
|
||||||
assetMock.getById.mockResolvedValue(assetStub.external);
|
assetMock.getById.mockResolvedValue(assetStub.external);
|
||||||
storageMock.stat.mockRejectedValue(new Error('ENOENT, no such file or directory'));
|
storageMock.stat.mockRejectedValue(new Error('ENOENT, no such file or directory'));
|
||||||
|
|
||||||
await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
||||||
|
|
||||||
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
||||||
isOffline: true,
|
isOffline: true,
|
||||||
|
@ -282,15 +282,15 @@ describe(LibraryService.name, () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should offline assets matching an exclusion pattern', async () => {
|
it('should offline assets matching an exclusion pattern', async () => {
|
||||||
const mockAssetJob: ILibraryAssetJob = {
|
const mockAssetJob: ILibraryAssetsJob = {
|
||||||
id: assetStub.external.id,
|
ids: [assetStub.external.id],
|
||||||
importPaths: ['/'],
|
importPaths: ['/'],
|
||||||
exclusionPatterns: ['**/user1/**'],
|
exclusionPatterns: ['**/user1/**'],
|
||||||
};
|
};
|
||||||
|
|
||||||
assetMock.getById.mockResolvedValue(assetStub.external);
|
assetMock.getById.mockResolvedValue(assetStub.external);
|
||||||
|
|
||||||
await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
||||||
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
||||||
isOffline: true,
|
isOffline: true,
|
||||||
deletedAt: expect.any(Date),
|
deletedAt: expect.any(Date),
|
||||||
|
@ -298,8 +298,8 @@ describe(LibraryService.name, () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set assets outside of import paths as offline', async () => {
|
it('should set assets outside of import paths as offline', async () => {
|
||||||
const mockAssetJob: ILibraryAssetJob = {
|
const mockAssetJob: ILibraryAssetsJob = {
|
||||||
id: assetStub.external.id,
|
ids: [assetStub.external.id],
|
||||||
importPaths: ['/data/user2'],
|
importPaths: ['/data/user2'],
|
||||||
exclusionPatterns: [],
|
exclusionPatterns: [],
|
||||||
};
|
};
|
||||||
|
@ -307,7 +307,7 @@ describe(LibraryService.name, () => {
|
||||||
assetMock.getById.mockResolvedValue(assetStub.external);
|
assetMock.getById.mockResolvedValue(assetStub.external);
|
||||||
storageMock.checkFileExists.mockResolvedValue(true);
|
storageMock.checkFileExists.mockResolvedValue(true);
|
||||||
|
|
||||||
await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
||||||
|
|
||||||
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
||||||
isOffline: true,
|
isOffline: true,
|
||||||
|
@ -316,8 +316,8 @@ describe(LibraryService.name, () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should do nothing with online assets', async () => {
|
it('should do nothing with online assets', async () => {
|
||||||
const mockAssetJob: ILibraryAssetJob = {
|
const mockAssetJob: ILibraryAssetsJob = {
|
||||||
id: assetStub.external.id,
|
ids: [assetStub.external.id],
|
||||||
importPaths: ['/'],
|
importPaths: ['/'],
|
||||||
exclusionPatterns: [],
|
exclusionPatterns: [],
|
||||||
};
|
};
|
||||||
|
@ -325,14 +325,14 @@ describe(LibraryService.name, () => {
|
||||||
assetMock.getById.mockResolvedValue(assetStub.external);
|
assetMock.getById.mockResolvedValue(assetStub.external);
|
||||||
storageMock.stat.mockResolvedValue({ mtime: assetStub.external.fileModifiedAt } as Stats);
|
storageMock.stat.mockResolvedValue({ mtime: assetStub.external.fileModifiedAt } as Stats);
|
||||||
|
|
||||||
await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
||||||
|
|
||||||
expect(assetMock.updateAll).not.toHaveBeenCalled();
|
expect(assetMock.updateAll).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should un-trash an asset previously marked as offline', async () => {
|
it('should un-trash an asset previously marked as offline', async () => {
|
||||||
const mockAssetJob: ILibraryAssetJob = {
|
const mockAssetJob: ILibraryAssetsJob = {
|
||||||
id: assetStub.external.id,
|
ids: [assetStub.external.id],
|
||||||
importPaths: ['/'],
|
importPaths: ['/'],
|
||||||
exclusionPatterns: [],
|
exclusionPatterns: [],
|
||||||
};
|
};
|
||||||
|
@ -340,7 +340,7 @@ describe(LibraryService.name, () => {
|
||||||
assetMock.getById.mockResolvedValue(assetStub.trashedOffline);
|
assetMock.getById.mockResolvedValue(assetStub.trashedOffline);
|
||||||
storageMock.stat.mockResolvedValue({ mtime: assetStub.trashedOffline.fileModifiedAt } as Stats);
|
storageMock.stat.mockResolvedValue({ mtime: assetStub.trashedOffline.fileModifiedAt } as Stats);
|
||||||
|
|
||||||
await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
||||||
|
|
||||||
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.trashedOffline.id], {
|
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.trashedOffline.id], {
|
||||||
deletedAt: null,
|
deletedAt: null,
|
||||||
|
@ -353,8 +353,8 @@ describe(LibraryService.name, () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should update file when mtime has changed', async () => {
|
it('should update file when mtime has changed', async () => {
|
||||||
const mockAssetJob: ILibraryAssetJob = {
|
const mockAssetJob: ILibraryAssetsJob = {
|
||||||
id: assetStub.external.id,
|
ids: [assetStub.external.id],
|
||||||
importPaths: ['/'],
|
importPaths: ['/'],
|
||||||
exclusionPatterns: [],
|
exclusionPatterns: [],
|
||||||
};
|
};
|
||||||
|
@ -363,7 +363,7 @@ describe(LibraryService.name, () => {
|
||||||
assetMock.getById.mockResolvedValue(assetStub.external);
|
assetMock.getById.mockResolvedValue(assetStub.external);
|
||||||
storageMock.stat.mockResolvedValue({ mtime: newMTime } as Stats);
|
storageMock.stat.mockResolvedValue({ mtime: newMTime } as Stats);
|
||||||
|
|
||||||
await expect(sut.handleSyncAsset(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
await expect(sut.handleSyncAssets(mockAssetJob)).resolves.toBe(JobStatus.SUCCESS);
|
||||||
|
|
||||||
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
expect(assetMock.updateAll).toHaveBeenCalledWith([assetStub.external.id], {
|
||||||
fileModifiedAt: newMTime,
|
fileModifiedAt: newMTime,
|
||||||
|
@ -969,7 +969,7 @@ describe(LibraryService.name, () => {
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
||||||
{ name: JobName.LIBRARY_SYNC_ASSET, data: expect.objectContaining({ id: assetStub.image.id }) },
|
{ name: JobName.LIBRARY_SYNC_ASSETS, data: expect.objectContaining({ ids: [assetStub.image.id] }) },
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -994,7 +994,7 @@ describe(LibraryService.name, () => {
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
||||||
{ name: JobName.LIBRARY_SYNC_ASSET, data: expect.objectContaining({ id: assetStub.image.id }) },
|
{ name: JobName.LIBRARY_SYNC_ASSETS, data: expect.objectContaining({ id: [assetStub.image.id] }) },
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -1009,7 +1009,7 @@ describe(LibraryService.name, () => {
|
||||||
await sut.watchAll();
|
await sut.watchAll();
|
||||||
|
|
||||||
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
||||||
{ name: JobName.LIBRARY_SYNC_ASSET, data: expect.objectContaining({ id: assetStub.image.id }) },
|
{ name: JobName.LIBRARY_SYNC_ASSETS, data: expect.objectContaining({ ids: [assetStub.image.id] }) },
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -1166,9 +1166,9 @@ describe(LibraryService.name, () => {
|
||||||
|
|
||||||
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
||||||
{
|
{
|
||||||
name: JobName.LIBRARY_SYNC_ASSET,
|
name: JobName.LIBRARY_SYNC_ASSETS,
|
||||||
data: {
|
data: {
|
||||||
id: assetStub.image1.id,
|
ids: [assetStub.image1.id],
|
||||||
importPaths: libraryStub.externalLibrary1.importPaths,
|
importPaths: libraryStub.externalLibrary1.importPaths,
|
||||||
exclusionPatterns: libraryStub.externalLibrary1.exclusionPatterns,
|
exclusionPatterns: libraryStub.externalLibrary1.exclusionPatterns,
|
||||||
},
|
},
|
||||||
|
|
|
@ -242,12 +242,10 @@ export class LibraryService extends BaseService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async syncAssets({ importPaths, exclusionPatterns }: LibraryEntity, assetIds: string[]) {
|
private async syncAssets({ importPaths, exclusionPatterns }: LibraryEntity, assetIds: string[]) {
|
||||||
await this.jobRepository.queueAll(
|
await this.jobRepository.queue({
|
||||||
assetIds.map((assetId) => ({
|
name: JobName.LIBRARY_SYNC_ASSETS,
|
||||||
name: JobName.LIBRARY_SYNC_ASSET,
|
data: { ids: assetIds, importPaths, exclusionPatterns },
|
||||||
data: { id: assetId, importPaths, exclusionPatterns },
|
});
|
||||||
})),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
|
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
|
||||||
|
@ -472,27 +470,35 @@ export class LibraryService extends BaseService {
|
||||||
return JobStatus.SUCCESS;
|
return JobStatus.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnJob({ name: JobName.LIBRARY_SYNC_ASSET, queue: QueueName.LIBRARY })
|
@OnJob({ name: JobName.LIBRARY_SYNC_ASSETS, queue: QueueName.LIBRARY })
|
||||||
async handleSyncAsset(job: JobOf<JobName.LIBRARY_SYNC_ASSET>): Promise<JobStatus> {
|
async handleSyncAssets(job: JobOf<JobName.LIBRARY_SYNC_ASSETS>): Promise<JobStatus> {
|
||||||
const asset = await this.assetRepository.getById(job.id);
|
for (const id of job.ids) {
|
||||||
|
await this.handleSyncAsset(id, job.importPaths, job.exclusionPatterns);
|
||||||
|
}
|
||||||
|
|
||||||
|
return JobStatus.SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async handleSyncAsset(id: string, importPaths: string[], exclusionPatterns: string[]): Promise<JobStatus> {
|
||||||
|
const asset = await this.assetRepository.getById(id);
|
||||||
if (!asset) {
|
if (!asset) {
|
||||||
return JobStatus.SKIPPED;
|
return JobStatus.SKIPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
const markOffline = async (explanation: string) => {
|
const markOffline = async (explanation: string) => {
|
||||||
if (!asset.isOffline) {
|
if (!asset.isOffline) {
|
||||||
this.logger.debug(`${explanation}, removing: ${asset.originalPath}`);
|
this.logger.debug(`${explanation}, moving to trash: ${asset.originalPath}`);
|
||||||
await this.assetRepository.updateAll([asset.id], { isOffline: true, deletedAt: new Date() });
|
await this.assetRepository.updateAll([asset.id], { isOffline: true, deletedAt: new Date() });
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const isInPath = job.importPaths.find((path) => asset.originalPath.startsWith(path));
|
const isInPath = importPaths.find((path) => asset.originalPath.startsWith(path));
|
||||||
if (!isInPath) {
|
if (!isInPath) {
|
||||||
await markOffline('Asset is no longer in an import path');
|
await markOffline('Asset is no longer in an import path');
|
||||||
return JobStatus.SUCCESS;
|
return JobStatus.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
const isExcluded = job.exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
|
const isExcluded = exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
|
||||||
if (isExcluded) {
|
if (isExcluded) {
|
||||||
await markOffline('Asset is covered by an exclusion pattern');
|
await markOffline('Asset is covered by an exclusion pattern');
|
||||||
return JobStatus.SUCCESS;
|
return JobStatus.SUCCESS;
|
||||||
|
@ -597,12 +603,14 @@ export class LibraryService extends BaseService {
|
||||||
for await (const assets of onlineAssets) {
|
for await (const assets of onlineAssets) {
|
||||||
assetCount += assets.length;
|
assetCount += assets.length;
|
||||||
this.logger.debug(`Discovered ${assetCount} asset(s) in library ${library.id}...`);
|
this.logger.debug(`Discovered ${assetCount} asset(s) in library ${library.id}...`);
|
||||||
await this.jobRepository.queueAll(
|
await this.jobRepository.queue({
|
||||||
assets.map((asset) => ({
|
name: JobName.LIBRARY_SYNC_ASSETS,
|
||||||
name: JobName.LIBRARY_SYNC_ASSET,
|
data: {
|
||||||
data: { id: asset.id, importPaths: library.importPaths, exclusionPatterns: library.exclusionPatterns },
|
ids: assets.map((asset) => asset.id),
|
||||||
})),
|
importPaths: library.importPaths,
|
||||||
);
|
exclusionPatterns: library.exclusionPatterns,
|
||||||
|
},
|
||||||
|
});
|
||||||
this.logger.debug(`Queued check of ${assets.length} asset(s) in library ${library.id}...`);
|
this.logger.debug(`Queued check of ${assets.length} asset(s) in library ${library.id}...`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue