From 4dbc3bcd348314a426f6802449bda8a69235205c Mon Sep 17 00:00:00 2001
From: Martin Mares <mj@ucw.cz>
Date: Sat, 15 Jan 2022 23:26:24 +0100
Subject: [PATCH] =?UTF-8?q?Skript=20init-schools=20p=C5=99eps=C3=A1n,=20ab?=
=?UTF-8?q?y=20um=C4=9Bl=20=C5=A1koly=20aktualizovat?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Pozor, je potřeba zkontrolovat výstup a v některých případech ručně
zasáhnout. Speciálně se vůbec nestaráme o zkracování jmen.
---
bin/init-schools | 222 ++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 192 insertions(+), 30 deletions(-)
diff --git a/bin/init-schools b/bin/init-schools
index 1abdbe86..e1114e23 100755
--- a/bin/init-schools
+++ b/bin/init-schools
@@ -8,24 +8,46 @@
# číselníkem obcí z RUIANu.
import argparse
-from typing import List, Dict, DefaultDict
-import sys
+from collections import defaultdict
+import csv
+from dataclasses import dataclass
from pathlib import Path
import re
-import csv
-from collections import defaultdict
+from sqlalchemy.orm import joinedload
+import sys
+from typing import List, Dict, DefaultDict, Tuple, Optional
import mo.db as db
import mo.util
mo.util.init_standalone()
session = db.get_session()
+
new_town_cnt = 0
processed_school_cnt = 0
new_school_cnt = 0
+updated_school_cnt = 0
-def import_schools(path: Path, nuts: str):
+@dataclass
+class ProtoSchool:
+ town: db.Place
+ town_name: str
+ official_name: str
+ red_izo: str
+ ico: str
+ address: str
+ is_zs: bool
+ is_ss: bool
+ school: Optional[db.School]
+
+
+# (red_izo, address) -> ProtoSchool
+proto_schools: Dict[Tuple[str, str], ProtoSchool] = {}
+fields = ('town_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
+
+
+def process_schools(path: Path, nuts: str) -> None:
# XXX: Rejstřík škol používá několik chybných/obsoletních NUTS kódů :(
nuts = re.sub('^CZ011', 'CZ010', nuts)
nuts = re.sub('^CZ021', 'CZ020', nuts)
@@ -68,36 +90,29 @@ def import_schools(path: Path, nuts: str):
else:
assert False, f"Neznámý druh školy: {druh}"
- school = (session.query(db.School)
- .join(db.Place)
- .filter(db.Place.level == 4)
- .filter(db.Place.parent == town.place_id)
- .filter(db.School.red_izo == red_izo)
- .filter(db.School.address == addr2)
- .first())
- if school:
- assert school.official_name == nazev
+ key = (red_izo, addr2)
+ if key in proto_schools:
+ ps = proto_schools[key]
+ assert ps.town == town
+ assert ps.ico == ico
+ assert ps.official_name == nazev
if is_zs:
- school.is_zs = True
+ ps.is_zs = True
else:
- school.is_ss = True
+ ps.is_ss = True
else:
- place = db.Place(
- level=4,
- parent=town.place_id,
- name=nazev,
- type=db.PlaceType.school)
- school = db.School(
- place=place,
+ ps = ProtoSchool(
+ town=town,
+ town_name=f'{town.name} ({town.get_code()})',
+ official_name=nazev,
red_izo=red_izo,
ico=ico,
- official_name=nazev,
address=addr2,
is_zs=is_zs,
- is_ss=not is_zs)
- session.add(school)
- global new_school_cnt
- new_school_cnt += 1
+ is_ss=not is_zs,
+ school=None,
+ )
+ proto_schools[key] = ps
global processed_school_cnt
processed_school_cnt += 1
@@ -181,7 +196,7 @@ def load_ruian_csv(name):
ruian_obec_to_okres_nuts: DefaultDict[str, List[str]] = defaultdict(list)
-def load_ruian():
+def load_ruian() -> None:
ocols, okresy = load_ruian_csv('extra/ruian/UI_OKRES.csv')
okres_by_id: Dict[int, List[str]] = {}
for o in okresy:
@@ -197,8 +212,151 @@ def load_ruian():
# print(f"{jmeno} -> {okres}")
ruian_obec_to_okres_nuts[jmeno].append(okres[ocols['NUTS_LAU']])
+
+def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]:
+ schools = session.query(db.School).options(joinedload(db.School.place)).all()
+
+ old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
+ for s in schools:
+ town = s.place.parent_place
+ ps = ProtoSchool(
+ town=town,
+ town_name=f'{town.name} ({town.get_code()})',
+ official_name=s.official_name,
+ red_izo=s.red_izo,
+ ico=s.ico,
+ address=s.address,
+ is_zs=s.is_zs,
+ is_ss=s.is_ss,
+ school=s,
+ )
+ old_schools[ps.red_izo].append(ps)
+
+ return old_schools
+
+
+def apply_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
+ if new is None:
+ assert old is not None
+ assert old.school is not None
+ print(f'TODO: Škola #{old.school.place_id} (RED IZO {old.red_izo}) vypadla z rejstříku')
+ for field in fields:
+ print(f'\t{field}: {getattr(old, field)}')
+ elif old is None:
+ assert new is not None
+ place = db.Place(
+ level=4,
+ parent=new.town.place_id,
+ name=new.official_name,
+ type=db.PlaceType.school)
+ school = db.School(
+ place=place,
+ red_izo=new.red_izo,
+ ico=new.ico,
+ official_name=new.official_name,
+ address=new.address,
+ is_zs=new.is_zs,
+ is_ss=new.is_ss)
+ session.add(school)
+ session.flush()
+
+ if args.update:
+ print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): NOVÁ')
+ for field in fields:
+ print(f'\t{field}: {getattr(new, field)}')
+
+ mo.util.log(
+ type=db.LogType.place,
+ what=school.place_id,
+ details={'action': 'import-school', 'school': db.row2dict(school)}
+ )
+
+ global new_school_cnt
+ new_school_cnt += 1
+ else:
+ assert old.school
+ school = old.school
+
+ changes: List[Tuple[str, str, str]] = []
+ for field in fields:
+ if getattr(old, field) != getattr(new, field):
+ changes.append((field, getattr(old, field), getattr(new, field)))
+
+ school.place.parent = new.town.place_id
+ school.ico = new.ico
+ school.official_name = new.official_name
+ school.address = new.address
+ school.is_zs = new.is_zs
+ school.is_ss = new.is_ss
+
+ if changes:
+ print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): ZMĚNY')
+ for field, old_val, new_val in changes:
+ print(f'\t{field}: {old_val} -> {new_val}')
+ assert args.update, "Změny provádíme pouze s přepínačem --update"
+
+ mo.util.log(
+ type=db.LogType.place,
+ what=school.place_id,
+ details={'action': 'import-school',
+ 'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
+ )
+
+ global updated_school_cnt
+ updated_school_cnt += 1
+
+
+def apply_changes() -> None:
+ new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
+ for ps in proto_schools.values():
+ new_schools[ps.red_izo].append(ps)
+
+ old_schools = get_old_schools()
+
+ for red_izo in sorted(new_schools.keys()):
+ new_ps = new_schools[red_izo]
+ old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address)
+ new_ps = sorted(new_ps, key=lambda ps: ps.address)
+ if len(old_ps) == 0:
+ for n in new_ps:
+ apply_single_change(None, n)
+ elif len(old_ps) == 1 and len(new_ps) == 1:
+ apply_single_change(old_ps[0], new_ps[0])
+ else:
+ oi = 0
+ ni = 0
+ while oi < len(old_ps) or ni < len(new_ps):
+ old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None
+ new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None
+ if old and new and old.address == new.address:
+ apply_single_change(old, new)
+ oi += 1
+ ni += 1
+ elif old and (not new or old.address < new.address):
+ assert old.school
+ print(f'TODO: Starou školu #{old.school.place_id} (RED IZO {red_izo}) nedokáži spárovat s novou')
+ for field in fields:
+ print(f'\t{field}: {getattr(old, field)}')
+ oi += 1
+ else:
+ print(f'TODO: Novou školu (RED IZO {red_izo}) nedokáži spárovat se starou')
+ for field in fields:
+ print(f'\t{field}: {getattr(new, field)}')
+ ni += 1
+
+ if args.stop_after is not None and (new_school_cnt + updated_school_cnt) >= args.stop_after:
+ return
+
+ for red_izo in old_schools.keys():
+ if red_izo not in new_schools:
+ for os in old_schools[red_izo]:
+ apply_single_change(os, None)
+
+
parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol')
parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno')
+parser.add_argument('-u', '--update', default=False, action='store_true', help='aktualizuje školy v DB')
+parser.add_argument('--stop-after', type=int, help='zastaví se po daném počtu změn')
args = parser.parse_args()
@@ -208,9 +366,13 @@ for path in Path('extra/skoly/parsed').glob('*.tsv'):
m = re.fullmatch(r'^[A-Z]-(CZ\w+)\.tsv', path.name)
assert m is not None
nuts = m[1]
- import_schools(path, nuts)
+ process_schools(path, nuts)
+
+apply_changes()
if not args.dry_run:
session.commit()
+
print(f"Importováno {processed_school_cnt} škol.")
print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.")
+print(f"Aktualizováno {updated_school_cnt} škol.")
--
GitLab