mirror of
https://github.com/immich-app/immich.git
synced 2024-12-28 22:51:59 +00:00
fix(cli): Concurrency not fully using queue potential (#11828)
This commit is contained in:
parent
a60209db3e
commit
6729782c3f
3 changed files with 67 additions and 39 deletions
|
@ -1,5 +1,6 @@
|
||||||
import {
|
import {
|
||||||
Action,
|
Action,
|
||||||
|
AssetBulkUploadCheckItem,
|
||||||
AssetBulkUploadCheckResult,
|
AssetBulkUploadCheckResult,
|
||||||
AssetMediaResponseDto,
|
AssetMediaResponseDto,
|
||||||
AssetMediaStatus,
|
AssetMediaStatus,
|
||||||
|
@ -11,7 +12,7 @@ import {
|
||||||
getSupportedMediaTypes,
|
getSupportedMediaTypes,
|
||||||
} from '@immich/sdk';
|
} from '@immich/sdk';
|
||||||
import byteSize from 'byte-size';
|
import byteSize from 'byte-size';
|
||||||
import { Presets, SingleBar } from 'cli-progress';
|
import { MultiBar, Presets, SingleBar } from 'cli-progress';
|
||||||
import { chunk } from 'lodash-es';
|
import { chunk } from 'lodash-es';
|
||||||
import { Stats, createReadStream } from 'node:fs';
|
import { Stats, createReadStream } from 'node:fs';
|
||||||
import { stat, unlink } from 'node:fs/promises';
|
import { stat, unlink } from 'node:fs/promises';
|
||||||
|
@ -90,23 +91,23 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
|
||||||
return { newFiles: files, duplicates: [] };
|
return { newFiles: files, duplicates: [] };
|
||||||
}
|
}
|
||||||
|
|
||||||
const progressBar = new SingleBar(
|
const multiBar = new MultiBar(
|
||||||
{ format: 'Checking files | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' },
|
{ format: '{message} | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' },
|
||||||
Presets.shades_classic,
|
Presets.shades_classic,
|
||||||
);
|
);
|
||||||
|
|
||||||
progressBar.start(files.length, 0);
|
const hashProgressBar = multiBar.create(files.length, 0, { message: 'Hashing files ' });
|
||||||
|
const checkProgressBar = multiBar.create(files.length, 0, { message: 'Checking for duplicates' });
|
||||||
|
|
||||||
const newFiles: string[] = [];
|
const newFiles: string[] = [];
|
||||||
const duplicates: Asset[] = [];
|
const duplicates: Asset[] = [];
|
||||||
|
|
||||||
const queue = new Queue<string[], AssetBulkUploadCheckResults>(
|
const checkBulkUploadQueue = new Queue<AssetBulkUploadCheckItem[], void>(
|
||||||
async (filepaths: string[]) => {
|
async (assets: AssetBulkUploadCheckItem[]) => {
|
||||||
const dto = await Promise.all(
|
const response = await checkBulkUpload({ assetBulkUploadCheckDto: { assets } });
|
||||||
filepaths.map(async (filepath) => ({ id: filepath, checksum: await sha1(filepath) })),
|
|
||||||
);
|
|
||||||
const response = await checkBulkUpload({ assetBulkUploadCheckDto: { assets: dto } });
|
|
||||||
const results = response.results as AssetBulkUploadCheckResults;
|
const results = response.results as AssetBulkUploadCheckResults;
|
||||||
|
|
||||||
for (const { id: filepath, assetId, action } of results) {
|
for (const { id: filepath, assetId, action } of results) {
|
||||||
if (action === Action.Accept) {
|
if (action === Action.Accept) {
|
||||||
newFiles.push(filepath);
|
newFiles.push(filepath);
|
||||||
|
@ -115,19 +116,46 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
|
||||||
duplicates.push({ id: assetId as string, filepath });
|
duplicates.push({ id: assetId as string, filepath });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
progressBar.increment(filepaths.length);
|
|
||||||
|
checkProgressBar.increment(assets.length);
|
||||||
|
},
|
||||||
|
{ concurrency, retry: 3 },
|
||||||
|
);
|
||||||
|
|
||||||
|
const results: { id: string; checksum: string }[] = [];
|
||||||
|
let checkBulkUploadRequests: AssetBulkUploadCheckItem[] = [];
|
||||||
|
|
||||||
|
const queue = new Queue<string, AssetBulkUploadCheckItem[]>(
|
||||||
|
async (filepath: string): Promise<AssetBulkUploadCheckItem[]> => {
|
||||||
|
const dto = { id: filepath, checksum: await sha1(filepath) };
|
||||||
|
|
||||||
|
results.push(dto);
|
||||||
|
checkBulkUploadRequests.push(dto);
|
||||||
|
if (checkBulkUploadRequests.length === 5000) {
|
||||||
|
const batch = checkBulkUploadRequests;
|
||||||
|
checkBulkUploadRequests = [];
|
||||||
|
void checkBulkUploadQueue.push(batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
hashProgressBar.increment();
|
||||||
return results;
|
return results;
|
||||||
},
|
},
|
||||||
{ concurrency, retry: 3 },
|
{ concurrency, retry: 3 },
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const items of chunk(files, concurrency)) {
|
for (const item of files) {
|
||||||
await queue.push(items);
|
void queue.push(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
await queue.drained();
|
await queue.drained();
|
||||||
|
|
||||||
progressBar.stop();
|
if (checkBulkUploadRequests.length > 0) {
|
||||||
|
void checkBulkUploadQueue.push(checkBulkUploadRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
await checkBulkUploadQueue.drained();
|
||||||
|
|
||||||
|
multiBar.stop();
|
||||||
|
|
||||||
console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`);
|
console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`);
|
||||||
|
|
||||||
|
@ -201,8 +229,8 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo
|
||||||
{ concurrency, retry: 3 },
|
{ concurrency, retry: 3 },
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const filepath of files) {
|
for (const item of files) {
|
||||||
await queue.push(filepath);
|
void queue.push(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
await queue.drained();
|
await queue.drained();
|
||||||
|
|
|
@ -72,8 +72,8 @@ export class Queue<T, R> {
|
||||||
* @returns Promise<void> - The returned Promise will be resolved when all tasks in the queue have been processed by a worker.
|
* @returns Promise<void> - The returned Promise will be resolved when all tasks in the queue have been processed by a worker.
|
||||||
* This promise could be ignored as it will not lead to a `unhandledRejection`.
|
* This promise could be ignored as it will not lead to a `unhandledRejection`.
|
||||||
*/
|
*/
|
||||||
async drained(): Promise<void> {
|
drained(): Promise<void> {
|
||||||
await this.queue.drain();
|
return this.queue.drained();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -103,7 +103,7 @@ describe(`immich upload`, () => {
|
||||||
describe(`immich upload /path/to/file.jpg`, () => {
|
describe(`immich upload /path/to/file.jpg`, () => {
|
||||||
it('should upload a single file', async () => {
|
it('should upload a single file', async () => {
|
||||||
const { stderr, stdout, exitCode } = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
const { stderr, stdout, exitCode } = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
|
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
|
||||||
);
|
);
|
||||||
|
@ -126,7 +126,7 @@ describe(`immich upload`, () => {
|
||||||
const expectedCount = Object.entries(files).filter((entry) => entry[1]).length;
|
const expectedCount = Object.entries(files).filter((entry) => entry[1]).length;
|
||||||
|
|
||||||
const { stderr, stdout, exitCode } = await immichCli(['upload', ...commandLine]);
|
const { stderr, stdout, exitCode } = await immichCli(['upload', ...commandLine]);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining(`Successfully uploaded ${expectedCount} new asset`)]),
|
expect.arrayContaining([expect.stringContaining(`Successfully uploaded ${expectedCount} new asset`)]),
|
||||||
);
|
);
|
||||||
|
@ -154,7 +154,7 @@ describe(`immich upload`, () => {
|
||||||
cpSync(`${testAssetDir}/albums/nature/silver_fir.jpg`, testPaths[1]);
|
cpSync(`${testAssetDir}/albums/nature/silver_fir.jpg`, testPaths[1]);
|
||||||
|
|
||||||
const { stderr, stdout, exitCode } = await immichCli(['upload', ...testPaths]);
|
const { stderr, stdout, exitCode } = await immichCli(['upload', ...testPaths]);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining('Successfully uploaded 2 new assets')]),
|
expect.arrayContaining([expect.stringContaining('Successfully uploaded 2 new assets')]),
|
||||||
);
|
);
|
||||||
|
@ -169,7 +169,7 @@ describe(`immich upload`, () => {
|
||||||
|
|
||||||
it('should skip a duplicate file', async () => {
|
it('should skip a duplicate file', async () => {
|
||||||
const first = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
const first = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
||||||
expect(first.stderr).toBe('');
|
expect(first.stderr).toContain('{message}');
|
||||||
expect(first.stdout.split('\n')).toEqual(
|
expect(first.stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
|
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
|
||||||
);
|
);
|
||||||
|
@ -179,7 +179,7 @@ describe(`immich upload`, () => {
|
||||||
expect(assets.total).toBe(1);
|
expect(assets.total).toBe(1);
|
||||||
|
|
||||||
const second = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
const second = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
||||||
expect(second.stderr).toBe('');
|
expect(second.stderr).toContain('{message}');
|
||||||
expect(second.stdout.split('\n')).toEqual(
|
expect(second.stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([
|
expect.arrayContaining([
|
||||||
expect.stringContaining('Found 0 new files and 1 duplicate'),
|
expect.stringContaining('Found 0 new files and 1 duplicate'),
|
||||||
|
@ -205,7 +205,7 @@ describe(`immich upload`, () => {
|
||||||
`${testAssetDir}/albums/nature/silver_fir.jpg`,
|
`${testAssetDir}/albums/nature/silver_fir.jpg`,
|
||||||
'--dry-run',
|
'--dry-run',
|
||||||
]);
|
]);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining('Would have uploaded 1 asset')]),
|
expect.arrayContaining([expect.stringContaining('Would have uploaded 1 asset')]),
|
||||||
);
|
);
|
||||||
|
@ -217,7 +217,7 @@ describe(`immich upload`, () => {
|
||||||
|
|
||||||
it('dry run should handle duplicates', async () => {
|
it('dry run should handle duplicates', async () => {
|
||||||
const first = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
const first = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
|
||||||
expect(first.stderr).toBe('');
|
expect(first.stderr).toContain('{message}');
|
||||||
expect(first.stdout.split('\n')).toEqual(
|
expect(first.stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
|
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
|
||||||
);
|
);
|
||||||
|
@ -227,7 +227,7 @@ describe(`immich upload`, () => {
|
||||||
expect(assets.total).toBe(1);
|
expect(assets.total).toBe(1);
|
||||||
|
|
||||||
const second = await immichCli(['upload', `${testAssetDir}/albums/nature/`, '--dry-run']);
|
const second = await immichCli(['upload', `${testAssetDir}/albums/nature/`, '--dry-run']);
|
||||||
expect(second.stderr).toBe('');
|
expect(second.stderr).toContain('{message}');
|
||||||
expect(second.stdout.split('\n')).toEqual(
|
expect(second.stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([
|
expect.arrayContaining([
|
||||||
expect.stringContaining('Found 8 new files and 1 duplicate'),
|
expect.stringContaining('Found 8 new files and 1 duplicate'),
|
||||||
|
@ -241,7 +241,7 @@ describe(`immich upload`, () => {
|
||||||
describe('immich upload --recursive', () => {
|
describe('immich upload --recursive', () => {
|
||||||
it('should upload a folder recursively', async () => {
|
it('should upload a folder recursively', async () => {
|
||||||
const { stderr, stdout, exitCode } = await immichCli(['upload', `${testAssetDir}/albums/nature/`, '--recursive']);
|
const { stderr, stdout, exitCode } = await immichCli(['upload', `${testAssetDir}/albums/nature/`, '--recursive']);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining('Successfully uploaded 9 new assets')]),
|
expect.arrayContaining([expect.stringContaining('Successfully uploaded 9 new assets')]),
|
||||||
);
|
);
|
||||||
|
@ -267,7 +267,7 @@ describe(`immich upload`, () => {
|
||||||
expect.stringContaining('Successfully updated 9 assets'),
|
expect.stringContaining('Successfully updated 9 assets'),
|
||||||
]),
|
]),
|
||||||
);
|
);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(exitCode).toBe(0);
|
expect(exitCode).toBe(0);
|
||||||
|
|
||||||
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -283,7 +283,7 @@ describe(`immich upload`, () => {
|
||||||
expect(response1.stdout.split('\n')).toEqual(
|
expect(response1.stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([expect.stringContaining('Successfully uploaded 9 new assets')]),
|
expect.arrayContaining([expect.stringContaining('Successfully uploaded 9 new assets')]),
|
||||||
);
|
);
|
||||||
expect(response1.stderr).toBe('');
|
expect(response1.stderr).toContain('{message}');
|
||||||
expect(response1.exitCode).toBe(0);
|
expect(response1.exitCode).toBe(0);
|
||||||
|
|
||||||
const assets1 = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets1 = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -299,7 +299,7 @@ describe(`immich upload`, () => {
|
||||||
expect.stringContaining('Successfully updated 9 assets'),
|
expect.stringContaining('Successfully updated 9 assets'),
|
||||||
]),
|
]),
|
||||||
);
|
);
|
||||||
expect(response2.stderr).toBe('');
|
expect(response2.stderr).toContain('{message}');
|
||||||
expect(response2.exitCode).toBe(0);
|
expect(response2.exitCode).toBe(0);
|
||||||
|
|
||||||
const assets2 = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets2 = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -325,7 +325,7 @@ describe(`immich upload`, () => {
|
||||||
expect.stringContaining('Would have updated albums of 9 assets'),
|
expect.stringContaining('Would have updated albums of 9 assets'),
|
||||||
]),
|
]),
|
||||||
);
|
);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(exitCode).toBe(0);
|
expect(exitCode).toBe(0);
|
||||||
|
|
||||||
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -351,7 +351,7 @@ describe(`immich upload`, () => {
|
||||||
expect.stringContaining('Successfully updated 9 assets'),
|
expect.stringContaining('Successfully updated 9 assets'),
|
||||||
]),
|
]),
|
||||||
);
|
);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(exitCode).toBe(0);
|
expect(exitCode).toBe(0);
|
||||||
|
|
||||||
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -377,7 +377,7 @@ describe(`immich upload`, () => {
|
||||||
expect.stringContaining('Would have updated albums of 9 assets'),
|
expect.stringContaining('Would have updated albums of 9 assets'),
|
||||||
]),
|
]),
|
||||||
);
|
);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(exitCode).toBe(0);
|
expect(exitCode).toBe(0);
|
||||||
|
|
||||||
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -408,7 +408,7 @@ describe(`immich upload`, () => {
|
||||||
expect.stringContaining('Deleting assets that have been uploaded'),
|
expect.stringContaining('Deleting assets that have been uploaded'),
|
||||||
]),
|
]),
|
||||||
);
|
);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(exitCode).toBe(0);
|
expect(exitCode).toBe(0);
|
||||||
|
|
||||||
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -434,7 +434,7 @@ describe(`immich upload`, () => {
|
||||||
expect.stringContaining('Would have deleted 9 local assets'),
|
expect.stringContaining('Would have deleted 9 local assets'),
|
||||||
]),
|
]),
|
||||||
);
|
);
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(exitCode).toBe(0);
|
expect(exitCode).toBe(0);
|
||||||
|
|
||||||
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
|
||||||
|
@ -493,7 +493,7 @@ describe(`immich upload`, () => {
|
||||||
'2',
|
'2',
|
||||||
]);
|
]);
|
||||||
|
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([
|
expect.arrayContaining([
|
||||||
'Found 9 new files and 0 duplicates',
|
'Found 9 new files and 0 duplicates',
|
||||||
|
@ -534,7 +534,7 @@ describe(`immich upload`, () => {
|
||||||
'silver_fir.jpg',
|
'silver_fir.jpg',
|
||||||
]);
|
]);
|
||||||
|
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([
|
expect.arrayContaining([
|
||||||
'Found 8 new files and 0 duplicates',
|
'Found 8 new files and 0 duplicates',
|
||||||
|
@ -555,7 +555,7 @@ describe(`immich upload`, () => {
|
||||||
'!(*_*_*).jpg',
|
'!(*_*_*).jpg',
|
||||||
]);
|
]);
|
||||||
|
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([
|
expect.arrayContaining([
|
||||||
'Found 1 new files and 0 duplicates',
|
'Found 1 new files and 0 duplicates',
|
||||||
|
@ -577,7 +577,7 @@ describe(`immich upload`, () => {
|
||||||
'--dry-run',
|
'--dry-run',
|
||||||
]);
|
]);
|
||||||
|
|
||||||
expect(stderr).toBe('');
|
expect(stderr).toContain('{message}');
|
||||||
expect(stdout.split('\n')).toEqual(
|
expect(stdout.split('\n')).toEqual(
|
||||||
expect.arrayContaining([
|
expect.arrayContaining([
|
||||||
'Found 8 new files and 0 duplicates',
|
'Found 8 new files and 0 duplicates',
|
||||||
|
|
Loading…
Reference in a new issue