Skip to content
Snippets Groups Projects
Commit 6219ea7b authored by Martin Mareš's avatar Martin Mareš
Browse files

init-schools: Předěláno na dvoufázovou aktualizaci

Změny škol je nyní možné připravit, pak ručně zkontrolovat a provést.
Změny obcí stále probíhají automaticky.

Další úpravy:

  •  Všechny změny v databázi logujeme do DB logu.
  •  U každé školy ukazujeme URL s odkazem na editaci.
  •  Také ukazujeme záznamy v DB logu kromě založení a změn z importu.
parent 0730aa35
Branches
No related tags found
No related merge requests found
...@@ -11,13 +11,16 @@ import argparse ...@@ -11,13 +11,16 @@ import argparse
from collections import defaultdict from collections import defaultdict
import csv import csv
from dataclasses import dataclass from dataclasses import dataclass
from enum import auto
import re import re
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
import sys import sys
from typing import List, Dict, DefaultDict, Tuple, Optional from typing import List, Dict, DefaultDict, Tuple, Optional
import mo.config as config
import mo.db as db import mo.db as db
import mo.util import mo.util
from mo.util import die
mo.util.init_standalone() mo.util.init_standalone()
session = db.get_session() session = db.get_session()
...@@ -26,12 +29,16 @@ new_town_cnt = 0 ...@@ -26,12 +29,16 @@ new_town_cnt = 0
processed_school_cnt = 0 processed_school_cnt = 0
new_school_cnt = 0 new_school_cnt = 0
updated_school_cnt = 0 updated_school_cnt = 0
disabled_school_cnt = 0
@dataclass @dataclass
class ProtoSchool: class ProtoSchool:
town: db.Place town: db.Place
town_id: int
town_name: str town_name: str
unsure_region: bool
short_name: str
official_name: str official_name: str
red_izo: str red_izo: str
ico: str ico: str
...@@ -43,7 +50,20 @@ class ProtoSchool: ...@@ -43,7 +50,20 @@ class ProtoSchool:
# (red_izo, address) -> ProtoSchool # (red_izo, address) -> ProtoSchool
proto_schools: Dict[Tuple[str, str], ProtoSchool] = {} proto_schools: Dict[Tuple[str, str], ProtoSchool] = {}
fields = ('town_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss') fields = ('town_id', 'town_name', 'short_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
class ActionType(db.MOEnum):
ADD = auto()
DISABLE = auto()
EDIT = ()
@dataclass
class Action:
type: ActionType
school_id: Optional[int]
values: Dict[str, str]
def process_schools() -> None: def process_schools() -> None:
...@@ -78,13 +98,17 @@ def process_schools() -> None: ...@@ -78,13 +98,17 @@ def process_schools() -> None:
ulice2 = f[columns['Škola_ulice']] ulice2 = f[columns['Škola_ulice']]
cp2 = f[columns['Škola_č.p.']] cp2 = f[columns['Škola_č.p.']]
co2 = f[columns['Škola_č.or.']] co2 = f[columns['Škola_č.or.']]
if misto2 == 'Praha':
print(f"WARNING: Škola s RED_IZO má jako obec uvedenu Prahu => volím náhradu {misto}", file=sys.stderr)
assert misto != 'Praha', f'Škola s RED_IZO {red_izo} má jako obec uvedenu Prahu dvakrát'
misto2 = misto
addr = make_address(misto, ulice, cp, co) addr = make_address(misto, ulice, cp, co)
addr2 = make_address(misto2, ulice2, cp2, co2) addr2 = make_address(misto2, ulice2, cp2, co2)
# if addr != addr2: # if addr != addr2:
# print(f"WARNING: Škola má dvě různé adresy: <{addr}> != <{addr2}>", file=sys.stderr) # print(f"WARNING: Škola má dvě různé adresy: <{addr}> != <{addr2}>", file=sys.stderr)
town = lookup_town(misto2, nuts) town, unsure_region = lookup_town(misto2, nuts)
if druh == 'B00': if druh == 'B00':
is_zs = True is_zs = True
...@@ -112,7 +136,10 @@ def process_schools() -> None: ...@@ -112,7 +136,10 @@ def process_schools() -> None:
else: else:
ps = ProtoSchool( ps = ProtoSchool(
town=town, town=town,
town_id=town.place_id,
town_name=f'{town.name} ({town.get_code()})', town_name=f'{town.name} ({town.get_code()})',
unsure_region=unsure_region,
short_name=nazev,
official_name=nazev, official_name=nazev,
red_izo=red_izo, red_izo=red_izo,
ico=ico, ico=ico,
...@@ -155,9 +182,10 @@ def make_address(misto: str, ulice: str, cp: str, co: str) -> str: ...@@ -155,9 +182,10 @@ def make_address(misto: str, ulice: str, cp: str, co: str) -> str:
return misto return misto
def lookup_town(misto: str, region_nuts: str) -> db.Place: def lookup_town(misto: str, region_nuts: str) -> Tuple[db.Place, bool]:
ruian_nuts = ruian_obec_to_okres_nuts[misto] ruian_nuts = ruian_obec_to_okres_nuts[misto]
region = None region = None
unsure_region = False
if region_nuts in ruian_nuts: if region_nuts in ruian_nuts:
nuts = region_nuts nuts = region_nuts
...@@ -172,9 +200,11 @@ def lookup_town(misto: str, region_nuts: str) -> db.Place: ...@@ -172,9 +200,11 @@ def lookup_town(misto: str, region_nuts: str) -> db.Place:
elif len(ruian_nuts) == 1: elif len(ruian_nuts) == 1:
nuts = ruian_nuts[0] nuts = ruian_nuts[0]
print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, ale pod RUIAN v {nuts} => preferuji RUIAN", file=sys.stderr) print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, ale pod RUIAN v {nuts} => preferuji RUIAN", file=sys.stderr)
unsure_region = True
else: else:
nuts = region_nuts nuts = region_nuts
print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, podle RUIAN je na výběr {ruian_nuts} => dořešit ručně!", file=sys.stderr) print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, podle RUIAN je na výběr {ruian_nuts} => dořešit ručně!", file=sys.stderr)
unsure_region = True
if not region: if not region:
region = session.query(db.Place).filter_by(level=2, nuts=nuts).first() region = session.query(db.Place).filter_by(level=2, nuts=nuts).first()
...@@ -185,10 +215,16 @@ def lookup_town(misto: str, region_nuts: str) -> db.Place: ...@@ -185,10 +215,16 @@ def lookup_town(misto: str, region_nuts: str) -> db.Place:
town = db.Place(level=3, parent=region.place_id, name=misto, type=db.PlaceType.region) town = db.Place(level=3, parent=region.place_id, name=misto, type=db.PlaceType.region)
session.add(town) session.add(town)
session.flush() session.flush()
mo.util.log(
type=db.LogType.place,
what=town.place_id,
details={'action': 'new', 'reason': 'init-schools', 'place': db.row2dict(town)},
)
session.flush()
global new_town_cnt global new_town_cnt
new_town_cnt += 1 new_town_cnt += 1
print(f'Založena obec: {misto} (#{town.place_id})') print(f'Založena obec: {misto} (#{town.place_id}) v okrese {region.name}')
return town return town, unsure_region
def load_ruian_csv(name): def load_ruian_csv(name):
...@@ -232,12 +268,15 @@ def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]: ...@@ -232,12 +268,15 @@ def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]:
old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list) old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for s in schools: for s in schools:
if not s.red_izo: if not s.red_izo:
print(f'TODO: Škola #{s.place_id} ({s.place.name}) nemá RED IZO') print(f'# TODO: Škola #{s.place_id} ({s.place.name}) nemá RED IZO')
continue continue
town = s.place.parent_place town = s.place.parent_place
ps = ProtoSchool( ps = ProtoSchool(
town=town, town=town,
town_id=town.place_id,
town_name=f'{town.name} ({town.get_code()})', town_name=f'{town.name} ({town.get_code()})',
unsure_region=False,
short_name=s.place.name,
official_name=s.official_name, official_name=s.official_name,
red_izo=s.red_izo, red_izo=s.red_izo,
ico=s.ico, ico=s.ico,
...@@ -260,96 +299,96 @@ def simplify_name(name: str, town: str) -> str: ...@@ -260,96 +299,96 @@ def simplify_name(name: str, town: str) -> str:
return name + ', ' + town return name + ', ' + town
def apply_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None: def school_url(school_id: int) -> str:
return f'{config.WEB_ROOT}org/place/{school_id}'
def create_action(ps: ProtoSchool, type: ActionType, msg: str) -> Action:
school_id = ps.school.place_id if ps.school else 0
print(f'>> {type.name} (#{school_id}, RED_IZO {ps.red_izo}): {msg}')
if school_id > 0:
print(f'\t# URL: {school_url(school_id)}')
logs = session.query(db.Log).filter_by(type=db.LogType.place, id=school_id).all()
for log in logs:
action = log.details.get('action', '?')
reason = log.details.get('reason', '?')
if action == 'new' or reason == 'init-schools':
pass
else:
print(f'\t# Log: {log.details}')
if ps.unsure_region:
print('\t# WARNING: Obec s nejistým regionem')
return Action(type=type, school_id=school_id, values={})
def add_fields(action: Action, ps: ProtoSchool) -> None:
for field in fields:
val = getattr(ps, field)
action.values[field] = val
print(f'\t{field}: {val}')
def plan_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> Optional[Action]:
if new is None: if new is None:
assert old is not None assert old is not None
assert old.school is not None assert old.school is not None
print(f'TODO: Škola #{old.school.place_id} (RED IZO {old.red_izo}) vypadla z rejstříku') if old.school.place.hidden:
for field in fields: return None
print(f'\t{field}: {getattr(old, field)}') act = create_action(old, ActionType.DISABLE, 'vypadla z rejstříku')
add_fields(act, old)
return act
elif old is None: elif old is None:
assert new is not None assert new is not None
simple_name = simplify_name(new.official_name, new.town.name) new.short_name = simplify_name(new.official_name, new.town.name)
place = db.Place( act = create_action(new, ActionType.ADD, 'nová')
level=4, add_fields(act, new)
parent=new.town.place_id, return act
name=simple_name,
type=db.PlaceType.school)
school = db.School(
place=place,
red_izo=new.red_izo,
ico=new.ico,
official_name=new.official_name,
address=new.address,
is_zs=new.is_zs,
is_ss=new.is_ss)
session.add(school)
session.flush()
if args.update:
print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): NOVÁ')
print(f'\tname: {simple_name}')
for field in fields:
print(f'\t{field}: {getattr(new, field)}')
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'import-school', 'reason': 'script', 'school': db.row2dict(school)}
)
global new_school_cnt
new_school_cnt += 1
else: else:
assert old.school assert old.school
school = old.school if old.official_name != new.official_name or old.town_name != new.town_name:
new.short_name = simplify_name(new.official_name, new.town.name)
else:
new.short_name = old.short_name
changes: List[Tuple[str, str, str]] = [] changes: List[Tuple[str, str, str]] = []
for field in fields: for field in fields:
if getattr(old, field) != getattr(new, field): if getattr(old, field) != getattr(new, field):
changes.append((field, getattr(old, field), getattr(new, field))) changes.append((field, getattr(old, field), getattr(new, field)))
school.place.parent = new.town.place_id
school.ico = new.ico
school.official_name = new.official_name
school.address = new.address
school.is_zs = new.is_zs
school.is_ss = new.is_ss
if changes: if changes:
print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): ZMĚNY') act = create_action(old, ActionType.EDIT, 'změny')
for field, old_val, new_val in changes: for field, old_val, new_val in changes:
print(f'\t{field}: {old_val} -> {new_val}') print(f'\t{field}: {new_val}')
assert args.update, "Změny provádíme pouze s přepínačem --update" print(f'\t#{" " * len(field)} {old_val}')
act.values[field] = new_val
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'import-school',
'reason': 'script',
'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
)
global updated_school_cnt
updated_school_cnt += 1
def plan_actions() -> List[Action]:
load_ruian()
process_schools()
def apply_changes() -> None:
new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list) new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for ps in proto_schools.values(): for ps in proto_schools.values():
new_schools[ps.red_izo].append(ps) new_schools[ps.red_izo].append(ps)
old_schools = get_old_schools() old_schools = get_old_schools()
actions = []
def plan(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
act = plan_single_change(old, new)
if act is not None:
actions.append(act)
for red_izo in sorted(new_schools.keys()): for red_izo in sorted(new_schools.keys()):
new_ps = new_schools[red_izo] new_ps = new_schools[red_izo]
old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address) old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address)
new_ps = sorted(new_ps, key=lambda ps: ps.address) new_ps = sorted(new_ps, key=lambda ps: ps.address)
if len(old_ps) == 0: if len(old_ps) == 0:
for n in new_ps: for n in new_ps:
apply_single_change(None, n) plan(None, n)
elif len(old_ps) == 1 and len(new_ps) == 1: elif len(old_ps) == 1 and len(new_ps) == 1:
apply_single_change(old_ps[0], new_ps[0]) plan(old_ps[0], new_ps[0])
else: else:
oi = 0 oi = 0
ni = 0 ni = 0
...@@ -357,7 +396,7 @@ def apply_changes() -> None: ...@@ -357,7 +396,7 @@ def apply_changes() -> None:
old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None
new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None
if old and new and old.address == new.address: if old and new and old.address == new.address:
apply_single_change(old, new) plan(old, new)
oi += 1 oi += 1
ni += 1 ni += 1
elif old and (not new or old.address < new.address): elif old and (not new or old.address < new.address):
...@@ -372,29 +411,158 @@ def apply_changes() -> None: ...@@ -372,29 +411,158 @@ def apply_changes() -> None:
print(f'\t{field}: {getattr(new, field)}') print(f'\t{field}: {getattr(new, field)}')
ni += 1 ni += 1
if args.stop_after is not None and (new_school_cnt + updated_school_cnt) >= args.stop_after:
return
for red_izo in sorted(old_schools.keys()): for red_izo in sorted(old_schools.keys()):
if red_izo not in new_schools: if red_izo not in new_schools:
for os in old_schools[red_izo]: for os in old_schools[red_izo]:
apply_single_change(os, None) plan(os, None)
def read_actions() -> List[Action]:
actions = []
act = None
for line in sys.stdin:
line = line.rstrip()
if (match := re.fullmatch(r'>> ([A-Z]+) \(#(\d+), RED_IZO (\d+)\): .*', line)) is not None:
print(f'## {match[1]} {match[2]} {match[3]}')
act_type = ActionType.coerce(match[1])
act = Action(type=act_type, school_id=int(match[2]), values={})
actions.append(act)
elif line.startswith('\t#'):
pass
elif line.startswith('\t'):
if (match := re.fullmatch(r'\t(\w+): (.*)', line)) is None:
die(f'Cannot parse line: {line}')
if act is not None:
assert match[1] in fields
assert match[1] not in act.values
act.values[match[1]] = match[2]
print(f'#\t{match[1]}: {match[2]}')
else:
act = None
return actions
def execute_action(act: Action) -> None:
vals = act.values
if act.school_id > 0:
school = session.query(db.School).options(joinedload(db.School.place)).get(act.school_id)
assert school is not None
else:
school = None
if 'town_id' in vals:
town = session.query(db.Place).get(int(vals['town_id']))
assert town is not None
else:
town = None
if act.type == ActionType.ADD:
assert school is None
assert town is not None
place = db.Place(
level=4,
parent=town.place_id,
name=vals['short_name'],
type=db.PlaceType.school)
school = db.School(
place=place,
red_izo=vals['red_izo'],
ico=vals['ico'],
official_name=vals['official_name'],
address=vals['address'],
is_zs=bool(vals['is_zs']),
is_ss=bool(vals['is_ss']))
session.add(school)
session.flush()
print(f'Zakládám školu #{place.place_id}')
mo.util.log(
type=db.LogType.place,
what=place.place_id,
details={'action': 'new',
'reason': 'init-schools',
'place': db.row2dict(place),
'school': db.row2dict(school)}
)
global new_school_cnt
new_school_cnt += 1
elif act.type == ActionType.DISABLE:
assert school is not None
print(f'Skrývám školu #{act.school_id}')
school.place.hidden = True
mo.util.log(
type=db.LogType.place,
what=act.school_id,
details={'action': 'disable',
'reason': 'init-schools',
'changes': db.get_object_changes(school.place)},
)
global disabled_school_cnt
disabled_school_cnt += 1
elif act.type == ActionType.EDIT:
assert school is not None
print(f'Upravuji školu #{act.school_id}')
if town is not None:
school.place.parent = town.place_id
if 'short_name' in vals:
school.place.name = vals['short_name']
if 'ico' in vals:
school.ico = vals['ico']
if 'official_name' in vals:
school.official_name = vals['official_name']
if 'address' in vals:
school.address = vals['address']
if 'is_zs' in vals:
school.is_zs = bool(vals['is_zs'])
if 'is_ss' in vals:
school.is_ss = bool(vals['is_ss'])
mo.util.log(
type=db.LogType.place,
what=act.school_id,
details={'action': 'edit',
'reason': 'init-schools',
'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
)
global updated_school_cnt
updated_school_cnt += 1
else:
assert False
def execute_actions(actions: List[Action]) -> None:
for act in actions:
execute_action(act)
parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol') parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol')
parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno') parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno')
parser.add_argument('-u', '--update', default=False, action='store_true', help='aktualizuje školy v DB') mode = parser.add_argument_group('operace').add_mutually_exclusive_group(required=True)
parser.add_argument('--stop-after', type=int, help='zastaví se po daném počtu změn') mode.add_argument('--plan', default=False, action='store_true', help='pouze naplánuje změny')
mode.add_argument('--execute', default=False, action='store_true', help='načte plán ze stdinu a provede změny')
mode.add_argument('--run', default=False, action='store_true', help='změny rovnou provádí')
args = parser.parse_args() args = parser.parse_args()
load_ruian() if args.plan:
process_schools() plan_actions()
apply_changes() elif args.execute:
actions = read_actions()
execute_actions(actions)
elif args.run:
actions = plan_actions()
execute_actions(actions)
if not args.dry_run: if not args.dry_run:
session.commit() session.commit()
print(f"Importováno {processed_school_cnt} škol.") print(f"Zpracováno {processed_school_cnt} škol z rejstříku.")
print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.") print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.")
print(f"Aktualizováno {updated_school_cnt} škol.") print(f"Aktualizováno {updated_school_cnt} škol.")
print(f"Skryto {disabled_school_cnt} škol.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment