Select Git revision
-
Jiří Setnička authored
* Přidáno logo (skryje se při skrolování) * Menu roztaženo po celé šířce (sticky při skrolování) * Top level elementy se třídou .content se zobrazují jen ve střední části stránky Vršek stránky vizuálně a barevně sjednocen s tím, co je na matematickaolympiada.cz Solve #21
Jiří Setnička authored* Přidáno logo (skryje se při skrolování) * Menu roztaženo po celé šířce (sticky při skrolování) * Top level elementy se třídou .content se zobrazují jen ve střední části stránky Vršek stránky vizuálně a barevně sjednocen s tím, co je na matematickaolympiada.cz Solve #21
init-schools 19.94 KiB
#!/usr/bin/env python3
# Naplní databázi školami a obcemi, v nichž školy sídlí
# Používá extra/skoly/SkolyAObory.tsv
#
# Pozor, zrada: rejstřík škol je sice rozdělený do okresů dle NUTS/LAU,
# ale školy tam řadí podle úřadu, u nějž je škole registrovaná, což vůbec
# nemusí odpovídat skutečnému sídlu školy. Proto si poněkud magicky pomáháme
# číselníkem obcí z RUIANu.
import argparse
from collections import defaultdict
import csv
from dataclasses import dataclass
from enum import auto
import re
from sqlalchemy.orm import joinedload
import sys
from typing import List, Dict, DefaultDict, Tuple, Optional
import mo.config as config
import mo.db as db
import mo.util
from mo.util import die
from mo.util_format import timeformat
mo.util.init_standalone()
session = db.get_session()
new_town_cnt = 0
processed_school_cnt = 0
new_school_cnt = 0
updated_school_cnt = 0
disabled_school_cnt = 0
@dataclass
class ProtoSchool:
town: db.Place
town_id: int
town_name: str
unsure_region: bool
short_name: str
official_name: str
red_izo: str
ico: str
address: str
is_zs: bool
is_ss: bool
school: Optional[db.School]
# (red_izo, address) -> ProtoSchool
proto_schools: Dict[Tuple[str, str], ProtoSchool] = {}
fields = ('town_id', 'town_name', 'short_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
class ActionType(db.MOEnum):
ADD = auto()
DISABLE = auto()
EDIT = ()
@dataclass
class Action:
type: ActionType
school_id: Optional[int]
values: Dict[str, str]
def process_schools() -> None:
with open('extra/skoly/SkolyAObory.csv') as file:
reader = csv.reader(file, delimiter=',')
rows = list(reader)
columns = parse_header(rows.pop(0))
for f in rows:
red_izo = f[columns['RED_IZO']]
ico = f[columns['IČO']]
nuts = f[columns['Území']]
druh = f[columns['Druh/Typ']]
# Skip broken records
if red_izo == "691016551":
continue
# Normalize IČO
if len(ico) < 8:
ico = "0" * (8 - len(ico)) + ico
# Address of legal entity
nazev = f[columns['Plný název']]
nazev = nazev.replace('\u00b4', "'")
misto = f[columns['Místo']]
ulice = f[columns['Ulice']]
cp = f[columns['č.p.']]
co = f[columns['č.or.']]
# Address of school building
misto2 = f[columns['Škola_místo']]
ulice2 = f[columns['Škola_ulice']]
cp2 = f[columns['Škola_č.p.']]
co2 = f[columns['Škola_č.or.']]
if misto2 == 'Praha':
print(f"WARNING: Škola s RED_IZO má jako obec uvedenu Prahu => volím náhradu {misto}", file=sys.stderr)
assert misto != 'Praha', f'Škola s RED_IZO {red_izo} má jako obec uvedenu Prahu dvakrát'
misto2 = misto
addr = make_address(misto, ulice, cp, co)
addr2 = make_address(misto2, ulice2, cp2, co2)
# if addr != addr2:
# print(f"WARNING: Škola má dvě různé adresy: <{addr}> != <{addr2}>", file=sys.stderr)
town, unsure_region = lookup_town(misto2, nuts)
if druh == 'B00':
is_zs = True
elif druh == 'C00':
is_zs = False
elif druh in ['D00', 'E00', 'M40']:
# Ignorujeme:
# D00 = konzervatoř
# E00 = VOŠ
# M40 = středisko praktického vyučování
continue
else:
assert False, f"Neznámý druh školy: {druh}"
key = (red_izo, addr2)
if key in proto_schools:
ps = proto_schools[key]
assert ps.town == town
assert ps.ico == ico
assert ps.official_name == nazev
if is_zs:
ps.is_zs = True
else:
ps.is_ss = True
else:
ps = ProtoSchool(
town=town,
town_id=town.place_id,
town_name=town.name,
unsure_region=unsure_region,
short_name=nazev,
official_name=nazev,
red_izo=red_izo,
ico=ico,
address=addr2,
is_zs=is_zs,
is_ss=not is_zs,
school=None,
)
proto_schools[key] = ps
global processed_school_cnt
processed_school_cnt += 1
def parse_header(header: List[str]) -> Dict[str, int]:
columns = {}
i = 0
for col in header:
if col.endswith(':'):
col = col[:-1]
while col in columns:
col = '_' + col
columns[col] = i
i += 1
return columns
def make_address(misto: str, ulice: str, cp: str, co: str) -> str:
if cp and co:
c = f"{cp}/{co}"
else:
c = cp or co
if ulice:
if c:
return f"{ulice} {c}, {misto}"
else:
return f"{ulice}, {misto}"
else:
return misto
def lookup_town(misto: str, region_nuts: str) -> Tuple[db.Place, bool]:
ruian_nuts = ruian_obec_to_okres_nuts[misto]
region = None
unsure_region = False
if region_nuts in ruian_nuts:
nuts = region_nuts
elif not ruian_nuts:
if misto.startswith('Praha '):
# XXX: Pražské obvody nejsou v RUIANu
region = session.query(db.Place).filter_by(level=2, name=misto).first()
assert region
else:
nuts = region_nuts
print(f"WARNING: Obec {misto} není v RUIAN", file=sys.stderr)
elif len(ruian_nuts) == 1:
nuts = ruian_nuts[0]
print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, ale pod RUIAN v {nuts} => preferuji RUIAN", file=sys.stderr)
unsure_region = True
else:
nuts = region_nuts
print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, podle RUIAN je na výběr {ruian_nuts} => dořešit ručně!", file=sys.stderr)
unsure_region = True
if not region:
region = session.query(db.Place).filter_by(level=2, nuts=nuts).first()
assert region, f'Chybí region pro NUTS {nuts}'
town = session.query(db.Place).filter_by(level=3, parent=region.place_id, name=misto).first()
if town is None:
town = db.Place(level=3, parent=region.place_id, name=misto, type=db.PlaceType.region)
session.add(town)
session.flush()
mo.util.log(
type=db.LogType.place,
what=town.place_id,
details={'action': 'new', 'reason': 'init-schools', 'place': db.row2dict(town)},
)
session.flush()
global new_town_cnt
new_town_cnt += 1
print(f'Založena obec: {misto} (#{town.place_id}) v okrese {region.name}')
return town, unsure_region
def load_ruian_csv(name):
with open(name) as file:
reader = csv.reader(file, delimiter=';')
rows = list(reader)
columns = {}
i = 0
for h in rows[0]:
columns[h] = i
i += 1
return columns, rows[1:]
ruian_obec_to_okres_nuts: DefaultDict[str, List[str]] = defaultdict(list)
def load_ruian() -> None:
ocols, okresy = load_ruian_csv('extra/ruian/UI_OKRES.csv')
okres_by_id: Dict[int, List[str]] = {}
for o in okresy:
id = int(o[ocols['KOD']])
assert id not in okres_by_id
okres_by_id[id] = o
mcols, mesta = load_ruian_csv('extra/ruian/UI_OBEC.csv')
for m in mesta:
jmeno = m[mcols['NAZEV']]
oid = int(m[mcols['OKRES_KOD']])
if jmeno == 'Praha' and oid == 9999:
# Výjimka: obec není součástí okresu
continue
okres = okres_by_id[oid]
# print(f"{jmeno} -> {okres}")
ruian_obec_to_okres_nuts[jmeno].append(okres[ocols['NUTS_LAU']])
def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]:
schools = session.query(db.School).options(joinedload(db.School.place)).all()
old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for s in schools:
if not s.red_izo:
print(f'# TODO: Škola #{s.place_id} ({s.place.name}) nemá RED IZO')
continue
town = s.place.parent_place
ps = ProtoSchool(
town=town,
town_id=town.place_id,
town_name=town.name,
unsure_region=False,
short_name=s.place.name,
official_name=s.official_name,
red_izo=s.red_izo,
ico=s.ico,
address=s.address,
is_zs=s.is_zs,
is_ss=s.is_ss,
school=s,
)
old_schools[ps.red_izo].append(ps)
return old_schools
def simplify_name(name: str, town: str) -> str:
name = re.sub('základní škola', 'ZŠ', name, flags=re.IGNORECASE)
name = re.sub('mateřská škola', 'MŠ', name, flags=re.IGNORECASE)
name = re.sub('střední škola', 'SŠ', name, flags=re.IGNORECASE)
name = re.sub('střední průmyslová škola', 'SPŠ', name, flags=re.IGNORECASE)
name = re.sub('gymnázium', 'G', name, flags=re.IGNORECASE)
name = re.sub(r',?\s*s\.r\.o\.', "", name)
return name + ', ' + town
def school_url(school_id: int) -> str:
return f'{config.WEB_ROOT}org/place/{school_id}'
def create_action(ps: ProtoSchool, type: ActionType, msg: str) -> Action:
school_id = ps.school.place_id if ps.school else 0
print(f'>> {type.name} (#{school_id}, RED_IZO {ps.red_izo}): {msg}')
if school_id > 0:
print(f'\t# URL: {school_url(school_id)}')
logs = session.query(db.Log).filter_by(type=db.LogType.place, id=school_id).options(joinedload(db.LogType.user)).order_by(db.Log.changed_at).all()
for log in logs:
action = log.details.get('action', '?')
reason = log.details.get('reason', '?')
if action == 'new' or reason == 'init-schools':
pass
else:
print(f'\t# Log ({timeformat(log.changed_at)} {log.user.full_name() if log.user else "system"}): {log.details}')
if ps.unsure_region:
print('\t# WARNING: Obec s nejistým regionem')
if ps.school and ps.school.place.note:
print(f'\t# NOTE: {ps.school.place.note}')
return Action(type=type, school_id=school_id, values={})
def add_fields(action: Action, ps: ProtoSchool) -> None:
for field in fields:
val = getattr(ps, field)
action.values[field] = val
print(f'\t{field}: {val}')
def plan_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> Optional[Action]:
if new is None:
assert old is not None
assert old.school is not None
if old.school.place.hidden:
return None
act = create_action(old, ActionType.DISABLE, 'vypadla z rejstříku')
add_fields(act, old)
return act
elif old is None:
assert new is not None
new.short_name = simplify_name(new.official_name, new.town.name)
act = create_action(new, ActionType.ADD, 'nová')
add_fields(act, new)
return act
else:
assert old.school
if old.official_name != new.official_name or old.town_name != new.town_name:
new.short_name = simplify_name(new.official_name, new.town.name)
else:
new.short_name = old.short_name
changes: List[Tuple[str, str, str]] = []
for field in fields:
if getattr(old, field) != getattr(new, field):
changes.append((field, getattr(old, field), getattr(new, field)))
changed_fields = {c[0] for c in changes}
if set(changed_fields) <= {'is_zs', 'is_ss'}:
return None
act = create_action(old, ActionType.EDIT, 'změny')
for field, old_val, new_val in changes:
print(f'\t{field}: {new_val}')
print(f'\t#{" " * len(field)} {old_val}')
act.values[field] = new_val
def plan_actions() -> List[Action]:
load_ruian()
process_schools()
new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for ps in proto_schools.values():
new_schools[ps.red_izo].append(ps)
old_schools = get_old_schools()
actions = []
def plan(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
act = plan_single_change(old, new)
if act is not None:
actions.append(act)
for red_izo in sorted(new_schools.keys()):
new_ps = new_schools[red_izo]
old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address)
new_ps = sorted(new_ps, key=lambda ps: ps.address)
if len(old_ps) == 0:
for n in new_ps:
plan(None, n)
elif len(old_ps) == 1 and len(new_ps) == 1:
plan(old_ps[0], new_ps[0])
else:
oi = 0
ni = 0
merged_list: List[Tuple[ProtoSchool, ProtoSchool]] = []
while oi < len(old_ps) or ni < len(new_ps):
old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None
new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None
if old and new and old.address == new.address:
merged_list.append((old, new))
plan(old, new)
oi += 1
ni += 1
elif old and (not new or old.address < new.address):
assert old.school
if not old.school.place.hidden:
merged_list.append((old, None))
oi += 1
else:
merged_list.append((None, new))
ni += 1
if any(not(x[0] and x[1]) for x in merged_list):
# Existují nespárované záznamy, tak o nich chceme varovat
print(f'TODO: Školy s RED IZO {red_izo} se nepodařilo spárovat')
for old, new in merged_list:
if old and new:
print(f' Spárovaná (#{old.school.place_id}):')
s = new
elif old:
print(f' Jen v OSMO (#{old.school.place_id}):')
s = old
else:
print(' Jen v rejstříku:')
s = new
if old:
print(f'\t# URL: {school_url(old.school.place_id)}')
if old.school.place.note:
print(f'\t# NOTE: {old.school.place.note}')
for field in fields:
print(f'\t{field}: {getattr(s, field)}')
for red_izo in sorted(old_schools.keys()):
if red_izo not in new_schools:
for os in old_schools[red_izo]:
plan(os, None)
def read_actions() -> List[Action]:
actions = []
act = None
for line in sys.stdin:
line = line.rstrip()
if (match := re.fullmatch(r'>> ([A-Z]+) \(#(\d+), RED_IZO (\d+)\): .*', line)) is not None:
print(f'## {match[1]} {match[2]} {match[3]}')
act_type = ActionType.coerce(match[1])
act = Action(type=act_type, school_id=int(match[2]), values={})
actions.append(act)
elif line.startswith('\t#'):
pass
elif line.startswith('\t'):
if (match := re.fullmatch(r'\t(\w+): (.*)', line)) is None:
die(f'Cannot parse line: {line}')
if act is not None:
assert match[1] in fields
assert match[1] not in act.values
act.values[match[1]] = match[2]
print(f'#\t{match[1]}: {match[2]}')
else:
act = None
return actions
def execute_action(act: Action) -> None:
vals = act.values
if act.school_id > 0:
school = session.query(db.School).options(joinedload(db.School.place)).get(act.school_id)
assert school is not None
else:
school = None
if 'town_id' in vals:
town = session.query(db.Place).get(int(vals['town_id']))
assert town is not None
else:
town = None
if act.type == ActionType.ADD:
assert school is None
assert town is not None
place = db.Place(
level=4,
parent=town.place_id,
name=vals['short_name'],
type=db.PlaceType.school)
school = db.School(
place=place,
red_izo=vals['red_izo'],
ico=vals['ico'],
official_name=vals['official_name'],
address=vals['address'],
is_zs=bool(vals['is_zs']),
is_ss=bool(vals['is_ss']))
session.add(school)
session.flush()
print(f'Zakládám školu #{place.place_id}')
mo.util.log(
type=db.LogType.place,
what=place.place_id,
details={'action': 'new',
'reason': 'init-schools',
'place': db.row2dict(place),
'school': db.row2dict(school)}
)
global new_school_cnt
new_school_cnt += 1
elif act.type == ActionType.DISABLE:
assert school is not None
print(f'Skrývám školu #{act.school_id}')
school.place.hidden = True
mo.util.log(
type=db.LogType.place,
what=act.school_id,
details={'action': 'disable',
'reason': 'init-schools',
'changes': db.get_object_changes(school.place)},
)
global disabled_school_cnt
disabled_school_cnt += 1
elif act.type == ActionType.EDIT:
assert school is not None
print(f'Upravuji školu #{act.school_id}')
if town is not None:
school.place.parent = town.place_id
if 'short_name' in vals:
school.place.name = vals['short_name']
if 'ico' in vals:
school.ico = vals['ico']
if 'official_name' in vals:
school.official_name = vals['official_name']
if 'address' in vals:
school.address = vals['address']
if 'is_zs' in vals:
school.is_zs = bool(vals['is_zs'])
if 'is_ss' in vals:
school.is_ss = bool(vals['is_ss'])
mo.util.log(
type=db.LogType.place,
what=act.school_id,
details={'action': 'edit',
'reason': 'init-schools',
'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
)
global updated_school_cnt
updated_school_cnt += 1
else:
assert False
def execute_actions(actions: List[Action]) -> None:
for act in actions:
execute_action(act)
parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol')
parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno')
mode = parser.add_argument_group('operace').add_mutually_exclusive_group(required=True)
mode.add_argument('--plan', default=False, action='store_true', help='pouze naplánuje změny')
mode.add_argument('--execute', default=False, action='store_true', help='načte plán ze stdinu a provede změny')
mode.add_argument('--run', default=False, action='store_true', help='změny rovnou provádí')
args = parser.parse_args()
if args.plan:
plan_actions()
elif args.execute:
actions = read_actions()
execute_actions(actions)
elif args.run:
actions = plan_actions()
execute_actions(actions)
if not args.dry_run:
session.commit()
else:
print('*** Změny neprovedeny***')
print(f"Zpracováno {processed_school_cnt} škol z rejstříku.")
print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.")
print(f"Aktualizováno {updated_school_cnt} škol.")
print(f"Skryto {disabled_school_cnt} škol.")