1
0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-16 16:56:46 +01:00

feat(server): faster geodata import (#14241)

* faster geodata import

* revert logging change

* unlogged tables

* leave spare connection

* use expression index instead of generated column

* do btree indexing with others
This commit is contained in:
Mert 2024-11-20 09:57:14 -05:00 committed by GitHub
parent a3712e40bd
commit ad510dd6fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 213 additions and 137 deletions

View file

@ -14,13 +14,42 @@ export class GeodataPlacesEntity {
@Column({ type: 'float' }) @Column({ type: 'float' })
latitude!: number; latitude!: number;
// @Column({ @Column({ type: 'char', length: 2 })
// generatedType: 'STORED', countryCode!: string;
// asExpression: 'll_to_earth((latitude)::double precision, (longitude)::double precision)',
// type: 'earth', @Column({ type: 'varchar', length: 20, nullable: true })
// }) admin1Code!: string;
// earthCoord!: unknown;
@Column({ type: 'varchar', length: 80, nullable: true })
admin2Code!: string;
@Column({ type: 'varchar', nullable: true })
admin1Name!: string;
@Column({ type: 'varchar', nullable: true })
admin2Name!: string;
@Column({ type: 'varchar', nullable: true })
alternateNames!: string;
@Column({ type: 'date' })
modificationDate!: Date;
}
@Entity('geodata_places_tmp', { synchronize: false })
export class GeodataPlacesTempEntity {
@PrimaryColumn({ type: 'integer' })
id!: number;
@Column({ type: 'varchar', length: 200 })
name!: string;
@Column({ type: 'float' })
longitude!: number;
@Column({ type: 'float' })
latitude!: number;
@Column({ type: 'char', length: 2 }) @Column({ type: 'char', length: 2 })
countryCode!: string; countryCode!: string;

View file

@ -2,7 +2,25 @@ import { Column, Entity, PrimaryGeneratedColumn } from 'typeorm';
@Entity('naturalearth_countries', { synchronize: false }) @Entity('naturalearth_countries', { synchronize: false })
export class NaturalEarthCountriesEntity { export class NaturalEarthCountriesEntity {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn('identity', { generatedIdentity: 'ALWAYS' })
id!: number;
@Column({ type: 'varchar', length: 50 })
admin!: string;
@Column({ type: 'varchar', length: 3 })
admin_a3!: string;
@Column({ type: 'varchar', length: 50 })
type!: string;
@Column({ type: 'polygon' })
coordinates!: string;
}
@Entity('naturalearth_countries_tmp', { synchronize: false })
export class NaturalEarthCountriesTempEntity {
@PrimaryGeneratedColumn('identity', { generatedIdentity: 'ALWAYS' })
id!: number; id!: number;
@Column({ type: 'varchar', length: 50 }) @Column({ type: 'varchar', length: 50 })

View file

@ -0,0 +1,29 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class NaturalEarthCountriesIdentityColumn1732072134943 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE naturalearth_countries ALTER id DROP DEFAULT`);
await queryRunner.query(`DROP SEQUENCE naturalearth_countries_id_seq`);
await queryRunner.query(`ALTER TABLE naturalearth_countries ALTER id ADD GENERATED ALWAYS AS IDENTITY`);
// same as ll_to_earth, but with explicit schema to avoid weirdness and allow it to work in expression indices
await queryRunner.query(`
CREATE FUNCTION ll_to_earth_public(latitude double precision, longitude double precision) RETURNS public.earth PARALLEL SAFE IMMUTABLE STRICT LANGUAGE SQL AS $$
SELECT public.cube(public.cube(public.cube(public.earth()*cos(radians(latitude))*cos(radians(longitude))),public.earth()*cos(radians(latitude))*sin(radians(longitude))),public.earth()*sin(radians(latitude)))::public.earth
$$`);
await queryRunner.query(`ALTER TABLE geodata_places DROP COLUMN "earthCoord"`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE naturalearth_countries ALTER id DROP GENERATED`);
await queryRunner.query(`CREATE SEQUENCE naturalearth_countries_id_seq`);
await queryRunner.query(
`ALTER TABLE naturalearth_countries ALTER id SET DEFAULT nextval('naturalearth_countries_id_seq'::regclass)`,
);
await queryRunner.query(`DROP FUNCTION ll_to_earth_public`);
await queryRunner.query(
`ALTER TABLE "geodata_places" ADD "earthCoord" earth GENERATED ALWAYS AS (ll_to_earth(latitude, longitude)) STORED`,
);
}
}

View file

@ -1,14 +1,18 @@
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import { InjectDataSource, InjectRepository } from '@nestjs/typeorm'; import { InjectDataSource, InjectRepository } from '@nestjs/typeorm';
import { getName } from 'i18n-iso-countries'; import { getName } from 'i18n-iso-countries';
import { randomUUID } from 'node:crypto';
import { createReadStream, existsSync } from 'node:fs'; import { createReadStream, existsSync } from 'node:fs';
import { readFile } from 'node:fs/promises'; import { readFile } from 'node:fs/promises';
import readLine from 'node:readline'; import readLine from 'node:readline';
import { citiesFile } from 'src/constants'; import { citiesFile } from 'src/constants';
import { AssetEntity } from 'src/entities/asset.entity'; import { AssetEntity } from 'src/entities/asset.entity';
import { GeodataPlacesEntity } from 'src/entities/geodata-places.entity'; import { GeodataPlacesEntity, GeodataPlacesTempEntity } from 'src/entities/geodata-places.entity';
import { NaturalEarthCountriesEntity } from 'src/entities/natural-earth-countries.entity'; import {
import { SystemMetadataKey } from 'src/enum'; NaturalEarthCountriesEntity,
NaturalEarthCountriesTempEntity,
} from 'src/entities/natural-earth-countries.entity';
import { LogLevel, SystemMetadataKey } from 'src/enum';
import { IConfigRepository } from 'src/interfaces/config.interface'; import { IConfigRepository } from 'src/interfaces/config.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { import {
@ -20,7 +24,7 @@ import {
} from 'src/interfaces/map.interface'; } from 'src/interfaces/map.interface';
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
import { OptionalBetween } from 'src/utils/database'; import { OptionalBetween } from 'src/utils/database';
import { DataSource, In, IsNull, Not, QueryRunner, Repository } from 'typeorm'; import { DataSource, In, IsNull, Not, Repository } from 'typeorm';
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js'; import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js';
@Injectable() @Injectable()
@ -49,8 +53,7 @@ export class MapRepository implements IMapRepository {
return; return;
} }
await this.importGeodata(); await Promise.all([this.importGeodata(), this.importNaturalEarthCountries()]);
await this.importNaturalEarthCountries();
await this.metadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, { await this.metadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, {
lastUpdate: geodataDate, lastUpdate: geodataDate,
@ -116,13 +119,18 @@ export class MapRepository implements IMapRepository {
const response = await this.geodataPlacesRepository const response = await this.geodataPlacesRepository
.createQueryBuilder('geoplaces') .createQueryBuilder('geoplaces')
.where('earth_box(ll_to_earth(:latitude, :longitude), 25000) @> "earthCoord"', point) .where(
.orderBy('earth_distance(ll_to_earth(:latitude, :longitude), "earthCoord")') 'earth_box(ll_to_earth_public(:latitude, :longitude), 25000) @> ll_to_earth_public(latitude, longitude)',
point,
)
.orderBy('earth_distance(ll_to_earth_public(:latitude, :longitude), ll_to_earth_public(latitude, longitude))')
.limit(1) .limit(1)
.getOne(); .getOne();
if (response) { if (response) {
this.logger.verbose(`Raw: ${JSON.stringify(response, null, 2)}`); if (this.logger.isLevelEnabled(LogLevel.VERBOSE)) {
this.logger.verbose(`Raw: ${JSON.stringify(response, null, 2)}`);
}
const { countryCode, name: city, admin1Name } = response; const { countryCode, name: city, admin1Name } = response;
const country = getName(countryCode, 'en') ?? null; const country = getName(countryCode, 'en') ?? null;
@ -149,8 +157,9 @@ export class MapRepository implements IMapRepository {
return { country: null, state: null, city: null }; return { country: null, state: null, city: null };
} }
this.logger.verbose(`Raw: ${JSON.stringify(ne_response, ['id', 'admin', 'admin_a3', 'type'], 2)}`); if (this.logger.isLevelEnabled(LogLevel.VERBOSE)) {
this.logger.verbose(`Raw: ${JSON.stringify(ne_response, ['id', 'admin', 'admin_a3', 'type'], 2)}`);
}
const { admin_a3 } = ne_response; const { admin_a3 } = ne_response;
const country = getName(admin_a3, 'en') ?? null; const country = getName(admin_a3, 'en') ?? null;
const state = null; const state = null;
@ -159,151 +168,119 @@ export class MapRepository implements IMapRepository {
return { country, state, city }; return { country, state, city };
} }
private transformCoordinatesToPolygon(coordinates: number[][][]): string {
const pointsString = coordinates.map((point) => `(${point[0]},${point[1]})`).join(', ');
return `(${pointsString})`;
}
private async importNaturalEarthCountries() { private async importNaturalEarthCountries() {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
const { resourcePaths } = this.configRepository.getEnv(); const { resourcePaths } = this.configRepository.getEnv();
const geoJSONData = JSON.parse(await readFile(resourcePaths.geodata.naturalEarthCountriesPath, 'utf8'));
if (geoJSONData.type !== 'FeatureCollection' || !Array.isArray(geoJSONData.features)) {
this.logger.fatal('Invalid GeoJSON FeatureCollection');
return;
}
try { await this.dataSource.query('DROP TABLE IF EXISTS naturalearth_countries_tmp');
await queryRunner.startTransaction(); await this.dataSource.query(
await queryRunner.manager.clear(NaturalEarthCountriesEntity); 'CREATE UNLOGGED TABLE naturalearth_countries_tmp (LIKE naturalearth_countries INCLUDING ALL EXCLUDING INDEXES)',
);
const fileContent = await readFile(resourcePaths.geodata.naturalEarthCountriesPath, 'utf8'); const entities: Omit<NaturalEarthCountriesTempEntity, 'id'>[] = [];
const geoJSONData = JSON.parse(fileContent); for (const feature of geoJSONData.features) {
for (const entry of feature.geometry.coordinates) {
if (geoJSONData.type !== 'FeatureCollection' || !Array.isArray(geoJSONData.features)) { const coordinates: number[][][] = feature.geometry.type === 'MultiPolygon' ? entry[0] : entry;
this.logger.fatal('Invalid GeoJSON FeatureCollection'); const featureRecord: Omit<NaturalEarthCountriesTempEntity, 'id'> = {
return; admin: feature.properties.ADMIN,
} admin_a3: feature.properties.ADM0_A3,
type: feature.properties.TYPE,
for await (const feature of geoJSONData.features) { coordinates: `(${coordinates.map((point) => `(${point[0]},${point[1]})`).join(', ')})`,
for (const polygon of feature.geometry.coordinates) { };
const featureRecord = new NaturalEarthCountriesEntity(); entities.push(featureRecord);
featureRecord.admin = feature.properties.ADMIN; if (feature.geometry.type === 'Polygon') {
featureRecord.admin_a3 = feature.properties.ADM0_A3; break;
featureRecord.type = feature.properties.TYPE;
if (feature.geometry.type === 'MultiPolygon') {
featureRecord.coordinates = this.transformCoordinatesToPolygon(polygon[0]);
await queryRunner.manager.save(featureRecord);
} else if (feature.geometry.type === 'Polygon') {
featureRecord.coordinates = this.transformCoordinatesToPolygon(polygon);
await queryRunner.manager.save(featureRecord);
break;
}
} }
} }
await queryRunner.commitTransaction();
} catch (error) {
this.logger.fatal('Error importing natural earth country data', error);
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
} }
await this.dataSource.manager.insert(NaturalEarthCountriesTempEntity, entities);
await this.dataSource.query(`ALTER TABLE naturalearth_countries_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`);
await this.dataSource.transaction(async (manager) => {
await manager.query('ALTER TABLE naturalearth_countries RENAME TO naturalearth_countries_old');
await manager.query('ALTER TABLE naturalearth_countries_tmp RENAME TO naturalearth_countries');
await manager.query('DROP TABLE naturalearth_countries_old');
});
} }
private async importGeodata() { private async importGeodata() {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
const { resourcePaths } = this.configRepository.getEnv(); const { resourcePaths } = this.configRepository.getEnv();
const admin1 = await this.loadAdmin(resourcePaths.geodata.admin1); const [admin1, admin2] = await Promise.all([
const admin2 = await this.loadAdmin(resourcePaths.geodata.admin2); this.loadAdmin(resourcePaths.geodata.admin1),
this.loadAdmin(resourcePaths.geodata.admin2),
]);
try { await this.dataSource.query('DROP TABLE IF EXISTS geodata_places_tmp');
await queryRunner.startTransaction(); await this.dataSource.query(
'CREATE UNLOGGED TABLE geodata_places_tmp (LIKE geodata_places INCLUDING ALL EXCLUDING INDEXES)',
);
await this.loadCities500(admin1, admin2);
await this.createGeodataIndices();
await queryRunner.manager.clear(GeodataPlacesEntity); await this.dataSource.transaction(async (manager) => {
await this.loadCities500(queryRunner, admin1, admin2); await manager.query('ALTER TABLE geodata_places RENAME TO geodata_places_old');
await manager.query('ALTER TABLE geodata_places_tmp RENAME TO geodata_places');
await queryRunner.commitTransaction(); await manager.query('DROP TABLE geodata_places_old');
} catch (error) { });
this.logger.fatal('Error importing geodata', error);
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
} }
private async loadGeodataToTableFromFile( private async loadCities500(admin1Map: Map<string, string>, admin2Map: Map<string, string>) {
queryRunner: QueryRunner, const { resourcePaths } = this.configRepository.getEnv();
lineToEntityMapper: (lineSplit: string[]) => GeodataPlacesEntity, const cities500 = resourcePaths.geodata.cities500;
filePath: string, if (!existsSync(cities500)) {
options?: { entityFilter?: (linesplit: string[]) => boolean }, throw new Error(`Geodata file ${cities500} not found`);
) {
const _entityFilter = options?.entityFilter ?? (() => true);
if (!existsSync(filePath)) {
this.logger.error(`Geodata file ${filePath} not found`);
throw new Error(`Geodata file ${filePath} not found`);
} }
const input = createReadStream(filePath); const input = createReadStream(cities500, { highWaterMark: 512 * 1024 * 1024 });
let bufferGeodata: QueryDeepPartialEntity<GeodataPlacesEntity>[] = []; let bufferGeodata: QueryDeepPartialEntity<GeodataPlacesTempEntity>[] = [];
const lineReader = readLine.createInterface({ input }); const lineReader = readLine.createInterface({ input });
let count = 0; let count = 0;
let futures = [];
for await (const line of lineReader) { for await (const line of lineReader) {
const lineSplit = line.split('\t'); const lineSplit = line.split('\t');
if (!_entityFilter(lineSplit)) { if (lineSplit[7] === 'PPLX' && lineSplit[8] !== 'AU') {
continue; continue;
} }
const geoData = lineToEntityMapper(lineSplit);
const geoData = {
id: Number.parseInt(lineSplit[0]),
name: lineSplit[1],
alternateNames: lineSplit[3],
latitude: Number.parseFloat(lineSplit[4]),
longitude: Number.parseFloat(lineSplit[5]),
countryCode: lineSplit[8],
admin1Code: lineSplit[10],
admin2Code: lineSplit[11],
modificationDate: lineSplit[18],
admin1Name: admin1Map.get(`${lineSplit[8]}.${lineSplit[10]}`),
admin2Name: admin2Map.get(`${lineSplit[8]}.${lineSplit[10]}.${lineSplit[11]}`),
};
bufferGeodata.push(geoData); bufferGeodata.push(geoData);
if (bufferGeodata.length >= 1000) { if (bufferGeodata.length >= 5000) {
await queryRunner.manager.upsert(GeodataPlacesEntity, bufferGeodata, ['id']); const curLength = bufferGeodata.length;
count += bufferGeodata.length; futures.push(
if (count % 10_000 === 0) { this.dataSource.manager.insert(GeodataPlacesTempEntity, bufferGeodata).then(() => {
this.logger.log(`${count} geodata records imported`); count += curLength;
} if (count % 10_000 === 0) {
this.logger.log(`${count} geodata records imported`);
}
}),
);
bufferGeodata = []; bufferGeodata = [];
// leave spare connection for other queries
if (futures.length >= 9) {
await Promise.all(futures);
futures = [];
}
} }
} }
await queryRunner.manager.upsert(GeodataPlacesEntity, bufferGeodata, ['id']);
}
private async loadCities500( await this.dataSource.manager.insert(GeodataPlacesTempEntity, bufferGeodata);
queryRunner: QueryRunner,
admin1Map: Map<string, string>,
admin2Map: Map<string, string>,
) {
const { resourcePaths } = this.configRepository.getEnv();
await this.loadGeodataToTableFromFile(
queryRunner,
(lineSplit: string[]) =>
this.geodataPlacesRepository.create({
id: Number.parseInt(lineSplit[0]),
name: lineSplit[1],
alternateNames: lineSplit[3],
latitude: Number.parseFloat(lineSplit[4]),
longitude: Number.parseFloat(lineSplit[5]),
countryCode: lineSplit[8],
admin1Code: lineSplit[10],
admin2Code: lineSplit[11],
modificationDate: lineSplit[18],
admin1Name: admin1Map.get(`${lineSplit[8]}.${lineSplit[10]}`),
admin2Name: admin2Map.get(`${lineSplit[8]}.${lineSplit[10]}.${lineSplit[11]}`),
}),
resourcePaths.geodata.cities500,
{
entityFilter: (lineSplit) => {
if (lineSplit[7] === 'PPLX') {
// Exclude populated subsections of cities that are not in Australia.
// Australia has a lot of PPLX areas, so we include them.
return lineSplit[8] === 'AU';
}
return true;
},
},
);
} }
private async loadAdmin(filePath: string) { private async loadAdmin(filePath: string) {
@ -312,7 +289,7 @@ export class MapRepository implements IMapRepository {
throw new Error(`Geodata file ${filePath} not found`); throw new Error(`Geodata file ${filePath} not found`);
} }
const input = createReadStream(filePath); const input = createReadStream(filePath, { highWaterMark: 512 * 1024 * 1024 });
const lineReader = readLine.createInterface({ input }); const lineReader = readLine.createInterface({ input });
const adminMap = new Map<string, string>(); const adminMap = new Map<string, string>();
@ -323,4 +300,27 @@ export class MapRepository implements IMapRepository {
return adminMap; return adminMap;
} }
private createGeodataIndices() {
return Promise.all([
this.dataSource.query(`ALTER TABLE geodata_places_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`),
this.dataSource.query(`
CREATE INDEX IDX_geodata_gist_earthcoord_${randomUUID().replaceAll('-', '_')}
ON geodata_places_tmp
USING gist (ll_to_earth_public(latitude, longitude))
WITH (fillfactor = 100)`),
this.dataSource.query(`
CREATE INDEX idx_geodata_places_name_${randomUUID().replaceAll('-', '_')}
ON geodata_places_tmp
USING gin (f_unaccent(name) gin_trgm_ops)`),
this.dataSource.query(`
CREATE INDEX idx_geodata_places_admin1_name_${randomUUID().replaceAll('-', '_')}
ON geodata_places_tmp
USING gin (f_unaccent("admin1Name") gin_trgm_ops)`),
this.dataSource.query(`
CREATE INDEX idx_geodata_places_admin2_name_${randomUUID().replaceAll('-', '_')}
ON geodata_places_tmp
USING gin (f_unaccent("admin2Name") gin_trgm_ops)`),
]);
}
} }