1
0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-16 16:56:46 +01:00

wip batch imports

This commit is contained in:
Jonathan Jogenfors 2024-12-13 01:47:34 +01:00
parent 96f2f6535e
commit 3d7b9248d6
5 changed files with 53 additions and 60 deletions

@ -1 +1 @@
Subproject commit 99544a200412d553103cc7b8f1a28f339c7cffd9
Subproject commit c4a0575c3e89a755b951ae6d91e7307cd34c606f

View file

@ -84,7 +84,7 @@ export enum JobName {
// library management
LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files',
LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets',
LIBRARY_SYNC_FILE = 'library-sync-file',
LIBRARY_SYNC_FILES = 'library-sync-files',
LIBRARY_SYNC_ASSETS = 'library-sync-assets',
LIBRARY_DELETE = 'library-delete',
LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all',
@ -135,7 +135,7 @@ export interface IDelayedJob extends IBaseJob {
export interface IEntityJob extends IBaseJob {
id: string;
source?: 'upload' | 'sidecar-write' | 'copy';
source?: 'upload' | 'library-import' | 'sidecar-write' | 'copy';
notify?: boolean;
}
@ -146,7 +146,7 @@ export interface IAssetDeleteJob extends IEntityJob {
export interface ILibraryFileJob {
libraryId: string;
ownerId: string;
assetPath: string;
assetPaths: string[];
}
export interface ILibraryBulkIdsJob {
@ -290,7 +290,7 @@ export type JobItem =
| { name: JobName.ASSET_DELETION_CHECK; data?: IBaseJob }
// Library Management
| { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob }
| { name: JobName.LIBRARY_SYNC_FILES; data: ILibraryFileJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob }
| { name: JobName.LIBRARY_SYNC_ASSETS; data: ILibraryBulkIdsJob }

View file

@ -179,7 +179,7 @@ describe(LibraryService.name, () => {
expect(jobMock.queueAll).toHaveBeenCalledWith([
{
name: JobName.LIBRARY_SYNC_FILE,
name: JobName.LIBRARY_SYNC_FILES,
data: {
id: libraryStub.externalLibrary1.id,
ownerId: libraryStub.externalLibrary1.owner.id,
@ -960,7 +960,7 @@ describe(LibraryService.name, () => {
expect(jobMock.queueAll).toHaveBeenCalledWith([
{
name: JobName.LIBRARY_SYNC_FILE,
name: JobName.LIBRARY_SYNC_FILES,
data: {
id: libraryStub.externalLibraryWithImportPaths1.id,
assetPath: '/foo/photo.jpg',
@ -985,7 +985,7 @@ describe(LibraryService.name, () => {
expect(jobMock.queueAll).toHaveBeenCalledWith([
{
name: JobName.LIBRARY_SYNC_FILE,
name: JobName.LIBRARY_SYNC_FILES,
data: {
id: libraryStub.externalLibraryWithImportPaths1.id,
assetPath: '/foo/photo.jpg',

View file

@ -17,6 +17,7 @@ import {
import { AssetEntity } from 'src/entities/asset.entity';
import { LibraryEntity } from 'src/entities/library.entity';
import { AssetStatus, AssetType, ImmichWorker } from 'src/enum';
import { AssetCreate } from 'src/interfaces/asset.interface';
import { DatabaseLock } from 'src/interfaces/database.interface';
import { ArgOf } from 'src/interfaces/event.interface';
import { JobName, JobOf, JOBS_LIBRARY_PAGINATION_SIZE, JobStatus, QueueName } from 'src/interfaces/job.interface';
@ -102,7 +103,10 @@ export class LibraryService extends BaseService {
const handler = async (event: string, path: string) => {
if (matcher(path)) {
this.logger.debug(`File ${event} event received for ${path} in library ${library.id}}`);
await this.syncFiles(library, [path]);
await this.jobRepository.queue({
name: JobName.LIBRARY_SYNC_FILES,
data: { libraryId: library.id, ownerId: library.ownerId, assetPaths: [path] },
});
} else {
this.logger.verbose(`Ignoring file ${event} event for ${path} in library ${library.id}`);
}
@ -208,17 +212,23 @@ export class LibraryService extends BaseService {
return mapLibrary(library);
}
private async syncFiles({ id, ownerId }: LibraryEntity, assetPaths: string[]) {
await this.jobRepository.queueAll(
assetPaths.map((assetPath) => ({
name: JobName.LIBRARY_SYNC_FILE,
data: {
libraryId: id,
assetPath,
ownerId,
},
})),
);
@OnJob({ name: JobName.LIBRARY_SYNC_FILES, queue: QueueName.LIBRARY })
async handleSyncFiles(job: JobOf<JobName.LIBRARY_SYNC_FILES>): Promise<JobStatus> {
const assetImports = job.assetPaths.map((assetPath) => this.processEntity(assetPath, job.ownerId, job.libraryId));
const assetIds: string[] = [];
const batchSize = 1000; // Adjust the batch size as needed
for (let i = 0; i < assetImports.length; i += batchSize) {
const batch = assetImports.slice(i, i + batchSize);
const batchIds = await this.assetRepository.createAll(batch).then((assets) => assets.map((asset) => asset.id));
assetIds.push(...batchIds);
}
this.logger.log(`Imported ${assetIds.length} asset(s) for library ${job.libraryId}`);
await this.queuePostSyncJobs(assetIds);
return JobStatus.SUCCESS;
}
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
@ -332,60 +342,34 @@ export class LibraryService extends BaseService {
return JobStatus.SUCCESS;
}
@OnJob({ name: JobName.LIBRARY_SYNC_FILE, queue: QueueName.LIBRARY })
async handleSyncFile(job: JobOf<JobName.LIBRARY_SYNC_FILE>): Promise<JobStatus> {
/* For performance reasons, we don't check if the asset is already imported.
This is instead handled by a previous step in the scan process.
In the edge case of an asset being imported between that check
and this function call, the database constraint will prevent duplicates.
*/
const assetPath = path.normalize(job.assetPath);
// TODO: we can replace this get call with an exists call
/* let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.libraryId, assetPath);
if (asset) {
return await this.handleSyncAssets({ libraryId: job.libraryId, assetIds: [asset.id] });
} */
this.logger.log(`Importing new asset ${assetPath} into library ${job.libraryId}`);
// TODO: device asset id is deprecated, remove it
const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, '');
const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);
const assetType = mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE;
private processEntity(filePath: string, ownerId: string, libraryId: string): AssetCreate {
const assetPath = path.normalize(filePath);
const now = new Date();
const asset = await this.assetRepository.create({
ownerId: job.ownerId,
libraryId: job.libraryId,
checksum: pathHash,
return {
ownerId: ownerId,
libraryId: libraryId,
checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`),
originalPath: assetPath,
deviceAssetId,
// TODO: device asset id is deprecated, remove it
deviceAssetId: `${basename(assetPath)}`.replaceAll(/\s+/g, ''),
deviceId: 'Library Import',
fileCreatedAt: now,
fileModifiedAt: now,
localDateTime: now,
type: assetType,
type: mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE,
originalFileName: parse(assetPath).base,
isExternal: true,
});
this.logger.debug(`Queueing metadata extraction for: ${asset.originalPath}`);
await this.queuePostSyncJobs([asset.id]);
return JobStatus.SUCCESS;
};
}
async queuePostSyncJobs(assetIds: string[]) {
await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.METADATA_EXTRACTION,
data: { id: assetId, source: 'upload' },
data: { id: assetId, source: 'library-import' },
})),
);
}
@ -586,7 +570,12 @@ export class LibraryService extends BaseService {
const newPaths = await this.assetRepository.getNewPaths(library.id, pathBatch);
if (newPaths.length > 0) {
importCount += newPaths.length;
await this.syncFiles(library, newPaths);
await this.jobRepository.queue({
name: JobName.LIBRARY_SYNC_FILES,
data: { libraryId: library.id, ownerId: library.ownerId, assetPaths: newPaths },
});
if (newPaths.length < pathBatch.length) {
this.logger.debug(
`Current crawl batch: ${newPaths.length} of ${pathBatch.length} file(s) are new, queued import for library ${library.id}...`,

View file

@ -148,13 +148,17 @@ export class MetadataService extends BaseService {
}
@OnJob({ name: JobName.METADATA_EXTRACTION, queue: QueueName.METADATA_EXTRACTION })
async handleMetadataExtraction({ id }: JobOf<JobName.METADATA_EXTRACTION>): Promise<JobStatus> {
async handleMetadataExtraction({ id, source }: JobOf<JobName.METADATA_EXTRACTION>): Promise<JobStatus> {
const { metadata, reverseGeocoding } = await this.getConfig({ withCache: true });
const [asset] = await this.assetRepository.getByIds([id], { faces: { person: false } });
if (!asset) {
return JobStatus.FAILED;
}
if (source === 'library-import') {
await this.processSidecar(id, false);
}
const stats = await this.storageRepository.stat(asset.originalPath);
const exifTags = await this.getExifTags(asset);