#!/usr/bin/env python3 """Download GeoNames cities5000 from https://download.geonames.org/export/dump/ and seed a SQLite database.""" from __future__ import annotations import argparse import io import sqlite3 import sys import urllib.request import zipfile from pathlib import Path SOURCE_URL = "https://download.geonames.org/export/dump/cities5000.zip" ARCHIVE_ENTRY = "cities5000.txt" CREATE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS cities ( geoname_id INTEGER PRIMARY KEY, name TEXT NOT NULL, ascii_name TEXT NOT NULL, alternate_names TEXT, latitude REAL NOT NULL, longitude REAL NOT NULL, feature_class TEXT, feature_code TEXT, country_code TEXT, cc2 TEXT, admin1_code TEXT, admin2_code TEXT, admin3_code TEXT, admin4_code TEXT, population INTEGER, elevation INTEGER, dem INTEGER, timezone TEXT, modification_date TEXT ) """ CREATE_INDEXES_SQL = ( "CREATE INDEX IF NOT EXISTS idx_cities_name ON cities(name)", "CREATE INDEX IF NOT EXISTS idx_cities_ascii_name ON cities(ascii_name)", "CREATE INDEX IF NOT EXISTS idx_cities_alternate_names ON cities(alternate_names)", "CREATE INDEX IF NOT EXISTS idx_cities_country_code ON cities(country_code)", "CREATE INDEX IF NOT EXISTS idx_cities_population ON cities(population DESC)", ) INSERT_SQL = """ INSERT OR REPLACE INTO cities ( geoname_id, name, ascii_name, alternate_names, latitude, longitude, feature_class, feature_code, country_code, cc2, admin1_code, admin2_code, admin3_code, admin4_code, population, elevation, dem, timezone, modification_date ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Seed GeoNames cities5000 into SQLite") parser.add_argument("--db", required=True, help="Target SQLite database path") parser.add_argument( "--batch-size", type=int, default=1000, help="Insert batch size" ) return parser.parse_args() def to_int(value: str) -> int | None: value = value.strip() if value == "": return None return int(value) def to_float(value: str) -> float: return float(value.strip()) def row_from_columns(columns: list[str]) -> tuple: return ( int(columns[0]), columns[1], columns[2], columns[3], to_float(columns[4]), to_float(columns[5]), columns[6], columns[7], columns[8], columns[9], columns[10], columns[11], columns[12], columns[13], to_int(columns[14]), to_int(columns[15]), to_int(columns[16]), columns[17], columns[18], ) def download_archive() -> bytes: with urllib.request.urlopen(SOURCE_URL) as response: return response.read() def stream_city_rows(archive_bytes: bytes): with zipfile.ZipFile(io.BytesIO(archive_bytes)) as zf: with zf.open(ARCHIVE_ENTRY) as raw_file: for raw_line in raw_file: line = raw_line.decode("utf-8").rstrip("\n") if not line: continue columns = line.split("\t") if len(columns) != 19: continue yield row_from_columns(columns) def seed_database(db_path: Path, batch_size: int) -> int: archive = download_archive() inserted = 0 batch = [] db_path.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(str(db_path)) as conn: cursor = conn.cursor() cursor.execute("PRAGMA journal_mode=WAL") cursor.execute(CREATE_TABLE_SQL) for sql in CREATE_INDEXES_SQL: cursor.execute(sql) for row in stream_city_rows(archive): batch.append(row) if len(batch) >= batch_size: cursor.executemany(INSERT_SQL, batch) inserted += len(batch) batch.clear() if batch: cursor.executemany(INSERT_SQL, batch) inserted += len(batch) conn.commit() return inserted def main() -> int: args = parse_args() try: inserted = seed_database(Path(args.db), args.batch_size) except Exception as exc: # noqa: BLE001 print(f"failed to seed cities database: {exc}", file=sys.stderr) return 1 print(f"seeded {inserted} city rows into {args.db}") return 0 if __name__ == "__main__": raise SystemExit(main())