Skip to content
Snippets Groups Projects
Commit 62a0b6c9 authored by Martin Mareš's avatar Martin Mareš
Browse files

mo.org rozděleno na mo.org_contest a mo.org_place

parent 0d250fff
No related branches found
No related tags found
No related merge requests found
......@@ -63,5 +63,7 @@ app.before_request(init_request)
# Většina webu je v samostatných modulech
import mo.web.main
import mo.web.org
import mo.web.org_contest
import mo.web.org_place
import mo.web.org_users
import mo.web.menu
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
import locale
import os
import secrets
from sqlalchemy.orm import joinedload
from typing import List, Tuple, Optional
import werkzeug.exceptions
import wtforms
from flask import render_template, g, redirect, url_for
import mo
import mo.csv
import mo.db as db
import mo.imports
import mo.rights
import mo.util
from mo.web import app
from mo.web.table import Table, Column, cell_place_link, cell_user_link
import wtforms.validators as validators
@app.route('/org/')
def org_index():
return render_template('org_index.html')
@app.route('/org/place/<int:id>/')
def org_place(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
else:
school = None
children = sorted(place.children, key=lambda p: locale.strxfrm(p.name))
rr = mo.rights.Rights(g.user)
rr.get_for(place)
return render_template(
'org_place.html', place=place, school=school,
rights=sorted(rr.current_rights, key=lambda r: r. name),
can_edit=rr.can_edit_place(place),
children=children
)
class PlaceEditForm(FlaskForm):
name = wtforms.StringField(
'Název',
validators=[validators.DataRequired()]
)
code = wtforms.StringField(
'Kód', filters=[lambda x: x or None], # may be NULL in db
description="Při nevyplnění se použije ID místa"
)
type = wtforms.SelectField(
'Typ', choices=db.PlaceType.choices(), coerce=db.PlaceType.coerce
)
nuts = wtforms.StringField(
'NUTS', filters=[lambda x: x or None], # may be NULL in db
description="Pro okresy a výše"
)
note = wtforms.StringField('Poznámka')
submit = wtforms.SubmitField('Uložit')
class PlaceSchoolEditForm(PlaceEditForm):
red_izo = wtforms.StringField('RED_IZO')
ico = wtforms.StringField('IČO')
official_name = wtforms.StringField('Oficiální název')
address = wtforms.StringField('Adresa')
is_zs = wtforms.BooleanField('')
is_ss = wtforms.BooleanField('')
submit = wtforms.SubmitField('Uložit')
@app.route('/org/place/<int:id>/edit', methods=('GET', 'POST'))
def org_place_edit(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
# Pass school data as additional dict (data is used after obj)
form = PlaceSchoolEditForm(obj=place, data=db.row2dict(school))
else:
form = PlaceEditForm(obj=place)
school = None
form.type.choices = db.PlaceType.choices(level=place.level)
if form.validate_on_submit():
form.populate_obj(place)
if school:
form.populate_obj(school)
msg = 'Změny místa uloženy'
redirectURL = url_for('org_place', id=id)
if sess.is_modified(place) or school and sess.is_modified(school):
placeChanges = db.get_object_changes(place)
schoolChanges = {}
if school:
if request.form.get('type') != 'school':
# School record removed
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'school-delete', 'school': db.row2dict(school)},
)
app.logger.info(f"Deleting school record for place {place.place_id}")
db.get_session().delete(school)
msg = 'Změny místa uloženy, záznam o škole smazán'
else:
schoolChanges = db.get_object_changes(school)
elif request.form.get('type') == 'school':
# School record created
new_school = db.School()
new_school.place_id = place.place_id
mo.util.log(
type=db.LogType.place,
what=new_school.place_id,
details={'action': 'school-add'},
)
app.logger.info(f"Creating new school for place {place.place_id}")
db.get_session().add(new_school)
# Take org directly to the school edit to fill the data
msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data'
redirectURL = url_for('org_place_edit', id=id)
changes = {**placeChanges, **schoolChanges}
app.logger.info(f"Place {id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'edit', 'changes': changes},
)
db.get_session().commit()
flash(msg, 'success')
else:
flash(u'Žádné změny k uložení', 'info')
return redirect(redirectURL)
parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top
return render_template(
'org_place_edit.html', place=place, school=school,
parents=parents, form=form
)
class PlaceMoveForm(FlaskForm):
code = wtforms.StringField(validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Najít místo')
reset = wtforms.HiddenField()
move = wtforms.HiddenField()
class PlaceMoveConfirmForm(FlaskForm):
code = wtforms.HiddenField()
reset = wtforms.SubmitField('Zrušit')
move = wtforms.SubmitField('Přesunout')
@app.route('/org/place/<int:id>/move', methods=('GET', 'POST'))
def org_place_move(id: int):
sess = db.get_session()
# Tests: can move only existing places that we can edit
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top
new_parents = None
search_error = None
form = PlaceMoveForm()
form_confirm = None
if form.validate_on_submit():
if form.reset.data:
return redirect(url_for('org_place_move', id=id))
new_parent = db.get_place_by_code(form.code.data)
if not new_parent:
search_error = 'Místo s tímto kódem se nepovedlo nalézt'
else:
new_parents = reversed(db.get_place_parents(new_parent))
(_, levels) = db.place_type_names_and_levels[place.type]
rr.get_for(new_parent)
if not rr.can_edit_place(new_parent):
search_error = 'Nemáte právo k editaci vybraného nadřazeného místa, přesun nelze uskutečnit'
elif (new_parent.level + 1) not in levels:
search_error = f'Toto místo ({place.type_name()}) nelze přemístit pod vybrané místo ({new_parent.type_name()}), dostalo by se na nepovolený level'
elif new_parent.place_id == place.parent:
search_error = 'Žádná změna, místo je zde již umístěno'
elif form.move.data:
# Everything is OK, if submitted with 'move' do the move
place.parent = new_parent.place_id
place.level = new_parent.level + 1
changes = db.get_object_changes(place)
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'move', 'changes': changes},
)
app.logger.info(f"Place {id} moved, changes: {changes}")
db.get_session().commit()
flash('Místo úspěšně přesunuto', 'success')
return redirect(url_for('org_place', id=id))
else:
# OK but not confirmed yet, display the confirm form
form_confirm = PlaceMoveConfirmForm()
form_confirm.code.data = form.code.data
return render_template(
'org_place_move.html',
place=place, form=form, form_confirm=form_confirm, search_error=search_error,
parents=parents, new_parents=new_parents
)
@app.route('/org/place/<int:id>/delete', methods=('POST',))
def org_place_delete(id: int):
sess = db.get_session()
# Tests: can delete only existing places that we can edit
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
# Cannot delete place with children
if place.children:
flash("Nelze smazat místo s podřízenými místy", "danger")
return redirect(url_for('org_place', id=id))
# Cannot delete place with contests
if sess.query(db.Contest).filter_by(place_id=id).count() > 0:
flash("Nelze smazat místo ke kterému se váže nějaká soutěž ", "danger")
return redirect(url_for('org_place', id=id))
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'school-delete', 'school': db.row2dict(school)},
)
app.logger.info(f"Deleting school record for place {id}")
db.get_session().delete(school)
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'delete', 'place': db.row2dict(place)},
)
app.logger.info(f"Deleting place {id}")
parent = place.parent
db.get_session().delete(place)
db.get_session().commit()
flash("Místo smazáno", "success")
return redirect(url_for('org_place', id=parent))
@app.route('/org/place/<int:id>/new-child', methods=('GET', 'POST'))
def org_place_new_child(id: int):
sess = db.get_session()
# Tests: can add new child only under existing places that we can edit
parent_place = sess.query(db.Place).get(id)
if not parent_place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(parent_place)
if not rr.can_edit_place(parent_place):
raise werkzeug.exceptions.Forbidden()
if not parent_place.can_have_child():
raise werkzeug.exceptions.Forbidden()
form = PlaceEditForm()
form.type.choices = db.PlaceType.choices(level=parent_place.level + 1)
if form.validate_on_submit():
new_place = db.Place()
form.populate_obj(new_place)
new_place.parent = parent_place.place_id
new_place.level = parent_place.level + 1
sess.add(new_place)
sess.flush()
app.logger.info(f"New place created: {db.row2dict(new_place)}")
mo.util.log(
type=db.LogType.place,
what=new_place.place_id,
details={'action': 'new', 'place': db.row2dict(new_place)},
)
redirect_url = url_for('org_place', id=new_place.place_id)
msg = 'Nové místo uloženo'
if new_place.type == db.PlaceType.school:
new_school = db.School()
new_school.place_id = new_place.place_id
mo.util.log(
type=db.LogType.place,
what=new_school.place_id,
details={'action': 'school-add'},
)
app.logger.info(f"Creating new school for place {new_place.place_id}")
sess.add(new_school)
# Take org directly to the school edit to fill the data
msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data'
redirect_url = url_for('org_place_edit', id=new_place.place_id)
sess.commit()
flash(msg, 'success')
return redirect(redirect_url)
parents = reversed(db.get_place_parents(parent_place))
return render_template('org_place_new.html', parents=parents, form=form)
@app.route('/org/place/')
def org_place_root():
root = db.get_root_place()
return redirect(url_for('org_place', id=root.place_id))
@app.route('/org/place/<int:id>/rights')
def org_place_rights(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
parent_ids = [p.place_id for p in db.get_place_parents(place)]
roles = (sess.query(db.UserRole)
.filter(db.UserRole.place_id.in_(parent_ids))
.options(joinedload(db.UserRole.user))
.all())
rr = mo.rights.Rights(g.user)
rr.get_for(place)
return render_template(
'org_place_rights.html', place=place, rights=rr.current_rights,
roles=roles, roles_by_type=mo.rights.roles_by_type
)
class ImportForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()])
submit = wtforms.SubmitField('Importovat')
@app.route('/org/contest/')
def org_contest_root():
sess = db.get_session()
rounds = sess.query(db.Round).filter_by(year=mo.current_year).order_by(db.Round.year, db.Round.category, db.Round.seq)
return render_template('org_contest_root.html', rounds=rounds, level_names=mo.db.place_level_names)
def get_round(id: int) -> db.Round:
round = db.get_session().query(db.Round).get(id)
if not round:
raise werkzeug.exceptions.NotFound()
return round
def get_round_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Round, mo.rights.Rights]:
round = get_round(id)
rr = mo.rights.Rights(g.user)
rr.get_for_round(round)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return round, rr
@app.route('/org/contest/r/<int:id>/')
def org_round(id: int):
sess = db.get_session()
round, rr = get_round_rr(id, None)
contests = (sess.query(db.Contest)
.filter_by(round=round)
.options(joinedload(db.Contest.place))
.all())
contests.sort(key=lambda c: locale.strxfrm(c.place.name))
return render_template(
'org_round.html',
round=round,
contests=contests,
level_names=mo.db.place_level_names,
can_manage=rr.have_right(mo.rights.Right.manage_contest),
)
@app.route('/org/contest/r/<int:id>/list')
def org_round_list(id: int):
return render_template('not_implemented.html')
@app.route('/org/contest/r/<int:id>/import', methods=('GET', 'POST'))
def org_round_import(id: int):
round, rr = get_round_rr(id, mo.rights.Right.manage_contest)
form = ImportForm()
errs = []
if form.validate_on_submit():
tmp_name = secrets.token_hex(16) + '.csv'
tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
form.file.data.save(tmp_path)
app.logger.info('Import: Zpracovávám soubor %s pro round=%s, uid=%s', tmp_name, round.round_code(), g.user.user_id)
imp = mo.imports.Import(g.user)
if imp.import_contest(round, None, tmp_path):
mo.util.log(
type=db.LogType.round,
what=round.round_id,
details={'action': 'import'}
)
db.get_session().commit()
flash('Účastníci importováni', 'success')
return redirect(url_for('org_round', id=round.round_id))
else:
flash('Došlo k chybě při importu (detaily níže)', 'danger')
errs = imp.errors
return render_template(
'org_round_import.html',
round=round,
form=form,
errs=errs,
)
def get_contest(id: int) -> db.Contest:
contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round))
.get(id))
if not contest:
raise werkzeug.exceptions.NotFound()
return contest
def get_contest_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Contest, mo.rights.Rights]:
contest = get_contest(id)
rr = mo.rights.Rights(g.user)
rr.get_for_contest(contest)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, rr
@app.route('/org/contest/c/<int:id>')
def org_contest(id: int):
contest, rr = get_contest_rr(id, None)
return render_template(
'org_contest.html',
contest=contest,
rights=sorted(rr.current_rights, key=lambda r: r. name),
can_manage=rr.have_right(mo.rights.Right.manage_contest),
)
@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
def org_contest_import(id: int):
contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
form = ImportForm()
errs = []
if form.validate_on_submit():
tmp_name = secrets.token_hex(16) + '.csv'
tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
form.file.data.save(tmp_path)
app.logger.info('Import: Zpracovávám soubor %s pro contest_id=%s, uid=%s', tmp_name, contest.contest_id, g.user.user_id)
imp = mo.imports.Import(g.user)
if imp.import_contest(contest.round, contest, tmp_path):
mo.util.log(
type=db.LogType.contest,
what=contest.contest_id,
details={'action': 'import'}
)
db.get_session().commit()
flash('Účastníci importováni', 'success')
return redirect(url_for('org_contest', id=contest.contest_id))
else:
flash('Došlo k chybě při importu (detaily níže)', 'danger')
errs = imp.errors
return render_template(
'org_contest_import.html',
contest=contest,
form=form,
errs=errs,
)
@app.route('/org/contest/import/help.html')
def org_contest_import_help():
return render_template('org_contest_import_help.html')
@app.route('/org/contest/import/sablona.csv')
def org_contest_import_template():
out = mo.imports.contest_template()
resp = app.make_response(out)
resp.content_type = 'text/csv; charset=utf=8'
return resp
contest_list_columns = (
Column(key='first_name', name='krestni', title='Křestní jméno'),
Column(key='last_name', name='prijmeni', title='Příjmení'),
Column(key='email', name='email', title='E-mail'),
Column(key='school', name='skola', title='Škola'),
Column(key='school_code', name='kod_skoly', title='Kód školy'),
Column(key='grade', name='rocnik', title='Ročník'),
Column(key='born_year', name='rok_naroz', title='Rok naroz.'),
Column(key='place_code', name='kod_soutez_mista', title='Sout. místo'),
Column(key='status', name='stav', title='Stav'),
)
@app.route('/org/contest/c/<int:id>/ucastnici')
def org_contest_list(id: int):
contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
format = request.args.get('format', "")
ctants = (db.get_session()
.query(db.Participation, db.Participant)
.select_from(db.Participation)
.join(db.Participant, db.Participant.user_id == db.Participation.user_id)
.options(joinedload(db.Participation.user),
joinedload(db.Participation.place),
joinedload(db.Participant.school_place))
.filter(db.Participation.contest == contest)
.filter(db.Participant.year == contest.round.year)
.all())
rows: List[dict] = []
for pion, pant in ctants:
rows.append({
'first_name': pion.user.first_name,
'last_name': pion.user.last_name,
'email': cell_user_link(pion.user, pion.user.email),
'school': pant.school_place.name,
'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()),
'grade': pant.grade,
'born_year': pant.birth_year,
'place_code': pion.place.get_code(),
'status': pion.state.name,
})
rows.sort(key=lambda r: (locale.strxfrm(r['last_name']), locale.strxfrm(r['first_name'])))
table = Table(
columns=contest_list_columns,
rows=rows,
filename='ucastnici',
)
if format == "":
return render_template(
'org_contest_list.html',
contest=contest,
table=table,
)
else:
return table.send_as(format)
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
import locale
import os
import secrets
from sqlalchemy.orm import joinedload
from typing import List, Tuple, Optional
import werkzeug.exceptions
import wtforms
import mo
import mo.csv
import mo.db as db
import mo.imports
import mo.rights
import mo.util
from mo.web import app
from mo.web.table import Table, Column, cell_place_link, cell_user_link
import wtforms.validators as validators
class ImportForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()])
submit = wtforms.SubmitField('Importovat')
@app.route('/org/contest/')
def org_contest_root():
sess = db.get_session()
rounds = sess.query(db.Round).filter_by(year=mo.current_year).order_by(db.Round.year, db.Round.category, db.Round.seq)
return render_template('org_contest_root.html', rounds=rounds, level_names=mo.db.place_level_names)
def get_round(id: int) -> db.Round:
round = db.get_session().query(db.Round).get(id)
if not round:
raise werkzeug.exceptions.NotFound()
return round
def get_round_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Round, mo.rights.Rights]:
round = get_round(id)
rr = mo.rights.Rights(g.user)
rr.get_for_round(round)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return round, rr
@app.route('/org/contest/r/<int:id>/')
def org_round(id: int):
sess = db.get_session()
round, rr = get_round_rr(id, None)
contests = (sess.query(db.Contest)
.filter_by(round=round)
.options(joinedload(db.Contest.place))
.all())
contests.sort(key=lambda c: locale.strxfrm(c.place.name))
return render_template(
'org_round.html',
round=round,
contests=contests,
level_names=mo.db.place_level_names,
can_manage=rr.have_right(mo.rights.Right.manage_contest),
)
@app.route('/org/contest/r/<int:id>/list')
def org_round_list(id: int):
return render_template('not_implemented.html')
@app.route('/org/contest/r/<int:id>/import', methods=('GET', 'POST'))
def org_round_import(id: int):
round, rr = get_round_rr(id, mo.rights.Right.manage_contest)
form = ImportForm()
errs = []
if form.validate_on_submit():
tmp_name = secrets.token_hex(16) + '.csv'
tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
form.file.data.save(tmp_path)
app.logger.info('Import: Zpracovávám soubor %s pro round=%s, uid=%s', tmp_name, round.round_code(), g.user.user_id)
imp = mo.imports.Import(g.user)
if imp.import_contest(round, None, tmp_path):
mo.util.log(
type=db.LogType.round,
what=round.round_id,
details={'action': 'import'}
)
db.get_session().commit()
flash('Účastníci importováni', 'success')
return redirect(url_for('org_round', id=round.round_id))
else:
flash('Došlo k chybě při importu (detaily níže)', 'danger')
errs = imp.errors
return render_template(
'org_round_import.html',
round=round,
form=form,
errs=errs,
)
def get_contest(id: int) -> db.Contest:
contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round))
.get(id))
if not contest:
raise werkzeug.exceptions.NotFound()
return contest
def get_contest_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Contest, mo.rights.Rights]:
contest = get_contest(id)
rr = mo.rights.Rights(g.user)
rr.get_for_contest(contest)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, rr
@app.route('/org/contest/c/<int:id>')
def org_contest(id: int):
contest, rr = get_contest_rr(id, None)
return render_template(
'org_contest.html',
contest=contest,
rights=sorted(rr.current_rights, key=lambda r: r. name),
can_manage=rr.have_right(mo.rights.Right.manage_contest),
)
@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
def org_contest_import(id: int):
contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
form = ImportForm()
errs = []
if form.validate_on_submit():
tmp_name = secrets.token_hex(16) + '.csv'
tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
form.file.data.save(tmp_path)
app.logger.info('Import: Zpracovávám soubor %s pro contest_id=%s, uid=%s', tmp_name, contest.contest_id, g.user.user_id)
imp = mo.imports.Import(g.user)
if imp.import_contest(contest.round, contest, tmp_path):
mo.util.log(
type=db.LogType.contest,
what=contest.contest_id,
details={'action': 'import'}
)
db.get_session().commit()
flash('Účastníci importováni', 'success')
return redirect(url_for('org_contest', id=contest.contest_id))
else:
flash('Došlo k chybě při importu (detaily níže)', 'danger')
errs = imp.errors
return render_template(
'org_contest_import.html',
contest=contest,
form=form,
errs=errs,
)
@app.route('/org/contest/import/help.html')
def org_contest_import_help():
return render_template('org_contest_import_help.html')
@app.route('/org/contest/import/sablona.csv')
def org_contest_import_template():
out = mo.imports.contest_template()
resp = app.make_response(out)
resp.content_type = 'text/csv; charset=utf=8'
return resp
contest_list_columns = (
Column(key='first_name', name='krestni', title='Křestní jméno'),
Column(key='last_name', name='prijmeni', title='Příjmení'),
Column(key='email', name='email', title='E-mail'),
Column(key='school', name='skola', title='Škola'),
Column(key='school_code', name='kod_skoly', title='Kód školy'),
Column(key='grade', name='rocnik', title='Ročník'),
Column(key='born_year', name='rok_naroz', title='Rok naroz.'),
Column(key='place_code', name='kod_soutez_mista', title='Sout. místo'),
Column(key='status', name='stav', title='Stav'),
)
@app.route('/org/contest/c/<int:id>/ucastnici')
def org_contest_list(id: int):
contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
format = request.args.get('format', "")
ctants = (db.get_session()
.query(db.Participation, db.Participant)
.select_from(db.Participation)
.join(db.Participant, db.Participant.user_id == db.Participation.user_id)
.options(joinedload(db.Participation.user),
joinedload(db.Participation.place),
joinedload(db.Participant.school_place))
.filter(db.Participation.contest == contest)
.filter(db.Participant.year == contest.round.year)
.all())
rows: List[dict] = []
for pion, pant in ctants:
rows.append({
'first_name': pion.user.first_name,
'last_name': pion.user.last_name,
'email': cell_user_link(pion.user, pion.user.email),
'school': pant.school_place.name,
'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()),
'grade': pant.grade,
'born_year': pant.birth_year,
'place_code': pion.place.get_code(),
'status': pion.state.name,
})
rows.sort(key=lambda r: (locale.strxfrm(r['last_name']), locale.strxfrm(r['first_name'])))
table = Table(
columns=contest_list_columns,
rows=rows,
filename='ucastnici',
)
if format == "":
return render_template(
'org_contest_list.html',
contest=contest,
table=table,
)
else:
return table.send_as(format)
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import locale
from sqlalchemy.orm import joinedload
from typing import List, Tuple, Optional
import werkzeug.exceptions
import wtforms
import mo
import mo.csv
import mo.db as db
import mo.imports
import mo.rights
import mo.util
from mo.web import app
import wtforms.validators as validators
@app.route('/org/place/<int:id>/')
def org_place(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
else:
school = None
children = sorted(place.children, key=lambda p: locale.strxfrm(p.name))
rr = mo.rights.Rights(g.user)
rr.get_for(place)
return render_template(
'org_place.html', place=place, school=school,
rights=sorted(rr.current_rights, key=lambda r: r. name),
can_edit=rr.can_edit_place(place),
children=children
)
class PlaceEditForm(FlaskForm):
name = wtforms.StringField(
'Název',
validators=[validators.DataRequired()]
)
code = wtforms.StringField(
'Kód', filters=[lambda x: x or None], # may be NULL in db
description="Při nevyplnění se použije ID místa"
)
type = wtforms.SelectField(
'Typ', choices=db.PlaceType.choices(), coerce=db.PlaceType.coerce
)
nuts = wtforms.StringField(
'NUTS', filters=[lambda x: x or None], # may be NULL in db
description="Pro okresy a výše"
)
note = wtforms.StringField('Poznámka')
submit = wtforms.SubmitField('Uložit')
class PlaceSchoolEditForm(PlaceEditForm):
red_izo = wtforms.StringField('RED_IZO')
ico = wtforms.StringField('IČO')
official_name = wtforms.StringField('Oficiální název')
address = wtforms.StringField('Adresa')
is_zs = wtforms.BooleanField('')
is_ss = wtforms.BooleanField('')
submit = wtforms.SubmitField('Uložit')
@app.route('/org/place/<int:id>/edit', methods=('GET', 'POST'))
def org_place_edit(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
# Pass school data as additional dict (data is used after obj)
form = PlaceSchoolEditForm(obj=place, data=db.row2dict(school))
else:
form = PlaceEditForm(obj=place)
school = None
form.type.choices = db.PlaceType.choices(level=place.level)
if form.validate_on_submit():
form.populate_obj(place)
if school:
form.populate_obj(school)
msg = 'Změny místa uloženy'
redirectURL = url_for('org_place', id=id)
if sess.is_modified(place) or school and sess.is_modified(school):
placeChanges = db.get_object_changes(place)
schoolChanges = {}
if school:
if request.form.get('type') != 'school':
# School record removed
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'school-delete', 'school': db.row2dict(school)},
)
app.logger.info(f"Deleting school record for place {place.place_id}")
db.get_session().delete(school)
msg = 'Změny místa uloženy, záznam o škole smazán'
else:
schoolChanges = db.get_object_changes(school)
elif request.form.get('type') == 'school':
# School record created
new_school = db.School()
new_school.place_id = place.place_id
mo.util.log(
type=db.LogType.place,
what=new_school.place_id,
details={'action': 'school-add'},
)
app.logger.info(f"Creating new school for place {place.place_id}")
db.get_session().add(new_school)
# Take org directly to the school edit to fill the data
msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data'
redirectURL = url_for('org_place_edit', id=id)
changes = {**placeChanges, **schoolChanges}
app.logger.info(f"Place {id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'edit', 'changes': changes},
)
db.get_session().commit()
flash(msg, 'success')
else:
flash(u'Žádné změny k uložení', 'info')
return redirect(redirectURL)
parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top
return render_template(
'org_place_edit.html', place=place, school=school,
parents=parents, form=form
)
class PlaceMoveForm(FlaskForm):
code = wtforms.StringField(validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Najít místo')
reset = wtforms.HiddenField()
move = wtforms.HiddenField()
class PlaceMoveConfirmForm(FlaskForm):
code = wtforms.HiddenField()
reset = wtforms.SubmitField('Zrušit')
move = wtforms.SubmitField('Přesunout')
@app.route('/org/place/<int:id>/move', methods=('GET', 'POST'))
def org_place_move(id: int):
sess = db.get_session()
# Tests: can move only existing places that we can edit
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
parents = reversed(db.get_place_parents(place)[1:]) # without place itself and from the top
new_parents = None
search_error = None
form = PlaceMoveForm()
form_confirm = None
if form.validate_on_submit():
if form.reset.data:
return redirect(url_for('org_place_move', id=id))
new_parent = db.get_place_by_code(form.code.data)
if not new_parent:
search_error = 'Místo s tímto kódem se nepovedlo nalézt'
else:
new_parents = reversed(db.get_place_parents(new_parent))
(_, levels) = db.place_type_names_and_levels[place.type]
rr.get_for(new_parent)
if not rr.can_edit_place(new_parent):
search_error = 'Nemáte právo k editaci vybraného nadřazeného místa, přesun nelze uskutečnit'
elif (new_parent.level + 1) not in levels:
search_error = f'Toto místo ({place.type_name()}) nelze přemístit pod vybrané místo ({new_parent.type_name()}), dostalo by se na nepovolený level'
elif new_parent.place_id == place.parent:
search_error = 'Žádná změna, místo je zde již umístěno'
elif form.move.data:
# Everything is OK, if submitted with 'move' do the move
place.parent = new_parent.place_id
place.level = new_parent.level + 1
changes = db.get_object_changes(place)
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'move', 'changes': changes},
)
app.logger.info(f"Place {id} moved, changes: {changes}")
db.get_session().commit()
flash('Místo úspěšně přesunuto', 'success')
return redirect(url_for('org_place', id=id))
else:
# OK but not confirmed yet, display the confirm form
form_confirm = PlaceMoveConfirmForm()
form_confirm.code.data = form.code.data
return render_template(
'org_place_move.html',
place=place, form=form, form_confirm=form_confirm, search_error=search_error,
parents=parents, new_parents=new_parents
)
@app.route('/org/place/<int:id>/delete', methods=('POST',))
def org_place_delete(id: int):
sess = db.get_session()
# Tests: can delete only existing places that we can edit
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
# Cannot delete place with children
if place.children:
flash("Nelze smazat místo s podřízenými místy", "danger")
return redirect(url_for('org_place', id=id))
# Cannot delete place with contests
if sess.query(db.Contest).filter_by(place_id=id).count() > 0:
flash("Nelze smazat místo ke kterému se váže nějaká soutěž ", "danger")
return redirect(url_for('org_place', id=id))
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'school-delete', 'school': db.row2dict(school)},
)
app.logger.info(f"Deleting school record for place {id}")
db.get_session().delete(school)
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'delete', 'place': db.row2dict(place)},
)
app.logger.info(f"Deleting place {id}")
parent = place.parent
db.get_session().delete(place)
db.get_session().commit()
flash("Místo smazáno", "success")
return redirect(url_for('org_place', id=parent))
@app.route('/org/place/<int:id>/new-child', methods=('GET', 'POST'))
def org_place_new_child(id: int):
sess = db.get_session()
# Tests: can add new child only under existing places that we can edit
parent_place = sess.query(db.Place).get(id)
if not parent_place:
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_for(parent_place)
if not rr.can_edit_place(parent_place):
raise werkzeug.exceptions.Forbidden()
if not parent_place.can_have_child():
raise werkzeug.exceptions.Forbidden()
form = PlaceEditForm()
form.type.choices = db.PlaceType.choices(level=parent_place.level + 1)
if form.validate_on_submit():
new_place = db.Place()
form.populate_obj(new_place)
new_place.parent = parent_place.place_id
new_place.level = parent_place.level + 1
sess.add(new_place)
sess.flush()
app.logger.info(f"New place created: {db.row2dict(new_place)}")
mo.util.log(
type=db.LogType.place,
what=new_place.place_id,
details={'action': 'new', 'place': db.row2dict(new_place)},
)
redirect_url = url_for('org_place', id=new_place.place_id)
msg = 'Nové místo uloženo'
if new_place.type == db.PlaceType.school:
new_school = db.School()
new_school.place_id = new_place.place_id
mo.util.log(
type=db.LogType.place,
what=new_school.place_id,
details={'action': 'school-add'},
)
app.logger.info(f"Creating new school for place {new_place.place_id}")
sess.add(new_school)
# Take org directly to the school edit to fill the data
msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data'
redirect_url = url_for('org_place_edit', id=new_place.place_id)
sess.commit()
flash(msg, 'success')
return redirect(redirect_url)
parents = reversed(db.get_place_parents(parent_place))
return render_template('org_place_new.html', parents=parents, form=form)
@app.route('/org/place/')
def org_place_root():
root = db.get_root_place()
return redirect(url_for('org_place', id=root.place_id))
@app.route('/org/place/<int:id>/rights')
def org_place_rights(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
parent_ids = [p.place_id for p in db.get_place_parents(place)]
roles = (sess.query(db.UserRole)
.filter(db.UserRole.place_id.in_(parent_ids))
.options(joinedload(db.UserRole.user))
.all())
rr = mo.rights.Rights(g.user)
rr.get_for(place)
return render_template(
'org_place_rights.html', place=place, rights=rr.current_rights,
roles=roles, roles_by_type=mo.rights.roles_by_type
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment