Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • mj/mo-submit
1 result
Select Git revision
Show changes
Commits on Source (9)
......@@ -8,24 +8,46 @@
# číselníkem obcí z RUIANu.
import argparse
from typing import List, Dict, DefaultDict
import sys
from collections import defaultdict
import csv
from dataclasses import dataclass
from pathlib import Path
import re
import csv
from collections import defaultdict
from sqlalchemy.orm import joinedload
import sys
from typing import List, Dict, DefaultDict, Tuple, Optional
import mo.db as db
import mo.util
mo.util.init_standalone()
session = db.get_session()
new_town_cnt = 0
processed_school_cnt = 0
new_school_cnt = 0
updated_school_cnt = 0
@dataclass
class ProtoSchool:
town: db.Place
town_name: str
official_name: str
red_izo: str
ico: str
address: str
is_zs: bool
is_ss: bool
school: Optional[db.School]
# (red_izo, address) -> ProtoSchool
proto_schools: Dict[Tuple[str, str], ProtoSchool] = {}
fields = ('town_name', 'official_name', 'red_izo', 'ico', 'address', 'is_zs', 'is_ss')
def import_schools(path: Path, nuts: str):
def process_schools(path: Path, nuts: str) -> None:
# XXX: Rejstřík škol používá několik chybných/obsoletních NUTS kódů :(
nuts = re.sub('^CZ011', 'CZ010', nuts)
nuts = re.sub('^CZ021', 'CZ020', nuts)
......@@ -68,36 +90,29 @@ def import_schools(path: Path, nuts: str):
else:
assert False, f"Neznámý druh školy: {druh}"
school = (session.query(db.School)
.join(db.Place)
.filter(db.Place.level == 4)
.filter(db.Place.parent == town.place_id)
.filter(db.School.red_izo == red_izo)
.filter(db.School.address == addr2)
.first())
if school:
assert school.official_name == nazev
key = (red_izo, addr2)
if key in proto_schools:
ps = proto_schools[key]
assert ps.town == town
assert ps.ico == ico
assert ps.official_name == nazev
if is_zs:
school.is_zs = True
ps.is_zs = True
else:
school.is_ss = True
ps.is_ss = True
else:
place = db.Place(
level=4,
parent=town.place_id,
name=nazev,
type=db.PlaceType.school)
school = db.School(
place=place,
ps = ProtoSchool(
town=town,
town_name=f'{town.name} ({town.get_code()})',
official_name=nazev,
red_izo=red_izo,
ico=ico,
official_name=nazev,
address=addr2,
is_zs=is_zs,
is_ss=not is_zs)
session.add(school)
global new_school_cnt
new_school_cnt += 1
is_ss=not is_zs,
school=None,
)
proto_schools[key] = ps
global processed_school_cnt
processed_school_cnt += 1
......@@ -132,6 +147,10 @@ def make_address(misto: str, ulice: str, cp: str, co: str) -> str:
def lookup_town(misto: str, region_nuts: str) -> db.Place:
# HACK
if misto == 'Borotice 34':
misto = 'Borotice'
ruian_nuts = ruian_obec_to_okres_nuts[misto]
region = None
......@@ -181,7 +200,7 @@ def load_ruian_csv(name):
ruian_obec_to_okres_nuts: DefaultDict[str, List[str]] = defaultdict(list)
def load_ruian():
def load_ruian() -> None:
ocols, okresy = load_ruian_csv('extra/ruian/UI_OKRES.csv')
okres_by_id: Dict[int, List[str]] = {}
for o in okresy:
......@@ -197,8 +216,151 @@ def load_ruian():
# print(f"{jmeno} -> {okres}")
ruian_obec_to_okres_nuts[jmeno].append(okres[ocols['NUTS_LAU']])
def get_old_schools() -> DefaultDict[str, List[ProtoSchool]]:
schools = session.query(db.School).options(joinedload(db.School.place)).all()
old_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for s in schools:
town = s.place.parent_place
ps = ProtoSchool(
town=town,
town_name=f'{town.name} ({town.get_code()})',
official_name=s.official_name,
red_izo=s.red_izo,
ico=s.ico,
address=s.address,
is_zs=s.is_zs,
is_ss=s.is_ss,
school=s,
)
old_schools[ps.red_izo].append(ps)
return old_schools
def apply_single_change(old: Optional[ProtoSchool], new: Optional[ProtoSchool]) -> None:
if new is None:
assert old is not None
assert old.school is not None
print(f'TODO: Škola #{old.school.place_id} (RED IZO {old.red_izo}) vypadla z rejstříku')
for field in fields:
print(f'\t{field}: {getattr(old, field)}')
elif old is None:
assert new is not None
place = db.Place(
level=4,
parent=new.town.place_id,
name=new.official_name,
type=db.PlaceType.school)
school = db.School(
place=place,
red_izo=new.red_izo,
ico=new.ico,
official_name=new.official_name,
address=new.address,
is_zs=new.is_zs,
is_ss=new.is_ss)
session.add(school)
session.flush()
if args.update:
print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): NOVÁ')
for field in fields:
print(f'\t{field}: {getattr(new, field)}')
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'import-school', 'school': db.row2dict(school)}
)
global new_school_cnt
new_school_cnt += 1
else:
assert old.school
school = old.school
changes: List[Tuple[str, str, str]] = []
for field in fields:
if getattr(old, field) != getattr(new, field):
changes.append((field, getattr(old, field), getattr(new, field)))
school.place.parent = new.town.place_id
school.ico = new.ico
school.official_name = new.official_name
school.address = new.address
school.is_zs = new.is_zs
school.is_ss = new.is_ss
if changes:
print(f'Škola #{school.place_id} (RED IZO {new.red_izo}): ZMĚNY')
for field, old_val, new_val in changes:
print(f'\t{field}: {old_val} -> {new_val}')
assert args.update, "Změny provádíme pouze s přepínačem --update"
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'import-school',
'changes': {**db.get_object_changes(school), **db.get_object_changes(school.place)}},
)
global updated_school_cnt
updated_school_cnt += 1
def apply_changes() -> None:
new_schools: DefaultDict[str, List[ProtoSchool]] = defaultdict(list)
for ps in proto_schools.values():
new_schools[ps.red_izo].append(ps)
old_schools = get_old_schools()
for red_izo in sorted(new_schools.keys()):
new_ps = new_schools[red_izo]
old_ps = sorted(old_schools[red_izo], key=lambda ps: ps.address)
new_ps = sorted(new_ps, key=lambda ps: ps.address)
if len(old_ps) == 0:
for n in new_ps:
apply_single_change(None, n)
elif len(old_ps) == 1 and len(new_ps) == 1:
apply_single_change(old_ps[0], new_ps[0])
else:
oi = 0
ni = 0
while oi < len(old_ps) or ni < len(new_ps):
old: Optional[ProtoSchool] = old_ps[oi] if oi < len(old_ps) else None
new: Optional[ProtoSchool] = new_ps[ni] if ni < len(new_ps) else None
if old and new and old.address == new.address:
apply_single_change(old, new)
oi += 1
ni += 1
elif old and (not new or old.address < new.address):
assert old.school
print(f'TODO: Starou školu #{old.school.place_id} (RED IZO {red_izo}) nedokáži spárovat s novou')
for field in fields:
print(f'\t{field}: {getattr(old, field)}')
oi += 1
else:
print(f'TODO: Novou školu (RED IZO {red_izo}) nedokáži spárovat se starou')
for field in fields:
print(f'\t{field}: {getattr(new, field)}')
ni += 1
if args.stop_after is not None and (new_school_cnt + updated_school_cnt) >= args.stop_after:
return
for red_izo in old_schools.keys():
if red_izo not in new_schools:
for os in old_schools[red_izo]:
apply_single_change(os, None)
parser = argparse.ArgumentParser(description='Importuje školy z naparsovaného Rejstříku škol')
parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='pouze ukáže, co by bylo provedeno')
parser.add_argument('-u', '--update', default=False, action='store_true', help='aktualizuje školy v DB')
parser.add_argument('--stop-after', type=int, help='zastaví se po daném počtu změn')
args = parser.parse_args()
......@@ -208,9 +370,13 @@ for path in Path('extra/skoly/parsed').glob('*.tsv'):
m = re.fullmatch(r'^[A-Z]-(CZ\w+)\.tsv', path.name)
assert m is not None
nuts = m[1]
import_schools(path, nuts)
process_schools(path, nuts)
apply_changes()
if not args.dry_run:
session.commit()
print(f"Importováno {processed_school_cnt} škol.")
print(f"Založeno {new_school_cnt} nových škol a {new_town_cnt} nových obcí.")
print(f"Aktualizováno {updated_school_cnt} škol.")
### Dořešit v importu škol ###
### TODO ###
- logovat nejen nový záznam ve schools, ale i v places
- neformální jména nových škol (přidat obec, používat běžné zkratky)
- nějaký mechanismus na review změn
### Poznámky z prvního importu škol ###
WARNING: Obec Černovice je podle rejstříku v okrese CZ0632, podle RUIAN je na výběr ['CZ0641', 'CZ0321', 'CZ0422', 'CZ0633'] => dořešit ručně!
-> přesun do okresu PE
......
#!/bin/bash
set -e
rm -rf extra/parsed
mkdir extra/parsed
rm -rf extra/skoly/parsed
mkdir extra/skoly/parsed
for src in extra/html/*.html ; do
dst=extra/parsed/$(basename $src .html).tsv
for src in extra/skoly/html/*.html ; do
dst=extra/skoly/parsed/$(basename $src .html).tsv
echo -n "$src -> "
./rejskol-parse <$src >$dst
db/skoly/rejskol-parse <$src >$dst
wc -l $dst
done
......@@ -8,7 +8,7 @@ my $mech = WWW::Mechanize->new(autocheck => 1, strict_forms => 1);
$mech->get('https://rejstriky.msmt.cz/rejskol/VREJVerejne/VerejneRozhrani.aspx');
$mech->form_id('form1');
mkdir 'extra/html';
mkdir 'extra/skoly/html';
download_type('B'); # Základní školy
download_type('C'); # Střední školy
exit 0;
......@@ -59,7 +59,7 @@ sub download_region {
sleep 1;
my $resp = $mech->click_button(id => 'btnVybrat');
open my $f, '>:utf8', "extra/html/$type-$nuts.html";
open my $f, '>:utf8', "extra/skoly/html/$type-$nuts.html";
print $f $resp->decoded_content;
close $f;
......
......@@ -18,10 +18,10 @@ my $div_with_tables = $divs[1] // die;
my @last_main = ();
my $tab_count = 0;
for my $table ($div_with_tables->find_by_tag_name('table')) {
my $style = $table->attr('style');
my $main = 0;
if ($tab_count == 0 || defined $style) {
$main = 1;
my $style = $table->attr('style') // "";
my $main = 1;
if ($tab_count == 1 || $style =~ m{#f0f8ff}) {
$main = 0;
}
my $last_out = "";
for my $tr ($table->find_by_tag_name('tr')) {
......
......@@ -125,6 +125,8 @@ school_export_columns = (
Column(key='name', name='nazev'),
Column(key='town_code', name='kod_obce'),
Column(key='town', name='obec'),
Column(key='okres_code', name='kod_okresu'),
Column(key='kraj_code', name='kod_kraje'),
Column(key='red_izo', name='red_izo'),
Column(key='ico', name='ico'),
Column(key='official_name', name='ofic_nazev'),
......@@ -141,11 +143,15 @@ def org_export_schools():
def gen_rows():
town = aliased(db.Place)
for p, s, t in (
sess.query(db.Place, db.School, town)
okres = aliased(db.Place)
kraj = aliased(db.Place)
for p, s, t, o, k in (
sess.query(db.Place, db.School, town, okres, kraj)
.filter(db.Place.type == db.PlaceType.school)
.filter(db.Place.place_id == db.School.place_id)
.filter(db.Place.parent == town.place_id)
.filter(town.parent == okres.place_id)
.filter(okres.parent == kraj.place_id)
.yield_per(100)):
yield Row(keys={
'code': p.get_code(),
......@@ -158,6 +164,8 @@ def org_export_schools():
'is_ss': int(s.is_ss),
'town_code': t.get_code(),
'town': t.name,
'okres_code': o.get_code(),
'kraj_code': k.get_code(),
})
table = Table(school_export_columns, gen_rows(), 'skoly')
......
......@@ -664,3 +664,93 @@ def org_orgs_import():
year=mo.config.CURRENT_YEAR if form.only_this_year.data else None
)
return generic_import_page(form, imp, url_for('org_orgs_import'), template='org_global_orgs_import.html')
class ConfirmDeleteForm(FlaskForm):
delete = SubmitField('Potvrdit smazání')
@app.route('/org/user/<int:user_id>/delete', methods=('GET', 'POST'))
def org_user_delete(user_id: int):
if not g.user.is_admin:
raise werkzeug.exceptions.Forbidden()
sess = db.get_session()
user = sess.query(db.User).get(user_id)
if not user:
raise werkzeug.exceptions.NotFound()
warnings = []
errors = []
pants = sess.query(db.Participant).filter_by(user=user).all()
for pant in pants:
warnings.append(f'Účastní se {pant.year}. ročníku')
pions = (sess.query(db.Participation)
.filter_by(user=user)
.options(joinedload(db.Participation.contest).joinedload(db.Contest.round))
.all())
for pion in pions:
warnings.append(f'Účastní se kola {pion.contest.round.round_code()}')
num_roles = sess.query(db.UserRole).filter_by(user=user).count()
if num_roles > 0:
warnings.append(f'Má přidělené role ({num_roles})')
sols = (sess.query(db.Solution)
.filter_by(user=user)
.options(joinedload(db.Solution.task).joinedload(db.Task.round))
.all())
for sol in sols:
errors.append(f'Odevzdal úlohu {sol.task.code} v kole {sol.task.round.round_code()}')
num_papers = sess.query(db.Paper).filter_by(for_user_obj=user).count()
if num_papers:
errors.append(f'Patří mu řešení/opravy ({num_papers})')
num_uploads = sess.query(db.Paper).filter_by(uploaded_by_obj=user).count()
if num_uploads:
errors.append(f'Nahrál řešení/opravy ({num_uploads})')
logs = sess.query(db.Log).filter_by(user=user).all()
num_good_logs = 0
num_bad_logs = 0
for log in logs:
if log.details.get('reason', "") == 'user-join':
num_good_logs += 1
else:
num_bad_logs += 1
if num_good_logs > 0:
warnings.append(f'Vlastní záznamy v logu z registrace ({num_good_logs})')
if num_bad_logs > 0:
errors.append(f'Vlastní záznamy v logu ({num_bad_logs})')
form = ConfirmDeleteForm()
if form.validate_on_submit() and not errors:
sess.rollback()
conn = sess.connection()
pant_table = db.Participation.__table__
pion_table = db.Participant.__table__
role_table = db.UserRole.__table__
log_table = db.Log.__table__
conn.execute(pant_table.delete().where(pant_table.c.user_id == user.user_id))
conn.execute(pion_table.delete().where(pion_table.c.user_id == user.user_id))
conn.execute(role_table.delete().where(role_table.c.user_id == user.user_id))
conn.execute(log_table.delete().where(log_table.c.changed_by == user.user_id))
sess.commit()
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'delete', 'user': db.row2dict(user)},
)
sess.delete(user)
sess.commit()
app.logger.info(f"Uživatel #{user.user_id} smazán")
flash('Uživatel smazán.', 'danger')
return redirect(url_for('org_orgs') if user.is_org else url_for('org_users'))
return render_template('org_user_delete.html', user=user, form=form, warnings=warnings, errors=errors)
......@@ -31,6 +31,7 @@
{% endif %}
{% if g.user.is_admin %}
<a class="btn btn-default" href="{{ log_url('user', user.user_id) }}">Historie</a>
<a class="btn btn-danger" href="{{ url_for('org_user_delete', user_id=user.user_id) }}">Smazat</a>
{% endif %}
{% if can_incarnate %}
<form action="{{ url_for('incarnate', id=user.user_id) }}" method=POST class='btn-group'>
......
......@@ -39,6 +39,7 @@
{% endif %}
{% if g.user.is_admin %}
<a class="btn btn-default" href="{{ log_url('user', user.user_id) }}">Historie</a>
<a class="btn btn-danger" href="{{ url_for('org_user_delete', user_id=user.user_id) }}">Smazat</a>
{% endif %}
{% if can_incarnate %}
<form action="{{ url_for('incarnate', id=user.user_id) }}" method=POST class='btn-group'>
......
{% extends "base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Smazat uživatele: {{ user.full_name() }}{% endblock %}
{% block body %}
{% if warnings %}
<h3>Varováni</h3>
<ul>
{% for w in warnings %}
<li>{{ w }}
{% endfor %}
</ul>
{% endif %}
{% if errors %}
<h3>Chyby</h3>
<ul>
{% for e in errors %}
<li>{{ e }}
{% endfor %}
</ul>
{% endif %}
{% if form and not errors %}
{{ wtf.quick_form(form, form_type='simple', button_map={'delete': 'danger'}) }}
{% endif %}
{% endblock %}