mirror of
https://github.com/immich-app/immich.git
synced 2025-01-04 02:46:47 +01:00
fix(server): queue library asset refresh in batches (#7914)
* add debug logs * scan assets in batches * Cleanup * don't normalize * Removing extra log * remove unneeded code * change log levels
This commit is contained in:
parent
428b7b0c4e
commit
ba38713fbc
1 changed files with 27 additions and 20 deletions
|
@ -300,17 +300,26 @@ export class LibraryService extends EventEmitter {
|
|||
}
|
||||
|
||||
private async scanAssets(libraryId: string, assetPaths: string[], ownerId: string, force = false) {
|
||||
await this.jobRepository.queueAll(
|
||||
assetPaths.map((assetPath) => ({
|
||||
name: JobName.LIBRARY_SCAN_ASSET,
|
||||
data: {
|
||||
id: libraryId,
|
||||
assetPath: path.normalize(assetPath),
|
||||
ownerId,
|
||||
force,
|
||||
},
|
||||
})),
|
||||
);
|
||||
this.logger.verbose(`Queuing refresh of ${assetPaths.length} asset(s)`);
|
||||
|
||||
// We perform this in batches to save on memory when performing large refreshes (greater than 1M assets)
|
||||
const batchSize = 5000;
|
||||
for (let i = 0; i < assetPaths.length; i += batchSize) {
|
||||
const batch = assetPaths.slice(i, i + batchSize);
|
||||
await this.jobRepository.queueAll(
|
||||
batch.map((assetPath) => ({
|
||||
name: JobName.LIBRARY_SCAN_ASSET,
|
||||
data: {
|
||||
id: libraryId,
|
||||
assetPath: assetPath,
|
||||
ownerId,
|
||||
force,
|
||||
},
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.debug('Asset refresh queue completed');
|
||||
}
|
||||
|
||||
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
|
||||
|
@ -611,14 +620,6 @@ export class LibraryService extends EventEmitter {
|
|||
return true;
|
||||
}
|
||||
|
||||
// Check if a given path is in a user's external path. Both arguments are assumed to be normalized
|
||||
private isInExternalPath(filePath: string, externalPath: string | null): boolean {
|
||||
if (externalPath === null) {
|
||||
return false;
|
||||
}
|
||||
return filePath.startsWith(externalPath);
|
||||
}
|
||||
|
||||
async handleQueueAssetRefresh(job: ILibraryRefreshJob): Promise<boolean> {
|
||||
const library = await this.repository.get(job.id);
|
||||
if (!library || library.type !== LibraryType.EXTERNAL) {
|
||||
|
@ -626,7 +627,7 @@ export class LibraryService extends EventEmitter {
|
|||
return false;
|
||||
}
|
||||
|
||||
this.logger.verbose(`Refreshing library: ${job.id}`);
|
||||
this.logger.log(`Refreshing library: ${job.id}`);
|
||||
|
||||
const crawledAssetPaths = await this.getPathTrie(library);
|
||||
this.logger.debug(`Found ${crawledAssetPaths.size} asset(s) when crawling import paths ${library.importPaths}`);
|
||||
|
@ -637,16 +638,20 @@ export class LibraryService extends EventEmitter {
|
|||
this.assetRepository.getLibraryAssetPaths(pagination, library.id),
|
||||
);
|
||||
|
||||
this.logger.verbose(`Crawled asset paths paginated`);
|
||||
|
||||
const shouldScanAll = job.refreshAllFiles || job.refreshModifiedFiles;
|
||||
for await (const page of pagination) {
|
||||
for (const asset of page) {
|
||||
const isOffline = !crawledAssetPaths.has(asset.originalPath);
|
||||
if (isOffline && !asset.isOffline) {
|
||||
assetIdsToMarkOffline.push(asset.id);
|
||||
this.logger.verbose(`Added to mark-offline list: ${asset.originalPath}`);
|
||||
}
|
||||
|
||||
if (!isOffline && asset.isOffline) {
|
||||
assetIdsToMarkOnline.push(asset.id);
|
||||
this.logger.verbose(`Added to mark-online list: ${asset.originalPath}`);
|
||||
}
|
||||
|
||||
if (!shouldScanAll) {
|
||||
|
@ -655,6 +660,8 @@ export class LibraryService extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
this.logger.verbose(`Crawled assets have been checked for online/offline status`);
|
||||
|
||||
if (assetIdsToMarkOffline.length > 0) {
|
||||
this.logger.debug(`Found ${assetIdsToMarkOffline.length} offline asset(s) previously marked as online`);
|
||||
await this.assetRepository.updateAll(assetIdsToMarkOffline, { isOffline: true });
|
||||
|
|
Loading…
Reference in a new issue