From ad510dd6fd96ddd5a934f7264cb59b37b91ed66a Mon Sep 17 00:00:00 2001 From: Mert <101130780+mertalev@users.noreply.github.com> Date: Wed, 20 Nov 2024 09:57:14 -0500 Subject: [PATCH] feat(server): faster geodata import (#14241) * faster geodata import * revert logging change * unlogged tables * leave spare connection * use expression index instead of generated column * do btree indexing with others --- server/src/entities/geodata-places.entity.ts | 43 ++- .../natural-earth-countries.entity.ts | 20 +- ...943-NaturalEarthCountriesIdentityColumn.ts | 29 ++ server/src/repositories/map.repository.ts | 258 +++++++++--------- 4 files changed, 213 insertions(+), 137 deletions(-) create mode 100644 server/src/migrations/1732072134943-NaturalEarthCountriesIdentityColumn.ts diff --git a/server/src/entities/geodata-places.entity.ts b/server/src/entities/geodata-places.entity.ts index 966a50d5c9..eb32d1b99b 100644 --- a/server/src/entities/geodata-places.entity.ts +++ b/server/src/entities/geodata-places.entity.ts @@ -14,13 +14,42 @@ export class GeodataPlacesEntity { @Column({ type: 'float' }) latitude!: number; - // @Column({ - // generatedType: 'STORED', - // asExpression: 'll_to_earth((latitude)::double precision, (longitude)::double precision)', - // type: 'earth', - // }) - // earthCoord!: unknown; - + @Column({ type: 'char', length: 2 }) + countryCode!: string; + + @Column({ type: 'varchar', length: 20, nullable: true }) + admin1Code!: string; + + @Column({ type: 'varchar', length: 80, nullable: true }) + admin2Code!: string; + + @Column({ type: 'varchar', nullable: true }) + admin1Name!: string; + + @Column({ type: 'varchar', nullable: true }) + admin2Name!: string; + + @Column({ type: 'varchar', nullable: true }) + alternateNames!: string; + + @Column({ type: 'date' }) + modificationDate!: Date; +} + +@Entity('geodata_places_tmp', { synchronize: false }) +export class GeodataPlacesTempEntity { + @PrimaryColumn({ type: 'integer' }) + id!: number; + + @Column({ type: 'varchar', length: 200 }) + name!: string; + + @Column({ type: 'float' }) + longitude!: number; + + @Column({ type: 'float' }) + latitude!: number; + @Column({ type: 'char', length: 2 }) countryCode!: string; diff --git a/server/src/entities/natural-earth-countries.entity.ts b/server/src/entities/natural-earth-countries.entity.ts index 19a12fa07b..0f97132045 100644 --- a/server/src/entities/natural-earth-countries.entity.ts +++ b/server/src/entities/natural-earth-countries.entity.ts @@ -2,7 +2,25 @@ import { Column, Entity, PrimaryGeneratedColumn } from 'typeorm'; @Entity('naturalearth_countries', { synchronize: false }) export class NaturalEarthCountriesEntity { - @PrimaryGeneratedColumn() + @PrimaryGeneratedColumn('identity', { generatedIdentity: 'ALWAYS' }) + id!: number; + + @Column({ type: 'varchar', length: 50 }) + admin!: string; + + @Column({ type: 'varchar', length: 3 }) + admin_a3!: string; + + @Column({ type: 'varchar', length: 50 }) + type!: string; + + @Column({ type: 'polygon' }) + coordinates!: string; +} + +@Entity('naturalearth_countries_tmp', { synchronize: false }) +export class NaturalEarthCountriesTempEntity { + @PrimaryGeneratedColumn('identity', { generatedIdentity: 'ALWAYS' }) id!: number; @Column({ type: 'varchar', length: 50 }) diff --git a/server/src/migrations/1732072134943-NaturalEarthCountriesIdentityColumn.ts b/server/src/migrations/1732072134943-NaturalEarthCountriesIdentityColumn.ts new file mode 100644 index 0000000000..3ebe8108cb --- /dev/null +++ b/server/src/migrations/1732072134943-NaturalEarthCountriesIdentityColumn.ts @@ -0,0 +1,29 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class NaturalEarthCountriesIdentityColumn1732072134943 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE naturalearth_countries ALTER id DROP DEFAULT`); + await queryRunner.query(`DROP SEQUENCE naturalearth_countries_id_seq`); + await queryRunner.query(`ALTER TABLE naturalearth_countries ALTER id ADD GENERATED ALWAYS AS IDENTITY`); + + // same as ll_to_earth, but with explicit schema to avoid weirdness and allow it to work in expression indices + await queryRunner.query(` + CREATE FUNCTION ll_to_earth_public(latitude double precision, longitude double precision) RETURNS public.earth PARALLEL SAFE IMMUTABLE STRICT LANGUAGE SQL AS $$ + SELECT public.cube(public.cube(public.cube(public.earth()*cos(radians(latitude))*cos(radians(longitude))),public.earth()*cos(radians(latitude))*sin(radians(longitude))),public.earth()*sin(radians(latitude)))::public.earth + $$`); + + await queryRunner.query(`ALTER TABLE geodata_places DROP COLUMN "earthCoord"`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE naturalearth_countries ALTER id DROP GENERATED`); + await queryRunner.query(`CREATE SEQUENCE naturalearth_countries_id_seq`); + await queryRunner.query( + `ALTER TABLE naturalearth_countries ALTER id SET DEFAULT nextval('naturalearth_countries_id_seq'::regclass)`, + ); + await queryRunner.query(`DROP FUNCTION ll_to_earth_public`); + await queryRunner.query( + `ALTER TABLE "geodata_places" ADD "earthCoord" earth GENERATED ALWAYS AS (ll_to_earth(latitude, longitude)) STORED`, + ); + } +} diff --git a/server/src/repositories/map.repository.ts b/server/src/repositories/map.repository.ts index f87ba6d0ac..348736a33d 100644 --- a/server/src/repositories/map.repository.ts +++ b/server/src/repositories/map.repository.ts @@ -1,14 +1,18 @@ import { Inject, Injectable } from '@nestjs/common'; import { InjectDataSource, InjectRepository } from '@nestjs/typeorm'; import { getName } from 'i18n-iso-countries'; +import { randomUUID } from 'node:crypto'; import { createReadStream, existsSync } from 'node:fs'; import { readFile } from 'node:fs/promises'; import readLine from 'node:readline'; import { citiesFile } from 'src/constants'; import { AssetEntity } from 'src/entities/asset.entity'; -import { GeodataPlacesEntity } from 'src/entities/geodata-places.entity'; -import { NaturalEarthCountriesEntity } from 'src/entities/natural-earth-countries.entity'; -import { SystemMetadataKey } from 'src/enum'; +import { GeodataPlacesEntity, GeodataPlacesTempEntity } from 'src/entities/geodata-places.entity'; +import { + NaturalEarthCountriesEntity, + NaturalEarthCountriesTempEntity, +} from 'src/entities/natural-earth-countries.entity'; +import { LogLevel, SystemMetadataKey } from 'src/enum'; import { IConfigRepository } from 'src/interfaces/config.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { @@ -20,7 +24,7 @@ import { } from 'src/interfaces/map.interface'; import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface'; import { OptionalBetween } from 'src/utils/database'; -import { DataSource, In, IsNull, Not, QueryRunner, Repository } from 'typeorm'; +import { DataSource, In, IsNull, Not, Repository } from 'typeorm'; import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js'; @Injectable() @@ -49,8 +53,7 @@ export class MapRepository implements IMapRepository { return; } - await this.importGeodata(); - await this.importNaturalEarthCountries(); + await Promise.all([this.importGeodata(), this.importNaturalEarthCountries()]); await this.metadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, { lastUpdate: geodataDate, @@ -116,13 +119,18 @@ export class MapRepository implements IMapRepository { const response = await this.geodataPlacesRepository .createQueryBuilder('geoplaces') - .where('earth_box(ll_to_earth(:latitude, :longitude), 25000) @> "earthCoord"', point) - .orderBy('earth_distance(ll_to_earth(:latitude, :longitude), "earthCoord")') + .where( + 'earth_box(ll_to_earth_public(:latitude, :longitude), 25000) @> ll_to_earth_public(latitude, longitude)', + point, + ) + .orderBy('earth_distance(ll_to_earth_public(:latitude, :longitude), ll_to_earth_public(latitude, longitude))') .limit(1) .getOne(); if (response) { - this.logger.verbose(`Raw: ${JSON.stringify(response, null, 2)}`); + if (this.logger.isLevelEnabled(LogLevel.VERBOSE)) { + this.logger.verbose(`Raw: ${JSON.stringify(response, null, 2)}`); + } const { countryCode, name: city, admin1Name } = response; const country = getName(countryCode, 'en') ?? null; @@ -149,8 +157,9 @@ export class MapRepository implements IMapRepository { return { country: null, state: null, city: null }; } - this.logger.verbose(`Raw: ${JSON.stringify(ne_response, ['id', 'admin', 'admin_a3', 'type'], 2)}`); - + if (this.logger.isLevelEnabled(LogLevel.VERBOSE)) { + this.logger.verbose(`Raw: ${JSON.stringify(ne_response, ['id', 'admin', 'admin_a3', 'type'], 2)}`); + } const { admin_a3 } = ne_response; const country = getName(admin_a3, 'en') ?? null; const state = null; @@ -159,151 +168,119 @@ export class MapRepository implements IMapRepository { return { country, state, city }; } - private transformCoordinatesToPolygon(coordinates: number[][][]): string { - const pointsString = coordinates.map((point) => `(${point[0]},${point[1]})`).join(', '); - return `(${pointsString})`; - } - private async importNaturalEarthCountries() { - const queryRunner = this.dataSource.createQueryRunner(); - await queryRunner.connect(); - const { resourcePaths } = this.configRepository.getEnv(); + const geoJSONData = JSON.parse(await readFile(resourcePaths.geodata.naturalEarthCountriesPath, 'utf8')); + if (geoJSONData.type !== 'FeatureCollection' || !Array.isArray(geoJSONData.features)) { + this.logger.fatal('Invalid GeoJSON FeatureCollection'); + return; + } - try { - await queryRunner.startTransaction(); - await queryRunner.manager.clear(NaturalEarthCountriesEntity); - - const fileContent = await readFile(resourcePaths.geodata.naturalEarthCountriesPath, 'utf8'); - const geoJSONData = JSON.parse(fileContent); - - if (geoJSONData.type !== 'FeatureCollection' || !Array.isArray(geoJSONData.features)) { - this.logger.fatal('Invalid GeoJSON FeatureCollection'); - return; - } - - for await (const feature of geoJSONData.features) { - for (const polygon of feature.geometry.coordinates) { - const featureRecord = new NaturalEarthCountriesEntity(); - featureRecord.admin = feature.properties.ADMIN; - featureRecord.admin_a3 = feature.properties.ADM0_A3; - featureRecord.type = feature.properties.TYPE; - - if (feature.geometry.type === 'MultiPolygon') { - featureRecord.coordinates = this.transformCoordinatesToPolygon(polygon[0]); - await queryRunner.manager.save(featureRecord); - } else if (feature.geometry.type === 'Polygon') { - featureRecord.coordinates = this.transformCoordinatesToPolygon(polygon); - await queryRunner.manager.save(featureRecord); - break; - } + await this.dataSource.query('DROP TABLE IF EXISTS naturalearth_countries_tmp'); + await this.dataSource.query( + 'CREATE UNLOGGED TABLE naturalearth_countries_tmp (LIKE naturalearth_countries INCLUDING ALL EXCLUDING INDEXES)', + ); + const entities: Omit[] = []; + for (const feature of geoJSONData.features) { + for (const entry of feature.geometry.coordinates) { + const coordinates: number[][][] = feature.geometry.type === 'MultiPolygon' ? entry[0] : entry; + const featureRecord: Omit = { + admin: feature.properties.ADMIN, + admin_a3: feature.properties.ADM0_A3, + type: feature.properties.TYPE, + coordinates: `(${coordinates.map((point) => `(${point[0]},${point[1]})`).join(', ')})`, + }; + entities.push(featureRecord); + if (feature.geometry.type === 'Polygon') { + break; } } - - await queryRunner.commitTransaction(); - } catch (error) { - this.logger.fatal('Error importing natural earth country data', error); - await queryRunner.rollbackTransaction(); - throw error; - } finally { - await queryRunner.release(); } + await this.dataSource.manager.insert(NaturalEarthCountriesTempEntity, entities); + + await this.dataSource.query(`ALTER TABLE naturalearth_countries_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`); + + await this.dataSource.transaction(async (manager) => { + await manager.query('ALTER TABLE naturalearth_countries RENAME TO naturalearth_countries_old'); + await manager.query('ALTER TABLE naturalearth_countries_tmp RENAME TO naturalearth_countries'); + await manager.query('DROP TABLE naturalearth_countries_old'); + }); } private async importGeodata() { - const queryRunner = this.dataSource.createQueryRunner(); - await queryRunner.connect(); - const { resourcePaths } = this.configRepository.getEnv(); - const admin1 = await this.loadAdmin(resourcePaths.geodata.admin1); - const admin2 = await this.loadAdmin(resourcePaths.geodata.admin2); + const [admin1, admin2] = await Promise.all([ + this.loadAdmin(resourcePaths.geodata.admin1), + this.loadAdmin(resourcePaths.geodata.admin2), + ]); - try { - await queryRunner.startTransaction(); + await this.dataSource.query('DROP TABLE IF EXISTS geodata_places_tmp'); + await this.dataSource.query( + 'CREATE UNLOGGED TABLE geodata_places_tmp (LIKE geodata_places INCLUDING ALL EXCLUDING INDEXES)', + ); + await this.loadCities500(admin1, admin2); + await this.createGeodataIndices(); - await queryRunner.manager.clear(GeodataPlacesEntity); - await this.loadCities500(queryRunner, admin1, admin2); - - await queryRunner.commitTransaction(); - } catch (error) { - this.logger.fatal('Error importing geodata', error); - await queryRunner.rollbackTransaction(); - throw error; - } finally { - await queryRunner.release(); - } + await this.dataSource.transaction(async (manager) => { + await manager.query('ALTER TABLE geodata_places RENAME TO geodata_places_old'); + await manager.query('ALTER TABLE geodata_places_tmp RENAME TO geodata_places'); + await manager.query('DROP TABLE geodata_places_old'); + }); } - private async loadGeodataToTableFromFile( - queryRunner: QueryRunner, - lineToEntityMapper: (lineSplit: string[]) => GeodataPlacesEntity, - filePath: string, - options?: { entityFilter?: (linesplit: string[]) => boolean }, - ) { - const _entityFilter = options?.entityFilter ?? (() => true); - if (!existsSync(filePath)) { - this.logger.error(`Geodata file ${filePath} not found`); - throw new Error(`Geodata file ${filePath} not found`); + private async loadCities500(admin1Map: Map, admin2Map: Map) { + const { resourcePaths } = this.configRepository.getEnv(); + const cities500 = resourcePaths.geodata.cities500; + if (!existsSync(cities500)) { + throw new Error(`Geodata file ${cities500} not found`); } - const input = createReadStream(filePath); - let bufferGeodata: QueryDeepPartialEntity[] = []; + const input = createReadStream(cities500, { highWaterMark: 512 * 1024 * 1024 }); + let bufferGeodata: QueryDeepPartialEntity[] = []; const lineReader = readLine.createInterface({ input }); let count = 0; + let futures = []; for await (const line of lineReader) { const lineSplit = line.split('\t'); - if (!_entityFilter(lineSplit)) { + if (lineSplit[7] === 'PPLX' && lineSplit[8] !== 'AU') { continue; } - const geoData = lineToEntityMapper(lineSplit); + + const geoData = { + id: Number.parseInt(lineSplit[0]), + name: lineSplit[1], + alternateNames: lineSplit[3], + latitude: Number.parseFloat(lineSplit[4]), + longitude: Number.parseFloat(lineSplit[5]), + countryCode: lineSplit[8], + admin1Code: lineSplit[10], + admin2Code: lineSplit[11], + modificationDate: lineSplit[18], + admin1Name: admin1Map.get(`${lineSplit[8]}.${lineSplit[10]}`), + admin2Name: admin2Map.get(`${lineSplit[8]}.${lineSplit[10]}.${lineSplit[11]}`), + }; bufferGeodata.push(geoData); - if (bufferGeodata.length >= 1000) { - await queryRunner.manager.upsert(GeodataPlacesEntity, bufferGeodata, ['id']); - count += bufferGeodata.length; - if (count % 10_000 === 0) { - this.logger.log(`${count} geodata records imported`); - } + if (bufferGeodata.length >= 5000) { + const curLength = bufferGeodata.length; + futures.push( + this.dataSource.manager.insert(GeodataPlacesTempEntity, bufferGeodata).then(() => { + count += curLength; + if (count % 10_000 === 0) { + this.logger.log(`${count} geodata records imported`); + } + }), + ); bufferGeodata = []; + // leave spare connection for other queries + if (futures.length >= 9) { + await Promise.all(futures); + futures = []; + } } } - await queryRunner.manager.upsert(GeodataPlacesEntity, bufferGeodata, ['id']); - } - private async loadCities500( - queryRunner: QueryRunner, - admin1Map: Map, - admin2Map: Map, - ) { - const { resourcePaths } = this.configRepository.getEnv(); - await this.loadGeodataToTableFromFile( - queryRunner, - (lineSplit: string[]) => - this.geodataPlacesRepository.create({ - id: Number.parseInt(lineSplit[0]), - name: lineSplit[1], - alternateNames: lineSplit[3], - latitude: Number.parseFloat(lineSplit[4]), - longitude: Number.parseFloat(lineSplit[5]), - countryCode: lineSplit[8], - admin1Code: lineSplit[10], - admin2Code: lineSplit[11], - modificationDate: lineSplit[18], - admin1Name: admin1Map.get(`${lineSplit[8]}.${lineSplit[10]}`), - admin2Name: admin2Map.get(`${lineSplit[8]}.${lineSplit[10]}.${lineSplit[11]}`), - }), - resourcePaths.geodata.cities500, - { - entityFilter: (lineSplit) => { - if (lineSplit[7] === 'PPLX') { - // Exclude populated subsections of cities that are not in Australia. - // Australia has a lot of PPLX areas, so we include them. - return lineSplit[8] === 'AU'; - } - return true; - }, - }, - ); + await this.dataSource.manager.insert(GeodataPlacesTempEntity, bufferGeodata); } private async loadAdmin(filePath: string) { @@ -312,7 +289,7 @@ export class MapRepository implements IMapRepository { throw new Error(`Geodata file ${filePath} not found`); } - const input = createReadStream(filePath); + const input = createReadStream(filePath, { highWaterMark: 512 * 1024 * 1024 }); const lineReader = readLine.createInterface({ input }); const adminMap = new Map(); @@ -323,4 +300,27 @@ export class MapRepository implements IMapRepository { return adminMap; } + + private createGeodataIndices() { + return Promise.all([ + this.dataSource.query(`ALTER TABLE geodata_places_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`), + this.dataSource.query(` + CREATE INDEX IDX_geodata_gist_earthcoord_${randomUUID().replaceAll('-', '_')} + ON geodata_places_tmp + USING gist (ll_to_earth_public(latitude, longitude)) + WITH (fillfactor = 100)`), + this.dataSource.query(` + CREATE INDEX idx_geodata_places_name_${randomUUID().replaceAll('-', '_')} + ON geodata_places_tmp + USING gin (f_unaccent(name) gin_trgm_ops)`), + this.dataSource.query(` + CREATE INDEX idx_geodata_places_admin1_name_${randomUUID().replaceAll('-', '_')} + ON geodata_places_tmp + USING gin (f_unaccent("admin1Name") gin_trgm_ops)`), + this.dataSource.query(` + CREATE INDEX idx_geodata_places_admin2_name_${randomUUID().replaceAll('-', '_')} + ON geodata_places_tmp + USING gin (f_unaccent("admin2Name") gin_trgm_ops)`), + ]); + } }