Skip to content
Snippets Groups Projects
Select Git revision
  • ae1503ae38191b5ee34fba6775e6edecf412af6c
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

base.html

Blame
    • Jiří Setnička's avatar
      05b48b1a
      CSS: Změna základního layoutu + přidáno logo MO · 05b48b1a
      Jiří Setnička authored
      * Přidáno logo (skryje se při skrolování)
      * Menu roztaženo po celé šířce (sticky při skrolování)
      * Top level elementy se třídou .content se zobrazují jen ve střední části stránky
      
      Vršek stránky vizuálně a barevně sjednocen s tím, co je na matematickaolympiada.cz
      
      Solve #21
      05b48b1a
      History
      CSS: Změna základního layoutu + přidáno logo MO
      Jiří Setnička authored
      * Přidáno logo (skryje se při skrolování)
      * Menu roztaženo po celé šířce (sticky při skrolování)
      * Top level elementy se třídou .content se zobrazují jen ve střední části stránky
      
      Vršek stránky vizuálně a barevně sjednocen s tím, co je na matematickaolympiada.cz
      
      Solve #21
    init-schools 19.94 KiB
    #!/usr/bin/env python3
    # Naplní databázi školami a obcemi, v nichž školy sídlí
    # Používá extra/skoly/SkolyAObory.tsv
    #
    # Pozor, zrada: rejstřík škol je sice rozdělený do okresů dle NUTS/LAU,
    # ale školy tam řadí podle úřadu, u nějž je škole registrovaná, což vůbec
    # nemusí odpovídat skutečnému sídlu školy. Proto si poněkud magicky pomáháme
    # číselníkem obcí z RUIANu.
    
    import argparse
    from collections import defaultdict
    import csv
    from dataclasses import dataclass
    from enum import auto
    import re
    from sqlalchemy.orm import joinedload
    import sys
    from typing import List, Dict, DefaultDict, Tuple, Optional
    
    import mo.config as config
    import mo.db as db
    import mo.util
    from mo.util import die
    from mo.util_format import timeformat
    
    mo.util.init_standalone()
    session = db.get_session()
    
    new_town_cnt = 0
    processed_school_cnt = 0
    new_school_cnt = 0
    updated_school_cnt = 0
    disabled_school_cnt = 0
    
    
    @dataclass
    class ProtoSchool:
        town: db.Place
        town_id: int
        town_name: str
        unsure_region: bool
        short_name: str
        official_name: str
        red_izo: str
        ico: str
        address: str
        is_zs: bool
        is_ss: bool
        school: Optional[db.School]
    
    
    # (red_izo, address) -> ProtoSchool
    proto_schools: Dict[Tuple[str, str], ProtoSchool] = {}
    fields = ('town_id', 'town_name', 'short_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
    
    
    class ActionType(db.MOEnum):
        ADD = auto()
        DISABLE = auto()
        EDIT = ()
    
    
    @dataclass
    class Action:
        type: ActionType
        school_id: Optional[int]
        values: Dict[str, str]
    
    
    def process_schools() -> None:
        with open('extra/skoly/SkolyAObory.csv') as file:
            reader = csv.reader(file, delimiter=',')
            rows = list(reader)
            columns = parse_header(rows.pop(0))
            for f in rows:
                red_izo = f[columns['RED_IZO']]
                ico = f[columns['IČO']]
                nuts = f[columns['Území']]
                druh = f[columns['Druh/Typ']]
    
                # Skip broken records
                if red_izo == "691016551":
                    continue
    
                # Normalize IČO
                if len(ico) < 8:
                    ico = "0" * (8 - len(ico)) + ico
    
                # Address of legal entity
                nazev = f[columns['Plný název']]
                nazev = nazev.replace('\u00b4', "'")
                misto = f[columns['Místo']]
                ulice = f[columns['Ulice']]
                cp = f[columns['č.p.']]
                co = f[columns['č.or.']]
    
                # Address of school building
                misto2 = f[columns['Škola_místo']]
                ulice2 = f[columns['Škola_ulice']]
                cp2 = f[columns['Škola_č.p.']]
                co2 = f[columns['Škola_č.or.']]
                if misto2 == 'Praha':
                    print(f"WARNING: Škola s RED_IZO má jako obec uvedenu Prahu => volím náhradu {misto}", file=sys.stderr)
                    assert misto != 'Praha', f'Škola s RED_IZO {red_izo} má jako obec uvedenu Prahu dvakrát'
                    misto2 = misto
    
                addr = make_address(misto, ulice, cp, co)
                addr2 = make_address(misto2, ulice2, cp2, co2)
                # if addr != addr2:
                #     print(f"WARNING: Škola má dvě různé adresy: <{addr}> != <{addr2}>", file=sys.stderr)
    
                town, unsure_region = lookup_town(misto2, nuts)
    
                if druh == 'B00':
                    is_zs = True
                elif druh == 'C00':
                    is_zs = False
                elif druh in ['D00', 'E00', 'M40']:
                    # Ignorujeme:
                    #   D00 = konzervatoř
                    #   E00 = VOŠ
                    #   M40 = středisko praktického vyučování
                    continue
                else:
                    assert False, f"Neznámý druh školy: {druh}"
    
                key = (red_izo, addr2)
                if key in proto_schools:
                    ps = proto_schools[key]
                    assert ps.town == town
                    assert ps.ico == ico
                    assert ps.official_name == nazev
                    if is_zs:
                        ps.is_zs = True
                    else:
                        ps.is_ss = True
                else:
                    ps = ProtoSchool(
                        town=town,
                        town_id=town.place_id,
                        town_name=town.name,
                        unsure_region=unsure_region,
                        short_name=nazev,
                        official_name=nazev,
                        red_izo=red_izo,
                        ico=ico,
                        address=addr2,
                        is_zs=is_zs,
                        is_ss=not is_zs,
                        school=None,
                    )
                    proto_schools[key] = ps
    
                global processed_school_cnt
                processed_school_cnt += 1
    
    
    def parse_header(header: List[str]) -> Dict[str, int]:
        columns = {}
        i = 0
        for col in header:
            if col.endswith(':'):
                col = col[:-1]
            while col in columns:
                col = '_' + col
            columns[col] = i
            i += 1
        return columns
    
    
    def make_address(misto: str, ulice: str, cp: str, co: str) -> str:
        if cp and co:
            c = f"{cp}/{co}"
        else:
            c = cp or co
    
        if ulice:
            if c:
                return f"{ulice} {c}, {misto}"
            else:
                return f"{ulice}, {misto}"
        else:
            return misto
    
    
    def lookup_town(misto: str, region_nuts: str) -> Tuple[db.Place, bool]:
        ruian_nuts = ruian_obec_to_okres_nuts[misto]
        region = None
        unsure_region = False
    
        if region_nuts in ruian_nuts:
            nuts = region_nuts
        elif not ruian_nuts:
            if misto.startswith('Praha '):
                # XXX: Pražské obvody nejsou v RUIANu
                region = session.query(db.Place).filter_by(level=2, name=misto).first()
                assert region
            else:
                nuts = region_nuts
                print(f"WARNING: Obec {misto} není v RUIAN", file=sys.stderr)
        elif len(ruian_nuts) == 1:
            nuts = ruian_nuts[0]
            print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, ale pod RUIAN v {nuts} => preferuji RUIAN", file=sys.stderr)
            unsure_region = True
        else:
            nuts = region_nuts
            print(f"WARNING: Obec {misto} je podle rejstříku v okrese {region_nuts}, podle RUIAN je na výběr {ruian_nuts} => dořešit ručně!", file=sys.stderr)
            unsure_region = True
    
        if not region:
            region = session.query(db.Place).filter_by(level=2, nuts=nuts).first()
            assert region, f'Chybí region pro NUTS {nuts}'
    
        town = session.query(db.Place).filter_by(level=3, parent=region.place_id, name=misto).first()
        if town is None:
            town = db.Place(level=3, parent=region.place_id, name=misto, type=db.PlaceType.region)
            session.add(town)
            session.flush()
            mo.util.log(
                type=db.LogType.place,
                what=town.place_id,
                details={'action': 'new', 'reason': 'init-schools', 'place': db.row2dict(town)},
            )
            session.flush()
            global new_town_cnt
            new_town_cnt += 1
            print(f'Založena obec: {misto} (#{town.place_id}) v okrese {region.name}')
        return town, unsure_region
    
    
    def load_ruian_csv(name):
        with open(name) as file:
            reader = csv.reader(file, delimiter=';')
            rows = list(reader)
            columns = {}
            i = 0
            for h in rows[0]:
                columns[h] = i
                i += 1
            return columns, rows[1:]
    
    
    ruian_obec_to_okres_nuts: DefaultDict[str, List[str]] = defaultdict(list)
    
    
    def load_ruian() -> None:
        ocols, okresy = load_ruian_csv('extra/ruian/UI_OKRES.csv')
        okres_by_id: Dict[int, List[str]] = {}
        for o in okresy:
            id = int(o[ocols['KOD']])
            assert id not in okres_by_id
            okres_by_id[id] = o
    
        mcols, mesta = load_ruian_csv('extra/ruian/UI_OBEC.csv')
        for m in mesta:
            jmeno = m[mcols['NAZEV']]
            oid = int(m[mcols['OKRES_KOD']])
            if jmeno == 'Praha' and oid == 9999:
                # Výjimka: obec není součástí okresu
                continue
            okres = okres_by_id[oid]
            # print(f"{jmeno} -> {okres}")
            ruian_obec_to_okres_nuts[jmeno].append(okres[ocols['NUTS_LAU']])
    
    
    def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]:
        schools = session.query(db.School).options(joinedload(db.School.place)).all()
    
        old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
        for s in schools:
            if not s.red_izo:
                print(f'# TODO: Škola #{s.place_id} ({s.place.name}) nemá RED IZO')
                continue
            town = s.place.parent_place
            ps = ProtoSchool(
                town=town,
                town_id=town.place_id,
                town_name=town.name,
                unsure_region=False,
                short_name=s.place.name,
                official_name=s.official_name,
                red_izo=s.red_izo,
                ico=s.ico,
                address=s.address,
                is_zs=s.is_zs,
                is_ss=s.is_ss,
                school=s,
            )
            old_schools[ps.red_izo].append(ps)
    
        return old_schools
    
    
    def simplify_name(name: str, town: str) -> str:
        name = re.sub('základní škola', '', name, flags=re.IGNORECASE)
        name = re.sub('mateřská škola', '', name, flags=re.IGNORECASE)
        name = re.sub('střední škola', '', name, flags=re.IGNORECASE)
        name = re.sub('střední průmyslová škola', 'SPŠ', name, flags=re.IGNORECASE)
        name = re.sub('gymnázium', 'G', name, flags=re.IGNORECASE)
        name = re.sub(r',?\s*s\.r\.o\.', "", name)
        return name + ', ' + town
    
    
    def school_url(school_id: int) -> str:
        return f'{config.WEB_ROOT}org/place/{school_id}'
    
    
    def create_action(ps: ProtoSchool, type: ActionType, msg: str) -> Action:
        school_id = ps.school.place_id if ps.school else 0
        print(f'>> {type.name} (#{school_id}, RED_IZO {ps.red_izo}): {msg}')
        if school_id > 0:
            print(f'\t# URL: {school_url(school_id)}')
            logs = session.query(db.Log).filter_by(type=db.LogType.place, id=school_id).options(joinedload(db.LogType.user)).order_by(db.Log.changed_at).all()
            for log in logs:
                action = log.details.get('action', '?')
                reason = log.details.get('reason', '?')
                if action == 'new' or reason == 'init-schools':
                    pass
                else:
                    print(f'\t# Log ({timeformat(log.changed_at)} {log.user.full_name() if log.user else "system"}): {log.details}')
        if ps.unsure_region:
            print('\t# WARNING: Obec s nejistým regionem')
        if ps.school and ps.school.place.note:
            print(f'\t# NOTE: {ps.school.place.note}')
        return Action(type=type, school_id=school_id, values={})
    
    
    def add_fields(action: Action, ps: ProtoSchool) -> None:
        for field in fields:
            val = getattr(ps, field)
            action.values[field] = val
            print(f'\t{field}: {val}')
    
    
    def plan_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> Optional[Action]:
        if new is None:
            assert old is not None
            assert old.school is not None
            if old.school.place.hidden:
                return None
            act = create_action(old, ActionType.DISABLE, 'vypadla z rejstříku')
            add_fields(act, old)
            return act
        elif old is None:
            assert new is not None
            new.short_name = simplify_name(new.official_name, new.town.name)
            act = create_action(new, ActionType.ADD, 'nová')
            add_fields(act, new)
            return act
        else:
            assert old.school
            if old.official_name != new.official_name or old.town_name != new.town_name:
                new.short_name = simplify_name(new.official_name, new.town.name)
            else:
                new.short_name = old.short_name
    
            changes: List[Tuple[str, str, str]] = []
            for field in fields:
                if getattr(old, field) != getattr(new, field):
                    changes.append((field, getattr(old, field), getattr(new, field)))
    
            changed_fields = {c[0] for c in changes}
            if set(changed_fields) <= {'is_zs', 'is_ss'}:
                return None
    
            act = create_action(old, ActionType.EDIT, 'změny')
            for field, old_val, new_val in changes:
                print(f'\t{field}: {new_val}')
                print(f'\t#{" " * len(field)} {old_val}')
                act.values[field] = new_val
    
    
    def plan_actions() -> List[Action]:
        load_ruian()
        process_schools()
    
        new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
        for ps in proto_schools.values():
            new_schools[ps.red_izo].append(ps)
    
        old_schools = get_old_schools()
    
        actions = []
    
        def plan(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
            act = plan_single_change(old, new)
            if act is not None:
                actions.append(act)
    
        for red_izo in sorted(new_schools.keys()):
            new_ps = new_schools[red_izo]
            old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address)
            new_ps = sorted(new_ps, key=lambda ps: ps.address)
            if len(old_ps) == 0:
                for n in new_ps:
                    plan(None, n)
            elif len(old_ps) == 1 and len(new_ps) == 1:
                plan(old_ps[0], new_ps[0])
            else:
                oi = 0
                ni = 0
                merged_list: List[Tuple[ProtoSchool, ProtoSchool]] = []
                while oi < len(old_ps) or ni < len(new_ps):
                    old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None
                    new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None
    
                    if old and new and old.address == new.address:
                        merged_list.append((old, new))
                        plan(old, new)
                        oi += 1
                        ni += 1
                    elif old and (not new or old.address < new.address):
                        assert old.school
                        if not old.school.place.hidden:
                            merged_list.append((old, None))
                        oi += 1
                    else:
                        merged_list.append((None, new))
                        ni += 1
    
                if any(not(x[0] and x[1]) for x in merged_list):
                    # Existují nespárované záznamy, tak o nich chceme varovat
                    print(f'TODO: Školy s RED IZO {red_izo} se nepodařilo spárovat')
                    for old, new in merged_list:
                        if old and new:
                            print(f'    Spárovaná (#{old.school.place_id}):')
                            s = new
                        elif old:
                            print(f'    Jen v OSMO (#{old.school.place_id}):')
                            s = old
                        else:
                            print('    Jen v rejstříku:')
                            s = new
                        if old:
                            print(f'\t# URL: {school_url(old.school.place_id)}')
                            if old.school.place.note:
                                print(f'\t# NOTE: {old.school.place.note}')
                        for field in fields:
                            print(f'\t{field}: {getattr(s, field)}')
    
        for red_izo in sorted(old_schools.keys()):
            if red_izo not in new_schools:
                for os in old_schools[red_izo]:
                    plan(os, None)
    
    
    def read_actions() -> List[Action]:
        actions = []
        act = None
    
        for line in sys.stdin:
            line = line.rstrip()
            if (match := re.fullmatch(r'>> ([A-Z]+) \(#(\d+), RED_IZO (\d+)\): .*', line)) is not None:
                print(f'## {match[1]} {match[2]} {match[3]}')
                act_type = ActionType.coerce(match[1])
                act = Action(type=act_type, school_id=int(match[2]), values={})
                actions.append(act)
            elif line.startswith('\t#'):
                pass
            elif line.startswith('\t'):
                if (match := re.fullmatch(r'\t(\w+): (.*)', line)) is None:
                    die(f'Cannot parse line: {line}')
                if act is not None:
                    assert match[1] in fields
                    assert match[1] not in act.values
                    act.values[match[1]] = match[2]
                    print(f'#\t{match[1]}: {match[2]}')
            else:
                act = None
    
        return actions
    
    
    def execute_action(act: Action) -> None:
        vals = act.values
    
        if act.school_id > 0:
            school = session.query(db.School).options(joinedload(db.School.place)).get(act.school_id)
            assert school is not None
        else:
            school = None
    
        if 'town_id' in vals:
            town = session.query(db.Place).get(int(vals['town_id']))
            assert town is not None
        else:
            town = None
    
        if act.type == ActionType.ADD:
            assert school is None
            assert town is not None
            place = db.Place(
                level=4,
                parent=town.place_id,
                name=vals['short_name'],
                type=db.PlaceType.school)
            school = db.School(
                place=place,
                red_izo=vals['red_izo'],
                ico=vals['ico'],
                official_name=vals['official_name'],
                address=vals['address'],
                is_zs=bool(vals['is_zs']),
                is_ss=bool(vals['is_ss']))
            session.add(school)
            session.flush()
    
            print(f'Zakládám školu #{place.place_id}')
            mo.util.log(
                type=db.LogType.place,
                what=place.place_id,
                details={'action': 'new',
                         'reason': 'init-schools',
                         'place': db.row2dict(place),
                         'school': db.row2dict(school)}
            )
    
            global new_school_cnt
            new_school_cnt += 1
        elif act.type == ActionType.DISABLE:
            assert school is not None
            print(f'Skrývám školu #{act.school_id}')
            school.place.hidden = True
            mo.util.log(
                type=db.LogType.place,
                what=act.school_id,
                details={'action': 'disable',
                         'reason': 'init-schools',
                         'changes': db.get_object_changes(school.place)},
            )
            global disabled_school_cnt
            disabled_school_cnt += 1
        elif act.type == ActionType.EDIT:
            assert school is not None
            print(f'Upravuji školu #{act.school_id}')
    
            if town is not None:
                school.place.parent = town.place_id
            if 'short_name' in vals:
                school.place.name = vals['short_name']
            if 'ico' in vals:
                school.ico = vals['ico']
            if 'official_name' in vals:
                school.official_name = vals['official_name']
            if 'address' in vals:
                school.address = vals['address']
            if 'is_zs' in vals:
                school.is_zs = bool(vals['is_zs'])
            if 'is_ss' in vals:
                school.is_ss = bool(vals['is_ss'])
    
            mo.util.log(
                type=db.LogType.place,
                what=act.school_id,
                details={'action': 'edit',
                         'reason': 'init-schools',
                         'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
            )
    
            global updated_school_cnt
            updated_school_cnt += 1
        else:
            assert False
    
    
    def execute_actions(actions: List[Action]) -> None:
        for act in actions:
            execute_action(act)
    
    
    parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol')
    parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno')
    mode = parser.add_argument_group('operace').add_mutually_exclusive_group(required=True)
    mode.add_argument('--plan', default=False, action='store_true', help='pouze naplánuje změny')
    mode.add_argument('--execute', default=False, action='store_true', help='načte plán ze stdinu a provede změny')
    mode.add_argument('--run', default=False, action='store_true', help='změny rovnou provádí')
    
    args = parser.parse_args()
    
    if args.plan:
        plan_actions()
    elif args.execute:
        actions = read_actions()
        execute_actions(actions)
    elif args.run:
        actions = plan_actions()
        execute_actions(actions)
    
    if not args.dry_run:
        session.commit()
    else:
        print('*** Změny neprovedeny***')
    
    print(f"Zpracováno {processed_school_cnt} škol z rejstříku.")
    print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.")
    print(f"Aktualizováno {updated_school_cnt} škol.")
    print(f"Skryto {disabled_school_cnt} škol.")