Skip to content
Snippets Groups Projects
Commit 4dbc3bcd authored by Martin Mareš's avatar Martin Mareš
Browse files

Skript init-schools přepsán, aby uměl školy aktualizovat

Pozor, je potřeba zkontrolovat výstup a v některých případech ručně
zasáhnout. Speciálně se vůbec nestaráme o zkracování jmen.
parent a48fda43
No related branches found
No related tags found
No related merge requests found
...@@ -8,24 +8,46 @@ ...@@ -8,24 +8,46 @@
# číselníkem obcí z RUIANu. # číselníkem obcí z RUIANu.
import argparse import argparse
from typing import List, Dict, DefaultDict from collections import defaultdict
import sys import csv
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import re import re
import csv from sqlalchemy.orm import joinedload
from collections import defaultdict import sys
from typing import List, Dict, DefaultDict, Tuple, Optional
import mo.db as db import mo.db as db
import mo.util import mo.util
mo.util.init_standalone() mo.util.init_standalone()
session = db.get_session() session = db.get_session()
new_town_cnt = 0 new_town_cnt = 0
processed_school_cnt = 0 processed_school_cnt = 0
new_school_cnt = 0 new_school_cnt = 0
updated_school_cnt = 0
@dataclass
class ProtoSchool:
town: db.Place
town_name: str
official_name: str
red_izo: str
ico: str
address: str
is_zs: bool
is_ss: bool
school: Optional[db.School]
# (red_izo, address) -> ProtoSchool
proto_schools: Dict[Tuple[str, str], ProtoSchool] = {}
fields = ('town_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
def import_schools(path: Path, nuts: str): def process_schools(path: Path, nuts: str) -> None:
# XXX: Rejstřík škol používá několik chybných/obsoletních NUTS kódů :( # XXX: Rejstřík škol používá několik chybných/obsoletních NUTS kódů :(
nuts = re.sub('^CZ011', 'CZ010', nuts) nuts = re.sub('^CZ011', 'CZ010', nuts)
nuts = re.sub('^CZ021', 'CZ020', nuts) nuts = re.sub('^CZ021', 'CZ020', nuts)
...@@ -68,36 +90,29 @@ def import_schools(path: Path, nuts: str): ...@@ -68,36 +90,29 @@ def import_schools(path: Path, nuts: str):
else: else:
assert False, f"Neznámý druh školy: {druh}" assert False, f"Neznámý druh školy: {druh}"
school = (session.query(db.School) key = (red_izo, addr2)
.join(db.Place) if key in proto_schools:
.filter(db.Place.level == 4) ps = proto_schools[key]
.filter(db.Place.parent == town.place_id) assert ps.town == town
.filter(db.School.red_izo == red_izo) assert ps.ico == ico
.filter(db.School.address == addr2) assert ps.official_name == nazev
.first())
if school:
assert school.official_name == nazev
if is_zs: if is_zs:
school.is_zs = True ps.is_zs = True
else: else:
school.is_ss = True ps.is_ss = True
else: else:
place = db.Place( ps = ProtoSchool(
level=4, town=town,
parent=town.place_id, town_name=f'{town.name} ({town.get_code()})',
name=nazev, official_name=nazev,
type=db.PlaceType.school)
school = db.School(
place=place,
red_izo=red_izo, red_izo=red_izo,
ico=ico, ico=ico,
official_name=nazev,
address=addr2, address=addr2,
is_zs=is_zs, is_zs=is_zs,
is_ss=not is_zs) is_ss=not is_zs,
session.add(school) school=None,
global new_school_cnt )
new_school_cnt += 1 proto_schools[key] = ps
global processed_school_cnt global processed_school_cnt
processed_school_cnt += 1 processed_school_cnt += 1
...@@ -181,7 +196,7 @@ def load_ruian_csv(name): ...@@ -181,7 +196,7 @@ def load_ruian_csv(name):
ruian_obec_to_okres_nuts: DefaultDict[str, List[str]] = defaultdict(list) ruian_obec_to_okres_nuts: DefaultDict[str, List[str]] = defaultdict(list)
def load_ruian(): def load_ruian() -> None:
ocols, okresy = load_ruian_csv('extra/ruian/UI_OKRES.csv') ocols, okresy = load_ruian_csv('extra/ruian/UI_OKRES.csv')
okres_by_id: Dict[int, List[str]] = {} okres_by_id: Dict[int, List[str]] = {}
for o in okresy: for o in okresy:
...@@ -197,8 +212,151 @@ def load_ruian(): ...@@ -197,8 +212,151 @@ def load_ruian():
# print(f"{jmeno} -> {okres}") # print(f"{jmeno} -> {okres}")
ruian_obec_to_okres_nuts[jmeno].append(okres[ocols['NUTS_LAU']]) ruian_obec_to_okres_nuts[jmeno].append(okres[ocols['NUTS_LAU']])
def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]:
schools = session.query(db.School).options(joinedload(db.School.place)).all()
old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for s in schools:
town = s.place.parent_place
ps = ProtoSchool(
town=town,
town_name=f'{town.name} ({town.get_code()})',
official_name=s.official_name,
red_izo=s.red_izo,
ico=s.ico,
address=s.address,
is_zs=s.is_zs,
is_ss=s.is_ss,
school=s,
)
old_schools[ps.red_izo].append(ps)
return old_schools
def apply_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
if new is None:
assert old is not None
assert old.school is not None
print(f'TODO: Škola #{old.school.place_id} (RED IZO {old.red_izo}) vypadla z rejstříku')
for field in fields:
print(f'\t{field}: {getattr(old, field)}')
elif old is None:
assert new is not None
place = db.Place(
level=4,
parent=new.town.place_id,
name=new.official_name,
type=db.PlaceType.school)
school = db.School(
place=place,
red_izo=new.red_izo,
ico=new.ico,
official_name=new.official_name,
address=new.address,
is_zs=new.is_zs,
is_ss=new.is_ss)
session.add(school)
session.flush()
if args.update:
print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): NOVÁ')
for field in fields:
print(f'\t{field}: {getattr(new, field)}')
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'import-school', 'school': db.row2dict(school)}
)
global new_school_cnt
new_school_cnt += 1
else:
assert old.school
school = old.school
changes: List[Tuple[str, str, str]] = []
for field in fields:
if getattr(old, field) != getattr(new, field):
changes.append((field, getattr(old, field), getattr(new, field)))
school.place.parent = new.town.place_id
school.ico = new.ico
school.official_name = new.official_name
school.address = new.address
school.is_zs = new.is_zs
school.is_ss = new.is_ss
if changes:
print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): ZMĚNY')
for field, old_val, new_val in changes:
print(f'\t{field}: {old_val} -> {new_val}')
assert args.update, "Změny provádíme pouze s přepínačem --update"
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'import-school',
'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
)
global updated_school_cnt
updated_school_cnt += 1
def apply_changes() -> None:
new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for ps in proto_schools.values():
new_schools[ps.red_izo].append(ps)
old_schools = get_old_schools()
for red_izo in sorted(new_schools.keys()):
new_ps = new_schools[red_izo]
old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address)
new_ps = sorted(new_ps, key=lambda ps: ps.address)
if len(old_ps) == 0:
for n in new_ps:
apply_single_change(None, n)
elif len(old_ps) == 1 and len(new_ps) == 1:
apply_single_change(old_ps[0], new_ps[0])
else:
oi = 0
ni = 0
while oi < len(old_ps) or ni < len(new_ps):
old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None
new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None
if old and new and old.address == new.address:
apply_single_change(old, new)
oi += 1
ni += 1
elif old and (not new or old.address < new.address):
assert old.school
print(f'TODO: Starou školu #{old.school.place_id} (RED IZO {red_izo}) nedokáži spárovat s novou')
for field in fields:
print(f'\t{field}: {getattr(old, field)}')
oi += 1
else:
print(f'TODO: Novou školu (RED IZO {red_izo}) nedokáži spárovat se starou')
for field in fields:
print(f'\t{field}: {getattr(new, field)}')
ni += 1
if args.stop_after is not None and (new_school_cnt + updated_school_cnt) >= args.stop_after:
return
for red_izo in old_schools.keys():
if red_izo not in new_schools:
for os in old_schools[red_izo]:
apply_single_change(os, None)
parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol') parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol')
parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno') parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno')
parser.add_argument('-u', '--update', default=False, action='store_true', help='aktualizuje školy v DB')
parser.add_argument('--stop-after', type=int, help='zastaví se po daném počtu změn')
args = parser.parse_args() args = parser.parse_args()
...@@ -208,9 +366,13 @@ for path in Path('extra/skoly/parsed').glob('*.tsv'): ...@@ -208,9 +366,13 @@ for path in Path('extra/skoly/parsed').glob('*.tsv'):
m = re.fullmatch(r'^[A-Z]-(CZ\w+)\.tsv', path.name) m = re.fullmatch(r'^[A-Z]-(CZ\w+)\.tsv', path.name)
assert m is not None assert m is not None
nuts = m[1] nuts = m[1]
import_schools(path, nuts) process_schools(path, nuts)
apply_changes()
if not args.dry_run: if not args.dry_run:
session.commit() session.commit()
print(f"Importováno {processed_school_cnt} škol.") print(f"Importováno {processed_school_cnt} škol.")
print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.") print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.")
print(f"Aktualizováno {updated_school_cnt} škol.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment