diff --git a/mo/web/__init__.py b/mo/web/__init__.py index d4662954de068d4b8111a8cccd98d2d8973dc19b..294c8509b9a2bee3928f7768e1beef80df566eeb 100644 --- a/mo/web/__init__.py +++ b/mo/web/__init__.py @@ -63,5 +63,7 @@ app.before_request(init_request) # Většina webu je v samostatných modulech import mo.web.main import mo.web.org +import mo.web.org_contest +import mo.web.org_place import mo.web.org_users import mo.web.menu diff --git a/mo/web/org.py b/mo/web/org.py index e817a9a372efc5169a474cd39e797ef79f19dc76..bde5ffc810062d23c4a87a43d724cd06e398f982 100644 --- a/mo/web/org.py +++ b/mo/web/org.py @@ -1,612 +1,8 @@ -from flask import render_template, g, redirect, url_for, flash, request -from flask_wtf import FlaskForm -import flask_wtf.file -import locale -import os -import secrets -from sqlalchemy.orm import joinedload -from typing import List, Tuple, Optional -import werkzeug.exceptions -import wtforms +from flask import render_template, g, redirect, url_for -import mo -import mo.csv -import mo.db as db -import mo.imports -import mo.rights -import mo.util from mo.web import app -from mo.web.table import Table, Column, cell_place_link, cell_user_link -import wtforms.validators as validators @app.route('/org/') def org_index(): return render_template('org_index.html') - - -@app.route('/org/place/<int:id>/') -def org_place(id: int): - sess = db.get_session() - - place = sess.query(db.Place).get(id) - if not place: - raise werkzeug.exceptions.NotFound() - - if place.type == db.PlaceType.school: - school = sess.query(db.School).get(place.place_id) - else: - school = None - - children = sorted(place.children, key=lambda p: locale.strxfrm(p.name)) - - rr = mo.rights.Rights(g.user) - rr.get_for(place) - - return render_template( - 'org_place.html', place=place, school=school, - rights=sorted(rr.current_rights, key=lambda r: r. name), - can_edit=rr.can_edit_place(place), - children=children - ) - - -class PlaceEditForm(FlaskForm): - name = wtforms.StringField( - 'Název', - validators=[validators.DataRequired()] - ) - code = wtforms.StringField( - 'Kód', filters=[lambda x: x or None], # may be NULL in db - description="Při nevyplnění se použije ID místa" - ) - type = wtforms.SelectField( - 'Typ', choices=db.PlaceType.choices(), coerce=db.PlaceType.coerce - ) - nuts = wtforms.StringField( - 'NUTS', filters=[lambda x: x or None], # may be NULL in db - description="Pro okresy a výše" - ) - note = wtforms.StringField('Poznámka') - submit = wtforms.SubmitField('Uložit') - - -class PlaceSchoolEditForm(PlaceEditForm): - red_izo = wtforms.StringField('RED_IZO') - ico = wtforms.StringField('IČO') - official_name = wtforms.StringField('Oficiální název') - address = wtforms.StringField('Adresa') - is_zs = wtforms.BooleanField('ZŠ') - is_ss = wtforms.BooleanField('SŠ') - submit = wtforms.SubmitField('Uložit') - - -@app.route('/org/place/<int:id>/edit', methods=('GET', 'POST')) -def org_place_edit(id: int): - sess = db.get_session() - - place = sess.query(db.Place).get(id) - if not place: - raise werkzeug.exceptions.NotFound() - - rr = mo.rights.Rights(g.user) - rr.get_for(place) - - if not rr.can_edit_place(place): - raise werkzeug.exceptions.Forbidden() - - if place.type == db.PlaceType.school: - school = sess.query(db.School).get(place.place_id) - # Pass school data as additional dict (data is used after obj) - form = PlaceSchoolEditForm(obj=place, data=db.row2dict(school)) - else: - form = PlaceEditForm(obj=place) - school = None - - form.type.choices = db.PlaceType.choices(level=place.level) - - if form.validate_on_submit(): - form.populate_obj(place) - if school: - form.populate_obj(school) - - msg = 'Změny místa uloženy' - redirectURL = url_for('org_place', id=id) - - if sess.is_modified(place) or school and sess.is_modified(school): - placeChanges = db.get_object_changes(place) - schoolChanges = {} - if school: - if request.form.get('type') != 'school': - # School record removed - mo.util.log( - type=db.LogType.place, - what=school.place_id, - details={'action': 'school-delete', 'school': db.row2dict(school)}, - ) - app.logger.info(f"Deleting school record for place {place.place_id}") - db.get_session().delete(school) - msg = 'Změny místa uloženy, záznam o škole smazán' - else: - schoolChanges = db.get_object_changes(school) - elif request.form.get('type') == 'school': - # School record created - new_school = db.School() - new_school.place_id = place.place_id - mo.util.log( - type=db.LogType.place, - what=new_school.place_id, - details={'action': 'school-add'}, - ) - app.logger.info(f"Creating new school for place {place.place_id}") - db.get_session().add(new_school) - # Take org directly to the school edit to fill the data - msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data' - redirectURL = url_for('org_place_edit', id=id) - - changes = {**placeChanges, **schoolChanges} - app.logger.info(f"Place {id} modified, changes: {changes}") - mo.util.log( - type=db.LogType.place, - what=id, - details={'action': 'edit', 'changes': changes}, - ) - db.get_session().commit() - flash(msg, 'success') - else: - flash(u'Žádné změny k uložení', 'info') - - return redirect(redirectURL) - - parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top - - return render_template( - 'org_place_edit.html', place=place, school=school, - parents=parents, form=form - ) - - -class PlaceMoveForm(FlaskForm): - code = wtforms.StringField(validators=[validators.DataRequired()]) - submit = wtforms.SubmitField('Najít místo') - reset = wtforms.HiddenField() - move = wtforms.HiddenField() - - -class PlaceMoveConfirmForm(FlaskForm): - code = wtforms.HiddenField() - reset = wtforms.SubmitField('Zrušit') - move = wtforms.SubmitField('Přesunout') - - -@app.route('/org/place/<int:id>/move', methods=('GET', 'POST')) -def org_place_move(id: int): - sess = db.get_session() - - # Tests: can move only existing places that we can edit - place = sess.query(db.Place).get(id) - if not place: - raise werkzeug.exceptions.NotFound() - rr = mo.rights.Rights(g.user) - rr.get_for(place) - if not rr.can_edit_place(place): - raise werkzeug.exceptions.Forbidden() - - parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top - new_parents = None - search_error = None - - form = PlaceMoveForm() - form_confirm = None - if form.validate_on_submit(): - if form.reset.data: - return redirect(url_for('org_place_move', id=id)) - - new_parent = db.get_place_by_code(form.code.data) - if not new_parent: - search_error = 'Místo s tímto kódem se nepovedlo nalézt' - else: - new_parents = reversed(db.get_place_parents(new_parent)) - (_, levels) = db.place_type_names_and_levels[place.type] - - rr.get_for(new_parent) - if not rr.can_edit_place(new_parent): - search_error = 'Nemáte právo k editaci vybraného nadřazeného místa, přesun nelze uskutečnit' - elif (new_parent.level + 1) not in levels: - search_error = f'Toto místo ({place.type_name()}) nelze přemístit pod vybrané místo ({new_parent.type_name()}), dostalo by se na nepovolený level' - elif new_parent.place_id == place.parent: - search_error = 'Žádná změna, místo je zde již umístěno' - elif form.move.data: - # Everything is OK, if submitted with 'move' do the move - place.parent = new_parent.place_id - place.level = new_parent.level + 1 - changes = db.get_object_changes(place) - mo.util.log( - type=db.LogType.place, - what=id, - details={'action': 'move', 'changes': changes}, - ) - app.logger.info(f"Place {id} moved, changes: {changes}") - db.get_session().commit() - flash('Místo úspěšně přesunuto', 'success') - return redirect(url_for('org_place', id=id)) - else: - # OK but not confirmed yet, display the confirm form - form_confirm = PlaceMoveConfirmForm() - form_confirm.code.data = form.code.data - - return render_template( - 'org_place_move.html', - place=place, form=form, form_confirm=form_confirm, search_error=search_error, - parents=parents, new_parents=new_parents - ) - - -@app.route('/org/place/<int:id>/delete', methods=('POST',)) -def org_place_delete(id: int): - sess = db.get_session() - - # Tests: can delete only existing places that we can edit - place = sess.query(db.Place).get(id) - if not place: - raise werkzeug.exceptions.NotFound() - rr = mo.rights.Rights(g.user) - rr.get_for(place) - if not rr.can_edit_place(place): - raise werkzeug.exceptions.Forbidden() - - # Cannot delete place with children - if place.children: - flash("Nelze smazat místo s podřízenými místy", "danger") - return redirect(url_for('org_place', id=id)) - - # Cannot delete place with contests - if sess.query(db.Contest).filter_by(place_id=id).count() > 0: - flash("Nelze smazat místo ke kterému se váže nějaká soutěž ", "danger") - return redirect(url_for('org_place', id=id)) - - if place.type == db.PlaceType.school: - school = sess.query(db.School).get(place.place_id) - mo.util.log( - type=db.LogType.place, - what=school.place_id, - details={'action': 'school-delete', 'school': db.row2dict(school)}, - ) - app.logger.info(f"Deleting school record for place {id}") - db.get_session().delete(school) - - mo.util.log( - type=db.LogType.place, - what=id, - details={'action': 'delete', 'place': db.row2dict(place)}, - ) - app.logger.info(f"Deleting place {id}") - - parent = place.parent - db.get_session().delete(place) - db.get_session().commit() - - flash("Místo smazáno", "success") - return redirect(url_for('org_place', id=parent)) - - -@app.route('/org/place/<int:id>/new-child', methods=('GET', 'POST')) -def org_place_new_child(id: int): - sess = db.get_session() - - # Tests: can add new child only under existing places that we can edit - parent_place = sess.query(db.Place).get(id) - if not parent_place: - raise werkzeug.exceptions.NotFound() - rr = mo.rights.Rights(g.user) - rr.get_for(parent_place) - if not rr.can_edit_place(parent_place): - raise werkzeug.exceptions.Forbidden() - if not parent_place.can_have_child(): - raise werkzeug.exceptions.Forbidden() - - form = PlaceEditForm() - form.type.choices = db.PlaceType.choices(level=parent_place.level + 1) - - if form.validate_on_submit(): - new_place = db.Place() - form.populate_obj(new_place) - new_place.parent = parent_place.place_id - new_place.level = parent_place.level + 1 - sess.add(new_place) - sess.flush() - - app.logger.info(f"New place created: {db.row2dict(new_place)}") - mo.util.log( - type=db.LogType.place, - what=new_place.place_id, - details={'action': 'new', 'place': db.row2dict(new_place)}, - ) - - redirect_url = url_for('org_place', id=new_place.place_id) - msg = 'Nové místo uloženo' - - if new_place.type == db.PlaceType.school: - new_school = db.School() - new_school.place_id = new_place.place_id - mo.util.log( - type=db.LogType.place, - what=new_school.place_id, - details={'action': 'school-add'}, - ) - app.logger.info(f"Creating new school for place {new_place.place_id}") - sess.add(new_school) - # Take org directly to the school edit to fill the data - msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data' - redirect_url = url_for('org_place_edit', id=new_place.place_id) - - sess.commit() - flash(msg, 'success') - return redirect(redirect_url) - - parents = reversed(db.get_place_parents(parent_place)) - - return render_template('org_place_new.html', parents=parents, form=form) - - -@app.route('/org/place/') -def org_place_root(): - root = db.get_root_place() - return redirect(url_for('org_place', id=root.place_id)) - - -@app.route('/org/place/<int:id>/rights') -def org_place_rights(id: int): - sess = db.get_session() - - place = sess.query(db.Place).get(id) - if not place: - raise werkzeug.exceptions.NotFound() - - parent_ids = [p.place_id for p in db.get_place_parents(place)] - roles = (sess.query(db.UserRole) - .filter(db.UserRole.place_id.in_(parent_ids)) - .options(joinedload(db.UserRole.user)) - .all()) - - rr = mo.rights.Rights(g.user) - rr.get_for(place) - - return render_template( - 'org_place_rights.html', place=place, rights=rr.current_rights, - roles=roles, roles_by_type=mo.rights.roles_by_type - ) - - -class ImportForm(FlaskForm): - file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()]) - submit = wtforms.SubmitField('Importovat') - - -@app.route('/org/contest/') -def org_contest_root(): - sess = db.get_session() - - rounds = sess.query(db.Round).filter_by(year=mo.current_year).order_by(db.Round.year, db.Round.category, db.Round.seq) - return render_template('org_contest_root.html', rounds=rounds, level_names=mo.db.place_level_names) - - -def get_round(id: int) -> db.Round: - round = db.get_session().query(db.Round).get(id) - if not round: - raise werkzeug.exceptions.NotFound() - return round - - -def get_round_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Round, mo.rights.Rights]: - round = get_round(id) - - rr = mo.rights.Rights(g.user) - rr.get_for_round(round) - - if not (right_needed is None or rr.have_right(right_needed)): - raise werkzeug.exceptions.Forbidden() - - return round, rr - - -@app.route('/org/contest/r/<int:id>/') -def org_round(id: int): - sess = db.get_session() - round, rr = get_round_rr(id, None) - - contests = (sess.query(db.Contest) - .filter_by(round=round) - .options(joinedload(db.Contest.place)) - .all()) - - contests.sort(key=lambda c: locale.strxfrm(c.place.name)) - - return render_template( - 'org_round.html', - round=round, - contests=contests, - level_names=mo.db.place_level_names, - can_manage=rr.have_right(mo.rights.Right.manage_contest), - ) - - -@app.route('/org/contest/r/<int:id>/list') -def org_round_list(id: int): - return render_template('not_implemented.html') - - -@app.route('/org/contest/r/<int:id>/import', methods=('GET', 'POST')) -def org_round_import(id: int): - round, rr = get_round_rr(id, mo.rights.Right.manage_contest) - - form = ImportForm() - errs = [] - if form.validate_on_submit(): - tmp_name = secrets.token_hex(16) + '.csv' - tmp_path = os.path.join(app.instance_path, 'imports', tmp_name) - form.file.data.save(tmp_path) - app.logger.info('Import: Zpracovávám soubor %s pro round=%s, uid=%s', tmp_name, round.round_code(), g.user.user_id) - - imp = mo.imports.Import(g.user) - if imp.import_contest(round, None, tmp_path): - mo.util.log( - type=db.LogType.round, - what=round.round_id, - details={'action': 'import'} - ) - db.get_session().commit() - flash('Účastníci importováni', 'success') - return redirect(url_for('org_round', id=round.round_id)) - else: - flash('Došlo k chybě při importu (detaily níže)', 'danger') - errs = imp.errors - - return render_template( - 'org_round_import.html', - round=round, - form=form, - errs=errs, - ) - - -def get_contest(id: int) -> db.Contest: - contest = (db.get_session().query(db.Contest) - .options(joinedload(db.Contest.place), - joinedload(db.Contest.round)) - .get(id)) - if not contest: - raise werkzeug.exceptions.NotFound() - return contest - - -def get_contest_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Contest, mo.rights.Rights]: - contest = get_contest(id) - - rr = mo.rights.Rights(g.user) - rr.get_for_contest(contest) - - if not (right_needed is None or rr.have_right(right_needed)): - raise werkzeug.exceptions.Forbidden() - - return contest, rr - - -@app.route('/org/contest/c/<int:id>') -def org_contest(id: int): - contest, rr = get_contest_rr(id, None) - - return render_template( - 'org_contest.html', - contest=contest, - rights=sorted(rr.current_rights, key=lambda r: r. name), - can_manage=rr.have_right(mo.rights.Right.manage_contest), - ) - - -@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST')) -def org_contest_import(id: int): - contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest) - - form = ImportForm() - errs = [] - if form.validate_on_submit(): - tmp_name = secrets.token_hex(16) + '.csv' - tmp_path = os.path.join(app.instance_path, 'imports', tmp_name) - form.file.data.save(tmp_path) - app.logger.info('Import: Zpracovávám soubor %s pro contest_id=%s, uid=%s', tmp_name, contest.contest_id, g.user.user_id) - - imp = mo.imports.Import(g.user) - if imp.import_contest(contest.round, contest, tmp_path): - mo.util.log( - type=db.LogType.contest, - what=contest.contest_id, - details={'action': 'import'} - ) - db.get_session().commit() - flash('Účastníci importováni', 'success') - return redirect(url_for('org_contest', id=contest.contest_id)) - else: - flash('Došlo k chybě při importu (detaily níže)', 'danger') - errs = imp.errors - - return render_template( - 'org_contest_import.html', - contest=contest, - form=form, - errs=errs, - ) - - -@app.route('/org/contest/import/help.html') -def org_contest_import_help(): - return render_template('org_contest_import_help.html') - - -@app.route('/org/contest/import/sablona.csv') -def org_contest_import_template(): - out = mo.imports.contest_template() - resp = app.make_response(out) - resp.content_type = 'text/csv; charset=utf=8' - return resp - - -contest_list_columns = ( - Column(key='first_name', name='krestni', title='Křestní jméno'), - Column(key='last_name', name='prijmeni', title='Příjmení'), - Column(key='email', name='email', title='E-mail'), - Column(key='school', name='skola', title='Škola'), - Column(key='school_code', name='kod_skoly', title='Kód školy'), - Column(key='grade', name='rocnik', title='Ročník'), - Column(key='born_year', name='rok_naroz', title='Rok naroz.'), - Column(key='place_code', name='kod_soutez_mista', title='Sout. místo'), - Column(key='status', name='stav', title='Stav'), -) - - -@app.route('/org/contest/c/<int:id>/ucastnici') -def org_contest_list(id: int): - contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest) - format = request.args.get('format', "") - - ctants = (db.get_session() - .query(db.Participation, db.Participant) - .select_from(db.Participation) - .join(db.Participant, db.Participant.user_id == db.Participation.user_id) - .options(joinedload(db.Participation.user), - joinedload(db.Participation.place), - joinedload(db.Participant.school_place)) - .filter(db.Participation.contest == contest) - .filter(db.Participant.year == contest.round.year) - .all()) - - rows: List[dict] = [] - for pion, pant in ctants: - rows.append({ - 'first_name': pion.user.first_name, - 'last_name': pion.user.last_name, - 'email': cell_user_link(pion.user, pion.user.email), - 'school': pant.school_place.name, - 'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()), - 'grade': pant.grade, - 'born_year': pant.birth_year, - 'place_code': pion.place.get_code(), - 'status': pion.state.name, - }) - - rows.sort(key=lambda r: (locale.strxfrm(r['last_name']), locale.strxfrm(r['first_name']))) - - table = Table( - columns=contest_list_columns, - rows=rows, - filename='ucastnici', - ) - - if format == "": - return render_template( - 'org_contest_list.html', - contest=contest, - table=table, - ) - else: - return table.send_as(format) diff --git a/mo/web/org_contest.py b/mo/web/org_contest.py new file mode 100644 index 0000000000000000000000000000000000000000..70ef811743d3cdc011b357c4aed2c3d2f89ea00c --- /dev/null +++ b/mo/web/org_contest.py @@ -0,0 +1,254 @@ +from flask import render_template, g, redirect, url_for, flash, request +from flask_wtf import FlaskForm +import flask_wtf.file +import locale +import os +import secrets +from sqlalchemy.orm import joinedload +from typing import List, Tuple, Optional +import werkzeug.exceptions +import wtforms + +import mo +import mo.csv +import mo.db as db +import mo.imports +import mo.rights +import mo.util +from mo.web import app +from mo.web.table import Table, Column, cell_place_link, cell_user_link +import wtforms.validators as validators + + +class ImportForm(FlaskForm): + file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()]) + submit = wtforms.SubmitField('Importovat') + + +@app.route('/org/contest/') +def org_contest_root(): + sess = db.get_session() + + rounds = sess.query(db.Round).filter_by(year=mo.current_year).order_by(db.Round.year, db.Round.category, db.Round.seq) + return render_template('org_contest_root.html', rounds=rounds, level_names=mo.db.place_level_names) + + +def get_round(id: int) -> db.Round: + round = db.get_session().query(db.Round).get(id) + if not round: + raise werkzeug.exceptions.NotFound() + return round + + +def get_round_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Round, mo.rights.Rights]: + round = get_round(id) + + rr = mo.rights.Rights(g.user) + rr.get_for_round(round) + + if not (right_needed is None or rr.have_right(right_needed)): + raise werkzeug.exceptions.Forbidden() + + return round, rr + + +@app.route('/org/contest/r/<int:id>/') +def org_round(id: int): + sess = db.get_session() + round, rr = get_round_rr(id, None) + + contests = (sess.query(db.Contest) + .filter_by(round=round) + .options(joinedload(db.Contest.place)) + .all()) + + contests.sort(key=lambda c: locale.strxfrm(c.place.name)) + + return render_template( + 'org_round.html', + round=round, + contests=contests, + level_names=mo.db.place_level_names, + can_manage=rr.have_right(mo.rights.Right.manage_contest), + ) + + +@app.route('/org/contest/r/<int:id>/list') +def org_round_list(id: int): + return render_template('not_implemented.html') + + +@app.route('/org/contest/r/<int:id>/import', methods=('GET', 'POST')) +def org_round_import(id: int): + round, rr = get_round_rr(id, mo.rights.Right.manage_contest) + + form = ImportForm() + errs = [] + if form.validate_on_submit(): + tmp_name = secrets.token_hex(16) + '.csv' + tmp_path = os.path.join(app.instance_path, 'imports', tmp_name) + form.file.data.save(tmp_path) + app.logger.info('Import: Zpracovávám soubor %s pro round=%s, uid=%s', tmp_name, round.round_code(), g.user.user_id) + + imp = mo.imports.Import(g.user) + if imp.import_contest(round, None, tmp_path): + mo.util.log( + type=db.LogType.round, + what=round.round_id, + details={'action': 'import'} + ) + db.get_session().commit() + flash('Účastníci importováni', 'success') + return redirect(url_for('org_round', id=round.round_id)) + else: + flash('Došlo k chybě při importu (detaily níže)', 'danger') + errs = imp.errors + + return render_template( + 'org_round_import.html', + round=round, + form=form, + errs=errs, + ) + + +def get_contest(id: int) -> db.Contest: + contest = (db.get_session().query(db.Contest) + .options(joinedload(db.Contest.place), + joinedload(db.Contest.round)) + .get(id)) + if not contest: + raise werkzeug.exceptions.NotFound() + return contest + + +def get_contest_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Contest, mo.rights.Rights]: + contest = get_contest(id) + + rr = mo.rights.Rights(g.user) + rr.get_for_contest(contest) + + if not (right_needed is None or rr.have_right(right_needed)): + raise werkzeug.exceptions.Forbidden() + + return contest, rr + + +@app.route('/org/contest/c/<int:id>') +def org_contest(id: int): + contest, rr = get_contest_rr(id, None) + + return render_template( + 'org_contest.html', + contest=contest, + rights=sorted(rr.current_rights, key=lambda r: r. name), + can_manage=rr.have_right(mo.rights.Right.manage_contest), + ) + + +@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST')) +def org_contest_import(id: int): + contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest) + + form = ImportForm() + errs = [] + if form.validate_on_submit(): + tmp_name = secrets.token_hex(16) + '.csv' + tmp_path = os.path.join(app.instance_path, 'imports', tmp_name) + form.file.data.save(tmp_path) + app.logger.info('Import: Zpracovávám soubor %s pro contest_id=%s, uid=%s', tmp_name, contest.contest_id, g.user.user_id) + + imp = mo.imports.Import(g.user) + if imp.import_contest(contest.round, contest, tmp_path): + mo.util.log( + type=db.LogType.contest, + what=contest.contest_id, + details={'action': 'import'} + ) + db.get_session().commit() + flash('Účastníci importováni', 'success') + return redirect(url_for('org_contest', id=contest.contest_id)) + else: + flash('Došlo k chybě při importu (detaily níže)', 'danger') + errs = imp.errors + + return render_template( + 'org_contest_import.html', + contest=contest, + form=form, + errs=errs, + ) + + +@app.route('/org/contest/import/help.html') +def org_contest_import_help(): + return render_template('org_contest_import_help.html') + + +@app.route('/org/contest/import/sablona.csv') +def org_contest_import_template(): + out = mo.imports.contest_template() + resp = app.make_response(out) + resp.content_type = 'text/csv; charset=utf=8' + return resp + + +contest_list_columns = ( + Column(key='first_name', name='krestni', title='Křestní jméno'), + Column(key='last_name', name='prijmeni', title='Příjmení'), + Column(key='email', name='email', title='E-mail'), + Column(key='school', name='skola', title='Škola'), + Column(key='school_code', name='kod_skoly', title='Kód školy'), + Column(key='grade', name='rocnik', title='Ročník'), + Column(key='born_year', name='rok_naroz', title='Rok naroz.'), + Column(key='place_code', name='kod_soutez_mista', title='Sout. místo'), + Column(key='status', name='stav', title='Stav'), +) + + +@app.route('/org/contest/c/<int:id>/ucastnici') +def org_contest_list(id: int): + contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest) + format = request.args.get('format', "") + + ctants = (db.get_session() + .query(db.Participation, db.Participant) + .select_from(db.Participation) + .join(db.Participant, db.Participant.user_id == db.Participation.user_id) + .options(joinedload(db.Participation.user), + joinedload(db.Participation.place), + joinedload(db.Participant.school_place)) + .filter(db.Participation.contest == contest) + .filter(db.Participant.year == contest.round.year) + .all()) + + rows: List[dict] = [] + for pion, pant in ctants: + rows.append({ + 'first_name': pion.user.first_name, + 'last_name': pion.user.last_name, + 'email': cell_user_link(pion.user, pion.user.email), + 'school': pant.school_place.name, + 'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()), + 'grade': pant.grade, + 'born_year': pant.birth_year, + 'place_code': pion.place.get_code(), + 'status': pion.state.name, + }) + + rows.sort(key=lambda r: (locale.strxfrm(r['last_name']), locale.strxfrm(r['first_name']))) + + table = Table( + columns=contest_list_columns, + rows=rows, + filename='ucastnici', + ) + + if format == "": + return render_template( + 'org_contest_list.html', + contest=contest, + table=table, + ) + else: + return table.send_as(format) diff --git a/mo/web/org_place.py b/mo/web/org_place.py new file mode 100644 index 0000000000000000000000000000000000000000..4c23d1c67a86e7bf1d0ba81c315754938d10b26c --- /dev/null +++ b/mo/web/org_place.py @@ -0,0 +1,369 @@ +from flask import render_template, g, redirect, url_for, flash, request +from flask_wtf import FlaskForm +import locale +from sqlalchemy.orm import joinedload +from typing import List, Tuple, Optional +import werkzeug.exceptions +import wtforms + +import mo +import mo.csv +import mo.db as db +import mo.imports +import mo.rights +import mo.util +from mo.web import app +import wtforms.validators as validators + + +@app.route('/org/place/<int:id>/') +def org_place(id: int): + sess = db.get_session() + + place = sess.query(db.Place).get(id) + if not place: + raise werkzeug.exceptions.NotFound() + + if place.type == db.PlaceType.school: + school = sess.query(db.School).get(place.place_id) + else: + school = None + + children = sorted(place.children, key=lambda p: locale.strxfrm(p.name)) + + rr = mo.rights.Rights(g.user) + rr.get_for(place) + + return render_template( + 'org_place.html', place=place, school=school, + rights=sorted(rr.current_rights, key=lambda r: r. name), + can_edit=rr.can_edit_place(place), + children=children + ) + + +class PlaceEditForm(FlaskForm): + name = wtforms.StringField( + 'Název', + validators=[validators.DataRequired()] + ) + code = wtforms.StringField( + 'Kód', filters=[lambda x: x or None], # may be NULL in db + description="Při nevyplnění se použije ID místa" + ) + type = wtforms.SelectField( + 'Typ', choices=db.PlaceType.choices(), coerce=db.PlaceType.coerce + ) + nuts = wtforms.StringField( + 'NUTS', filters=[lambda x: x or None], # may be NULL in db + description="Pro okresy a výše" + ) + note = wtforms.StringField('Poznámka') + submit = wtforms.SubmitField('Uložit') + + +class PlaceSchoolEditForm(PlaceEditForm): + red_izo = wtforms.StringField('RED_IZO') + ico = wtforms.StringField('IČO') + official_name = wtforms.StringField('Oficiální název') + address = wtforms.StringField('Adresa') + is_zs = wtforms.BooleanField('ZŠ') + is_ss = wtforms.BooleanField('SŠ') + submit = wtforms.SubmitField('Uložit') + + +@app.route('/org/place/<int:id>/edit', methods=('GET', 'POST')) +def org_place_edit(id: int): + sess = db.get_session() + + place = sess.query(db.Place).get(id) + if not place: + raise werkzeug.exceptions.NotFound() + + rr = mo.rights.Rights(g.user) + rr.get_for(place) + + if not rr.can_edit_place(place): + raise werkzeug.exceptions.Forbidden() + + if place.type == db.PlaceType.school: + school = sess.query(db.School).get(place.place_id) + # Pass school data as additional dict (data is used after obj) + form = PlaceSchoolEditForm(obj=place, data=db.row2dict(school)) + else: + form = PlaceEditForm(obj=place) + school = None + + form.type.choices = db.PlaceType.choices(level=place.level) + + if form.validate_on_submit(): + form.populate_obj(place) + if school: + form.populate_obj(school) + + msg = 'Změny místa uloženy' + redirectURL = url_for('org_place', id=id) + + if sess.is_modified(place) or school and sess.is_modified(school): + placeChanges = db.get_object_changes(place) + schoolChanges = {} + if school: + if request.form.get('type') != 'school': + # School record removed + mo.util.log( + type=db.LogType.place, + what=school.place_id, + details={'action': 'school-delete', 'school': db.row2dict(school)}, + ) + app.logger.info(f"Deleting school record for place {place.place_id}") + db.get_session().delete(school) + msg = 'Změny místa uloženy, záznam o škole smazán' + else: + schoolChanges = db.get_object_changes(school) + elif request.form.get('type') == 'school': + # School record created + new_school = db.School() + new_school.place_id = place.place_id + mo.util.log( + type=db.LogType.place, + what=new_school.place_id, + details={'action': 'school-add'}, + ) + app.logger.info(f"Creating new school for place {place.place_id}") + db.get_session().add(new_school) + # Take org directly to the school edit to fill the data + msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data' + redirectURL = url_for('org_place_edit', id=id) + + changes = {**placeChanges, **schoolChanges} + app.logger.info(f"Place {id} modified, changes: {changes}") + mo.util.log( + type=db.LogType.place, + what=id, + details={'action': 'edit', 'changes': changes}, + ) + db.get_session().commit() + flash(msg, 'success') + else: + flash(u'Žádné změny k uložení', 'info') + + return redirect(redirectURL) + + parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top + + return render_template( + 'org_place_edit.html', place=place, school=school, + parents=parents, form=form + ) + + +class PlaceMoveForm(FlaskForm): + code = wtforms.StringField(validators=[validators.DataRequired()]) + submit = wtforms.SubmitField('Najít místo') + reset = wtforms.HiddenField() + move = wtforms.HiddenField() + + +class PlaceMoveConfirmForm(FlaskForm): + code = wtforms.HiddenField() + reset = wtforms.SubmitField('Zrušit') + move = wtforms.SubmitField('Přesunout') + + +@app.route('/org/place/<int:id>/move', methods=('GET', 'POST')) +def org_place_move(id: int): + sess = db.get_session() + + # Tests: can move only existing places that we can edit + place = sess.query(db.Place).get(id) + if not place: + raise werkzeug.exceptions.NotFound() + rr = mo.rights.Rights(g.user) + rr.get_for(place) + if not rr.can_edit_place(place): + raise werkzeug.exceptions.Forbidden() + + parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top + new_parents = None + search_error = None + + form = PlaceMoveForm() + form_confirm = None + if form.validate_on_submit(): + if form.reset.data: + return redirect(url_for('org_place_move', id=id)) + + new_parent = db.get_place_by_code(form.code.data) + if not new_parent: + search_error = 'Místo s tímto kódem se nepovedlo nalézt' + else: + new_parents = reversed(db.get_place_parents(new_parent)) + (_, levels) = db.place_type_names_and_levels[place.type] + + rr.get_for(new_parent) + if not rr.can_edit_place(new_parent): + search_error = 'Nemáte právo k editaci vybraného nadřazeného místa, přesun nelze uskutečnit' + elif (new_parent.level + 1) not in levels: + search_error = f'Toto místo ({place.type_name()}) nelze přemístit pod vybrané místo ({new_parent.type_name()}), dostalo by se na nepovolený level' + elif new_parent.place_id == place.parent: + search_error = 'Žádná změna, místo je zde již umístěno' + elif form.move.data: + # Everything is OK, if submitted with 'move' do the move + place.parent = new_parent.place_id + place.level = new_parent.level + 1 + changes = db.get_object_changes(place) + mo.util.log( + type=db.LogType.place, + what=id, + details={'action': 'move', 'changes': changes}, + ) + app.logger.info(f"Place {id} moved, changes: {changes}") + db.get_session().commit() + flash('Místo úspěšně přesunuto', 'success') + return redirect(url_for('org_place', id=id)) + else: + # OK but not confirmed yet, display the confirm form + form_confirm = PlaceMoveConfirmForm() + form_confirm.code.data = form.code.data + + return render_template( + 'org_place_move.html', + place=place, form=form, form_confirm=form_confirm, search_error=search_error, + parents=parents, new_parents=new_parents + ) + + +@app.route('/org/place/<int:id>/delete', methods=('POST',)) +def org_place_delete(id: int): + sess = db.get_session() + + # Tests: can delete only existing places that we can edit + place = sess.query(db.Place).get(id) + if not place: + raise werkzeug.exceptions.NotFound() + rr = mo.rights.Rights(g.user) + rr.get_for(place) + if not rr.can_edit_place(place): + raise werkzeug.exceptions.Forbidden() + + # Cannot delete place with children + if place.children: + flash("Nelze smazat místo s podřízenými místy", "danger") + return redirect(url_for('org_place', id=id)) + + # Cannot delete place with contests + if sess.query(db.Contest).filter_by(place_id=id).count() > 0: + flash("Nelze smazat místo ke kterému se váže nějaká soutěž ", "danger") + return redirect(url_for('org_place', id=id)) + + if place.type == db.PlaceType.school: + school = sess.query(db.School).get(place.place_id) + mo.util.log( + type=db.LogType.place, + what=school.place_id, + details={'action': 'school-delete', 'school': db.row2dict(school)}, + ) + app.logger.info(f"Deleting school record for place {id}") + db.get_session().delete(school) + + mo.util.log( + type=db.LogType.place, + what=id, + details={'action': 'delete', 'place': db.row2dict(place)}, + ) + app.logger.info(f"Deleting place {id}") + + parent = place.parent + db.get_session().delete(place) + db.get_session().commit() + + flash("Místo smazáno", "success") + return redirect(url_for('org_place', id=parent)) + + +@app.route('/org/place/<int:id>/new-child', methods=('GET', 'POST')) +def org_place_new_child(id: int): + sess = db.get_session() + + # Tests: can add new child only under existing places that we can edit + parent_place = sess.query(db.Place).get(id) + if not parent_place: + raise werkzeug.exceptions.NotFound() + rr = mo.rights.Rights(g.user) + rr.get_for(parent_place) + if not rr.can_edit_place(parent_place): + raise werkzeug.exceptions.Forbidden() + if not parent_place.can_have_child(): + raise werkzeug.exceptions.Forbidden() + + form = PlaceEditForm() + form.type.choices = db.PlaceType.choices(level=parent_place.level + 1) + + if form.validate_on_submit(): + new_place = db.Place() + form.populate_obj(new_place) + new_place.parent = parent_place.place_id + new_place.level = parent_place.level + 1 + sess.add(new_place) + sess.flush() + + app.logger.info(f"New place created: {db.row2dict(new_place)}") + mo.util.log( + type=db.LogType.place, + what=new_place.place_id, + details={'action': 'new', 'place': db.row2dict(new_place)}, + ) + + redirect_url = url_for('org_place', id=new_place.place_id) + msg = 'Nové místo uloženo' + + if new_place.type == db.PlaceType.school: + new_school = db.School() + new_school.place_id = new_place.place_id + mo.util.log( + type=db.LogType.place, + what=new_school.place_id, + details={'action': 'school-add'}, + ) + app.logger.info(f"Creating new school for place {new_place.place_id}") + sess.add(new_school) + # Take org directly to the school edit to fill the data + msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data' + redirect_url = url_for('org_place_edit', id=new_place.place_id) + + sess.commit() + flash(msg, 'success') + return redirect(redirect_url) + + parents = reversed(db.get_place_parents(parent_place)) + + return render_template('org_place_new.html', parents=parents, form=form) + + +@app.route('/org/place/') +def org_place_root(): + root = db.get_root_place() + return redirect(url_for('org_place', id=root.place_id)) + + +@app.route('/org/place/<int:id>/rights') +def org_place_rights(id: int): + sess = db.get_session() + + place = sess.query(db.Place).get(id) + if not place: + raise werkzeug.exceptions.NotFound() + + parent_ids = [p.place_id for p in db.get_place_parents(place)] + roles = (sess.query(db.UserRole) + .filter(db.UserRole.place_id.in_(parent_ids)) + .options(joinedload(db.UserRole.user)) + .all()) + + rr = mo.rights.Rights(g.user) + rr.get_for(place) + + return render_template( + 'org_place_rights.html', place=place, rights=rr.current_rights, + roles=roles, roles_by_type=mo.rights.roles_by_type + )