Select Git revision
org_place.py 17.16 KiB
# Web: Správa míst
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import locale
from markupsafe import Markup
import re
from sqlalchemy import func, and_
from sqlalchemy.orm import joinedload
from typing import List, Optional
import werkzeug.exceptions
import wtforms
import mo
import mo.db as db
import mo.imports
import mo.rights
from mo.rights import Right
import mo.util
from mo.web import app
import mo.web.fields as mo_fields
import wtforms.validators as validators
class PlaceSearchForm(FlaskForm):
query = mo_fields.String(render_kw={'autofocus': True, 'placeholder': 'Kód nebo části názvu'})
submit = wtforms.SubmitField('Hledat')
# URL je explicitně uvedeno v bin/init-schools
@app.route('/org/place/<int:id>/')
def org_place(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
# Formulář nemá side-efekty, takže to může být GET bez CSRF.
search_form = PlaceSearchForm(request.args, meta={'csrf': False})
found_places = None
search_failed = False
search_limited = False
if 'submit' in request.args and search_form.validate():
query = search_form.query.data
query_words = query.split()
if len(query_words) == 1:
found = db.get_place_by_code(query_words[0])
if found is not None:
flash('Nalezeno toto místo', 'info')
return redirect(url_for('org_place', id=found.place_id))
if len(query_words) > 0 and '%' not in query:
max_places = 100
place_q = (sess.query(db.Place)
.filter(db.Place.place_id != place.place_id))
for qw in query_words:
place_q = place_q.filter(func.lower(db.f_unaccent(db.Place.name)).like(func.lower(db.f_unaccent(f'%{qw}%'))))
if place.level > 0:
place_q = place_q.join(db.RegionDescendant, and_(db.RegionDescendant.region == place.place_id,
db.RegionDescendant.descendant == db.Place.place_id))
found_places = (place_q
.options(joinedload(db.Place.parent_place))
.order_by(db.Place.level, db.Place.name, db.Place.place_id)
.limit(max_places)
.all())
if not found_places:
search_failed = True
if len(found_places) == 1:
flash('Nalezeno toto místo', 'info')
return redirect(url_for('org_place', id=found_places[0].place_id))
else:
search_limited = len(found_places) >= max_places
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
else:
school = None
children = sorted(place.children, key=lambda p: locale.strxfrm(p.name))
rr = g.gatekeeper.rights_for(place)
return render_template(
'org_place.html', place=place, school=school,
can_edit=rr.can_edit_place(place),
can_add_child=rr.can_add_place_child(place),
can_view_school_contestants=rr.have_right(Right.view_school_contestants),
children=children,
search_form=search_form,
found_places=found_places, search_failed=search_failed, search_limited=search_limited,
)
class PlaceEditForm(FlaskForm):
name = mo_fields.String(
'Název', render_kw={'autofocus': True},
validators=[validators.DataRequired()]
)
code = mo_fields.String(
'Kód', filters=[lambda x: x or None], # may be NULL in db
description="Na místo se lze odkazovat kódem z písmen a číslic."
)
type = wtforms.SelectField(
'Typ', choices=db.PlaceType.choices(), coerce=db.PlaceType.coerce
)
nuts = mo_fields.String(
'NUTS', filters=[lambda x: x or None], # may be NULL in db
description="Pro okresy a výše"
)
note = mo_fields.String('Poznámka')
hidden = wtforms.BooleanField(
'Skrýt (účastníkům se místo nebude zobrazovat při výběru školy v přihlašování)'
)
submit = wtforms.SubmitField('Uložit')
def validate_code(form: FlaskForm, field: mo_fields.String) -> None:
code = field.data
if code is None:
pass
elif code.startswith('#'):
raise wtforms.ValidationError('Kód nesmí začínat na znak "#".')
elif re.fullmatch(r'\d+', code):
raise wtforms.ValidationError('Kód nesmí být složen pouze z číslic.')
class PlaceSchoolEditForm(PlaceEditForm):
red_izo = mo_fields.String('RED_IZO')
ico = mo_fields.String('IČO')
official_name = mo_fields.String('Oficiální název')
address = mo_fields.String('Adresa')
is_zs = wtforms.BooleanField('ZŠ')
is_ss = wtforms.BooleanField('SŠ (6-letá a 8-letá gymnázia jsou současně i ZŠ)')
submit = wtforms.SubmitField('Uložit')
def place_breadcrumbs(place: db.Place, action: Optional[str] = None) -> Markup:
elements = []
ancestors: List[db.Place] = g.gatekeeper.get_ancestors(place)
for parent in ancestors:
elements.append((url_for('org_place', id=parent.place_id), parent.name))
if action:
elements.append(('', action))
if len(elements) == 0:
return Markup("")
return Markup(
"\n".join([f"<li><a href='{url}'>{name}</a>" for url, name in elements[:-1]])
+ "<li>" + elements[-1][1]
)
@app.route('/org/place/<int:id>/edit', methods=('GET', 'POST'))
def org_place_edit(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
# Pass school data as additional dict (data is used after obj)
form = PlaceSchoolEditForm(obj=place, data=db.row2dict(school))
form.name.description = ('Název školy tak, jak se má objevovat ve výsledkové listině. Viz '
+ Markup('<a href="' + url_for('doc_org') + '#kodskoly">pojmenovací konvence</a>.'))
else:
form = PlaceEditForm(obj=place)
school = None
form.code.description += f' Kromě zadaného kódu funguje též #{id}.'
form.type.choices = db.PlaceType.choices(level=place.level)
if form.validate_on_submit():
form.populate_obj(place)
if school:
form.populate_obj(school)
msg = 'Změny místa uloženy'
redirectURL = url_for('org_place', id=id)
if sess.is_modified(place) or school and sess.is_modified(school):
placeChanges = db.get_object_changes(place)
schoolChanges = {}
if school:
if request.form.get('type') != 'school':
# School record removed
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'school-delete', 'school': db.row2dict(school)},
)
app.logger.info(f"Deleting school record for place {place.place_id}")
db.get_session().delete(school)
msg = 'Změny místa uloženy, záznam o škole smazán'
else:
schoolChanges = db.get_object_changes(school)
elif request.form.get('type') == 'school':
# School record created
new_school = db.School()
new_school.place_id = place.place_id
mo.util.log(
type=db.LogType.place,
what=new_school.place_id,
details={'action': 'school-add'},
)
app.logger.info(f"Creating new school for place {place.place_id}")
db.get_session().add(new_school)
# Take org directly to the school edit to fill the data
msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data'
redirectURL = url_for('org_place_edit', id=id)
changes = {**placeChanges, **schoolChanges}
app.logger.info(f"Place {id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'edit', 'changes': changes},
)
db.get_session().commit()
flash(msg, 'success')
else:
flash(u'Žádné změny k uložení', 'info')
return redirect(redirectURL)
return render_template(
'org_place_edit.html', place=place, school=school,
form=form,
)
class PlaceMoveForm(FlaskForm):
new_parent = mo_fields.Place(validators=[validators.DataRequired()], render_kw={'autofocus': True})
submit = wtforms.SubmitField('Najít místo')
reset = wtforms.HiddenField()
move = wtforms.HiddenField()
class PlaceMoveConfirmForm(FlaskForm):
new_parent = mo_fields.Place(widget = wtforms.widgets.HiddenInput())
reset = wtforms.SubmitField('Zrušit')
move = wtforms.SubmitField('Přesunout')
@app.route('/org/place/<int:id>/move', methods=('GET', 'POST'))
def org_place_move(id: int):
sess = db.get_session()
# Tests: can move only existing places that we can edit
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
new_parents = None
search_error = None
form = PlaceMoveForm()
form_confirm = None
if not form.validate_on_submit():
if form.new_parent.place_error:
search_error = form.new_parent.place_error
else:
if form.reset.data:
return redirect(url_for('org_place_move', id=id))
new_parent = form.new_parent.place
new_parents = g.gatekeeper.get_ancestors(new_parent)
(_, levels) = db.place_type_names_and_levels[place.type]
rr = g.gatekeeper.rights_for(new_parent)
if not rr.can_edit_place(new_parent):
search_error = 'Nemáte právo k editaci vybraného nadřazeného místa, přesun nelze uskutečnit'
elif (new_parent.level + 1) not in levels:
search_error = f'Toto místo ({place.type_name()}) nelze přemístit pod vybrané místo ({new_parent.type_name()}), dostalo by se na nepovolenou úroveň'
elif new_parent.place_id == place.parent:
search_error = 'Žádná změna, místo je zde již umístěno'
elif form.move.data:
# Everything is OK, if submitted with 'move' do the move
place.parent = new_parent.place_id
place.level = new_parent.level + 1
changes = db.get_object_changes(place)
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'move', 'changes': changes},
)
app.logger.info(f"Place {id} moved, changes: {changes}")
db.get_session().commit()
flash('Místo úspěšně přesunuto', 'success')
return redirect(url_for('org_place', id=id))
else:
# OK but not confirmed yet, display the confirm form
form_confirm = PlaceMoveConfirmForm()
form_confirm.new_parent.data = form.new_parent.data
# tady se používá hnusný trik, že políčko new_parents z PlaceMoveConfirmForm se
# parsuje jako new_parents z PlaceMoveForm
return render_template(
'org_place_move.html',
place=place, form=form, form_confirm=form_confirm, search_error=search_error,
new_parents=new_parents
)
@app.route('/org/place/<int:id>/delete', methods=('POST',))
def org_place_delete(id: int):
sess = db.get_session()
# Tests: can delete only existing places that we can edit
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for(place)
if not rr.can_edit_place(place):
raise werkzeug.exceptions.Forbidden()
# Cannot delete place with children
if place.children:
flash("Nelze smazat místo s podřízenými místy", "danger")
return redirect(url_for('org_place', id=id))
# Cannot delete place with contests
if db.get_count(sess.query(db.Contest).filter_by(place_id=id)) > 0:
flash("Nelze smazat místo ke kterému se váže nějaká soutěž ", "danger")
return redirect(url_for('org_place', id=id))
if place.type == db.PlaceType.school:
school = sess.query(db.School).get(place.place_id)
mo.util.log(
type=db.LogType.place,
what=school.place_id,
details={'action': 'school-delete', 'school': db.row2dict(school)},
)
app.logger.info(f"Deleting school record for place {id}")
db.get_session().delete(school)
mo.util.log(
type=db.LogType.place,
what=id,
details={'action': 'delete', 'place': db.row2dict(place)},
)
app.logger.info(f"Deleting place {id}")
parent = place.parent
db.get_session().delete(place)
db.get_session().commit()
flash("Místo smazáno", "success")
return redirect(url_for('org_place', id=parent))
@app.route('/org/place/<int:id>/new-child', methods=('GET', 'POST'))
def org_place_new_child(id: int):
sess = db.get_session()
# Tests: can add new child only under existing places that we can edit
parent_place = sess.query(db.Place).get(id)
if not parent_place:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for(parent_place)
if not rr.can_edit_place(parent_place):
raise werkzeug.exceptions.Forbidden()
if not parent_place.can_have_child():
raise werkzeug.exceptions.Forbidden()
form = PlaceEditForm()
form.type.choices = db.PlaceType.choices(level=parent_place.level + 1)
if form.validate_on_submit():
new_place = db.Place()
form.populate_obj(new_place)
new_place.parent = parent_place.place_id
new_place.level = parent_place.level + 1
sess.add(new_place)
sess.flush()
app.logger.info(f"New place created: {db.row2dict(new_place)}")
mo.util.log(
type=db.LogType.place,
what=new_place.place_id,
details={'action': 'new', 'place': db.row2dict(new_place)},
)
redirect_url = url_for('org_place', id=new_place.place_id)
msg = 'Nové místo uloženo'
if new_place.type == db.PlaceType.school:
new_school = db.School()
new_school.place_id = new_place.place_id
mo.util.log(
type=db.LogType.place,
what=new_school.place_id,
details={'action': 'school-add'},
)
app.logger.info(f"Creating new school for place {new_place.place_id}")
sess.add(new_school)
# Take org directly to the school edit to fill the data
msg = 'Záznam o škole vytvořen, vyplňte prosím všechna data'
redirect_url = url_for('org_place_edit', id=new_place.place_id)
sess.commit()
flash(msg, 'success')
return redirect(redirect_url)
return render_template('org_place_new.html', parent_place=parent_place, form=form)
@app.route('/org/place/')
def org_place_root():
root = db.get_root_place()
return redirect(url_for('org_place', id=root.place_id))
@app.route('/org/place/<int:id>/roles')
def org_place_roles(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
parent_ids = [p.place_id for p in g.gatekeeper.get_ancestors(place)]
roles = (sess.query(db.UserRole)
.filter(db.UserRole.place_id.in_(parent_ids))
.options(joinedload(db.UserRole.user),
joinedload(db.UserRole.place),
joinedload(db.UserRole.assigned_by_user))
.all())
roles.sort(key=lambda r: (mo.rights.role_order_by_type[r.role], r.user.sort_key()))
assigned_roles = [r for r in roles if r.place_id == id]
inherited_roles = [r for r in roles if r.place_id != id]
rr = g.gatekeeper.rights_for(place)
rights = sorted(rr.rights, key=lambda r: r.name)
return render_template(
'org_place_roles.html', place=place, rights=rights,
assigned_roles=assigned_roles, inherited_roles=inherited_roles,
roles_by_type=mo.rights.roles_by_type,
)
@app.route('/org/place/<int:id>/contests')
def org_place_contests(id: int):
sess = db.get_session()
place = sess.query(db.Place).get(id)
if not place:
raise werkzeug.exceptions.NotFound()
contests = (sess.query(db.Contest)
.options(joinedload(db.Contest.round))
.filter_by(place=place)
.all())
contests.sort(key=lambda c: (-c.round.year, c.round.category, c.round.seq, c.round.part))
return render_template('org_place_contests.html', place=place, contests=contests)