Select Git revision
-
Jiří Setnička authoredJiří Setnička authored
org_round.py 24.04 KiB
from dataclasses import dataclass, field
import decimal
from flask import render_template, g, redirect, flash, request
import locale
import flask_wtf.file
from flask_wtf.form import FlaskForm
import bleach
from bleach.sanitizer import ALLOWED_TAGS
import markdown
import os
from sqlalchemy import func
from sqlalchemy.orm import joinedload, aliased
from sqlalchemy.sql.functions import coalesce
from typing import Optional, List, Dict, Tuple, Set
import werkzeug.exceptions
import wtforms
from wtforms import validators, ValidationError
from wtforms.widgets.html5 import NumberInput
import mo.config as config
import mo.db as db
import mo.imports
from mo.rights import Right
import mo.util
from mo.web import app
import mo.web.fields as mo_fields
from mo.web.org_contest import get_context
@app.route('/org/contest/')
def org_rounds():
sess = db.get_session()
rounds = sess.query(db.Round).filter_by(year=config.CURRENT_YEAR).order_by(db.Round.year, db.Round.category, db.Round.seq, db.Round.part)
return render_template('org_rounds.html', rounds=rounds, history=False)
@app.route('/org/contest/history')
def org_rounds_history():
sess = db.get_session()
rounds = sess.query(db.Round).order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part)
return render_template('org_rounds.html', rounds=rounds, history=True)
class TaskDeleteForm(FlaskForm):
delete_task_id = wtforms.IntegerField()
delete_task = wtforms.SubmitField('Smazat úlohu')
def delete_task(round_id: int, form: TaskDeleteForm) -> bool:
if not (request.method == 'POST' and 'delete_task_id' in request.form and form.validate_on_submit()):
return False
sess = db.get_session()
delete_task = sess.query(db.Task).filter_by(
round_id=round_id, task_id=form.delete_task_id.data
).first()
if not delete_task:
flash('Úloha s daným ID v tomto kole neexistuje', 'danger')
elif sess.query(db.Solution).filter_by(task_id=delete_task.task_id).first() is not None:
flash(f'Úlohu {delete_task.code} nelze smazat, existují řešení vázající se na ni', 'danger')
elif sess.query(db.Paper).filter_by(for_task=delete_task.task_id).first() is not None:
flash(f'Úlohu {delete_task.code} nelze smazat, existují papíry vázající se na ni', 'danger')
elif sess.query(db.PointsHistory).filter_by(task_id=delete_task.task_id).first() is not None:
flash(f'Úlohu {delete_task.code} nelze smazat, existují přidělené body vázající se na ni', 'danger')
else:
sess.delete(delete_task)
mo.util.log(
type=db.LogType.task,
what=delete_task.task_id,
details={'action': 'delete', 'task': db.row2dict(delete_task)},
)
app.logger.info(f"Úloha {delete_task.code} ({delete_task.task_id}) smazána: {db.row2dict(delete_task)}")
sess.commit()
flash(f'Úloha {delete_task.code} úspěšně smazána', 'success')
return True
return False
class AddContestForm(FlaskForm):
place = mo_fields.Place('Nová soutěž v oblasti:', validators=[validators.Required()])
create_contest = wtforms.SubmitField('Založit')
def add_contest(round: db.Round, form: AddContestForm) -> bool:
if not (request.method == 'POST' and 'create_contest' in request.form and form.validate_on_submit()):
return False
place: db.Place = form.place.place
if place.level != round.level:
flash(f'{place.type_name().title()} {place.name} není {round.get_level().name}', 'danger')
return False
sess = db.get_session()
if sess.query(db.Contest).filter_by(round=round, place=place).one_or_none():
flash(f'Pro {place.type_name()} {place.name} už toto kolo existuje', 'danger')
return False
# Počáteční stav soutěže
if round.state != db.RoundState.delegate:
state = round.state
else:
state = db.RoundState.preparing
# Soutěž vytvoříme vždy v hlavním kole
contest = db.Contest(round=round.master, place=place, state=state)
rr = g.gatekeeper.rights_for_contest(contest)
if not rr.have_right(Right.add_contest):
flash(f'Vaše role nedovoluje vytvořit soutěž {place.name_locative()}', 'danger')
return False
sess.add(contest)
sess.flush()
contest.master_contest_id = contest.contest_id
sess.add(contest)
sess.flush()
mo.util.log(
type=db.LogType.contest,
what=contest.contest_id,
details={'action': 'add', 'contest': db.row2dict(contest)},
)
app.logger.info(f"Soutěž #{contest.contest_id} založena: {db.row2dict(contest)}")
create_subcontests(round.master, contest)
sess.commit()
flash(f'Založena soutěž {place.name_locative()}', 'success')
return True
# XXX: Používá se i v registraci účastníků
def create_subcontests(master_round: db.Round, master_contest: db.Contest):
if master_round.part == 0:
return
sess = db.get_session()
subrounds = master_round.get_group_rounds()
for subround in subrounds:
subcontest = db.Contest(
round_id=subround.round_id,
master_contest_id=master_contest.contest_id,
place_id=master_contest.place_id,
state=master_contest.state,
)
sess.add(subcontest)
sess.flush()
mo.util.log(
type=db.LogType.contest,
what=subcontest.contest_id,
details={'action': 'add', 'contest': db.row2dict(subcontest)},
)
app.logger.info(f"Podsoutěž #{subcontest.contest_id} založena: {db.row2dict(subcontest)}")
@dataclass
class ContestStat:
region: db.Place
contest: Optional[db.Contest] = None
num_contests: int = 0
contest_states: Set[db.RoundState] = field(default_factory=set)
num_active_pants: int = 0
num_unconfirmed_pants: int = 0
def region_stats(round: db.Round, region: db.Place) -> List[ContestStat]:
stats: Dict[int, ContestStat] = {}
sess = db.get_session()
if region.level > round.level:
return []
if (region.level >= round.level - 1
or region.level == 2 and round.level == 4):
# List individual contests
q = sess.query(db.Contest).filter_by(round=round)
q = db.filter_place_nth_parent(q, db.Contest.place_id, round.level - region.level, region.place_id)
q = q.options(joinedload(db.Contest.place))
for c in q.all():
s = ContestStat(region=c.place, contest=c, num_contests=1)
stats[c.place.place_id] = s
have_contests = True
else:
# List sub-regions
regs = sess.query(db.Place).filter(db.Place.parent_place == region).all()
for r in regs:
s = ContestStat(region=r)
stats[r.place_id] = s
have_contests = False
region_ids = [s.region.place_id for s in stats.values()]
if not have_contests:
rcs = (sess.query(db.RegionContestStat)
.filter_by(round=round)
.filter(db.RegionContestStat.region.in_(region_ids))
.all())
for r in rcs:
stats[r.region].num_contests += r.count
stats[r.region].contest_states.add(r.state)
rs = (sess.query(db.RegionParticipantStat)
.filter_by(round_id=round.master_round_id)
.filter(db.RegionParticipantStat.region.in_(region_ids))
.all())
for r in rs:
if r.state == db.PartState.active:
stats[r.region].num_active_pants = r.count
elif r.state == db.PartState.registered:
stats[r.region].num_unconfirmed_pants = r.count
out = list(stats.values())
out.sort(key=lambda s: locale.strxfrm(s.region.name or ""))
return out
def region_totals(region: db.Place, stats: List[ContestStat]) -> ContestStat:
return ContestStat(
region=region,
num_contests=sum(s.num_contests for s in stats),
num_active_pants=sum(s.num_active_pants for s in stats),
num_unconfirmed_pants=sum(s.num_unconfirmed_pants for s in stats),
)
def task_stats(round: db.Round, region: db.Place) -> List[Tuple[db.Task, int]]:
sess = db.get_session()
tasks = sess.query(db.Task).filter_by(round=round).all()
tasks.sort(key=lambda t: t.code)
ts = (sess.query(db.RegionTaskStat)
.filter_by(round=round, region=region.place_id)
.all())
count_by_id = {s.task_id: s.count for s in ts}
return [(t, count_by_id.get(t.task_id, 0)) for t in tasks]
@app.route('/org/contest/r/<int:round_id>/', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>', methods=('GET', 'POST'))
def org_round(round_id: int, hier_id: Optional[int] = None):
ctx = get_context(round_id=round_id, hier_id=hier_id)
round = ctx.round
rights = ctx.rights
form_delete_task = TaskDeleteForm()
if rights.have_right(Right.manage_round) and delete_task(round_id, form_delete_task):
return redirect(ctx.url_for('org_round'))
form_add_contest = AddContestForm()
form_add_contest.place.label.text = "Nová soutěž " + round.get_level().in_name()
if add_contest(round, form_add_contest):
return redirect(ctx.url_for('org_round'))
group_rounds = round.get_group_rounds(True)
group_rounds.sort(key=lambda r: r.round_code())
region = ctx.hier_place or db.get_root_place()
reg_stats = region_stats(round, region)
reg_total = region_totals(region, reg_stats)
task_info = task_stats(round, region)
return render_template(
'org_round.html',
ctx=ctx, rights=rights,
round=round, group_rounds=group_rounds,
roles=[r.friendly_name() for r in rights.get_roles()],
reg_stats=reg_stats, reg_total=reg_total,
task_info=task_info,
form_delete_task=form_delete_task,
form_add_contest=form_add_contest,
statement_exists=mo.web.util.task_statement_exists(round),
)
class TaskEditForm(FlaskForm):
code = wtforms.StringField('Kód úlohy', validators=[
validators.Required(),
validators.Regexp(r'^[A-Za-z0-9-]+$', message="Kód úlohy smí obsahovat jen nediakritická písmena, čísla a znak -"),
], render_kw={'autofocus': True})
name = wtforms.StringField('Název úlohy')
type = wtforms.SelectField('Typ úlohy', choices=db.TaskType.choices(), coerce=db.TaskType.coerce)
max_points = mo_fields.Points(
'Maximum bodů', validators=[validators.Optional(), validators.NumberRange(min=0)],
description="Při nastavení maxima nelze udělit více bodů, pro zrušení uložte prázdnou hodnotu",
)
submit = wtforms.SubmitField('Uložit')
def __init__(self, points_step: decimal.Decimal, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_points.widget = NumberInput(min=0, step=points_step)
@app.route('/org/contest/r/<int:round_id>/task/new', methods=('GET', 'POST'))
def org_round_task_new(round_id: int):
sess = db.get_session()
ctx = get_context(round_id=round_id, right_needed=Right.manage_round)
form = TaskEditForm(ctx.master_round.points_step)
if form.validate_on_submit():
task = db.Task()
task.round = ctx.round
form.populate_obj(task)
if sess.query(db.Task).filter_by(round_id=round_id, code=task.code).first():
flash('Úloha se stejným kódem již v tomto kole existuje', 'danger')
else:
sess.add(task)
sess.flush()
mo.util.log(
type=db.LogType.task,
what=task.task_id,
details={'action': 'add', 'task': db.row2dict(task)},
)
sess.commit()
app.logger.info(f"Úloha {task.code} ({task.task_id}) přidána: {db.row2dict(task)}")
flash('Nová úloha přidána', 'success')
return redirect(ctx.url_for('org_round'))
return render_template(
'org_round_task_edit.html',
ctx=ctx, form=form,
)
@app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/edit', methods=('GET', 'POST'))
def org_round_task_edit(round_id: int, task_id: int):
sess = db.get_session()
ctx = get_context(round_id=round_id, task_id=task_id, right_needed=Right.manage_round)
task = ctx.task
assert task
form = TaskEditForm(ctx.master_round.points_step, obj=task)
if form.validate_on_submit():
if sess.query(db.Task).filter(
db.Task.task_id != task_id, db.Task.round_id == round_id, db.Task.code == form.code.data
).first():
flash('Úloha se stejným kódem již v tomto kole existuje', 'danger')
else:
form.populate_obj(task)
if sess.is_modified(task):
changes = db.get_object_changes(task)
mo.util.log(
type=db.LogType.task,
what=task_id,
details={'action': 'edit', 'changes': changes},
)
sess.commit()
app.logger.info(f"Úloha {task.code} ({task_id}) modifikována, změny: {changes}")
flash('Změny úlohy uloženy', 'success')
else:
flash('Žádné změny k uložení', 'info')
return redirect(ctx.url_for('org_round', task_id=None))
return render_template(
'org_round_task_edit.html',
ctx=ctx, form=form,
)
class RoundEditForm(FlaskForm):
_for_round: Optional[db.Round] = None
name = wtforms.StringField("Název", render_kw={'autofocus': True})
code = wtforms.StringField("Kód",
description="Kód kola používaný v kódech úloh ('1', 'S' apod.). Není-li vyplněn, použije se pořadí kola v kategorii.",
)
state = wtforms.SelectField(
"Stav kola", choices=db.RoundState.choices(), coerce=db.RoundState.coerce,
description="Stav soutěží ve všech oblastech kola. Pokud zvolíme 'po oblastech', každá soutěž si svůj stav určuje sama.",
)
# Only the desktop Firefox does not support datetime-local field nowadays,
# other browsers does provide date and time picker UI :(
ct_tasks_start = mo_fields.DateTime("Čas zveřejnění úloh pro účastníky", validators=[validators.Optional()])
pr_tasks_start = mo_fields.DateTime("Čas zveřejnění úloh pro dozor", validators=[validators.Optional()])
ct_submit_end = mo_fields.DateTime("Konec odevzdávání pro účastníky", validators=[validators.Optional()])
pr_submit_end = mo_fields.DateTime("Konec odevzdávání pro dozor", validators=[validators.Optional()])
score_mode = wtforms.SelectField("Výsledková listina", choices=db.RoundScoreMode.choices(), coerce=db.RoundScoreMode.coerce)
score_winner_limit = mo_fields.Points(
"Hranice bodů pro vítěze", validators=[validators.Optional(), validators.NumberRange(min=0)],
description="Řešitelé s alespoň tolika body budou označeni za vítěze, prázdná hodnota = žádné neoznačovat",
)
score_successful_limit = mo_fields.Points(
"Hranice bodů pro úspěšné řešitele", validators=[validators.Optional(), validators.NumberRange(min=0)],
description="Řešitelé s alespoň tolika body budou označeni za úspěšné řešitele, prázdná hodnota = žádné neoznačovat",
)
points_step = wtforms.SelectField(
"Přesnost bodování", choices=db.round_points_step_choices, coerce=decimal.Decimal,
description="Ovlivňuje možnost zadávání nových bodů, již uložené body nezmění"
)
enroll_mode = wtforms.SelectField("Režim přihlašování", choices=db.RoundEnrollMode.choices(), coerce=db.RoundEnrollMode.coerce)
enroll_advert = wtforms.StringField("Popis v přihlášce")
has_messages = wtforms.BooleanField("Zprávičky pro účastníky (aktivuje možnost vytvářet novinky zobrazované účastníkům)")
submit = wtforms.SubmitField('Uložit')
def validate_state(self, field):
if field.data != db.RoundState.preparing:
if self.ct_tasks_start.data is None:
raise ValidationError('Není-li nastaven času začátku soutěže, stav musí být "připravuje se"')
if self._for_round is not None:
num_tasks = db.get_session().query(db.Task).filter_by(round=self._for_round).count()
if num_tasks == 0:
raise ValidationError('Nejsou-li definovány žádné úlohy, stav musí být "připravuje se"')
def abstract_validate_time_order(self, field):
if field.data is not None:
if any([i.data is not None and i.data > field.data for i in [self.ct_tasks_start, self.pr_tasks_start]]):
raise ValidationError('Soutěž nesmí skončit dříve než začne.')
def validate_ct_submit_end(self, field):
self.abstract_validate_time_order(field)
def validate_pr_submit_end(self, field):
self.abstract_validate_time_order(field)
@app.route('/org/contest/r/<int:round_id>/edit', methods=('GET', 'POST'))
def org_round_edit(round_id: int):
sess = db.get_session()
ctx = get_context(round_id=round_id, right_needed=Right.manage_round)
round = ctx.round
form = RoundEditForm(obj=round)
form._for_round = round
if round.is_subround():
# podkolo nemá nastavení výsledkové listiny
del form.score_mode
del form.score_winner_limit
del form.score_successful_limit
del form.points_step
# ani nastavení přihlašování
del form.enroll_mode
if form.validate_on_submit():
form.populate_obj(round)
if sess.is_modified(round):
changes = db.get_object_changes(round)
app.logger.info(f"Round #{round_id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.round,
what=round_id,
details={'action': 'edit', 'changes': changes},
)
if 'state' in changes and round.state != db.RoundState.delegate:
# Propagujeme změny stavu do všech soutěží
for contest in sess.query(db.Contest).filter_by(round=round).filter(db.Contest.state != round.state):
contest.state = round.state
ct_changes = db.get_object_changes(contest)
app.logger.info(f"Change propagated to contest #{contest.contest_id}: {ct_changes}")
mo.util.log(
type=db.LogType.contest,
what=contest.contest_id,
details={'action': 'propagate', 'changes': ct_changes},
)
sess.commit()
flash('Změny kola uloženy', 'success')
else:
flash('Žádné změny k uložení', 'info')
return redirect(ctx.url_for('org_round'))
return render_template(
'org_round_edit.html',
ctx=ctx,
round=round,
form=form,
)
@app.route('/org/contest/r/<int:round_id>/task-statement/zadani.pdf')
def org_task_statement(round_id: int):
ctx = get_context(round_id=round_id)
if not ctx.rights.can_view_statement():
app.logger.warn(f'Organizátor #{g.user.user_id} chce zadání, na které nemá právo')
raise werkzeug.exceptions.Forbidden()
return mo.web.util.send_task_statement(ctx.round)
class StatementEditForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
upload = wtforms.SubmitField('Nahrát')
delete = wtforms.SubmitField('Smazat')
@app.route('/org/contest/r/<int:round_id>/task-statement/edit', methods=('GET', 'POST'))
def org_edit_statement(round_id: int):
sess = db.get_session()
ctx = get_context(round_id=round_id, right_needed=Right.manage_round)
round = ctx.round
def log_changes():
if sess.is_modified(round):
changes = db.get_object_changes(round)
app.logger.info(f"Kolo #{round_id} změněno, změny: {changes}")
mo.util.log(
type=db.LogType.round,
what=round_id,
details={'action': 'edit', 'changes': changes},
)
form = StatementEditForm()
if form.validate_on_submit():
if form.upload.data:
if form.file.data is not None:
file = form.file.data.stream
secure_category = werkzeug.utils.secure_filename(round.category)
stmt_dir = f'{round.year}-{secure_category}-{round.seq}'
full_dir = os.path.join(mo.util.data_dir('statements'), stmt_dir)
os.makedirs(full_dir, exist_ok=True)
full_name = mo.util.link_to_dir(file.name, full_dir, suffix='.pdf')
file_name = os.path.join(stmt_dir, os.path.basename(full_name))
app.logger.info(f'Nahráno zadání: {file_name}')
round.tasks_file = file_name
log_changes()
sess.commit()
flash('Zadání nahráno', 'success')
return redirect(ctx.url_for('org_round'))
else:
flash('Vyberte si prosím soubor', 'danger')
if form.delete.data:
round.tasks_file = None
log_changes()
sess.commit()
flash('Zadání smazáno', 'success')
return redirect(ctx.url_for('org_round'))
return render_template(
'org_edit_statement.html',
ctx=ctx,
round=round,
form=form,
)
class MessageAddForm(FlaskForm):
title = wtforms.StringField('Nadpis', validators=[validators.Required()])
markdown = wtforms.TextAreaField(
'Text zprávičky', description='Zprávičky lze formátovat pomocí Markdownu',
validators=[validators.Required()],
render_kw={'rows': 10},
)
submit = wtforms.SubmitField(label='Vložit zprávičku')
preview = wtforms.SubmitField(label='Zobrazit náhled')
class MessageRemoveForm(FlaskForm):
message_id = wtforms.IntegerField(validators=[validators.Required()])
message_remove = wtforms.SubmitField()
@app.route('/org/contest/r/<int:round_id>/messages/', methods=('GET', 'POST'))
def org_round_messages(round_id: int):
sess = db.get_session()
ctx = get_context(round_id=round_id)
round = ctx.round
if not round.has_messages:
flash('Toto kolo nemá aktivní zprávičky pro účastníky, aktivujte je v nastavení kola', 'warning')
return redirect(ctx.url_for('org_round'))
messages = sess.query(db.Message).filter_by(round_id=round_id).order_by(db.Message.created_at).all()
add_form: Optional[MessageAddForm] = None
remove_form: Optional[MessageRemoveForm] = None
preview: Optional[db.Message] = None
if ctx.rights.have_right(Right.manage_round):
add_form = MessageAddForm()
remove_form = MessageRemoveForm()
if remove_form.validate_on_submit() and remove_form.message_remove.data:
msg = sess.query(db.Message).get(remove_form.message_id.data)
if not msg or msg.round_id != round_id:
raise werkzeug.exceptions.NotFound()
sess.delete(msg)
sess.commit()
app.logger.info(f"Zprávička pro kolo {round_id} odstraněna: {db.row2dict(msg)}")
flash('Zprávička odstraněna', 'success')
return redirect(ctx.url_for('org_round_messages'))
if add_form.validate_on_submit():
msg = db.Message(
round_id=round_id,
created_by=g.user.user_id,
created_at=mo.now,
)
add_form.populate_obj(msg)
msg.html = bleach.clean(
markdown.markdown(msg.markdown),
tags=ALLOWED_TAGS+['p']
)
if add_form.preview.data:
preview = msg
elif add_form.submit.data:
sess.add(msg)
sess.commit()
app.logger.info(f"Vložena nová zprávička pro kolo {round_id}: {db.row2dict(msg)}")
flash('Zprávička úspěšně vložena', 'success')
return redirect(ctx.url_for('org_round_messages'))
return render_template(
'org_round_messages.html',
ctx=ctx,
round=round, messages=messages,
add_form=add_form, remove_form=remove_form,
preview=preview,
)