diff --git a/bin/init-schools b/bin/init-schools index 1abdbe86415d8fead853978105b2d1eab2cc1b09..e1114e23045030ada73f81fffe02ed2b4f08c3f7 100755 --- a/bin/init-schools +++ b/bin/init-schools @@ -8,24 +8,46 @@ # číselníkem obcí z RUIANu. import argparse -from typing import List, Dict, DefaultDict -import sys +from collections import defaultdict +import csv +from dataclasses import dataclass from pathlib import Path import re -import csv -from collections import defaultdict +from sqlalchemy.orm import joinedload +import sys +from typing import List, Dict, DefaultDict, Tuple, Optional import mo.db as db import mo.util mo.util.init_standalone() session = db.get_session() + new_town_cnt = 0 processed_school_cnt = 0 new_school_cnt = 0 +updated_school_cnt = 0 -def import_schools(path: Path, nuts: str): +@dataclass +class ProtoSchool: + town: db.Place + town_name: str + official_name: str + red_izo: str + ico: str + address: str + is_zs: bool + is_ss: bool + school: Optional[db.School] + + +# (red_izo, address) -> ProtoSchool +proto_schools: Dict[Tuple[str, str], ProtoSchool] = {} +fields = ('town_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss') + + +def process_schools(path: Path, nuts: str) -> None: # XXX: Rejstřík škol používá několik chybných/obsoletních NUTS kódů :( nuts = re.sub('^CZ011', 'CZ010', nuts) nuts = re.sub('^CZ021', 'CZ020', nuts) @@ -68,36 +90,29 @@ def import_schools(path: Path, nuts: str): else: assert False, f"Neznámý druh školy: {druh}" - school = (session.query(db.School) - .join(db.Place) - .filter(db.Place.level == 4) - .filter(db.Place.parent == town.place_id) - .filter(db.School.red_izo == red_izo) - .filter(db.School.address == addr2) - .first()) - if school: - assert school.official_name == nazev + key = (red_izo, addr2) + if key in proto_schools: + ps = proto_schools[key] + assert ps.town == town + assert ps.ico == ico + assert ps.official_name == nazev if is_zs: - school.is_zs = True + ps.is_zs = True else: - school.is_ss = True + ps.is_ss = True else: - place = db.Place( - level=4, - parent=town.place_id, - name=nazev, - type=db.PlaceType.school) - school = db.School( - place=place, + ps = ProtoSchool( + town=town, + town_name=f'{town.name} ({town.get_code()})', + official_name=nazev, red_izo=red_izo, ico=ico, - official_name=nazev, address=addr2, is_zs=is_zs, - is_ss=not is_zs) - session.add(school) - global new_school_cnt - new_school_cnt += 1 + is_ss=not is_zs, + school=None, + ) + proto_schools[key] = ps global processed_school_cnt processed_school_cnt += 1 @@ -181,7 +196,7 @@ def load_ruian_csv(name): ruian_obec_to_okres_nuts: DefaultDict[str, List[str]] = defaultdict(list) -def load_ruian(): +def load_ruian() -> None: ocols, okresy = load_ruian_csv('extra/ruian/UI_OKRES.csv') okres_by_id: Dict[int, List[str]] = {} for o in okresy: @@ -197,8 +212,151 @@ def load_ruian(): # print(f"{jmeno} -> {okres}") ruian_obec_to_okres_nuts[jmeno].append(okres[ocols['NUTS_LAU']]) + +def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]: + schools = session.query(db.School).options(joinedload(db.School.place)).all() + + old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list) + for s in schools: + town = s.place.parent_place + ps = ProtoSchool( + town=town, + town_name=f'{town.name} ({town.get_code()})', + official_name=s.official_name, + red_izo=s.red_izo, + ico=s.ico, + address=s.address, + is_zs=s.is_zs, + is_ss=s.is_ss, + school=s, + ) + old_schools[ps.red_izo].append(ps) + + return old_schools + + +def apply_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None: + if new is None: + assert old is not None + assert old.school is not None + print(f'TODO: Škola #{old.school.place_id} (RED IZO {old.red_izo}) vypadla z rejstříku') + for field in fields: + print(f'\t{field}: {getattr(old, field)}') + elif old is None: + assert new is not None + place = db.Place( + level=4, + parent=new.town.place_id, + name=new.official_name, + type=db.PlaceType.school) + school = db.School( + place=place, + red_izo=new.red_izo, + ico=new.ico, + official_name=new.official_name, + address=new.address, + is_zs=new.is_zs, + is_ss=new.is_ss) + session.add(school) + session.flush() + + if args.update: + print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): NOVÁ') + for field in fields: + print(f'\t{field}: {getattr(new, field)}') + + mo.util.log( + type=db.LogType.place, + what=school.place_id, + details={'action': 'import-school', 'school': db.row2dict(school)} + ) + + global new_school_cnt + new_school_cnt += 1 + else: + assert old.school + school = old.school + + changes: List[Tuple[str, str, str]] = [] + for field in fields: + if getattr(old, field) != getattr(new, field): + changes.append((field, getattr(old, field), getattr(new, field))) + + school.place.parent = new.town.place_id + school.ico = new.ico + school.official_name = new.official_name + school.address = new.address + school.is_zs = new.is_zs + school.is_ss = new.is_ss + + if changes: + print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): ZMĚNY') + for field, old_val, new_val in changes: + print(f'\t{field}: {old_val} -> {new_val}') + assert args.update, "Změny provádíme pouze s přepínačem --update" + + mo.util.log( + type=db.LogType.place, + what=school.place_id, + details={'action': 'import-school', + 'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}}, + ) + + global updated_school_cnt + updated_school_cnt += 1 + + +def apply_changes() -> None: + new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list) + for ps in proto_schools.values(): + new_schools[ps.red_izo].append(ps) + + old_schools = get_old_schools() + + for red_izo in sorted(new_schools.keys()): + new_ps = new_schools[red_izo] + old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address) + new_ps = sorted(new_ps, key=lambda ps: ps.address) + if len(old_ps) == 0: + for n in new_ps: + apply_single_change(None, n) + elif len(old_ps) == 1 and len(new_ps) == 1: + apply_single_change(old_ps[0], new_ps[0]) + else: + oi = 0 + ni = 0 + while oi < len(old_ps) or ni < len(new_ps): + old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None + new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None + if old and new and old.address == new.address: + apply_single_change(old, new) + oi += 1 + ni += 1 + elif old and (not new or old.address < new.address): + assert old.school + print(f'TODO: Starou školu #{old.school.place_id} (RED IZO {red_izo}) nedokáži spárovat s novou') + for field in fields: + print(f'\t{field}: {getattr(old, field)}') + oi += 1 + else: + print(f'TODO: Novou školu (RED IZO {red_izo}) nedokáži spárovat se starou') + for field in fields: + print(f'\t{field}: {getattr(new, field)}') + ni += 1 + + if args.stop_after is not None and (new_school_cnt + updated_school_cnt) >= args.stop_after: + return + + for red_izo in old_schools.keys(): + if red_izo not in new_schools: + for os in old_schools[red_izo]: + apply_single_change(os, None) + + parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol') parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno') +parser.add_argument('-u', '--update', default=False, action='store_true', help='aktualizuje školy v DB') +parser.add_argument('--stop-after', type=int, help='zastaví se po daném počtu změn') args = parser.parse_args() @@ -208,9 +366,13 @@ for path in Path('extra/skoly/parsed').glob('*.tsv'): m = re.fullmatch(r'^[A-Z]-(CZ\w+)\.tsv', path.name) assert m is not None nuts = m[1] - import_schools(path, nuts) + process_schools(path, nuts) + +apply_changes() if not args.dry_run: session.commit() + print(f"Importováno {processed_school_cnt} škol.") print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.") +print(f"Aktualizováno {updated_school_cnt} škol.")