diff --git a/bin/init-schools b/bin/init-schools
index 1ead92bda94b045a31723ab0cf2cc803c43c053b..80d7d7340410797a8fef40ccf471e530efb2d071 100755
--- a/bin/init-schools
+++ b/bin/init-schools
@@ -11,13 +11,16 @@ import argparse
from collections import defaultdict
import csv
from dataclasses import dataclass
+from enum import auto
import re
from sqlalchemy.orm import joinedload
import sys
from typing import List, Dict, DefaultDict, Tuple, Optional
+import mo.config as config
import mo.db as db
import mo.util
+from mo.util import die
mo.util.init_standalone()
session = db.get_session()
@@ -26,12 +29,16 @@ new_town_cnt = 0
processed_school_cnt = 0
new_school_cnt = 0
updated_school_cnt = 0
+disabled_school_cnt = 0
@dataclass
class ProtoSchool:
town: db.Place
+ town_id: int
town_name: str
+ unsure_region: bool
+ short_name: str
official_name: str
red_izo: str
ico: str
@@ -43,7 +50,20 @@ class ProtoSchool:
# (red_izo, address) -> ProtoSchool
proto_schools: Dict[Tuple[str, str], ProtoSchool] = {}
-fields = ('town_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
+fields = ('town_id', 'town_name', 'short_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
+
+
+class ActionType(db.MOEnum):
+ ADD = auto()
+ DISABLE = auto()
+ EDIT = ()
+
+
+@dataclass
+class Action:
+ type: ActionType
+ school_id: Optional[int]
+ values: Dict[str, str]
def process_schools() -> None:
@@ -78,13 +98,17 @@ def process_schools() -> None:
ulice2 = f[columns['Škola_ulice']]
cp2 = f[columns['Škola_č.p.']]
co2 = f[columns['Škola_č.or.']]
+ if misto2 == 'Praha':
+ print(f"WARNING: Škola s RED_IZO má jako obec uvedenu Prahu => volím náhradu {misto}", file=sys.stderr)
+ assert misto != 'Praha', f'Škola s RED_IZO {red_izo} má jako obec uvedenu Prahu dvakrát'
+ misto2 = misto
addr = make_address(misto, ulice, cp, co)
addr2 = make_address(misto2, ulice2, cp2, co2)
# if addr != addr2:
# print(f"WARNING: Škola má dvě různé adresy: <{addr}> != <{addr2}>", file=sys.stderr)
- town = lookup_town(misto2, nuts)
+ town, unsure_region = lookup_town(misto2, nuts)
if druh == 'B00':
is_zs = True
@@ -112,7 +136,10 @@ def process_schools() -> None:
else:
ps = ProtoSchool(
town=town,
+ town_id=town.place_id,
town_name=f'{town.name} ({town.get_code()})',
+ unsure_region=unsure_region,
+ short_name=nazev,
official_name=nazev,
red_izo=red_izo,
ico=ico,
@@ -155,9 +182,10 @@ def make_address(misto: str, ulice: str, cp: str, co: str) -> str:
return misto
-def lookup_town(misto: str, region_nuts: str) -> db.Place:
+def lookup_town(misto: str, region_nuts: str) -> Tuple[db.Place, bool]:
ruian_nuts = ruian_obec_to_okres_nuts[misto]
region = None
+ unsure_region = False
if region_nuts in ruian_nuts:
nuts = region_nuts
@@ -172,9 +200,11 @@ def lookup_town(misto: str, region_nuts: str) -> db.Place:
elif len(ruian_nuts) == 1:
nuts = ruian_nuts[0]
print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, ale pod RUIAN v {nuts} => preferuji RUIAN", file=sys.stderr)
+ unsure_region = True
else:
nuts = region_nuts
print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, podle RUIAN je na výběr {ruian_nuts} => dořešit ručně!", file=sys.stderr)
+ unsure_region = True
if not region:
region = session.query(db.Place).filter_by(level=2, nuts=nuts).first()
@@ -185,10 +215,16 @@ def lookup_town(misto: str, region_nuts: str) -> db.Place:
town = db.Place(level=3, parent=region.place_id, name=misto, type=db.PlaceType.region)
session.add(town)
session.flush()
+ mo.util.log(
+ type=db.LogType.place,
+ what=town.place_id,
+ details={'action': 'new', 'reason': 'init-schools', 'place': db.row2dict(town)},
+ )
+ session.flush()
global new_town_cnt
new_town_cnt += 1
- print(f'Založena obec: {misto} (#{town.place_id})')
- return town
+ print(f'Založena obec: {misto} (#{town.place_id}) v okrese {region.name}')
+ return town, unsure_region
def load_ruian_csv(name):
@@ -232,12 +268,15 @@ def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]:
old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for s in schools:
if not s.red_izo:
- print(f'TODO: Škola #{s.place_id} ({s.place.name}) nemá RED IZO')
+ print(f'# TODO: Škola #{s.place_id} ({s.place.name}) nemá RED IZO')
continue
town = s.place.parent_place
ps = ProtoSchool(
town=town,
+ town_id=town.place_id,
town_name=f'{town.name} ({town.get_code()})',
+ unsure_region=False,
+ short_name=s.place.name,
official_name=s.official_name,
red_izo=s.red_izo,
ico=s.ico,
@@ -260,96 +299,96 @@ def simplify_name(name: str, town: str) -> str:
return name + ', ' + town
-def apply_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
+def school_url(school_id: int) -> str:
+ return f'{config.WEB_ROOT}org/place/{school_id}'
+
+
+def create_action(ps: ProtoSchool, type: ActionType, msg: str) -> Action:
+ school_id = ps.school.place_id if ps.school else 0
+ print(f'>> {type.name} (#{school_id}, RED_IZO {ps.red_izo}): {msg}')
+ if school_id > 0:
+ print(f'\t# URL: {school_url(school_id)}')
+ logs = session.query(db.Log).filter_by(type=db.LogType.place, id=school_id).all()
+ for log in logs:
+ action = log.details.get('action', '?')
+ reason = log.details.get('reason', '?')
+ if action == 'new' or reason == 'init-schools':
+ pass
+ else:
+ print(f'\t# Log: {log.details}')
+ if ps.unsure_region:
+ print('\t# WARNING: Obec s nejistým regionem')
+ return Action(type=type, school_id=school_id, values={})
+
+
+def add_fields(action: Action, ps: ProtoSchool) -> None:
+ for field in fields:
+ val = getattr(ps, field)
+ action.values[field] = val
+ print(f'\t{field}: {val}')
+
+
+def plan_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> Optional[Action]:
if new is None:
assert old is not None
assert old.school is not None
- print(f'TODO: Škola #{old.school.place_id} (RED IZO {old.red_izo}) vypadla z rejstříku')
- for field in fields:
- print(f'\t{field}: {getattr(old, field)}')
+ if old.school.place.hidden:
+ return None
+ act = create_action(old, ActionType.DISABLE, 'vypadla z rejstříku')
+ add_fields(act, old)
+ return act
elif old is None:
assert new is not None
- simple_name = simplify_name(new.official_name, new.town.name)
- place = db.Place(
- level=4,
- parent=new.town.place_id,
- name=simple_name,
- type=db.PlaceType.school)
- school = db.School(
- place=place,
- red_izo=new.red_izo,
- ico=new.ico,
- official_name=new.official_name,
- address=new.address,
- is_zs=new.is_zs,
- is_ss=new.is_ss)
- session.add(school)
- session.flush()
-
- if args.update:
- print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): NOVÁ')
- print(f'\tname: {simple_name}')
- for field in fields:
- print(f'\t{field}: {getattr(new, field)}')
-
- mo.util.log(
- type=db.LogType.place,
- what=school.place_id,
- details={'action': 'import-school', 'reason': 'script', 'school': db.row2dict(school)}
- )
-
- global new_school_cnt
- new_school_cnt += 1
+ new.short_name = simplify_name(new.official_name, new.town.name)
+ act = create_action(new, ActionType.ADD, 'nová')
+ add_fields(act, new)
+ return act
else:
assert old.school
- school = old.school
+ if old.official_name != new.official_name or old.town_name != new.town_name:
+ new.short_name = simplify_name(new.official_name, new.town.name)
+ else:
+ new.short_name = old.short_name
changes: List[Tuple[str, str, str]] = []
for field in fields:
if getattr(old, field) != getattr(new, field):
changes.append((field, getattr(old, field), getattr(new, field)))
- school.place.parent = new.town.place_id
- school.ico = new.ico
- school.official_name = new.official_name
- school.address = new.address
- school.is_zs = new.is_zs
- school.is_ss = new.is_ss
-
if changes:
- print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): ZMĚNY')
+ act = create_action(old, ActionType.EDIT, 'změny')
for field, old_val, new_val in changes:
- print(f'\t{field}: {old_val} -> {new_val}')
- assert args.update, "Změny provádíme pouze s přepínačem --update"
-
- mo.util.log(
- type=db.LogType.place,
- what=school.place_id,
- details={'action': 'import-school',
- 'reason': 'script',
- 'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
- )
+ print(f'\t{field}: {new_val}')
+ print(f'\t#{" " * len(field)} {old_val}')
+ act.values[field] = new_val
- global updated_school_cnt
- updated_school_cnt += 1
+def plan_actions() -> List[Action]:
+ load_ruian()
+ process_schools()
-def apply_changes() -> None:
new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for ps in proto_schools.values():
new_schools[ps.red_izo].append(ps)
old_schools = get_old_schools()
+ actions = []
+
+ def plan(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
+ act = plan_single_change(old, new)
+ if act is not None:
+ actions.append(act)
+
for red_izo in sorted(new_schools.keys()):
new_ps = new_schools[red_izo]
old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address)
new_ps = sorted(new_ps, key=lambda ps: ps.address)
if len(old_ps) == 0:
for n in new_ps:
- apply_single_change(None, n)
+ plan(None, n)
elif len(old_ps) == 1 and len(new_ps) == 1:
- apply_single_change(old_ps[0], new_ps[0])
+ plan(old_ps[0], new_ps[0])
else:
oi = 0
ni = 0
@@ -357,7 +396,7 @@ def apply_changes() -> None:
old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None
new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None
if old and new and old.address == new.address:
- apply_single_change(old, new)
+ plan(old, new)
oi += 1
ni += 1
elif old and (not new or old.address < new.address):
@@ -372,29 +411,158 @@ def apply_changes() -> None:
print(f'\t{field}: {getattr(new, field)}')
ni += 1
- if args.stop_after is not None and (new_school_cnt + updated_school_cnt) >= args.stop_after:
- return
-
for red_izo in sorted(old_schools.keys()):
if red_izo not in new_schools:
for os in old_schools[red_izo]:
- apply_single_change(os, None)
+ plan(os, None)
+
+
+def read_actions() -> List[Action]:
+ actions = []
+ act = None
+
+ for line in sys.stdin:
+ line = line.rstrip()
+ if (match := re.fullmatch(r'>> ([A-Z]+) \(#(\d+), RED_IZO (\d+)\): .*', line)) is not None:
+ print(f'## {match[1]} {match[2]} {match[3]}')
+ act_type = ActionType.coerce(match[1])
+ act = Action(type=act_type, school_id=int(match[2]), values={})
+ actions.append(act)
+ elif line.startswith('\t#'):
+ pass
+ elif line.startswith('\t'):
+ if (match := re.fullmatch(r'\t(\w+): (.*)', line)) is None:
+ die(f'Cannot parse line: {line}')
+ if act is not None:
+ assert match[1] in fields
+ assert match[1] not in act.values
+ act.values[match[1]] = match[2]
+ print(f'#\t{match[1]}: {match[2]}')
+ else:
+ act = None
+
+ return actions
+
+
+def execute_action(act: Action) -> None:
+ vals = act.values
+
+ if act.school_id > 0:
+ school = session.query(db.School).options(joinedload(db.School.place)).get(act.school_id)
+ assert school is not None
+ else:
+ school = None
+
+ if 'town_id' in vals:
+ town = session.query(db.Place).get(int(vals['town_id']))
+ assert town is not None
+ else:
+ town = None
+
+ if act.type == ActionType.ADD:
+ assert school is None
+ assert town is not None
+ place = db.Place(
+ level=4,
+ parent=town.place_id,
+ name=vals['short_name'],
+ type=db.PlaceType.school)
+ school = db.School(
+ place=place,
+ red_izo=vals['red_izo'],
+ ico=vals['ico'],
+ official_name=vals['official_name'],
+ address=vals['address'],
+ is_zs=bool(vals['is_zs']),
+ is_ss=bool(vals['is_ss']))
+ session.add(school)
+ session.flush()
+
+ print(f'Zakládám školu #{place.place_id}')
+ mo.util.log(
+ type=db.LogType.place,
+ what=place.place_id,
+ details={'action': 'new',
+ 'reason': 'init-schools',
+ 'place': db.row2dict(place),
+ 'school': db.row2dict(school)}
+ )
+
+ global new_school_cnt
+ new_school_cnt += 1
+ elif act.type == ActionType.DISABLE:
+ assert school is not None
+ print(f'Skrývám školu #{act.school_id}')
+ school.place.hidden = True
+ mo.util.log(
+ type=db.LogType.place,
+ what=act.school_id,
+ details={'action': 'disable',
+ 'reason': 'init-schools',
+ 'changes': db.get_object_changes(school.place)},
+ )
+ global disabled_school_cnt
+ disabled_school_cnt += 1
+ elif act.type == ActionType.EDIT:
+ assert school is not None
+ print(f'Upravuji školu #{act.school_id}')
+
+ if town is not None:
+ school.place.parent = town.place_id
+ if 'short_name' in vals:
+ school.place.name = vals['short_name']
+ if 'ico' in vals:
+ school.ico = vals['ico']
+ if 'official_name' in vals:
+ school.official_name = vals['official_name']
+ if 'address' in vals:
+ school.address = vals['address']
+ if 'is_zs' in vals:
+ school.is_zs = bool(vals['is_zs'])
+ if 'is_ss' in vals:
+ school.is_ss = bool(vals['is_ss'])
+
+ mo.util.log(
+ type=db.LogType.place,
+ what=act.school_id,
+ details={'action': 'edit',
+ 'reason': 'init-schools',
+ 'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
+ )
+
+ global updated_school_cnt
+ updated_school_cnt += 1
+ else:
+ assert False
+
+
+def execute_actions(actions: List[Action]) -> None:
+ for act in actions:
+ execute_action(act)
parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol')
parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno')
-parser.add_argument('-u', '--update', default=False, action='store_true', help='aktualizuje školy v DB')
-parser.add_argument('--stop-after', type=int, help='zastaví se po daném počtu změn')
+mode = parser.add_argument_group('operace').add_mutually_exclusive_group(required=True)
+mode.add_argument('--plan', default=False, action='store_true', help='pouze naplánuje změny')
+mode.add_argument('--execute', default=False, action='store_true', help='načte plán ze stdinu a provede změny')
+mode.add_argument('--run', default=False, action='store_true', help='změny rovnou provádí')
args = parser.parse_args()
-load_ruian()
-process_schools()
-apply_changes()
+if args.plan:
+ plan_actions()
+elif args.execute:
+ actions = read_actions()
+ execute_actions(actions)
+elif args.run:
+ actions = plan_actions()
+ execute_actions(actions)
if not args.dry_run:
session.commit()
-print(f"Importováno {processed_school_cnt} škol.")
+print(f"Zpracováno {processed_school_cnt} škol z rejstříku.")
print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.")
print(f"Aktualizováno {updated_school_cnt} škol.")
+print(f"Skryto {disabled_school_cnt} škol.")