Skip to content
Snippets Groups Projects

Reforma orgovského rozhraní ke kolům a soutěžím

1 file
+ 2
2
Compare changes
  • Side-by-side
  • Inline
+ 434
434
from dataclasses import dataclass
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
@@ -8,10 +7,13 @@ from sqlalchemy import func, and_, select
from sqlalchemy.orm import joinedload, aliased
from sqlalchemy.orm.query import Query
from sqlalchemy.dialects.postgresql import insert as pgsql_insert
from typing import Any, List, Tuple, Optional, Sequence, Dict
import sqlalchemy.sql.schema
from typing import Any, List, Tuple, Optional, Dict
import urllib.parse
import werkzeug.exceptions
import wtforms
import wtforms.validators as validators
from wtforms.widgets.html5 import NumberInput
import mo
from mo.csv import FileFormat
@@ -19,33 +21,190 @@ import mo.config as config
import mo.db as db
from mo.imports import ImportType, create_import
import mo.jobs.submit
from mo.rights import Right, ContestRights
from mo.rights import Right, RoundRights
import mo.util
from mo.util_format import inflect_number, inflect_by_number
from mo.web import app
import mo.web.fields as mo_fields
import mo.web.util
from mo.web.util import PagerForm
from mo.web.table import CellCheckbox, Table, Row, Column, cell_pion_link, cell_place_link, cell_email_link, cell_email_link_flags
import wtforms.validators as validators
from wtforms.widgets.html5 import NumberInput
from mo.web.table import CellCheckbox, Table, Row, Column, cell_pion_link, cell_place_link, cell_email_link_flags
class ImportForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
typ = wtforms.SelectField(
"Typ dat",
choices=[(x.name, x.friendly_name()) for x in (ImportType.participants, ImportType.proctors, ImportType.judges)],
coerce=ImportType.coerce,
default=ImportType.participants,
)
fmt = wtforms.SelectField(
"Formát souboru",
choices=FileFormat.choices(), coerce=FileFormat.coerce,
default=FileFormat.cs_csv,
class Context:
# Kolo máme vždy
round: db.Round
master_round: db.Round
# Vypočtená práva: Může být implementováno i jako ContestRights
rights: RoundRights
# Můžeme mít vybranou soutěž a místo
# Pro nedělená kola platí contest == master_contest.
# Operace s účastníky by měly probíhat vždy přes master_contest.
contest: Optional[db.Contest] = None
master_contest: Optional[db.Contest] = None
site: Optional[db.Place] = None
# Můžeme se omezit na soutěže v dané oblasti
hier_place: Optional[db.Place] = None
# Účastník a úloha
pion: Optional[db.Participation] = None
user: Optional[db.User] = None
task: Optional[db.Task] = None
# IDčka, jak je předáváme do URL
round_id: Optional[int] = None
ct_id: Optional[int] = None
site_id: Optional[int] = None
hier_id: Optional[int] = None
user_id: Optional[int] = None
task_id: Optional[int] = None
def url_for(self, endpoint: str, **kwargs):
a = {}
round_id = kwargs.get('round_id', self.round_id)
ct_id = kwargs.get('ct_id', self.ct_id)
if ct_id is not None:
a['ct_id'] = ct_id
else:
assert round_id is not None
a['round_id'] = round_id
for arg in ('site_id', 'hier_id', 'user_id', 'task_id'):
val = getattr(self, arg)
if val is not None:
a[arg] = val
for arg, val in kwargs.items():
a[arg] = val
return url_for(endpoint, **a)
def url_home(self):
if self.ct_id:
return url_for('org_contest', ct_id=self.ct_id)
else:
return url_for('org_round', round_id=self.round_id, hier_id=self.hier_id)
def breadcrumbs(self, table: bool = False, action: Optional[str] = None) -> Markup:
elements = [(url_for('org_rounds'), 'Soutěže')]
elements.append((url_for('org_round', round_id=self.round_id), self.round.round_code()))
if self.hier_place:
parents = g.gatekeeper.get_ancestors(self.hier_place)
for p in parents[1:]:
elements.append((url_for('org_round', round_id=self.round_id, hier_id=p.place_id), p.name or '???'))
if self.contest:
if self.round.level >= 2:
parents = g.gatekeeper.get_ancestors(self.contest.place)
for i in range(1, len(parents) - 1):
p = parents[i]
if p.level >= 3:
break
elements.append((url_for('org_round', round_id=self.round_id, hier_id=p.place_id), db.Place.get_code(p)))
elements.append((url_for('org_contest', ct_id=self.ct_id), self.contest.place.name or '???'))
if self.site:
elements.append((url_for('org_contest', ct_id=self.ct_id, site_id=self.site_id), f"soutěžní místo {self.site.name}"))
if self.task:
elements.append((
url_for('org_contest_task', ct_id=self.ct_id, site_id=self.site_id, task_id=self.task_id) if self.contest
else url_for('org_round_task_edit', round_id=self.round_id, task_id=self.task_id),
f"{self.task.code} {self.task.name}"
))
if self.user:
elements.append((url_for('org_contest_user', ct_id=self.ct_id, user_id=self.user_id), self.user.full_name()))
if table:
if self.contest:
elements.append((url_for('org_generic_list', ct_id=self.ct_id, site=self.site_id), "Seznam účastníků"))
else:
elements.append((url_for('org_generic_list', round_id=self.round_id), "Seznam účastníků"))
if action:
elements.append(('', action))
return Markup(
"\n".join([f"<li><a href='{url}'>{name}</a>" for url, name in elements[:-1]])
+ "<li>" + elements[-1][1]
)
submit = wtforms.SubmitField('Importovat')
get_template = wtforms.SubmitField('Stáhnout šablonu')
def get_context(round_id: Optional[int] = None,
ct_id: Optional[int] = None,
site_id: Optional[int] = None,
hier_id: Optional[int] = None,
user_id: Optional[int] = None,
task_id: Optional[int] = None,
right_needed: Optional[Right] = None,
) -> Context:
ctx = Context()
ctx.round_id = round_id
ctx.ct_id = ct_id
ctx.site_id = site_id
ctx.hier_id = hier_id
ctx.user_id = user_id
ctx.task_id = task_id
sess = db.get_session()
if site_id is not None:
assert ct_id is not None
ctx.site = db.get_session().query(db.Place).get(site_id)
if not ctx.site:
raise werkzeug.exceptions.NotFound()
if hier_id is not None:
assert ct_id is None
ctx.hier_place = db.get_session().query(db.Place).get(hier_id)
if not ctx.hier_place:
raise werkzeug.exceptions.NotFound()
if ct_id is not None:
ctx.contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round),
joinedload(db.Contest.master).joinedload(db.Contest.round))
.get(ct_id))
if not ctx.contest:
raise werkzeug.exceptions.NotFound()
ctx.master_contest = ctx.contest.master
ctx.round, ctx.master_round = ctx.contest.round, ctx.master_contest.round
if round_id is not None and ctx.round.round_id != round_id:
raise werkzeug.exceptions.NotFound()
ctx.round_id = ctx.round.round_id
ctx.rights = g.gatekeeper.rights_for_contest(ctx.contest, ctx.site)
else:
ctx.round = sess.query(db.Round).options(joinedload(db.Round.master)).get(round_id)
if not ctx.round:
raise werkzeug.exceptions.NotFound()
if hier_id is not None and ctx.hier_place.level > ctx.round.level:
raise werkzeug.exceptions.NotFound()
ctx.master_round = ctx.round.master
ctx.rights = g.gatekeeper.rights_for_round(ctx.round, for_place=ctx.hier_place)
# Zkontrolujeme, zda se účastník opravdu účastní soutěže
if user_id is not None:
assert ctx.master_contest is not None
ctx.pion = (sess.query(db.Participation)
.filter_by(user_id=user_id, contest_id=ctx.master_contest.contest_id)
.options(joinedload(db.Participation.place),
joinedload(db.Participation.user))
.one_or_none())
if not ctx.pion:
raise werkzeug.exceptions.NotFound()
ctx.user = ctx.pion.user
# A zda soutěží na zadaném soutěžním místě, je-li určeno
if site_id is not None and site_id != ctx.pion.place_id:
raise werkzeug.exceptions.NotFound()
# Najdeme úlohu a ověříme, že je součástí kola
if task_id is not None:
ctx.task = sess.query(db.Task).get(task_id)
if not ctx.task or ctx.task.round != ctx.round:
raise werkzeug.exceptions.NotFound()
if not (right_needed is None or ctx.rights.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return ctx
class ParticipantsFilterForm(PagerForm):
@@ -59,6 +218,7 @@ class ParticipantsFilterForm(PagerForm):
download_csv = wtforms.SubmitField("↓ CSV")
download_tsv = wtforms.SubmitField("↓ TSV")
class ParticipantsActionForm(FlaskForm):
action_on = wtforms.RadioField(
"Provést akci na", validators=[validators.DataRequired()],
@@ -120,20 +280,7 @@ class ParticipantsActionForm(FlaskForm):
flash('Data v checkboxech nelze převést na čísla, kontaktujte správce', 'danger')
return False
# Check all participations if we can edit them
ctants: List[Tuple[db.Participation, Any, Any]] = query.all()
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
continue
rr = g.gatekeeper.rights_for_contest(pion.contest)
if not rr.have_right(Right.manage_contest):
flash(
f"Nemáte právo ke správě soutěže v kole {round.round_code_short()} {pion.contest.place.name_locative()} "
+ f"(účastník {u.full_name()}). Žádná akce nebyla provedena.", 'danger'
)
return False
count = 0
unchanged = 0
for pion, _, _ in ctants:
@@ -154,6 +301,7 @@ class ParticipantsActionForm(FlaskForm):
if self.set_participation_state.data:
pion.state = self.participation_state.data
elif self.set_participation_place.data:
assert participation_place
pion.place_id = participation_place.place_id
elif self.set_contest.data:
pion.contest_id = contest.contest_id
@@ -183,12 +331,14 @@ class ParticipantsActionForm(FlaskForm):
'success'
)
elif self.set_participation_place.data:
assert participation_place
flash(
f'Nastaveno soutěžní místo {participation_place.name} '
+ inflect_number(count, 'účastníkovi', 'účastníkům', 'účastníkům'),
'success'
)
elif self.set_contest.data:
assert contest_place
flash(
inflect_number(count, 'účastník přesunut', 'účastníci přesunuti', 'účastníků přesunuto')
+ f' do soutěže {contest_place.name_locative()}',
@@ -204,109 +354,19 @@ class ParticipantsActionForm(FlaskForm):
return True
def get_contest(id: int) -> Tuple[db.Contest, db.Contest]:
"""Vrací contest a master_contest pro zadané contest_id.
Pro nedělená kola platí contest == master_contest.
Operace s účastníky by měly probíhat vždy přes master_contest."""
contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round),
joinedload(db.Contest.master).joinedload(db.Contest.round))
.get(id))
if not contest:
raise werkzeug.exceptions.NotFound()
return contest, contest.master
def get_contest_rr(id: int, right_needed: Optional[Right] = None) -> Tuple[db.Contest, db.Contest, ContestRights]:
"""Vrací contest, master_contest a ContestRights objekt pro zadané contest_id.
Pro nedělená kola platí contest == master_contest.
Operace s účastníky by měly probíhat vždy přes master_contest."""
contest, master_contest = get_contest(id)
rr = g.gatekeeper.rights_for_contest(contest)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, master_contest, rr
def get_contest_site_rr(id: int, site_id: Optional[int], right_needed: Optional[Right] = None) -> Tuple[db.Contest, db.Contest, Optional[db.Place], ContestRights]:
"""Vrací contest, master_contest, optional site a ContestRights objekt pro zadané contest_id a site_id.
Pro nedělená kola platí contest == master_contest.
Operace s účastníky by měly probíhat vždy přes master_contest."""
if site_id is None:
contest, master_contest, rr = get_contest_rr(id, right_needed)
return contest, master_contest, None, rr
contest, master_contest = get_contest(id)
site = db.get_session().query(db.Place).get(site_id)
if not site:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for_contest(contest, site)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, master_contest, site, rr
def contest_breadcrumbs(
round: Optional[db.Round] = None, contest: Optional[db.Contest] = None,
site: Optional[db.Place] = None, task: Optional[db.Task] = None,
user: Optional[db.User] = None, action: Optional[str] = None,
table: Optional[bool] = False
) -> Markup:
elements = [(url_for('org_rounds'), 'Soutěže')]
round_id = None
if round:
round_id = round.round_id
elements.append((url_for('org_round', id=round_id), round.round_code()))
ct_id = None
if contest:
ct_id = contest.contest_id
elements.append((url_for('org_contest', id=ct_id), contest.place.name))
site_id = None
if site:
site_id = site.place_id
elements.append((url_for('org_contest', id=ct_id, site_id=site_id), f"soutěžní místo {site.name}"))
if task:
task_id = task.task_id
elements.append((
url_for('org_contest_task', contest_id=ct_id, site_id=site_id, task_id=task_id) if ct_id
else url_for('org_round_task_edit', id=round_id, task_id=task_id),
f"{task.code} {task.name}"
))
if user:
user_id = user.user_id
elements.append((url_for('org_contest_user', contest_id=ct_id, user_id=user_id), user.full_name()))
if table:
if contest:
elements.append((url_for('org_contest_list', id=ct_id, site=site_id), "Seznam účastníků"))
else:
elements.append((url_for('org_round_list', id=round_id), "Seznam účastníků"))
if action:
elements.append(('', action))
return Markup(
"\n".join([f"<li><a href='{url}'>{name}</a>" for url, name in elements[:-1]])
+ "<li>" + elements[-1][1]
)
@app.route('/org/contest/c/<int:id>')
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/')
def org_contest(id: int, site_id: Optional[int] = None):
@app.route('/org/contest/c/<int:ct_id>/')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/')
def org_contest(ct_id: int, site_id: Optional[int] = None):
sess = db.get_session()
contest, master_contest, site, rr = get_contest_site_rr(id, site_id, None)
round = contest.round
tasks_subq = sess.query(db.Task.task_id).filter_by(round=contest.round)
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=master_contest)
if site:
pions_subq = pions_subq.filter_by(place=site)
ctx = get_context(ct_id=ct_id, site_id=site_id)
contest = ctx.contest
assert contest
rights = ctx.rights
tasks_subq = sess.query(db.Task.task_id).filter_by(round=ctx.round)
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=ctx.master_contest)
if ctx.site:
pions_subq = pions_subq.filter_by(place=ctx.site)
sol_counts_q = (
sess.query(db.Solution.task_id, func.count(db.Solution.task_id))
.filter(
@@ -319,7 +379,7 @@ def org_contest(id: int, site_id: Optional[int] = None):
for task_id, count in sol_counts_q.group_by(db.Solution.task_id).all():
sol_counts[task_id] = count
tasks = sess.query(db.Task).filter_by(round=round).all()
tasks = sess.query(db.Task).filter_by(round=ctx.round).all()
tasks.sort(key=lambda t: t.code)
for task in tasks:
task.sol_count = sol_counts[task.task_id] if task.task_id in sol_counts else 0
@@ -330,7 +390,7 @@ def org_contest(id: int, site_id: Optional[int] = None):
sess.query(db.Place, func.count('*'))
.select_from(db.Participation).join(db.Place)
.group_by(db.Place)
.filter(db.Participation.contest == master_contest).all()
.filter(db.Participation.contest == ctx.master_contest).all()
)
group_contests = contest.get_group_contests(True)
@@ -338,27 +398,50 @@ def org_contest(id: int, site_id: Optional[int] = None):
return render_template(
'org_contest.html',
contest=contest, group_contests=group_contests, site=site,
rights=sorted(rr.rights, key=lambda r: r.name),
roles=[r.friendly_name() for r in rr.get_roles()],
can_manage=rr.have_right(Right.manage_contest),
can_upload=rr.can_upload_feedback(),
can_edit_points=rr.can_edit_points(),
can_create_solutions=rr.can_upload_feedback() or rr.can_upload_solutions(),
can_view_statement=rr.can_view_statement(),
ctx=ctx, rights=rights,
round=ctx.round, contest=contest, site=ctx.site,
group_contests=group_contests,
rights_list=sorted(rights.rights, key=lambda r: r.name),
roles=[r.friendly_name() for r in rights.get_roles()],
tasks=tasks, places_counts=places_counts,
)
def generic_import(round: db.Round, master_round: db.Round, contest: Optional[db.Contest], master_contest: Optional[db.Contest]):
"""Společná funkce pro importování do soutěží a kol"""
@app.route('/doc/import')
def doc_import():
return render_template('doc_import.html')
class ImportForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
typ = wtforms.SelectField(
"Typ dat",
choices=[(x.name, x.friendly_name()) for x in (ImportType.participants, ImportType.proctors, ImportType.judges)],
coerce=ImportType.coerce,
default=ImportType.participants,
)
fmt = wtforms.SelectField(
"Formát souboru",
choices=FileFormat.choices(), coerce=FileFormat.coerce,
default=FileFormat.cs_csv,
)
submit = wtforms.SubmitField('Importovat')
get_template = wtforms.SubmitField('Stáhnout šablonu')
@app.route('/org/contest/c/<int:ct_id>/import', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/import', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/import', methods=('GET', 'POST'))
def org_generic_import(round_id: Optional[int] = None, hier_id: Optional[int] = None, ct_id: Optional[int] = None):
ctx = get_context(round_id=round_id, hier_id=hier_id, ct_id=ct_id, right_needed=Right.manage_contest)
round, contest = ctx.master_round, ctx.master_contest
form = ImportForm()
errs = []
warnings = []
if form.validate_on_submit():
fmt = form.fmt.data
imp = create_import(user=g.user, type=form.typ.data, fmt=fmt, round=master_round, contest=master_contest)
imp = create_import(user=g.user, type=form.typ.data, fmt=fmt, round=round, contest=contest, only_region=ctx.hier_place)
if form.submit.data:
if form.file.data is not None:
file = form.file.data.stream
@@ -369,10 +452,7 @@ def generic_import(round: db.Round, master_round: db.Round, contest: Optional[db
flash('Soubor neobsahoval žádné řádky s daty', 'danger')
else:
flash(f'Importováno ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_participations} účastí, {imp.cnt_new_roles} rolí)', 'success')
if contest is not None:
return redirect(url_for('org_contest', id=contest.contest_id))
else:
return redirect(url_for('org_round', id=round.round_id))
return redirect(ctx.url_home())
else:
errs = imp.errors
warnings = imp.warnings
@@ -387,6 +467,7 @@ def generic_import(round: db.Round, master_round: db.Round, contest: Optional[db
return render_template(
'org_generic_import.html',
ctx=ctx,
contest=contest,
round=round,
form=form,
@@ -395,28 +476,21 @@ def generic_import(round: db.Round, master_round: db.Round, contest: Optional[db
)
@app.route('/doc/import')
def doc_import():
return render_template('doc_import.html')
@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
def org_contest_import(id: int):
contest, master_contest, rr = get_contest_rr(id, Right.manage_contest)
return generic_import(
round=contest.round, master_round=master_contest.round,
contest=contest, master_contest=master_contest
)
# URL je explicitně uvedeno v mo.email.contestant_list_url
@app.route('/org/contest/c/<int:id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/participants/emails', endpoint="org_contest_list_emails")
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/participants/emails', endpoint="org_contest_list_emails")
def org_contest_list(id: int, site_id: Optional[int] = None):
contest, master_contest, site, rr = get_contest_site_rr(id, site_id, Right.view_contestants)
can_edit = rr.have_right(Right.manage_contest) and request.endpoint != 'org_contest_list_emails'
@app.route('/org/contest/c/<int:ct_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/participants/emails', endpoint="org_generic_list_emails")
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/participants/emails', endpoint="org_generic_list_emails")
@app.route('/org/contest/r/<int:round_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/participants/emails', endpoint="org_generic_list_emails")
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/participants/emails', endpoint="org_generic_list_emails")
def org_generic_list(round_id: Optional[int] = None, hier_id: Optional[int] = None,
ct_id: Optional[int] = None, site_id: Optional[int] = None):
ctx = get_context(round_id=round_id, hier_id=hier_id, ct_id=ct_id, site_id=site_id, right_needed=Right.view_contestants)
round, contest = ctx.master_round, ctx.master_contest
rr = ctx.rights
can_edit = rr.have_right(Right.manage_contest) and request.endpoint != 'org_generic_list_emails'
format = request.args.get('format', "")
filter = ParticipantsFilterForm(formdata=request.args)
@@ -424,7 +498,7 @@ def org_contest_list(id: int, site_id: Optional[int] = None):
filter.validate()
query = get_contestants_query(
round=master_contest.round, contest=master_contest, site=site,
ctx=ctx,
school=filter.school.place,
contest_place=filter.contest_place.place,
participation_place=filter.participation_place.place,
@@ -434,7 +508,7 @@ def org_contest_list(id: int, site_id: Optional[int] = None):
action_form = None
if can_edit:
action_form = ParticipantsActionForm()
if action_form.do_action(round=contest.round, query=query):
if action_form.do_action(round=round, query=query):
# Action happened, redirect
return redirect(request.url)
@@ -442,23 +516,26 @@ def org_contest_list(id: int, site_id: Optional[int] = None):
table = None
emails = None
mailto_link = None
if request.endpoint == 'org_contest_list_emails':
(emails, mailto_link) = get_contestant_emails(query,
mailto_subject=f'{contest.round.name} {contest.round.category} {contest.place.name_locative()}')
if request.endpoint == 'org_generic_list_emails':
if contest:
subj = f'{contest.round.name} {contest.round.category} {contest.place.name_locative()}'
else:
subj = f'{round.name} kategorie {round.category}'
(emails, mailto_link) = get_contestant_emails(query, mailto_subject=subj)
count = len(emails)
else:
# (count, query) = filter.apply_limits(query, pagesize=50)
count = db.get_count(query)
table = make_contestant_table(query, master_contest.round, add_checkbox=can_edit)
(count, query) = filter.apply_limits(query, pagesize=50)
table = make_contestant_table(query, round, add_contest_column=(contest is None), add_checkbox=can_edit)
return render_template(
'org_contest_list.html',
contest=contest, site=site,
'org_generic_list.html',
ctx=ctx,
contest=contest, round=round, site=ctx.site,
table=table, emails=emails, mailto_link=mailto_link,
filter=filter, count=count, action_form=action_form,
)
else:
table = make_contestant_table(query, master_contest.round, is_export=True)
table = make_contestant_table(query, round, is_export=True)
return table.send_as(format)
@@ -476,8 +553,7 @@ contest_list_columns = (
def get_contestants_query(
round: db.Round, contest: Optional[db.Contest] = None,
site: Optional[db.Place] = None,
ctx: Context,
contest_place: Optional[db.Place] = None,
participation_place: Optional[db.Place] = None,
participation_state: Optional[db.PartState] = None,
@@ -487,15 +563,18 @@ def get_contestants_query(
.select_from(db.Participation)
.join(db.Participant, db.Participant.user_id == db.Participation.user_id)
.join(db.User, db.User.user_id == db.Participation.user_id)
.filter(db.Participant.year == round.year))
if contest:
query = query.join(db.Contest, db.Contest.contest_id == contest.contest_id)
.join(db.Contest)
.filter(db.Participant.year == ctx.round.year)
.filter(db.Participation.contest_id == db.Contest.contest_id))
if ctx.contest:
query = query.filter(db.Contest.contest_id == ctx.contest.contest_id)
if ctx.site:
query = query.filter(db.Participation.place_id == ctx.site.place_id)
else:
query = query.filter(db.Contest.round == round)
query = query.filter(db.Contest.round == ctx.round)
if ctx.hier_place:
query = db.filter_place_nth_parent(query, db.Contest.place_id, ctx.round.level - ctx.hier_place.level, ctx.hier_place.place_id)
query = query.options(joinedload(db.Contest.place))
query = query.filter(db.Participation.contest_id == db.Contest.contest_id)
if site:
query = query.filter(db.Participation.place_id == site.place_id)
if contest_place:
query = query.filter(db.Contest.place_id == contest_place.place_id)
if participation_place:
@@ -570,91 +649,6 @@ def get_contestant_emails(query: Query, mailto_subject: str = '[OSMO] Zpráva pr
return (emails, mailto_link)
@dataclass
class SolutionContext:
contest: db.Contest
master_contest: db.Contest
round: db.Round
master_round: db.Round
pion: Optional[db.Participation]
user: Optional[db.User]
task: Optional[db.Task]
site: Optional[db.Place]
allow_view: bool
allow_upload_solutions: bool
allow_upload_feedback: bool
allow_create_solutions: bool
allow_edit_points: bool
def get_solution_context(contest_id: int, user_id: Optional[int], task_id: Optional[int], site_id: Optional[int]) -> SolutionContext:
sess = db.get_session()
# Nejprve zjistíme, zda existuje soutěž
contest, master_contest = get_contest(contest_id)
round = contest.round
master_round = master_contest.round
# Najdeme úlohu a ověříme, že je součástí soutěže
if task_id is not None:
task = sess.query(db.Task).get(task_id)
if not task or task.round != round:
raise werkzeug.exceptions.NotFound()
else:
task = None
site = None
user = None
if user_id is not None:
# Zkontrolujeme, zda se účastník opravdu účastní soutěže
pion = (sess.query(db.Participation)
.filter_by(user_id=user_id, contest_id=master_contest.contest_id)
.options(joinedload(db.Participation.place),
joinedload(db.Participation.user))
.one_or_none())
if not pion:
raise werkzeug.exceptions.NotFound()
user = pion.user
# A zda soutěží na zadaném soutěžním místě, je-li určeno
if site_id is not None and site_id != pion.place_id:
raise werkzeug.exceptions.NotFound()
# Pokud je uvedeno soutěžní místo, hledáme práva k němu, jinak k soutěži
if site_id is not None:
site = pion.place
else:
pion = None
if site_id is not None:
site = sess.query(db.Place).get(site_id)
if not site:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for_contest(contest, site)
allow_view = rr.have_right(Right.view_submits)
if not allow_view:
raise werkzeug.exceptions.Forbidden()
allow_upload_solutions = rr.can_upload_solutions()
allow_upload_feedback = rr.can_upload_feedback()
return SolutionContext(
contest=contest, master_contest=master_contest,
round=round, master_round=master_round,
pion=pion,
user=user,
task=task,
site=site,
# XXX: Potřebujeme tohle všechno? Nechceme spíš vracet rr a nechat každého, ať na něm volá metody?
allow_view=allow_view,
allow_upload_solutions=allow_upload_solutions,
allow_upload_feedback=allow_upload_feedback,
allow_create_solutions=allow_upload_solutions or allow_upload_feedback,
allow_edit_points=rr.can_edit_points(),
)
class SubmitForm(FlaskForm):
note = wtforms.TextAreaField("Poznámka pro účastníka", description="Viditelná účastníkovi po uzavření kola", render_kw={'autofocus': True})
org_note = wtforms.TextAreaField("Interní poznámka", description="Viditelná jen organizátorům")
@@ -674,15 +668,15 @@ class SetFinalForm(FlaskForm):
submit_final = wtforms.SubmitField("Prohlásit za finální")
@app.route('/org/contest/c/<int:contest_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, user_id, task_id, site_id)
assert sc.user is not None
assert sc.task is not None
@app.route('/org/contest/c/<int:ct_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
def org_submit_list(ct_id: int, user_id: int, task_id: int, site_id: Optional[int] = None):
ctx = get_context(ct_id=ct_id, site_id=site_id, user_id=user_id, task_id=task_id, right_needed=Right.view_submits)
assert ctx.contest and ctx.user and ctx.task
rights = ctx.rights
sess = db.get_session()
self_url = url_for('org_submit_list', contest_id=contest_id, user_id=user_id, task_id=task_id, site_id=site_id)
self_url = ctx.url_for('org_submit_list')
# Najdeme řešení úlohy (nemusí existovat)
sol = (sess.query(db.Solution)
@@ -690,7 +684,7 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
.one_or_none())
set_final_form: Optional[SetFinalForm] = None
if sol and sc.allow_upload_feedback:
if sol and rights.can_upload_feedback():
set_final_form = SetFinalForm()
if set_final_form.validate_on_submit() and set_final_form.submit_final.data:
is_submit = set_final_form.type.data == "submit"
@@ -737,7 +731,7 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
return redirect(self_url)
form = SubmitForm(obj=sol)
form.points.widget = NumberInput(min=0, max=sc.task.max_points, step=sc.master_round.points_step) # min a max v HTML
form.points.widget = NumberInput(min=0, max=ctx.task.max_points, step=ctx.master_round.points_step) # min a max v HTML
if form.validate_on_submit():
if sol and form.delete.data:
if sol.final_submit or sol.final_feedback:
@@ -747,14 +741,14 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
sess.delete(sol)
mo.util.log(
type=db.LogType.participant,
what=sc.user.user_id,
what=ctx.user.user_id,
details={
'action': 'solution-removed',
'task': task_id,
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {sc.user.user_id} smazáno")
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {ctx.user.user_id} smazáno")
return redirect(self_url)
points = form.points.data
@@ -763,28 +757,28 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
flash('Schází soubor k nahrání, žádné změny nebyly uloženy', 'danger')
return redirect(self_url)
if points:
error = mo.util.check_points(points, for_task=sc.task, for_round=sc.round)
error = mo.util.check_points(points, for_task=ctx.task, for_round=ctx.round)
if error:
flash(error, 'danger')
return redirect(self_url)
if not sol and (sc.allow_edit_points or sc.allow_upload_solutions or sc.allow_upload_feedback):
if not sol and (rights.can_edit_points() or rights.can_upload_solutions() or rights.can_upload_feedback()):
flash('Řešení založeno', 'success')
sol = db.Solution(task=sc.task, user=sc.user)
sol = db.Solution(task=ctx.task, user=ctx.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
what=sc.user.user_id,
what=ctx.user.user_id,
details={
'action': 'solution-created',
'task': task_id,
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {sc.user.user_id} založeno")
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {ctx.user.user_id} založeno")
# Edit sol and points
if sol and sc.allow_edit_points:
if sol and rights.can_edit_points():
# Sol edit
sol.note = form.note.data
sol.org_note = form.org_note.data
@@ -794,7 +788,7 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
if points != sol.points:
sol.points = points
sess.add(db.PointsHistory(
task=sc.task,
task=ctx.task,
participant=sol.user,
user=g.user,
points_at=mo.now,
@@ -806,7 +800,7 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
changes = db.get_object_changes(sol)
mo.util.log(
type=db.LogType.participant,
what=sc.user.user_id,
what=ctx.user.user_id,
details={
'action': 'solution-edit',
'task': task_id,
@@ -814,22 +808,22 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {sc.user.user_id} modifikováno, změny: {changes}")
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {ctx.user.user_id} modifikováno, změny: {changes}")
if (form.submit_sol.data and sc.allow_upload_solutions) or (form.submit_fb.data and sc.allow_upload_feedback):
if (form.submit_sol.data and rights.can_upload_solutions()) or (form.submit_fb.data and rights.can_upload_feedback()):
file = form.file.data.stream
if sc.allow_upload_solutions and form.submit_sol.data:
if rights.can_upload_solutions() and form.submit_sol.data:
type = db.PaperType.solution
elif sc.allow_upload_feedback and form.submit_fb.data:
elif rights.can_upload_feedback() and form.submit_fb.data:
type = db.PaperType.feedback
else:
raise werkzeug.exceptions.Forbidden()
assert sc.task is not None and sc.user is not None
paper = db.Paper(task=sc.task, for_user_obj=sc.user, uploaded_by_obj=g.user, type=type, note=form.file_note.data)
assert ctx.task is not None and ctx.user is not None
paper = db.Paper(task=ctx.task, for_user_obj=ctx.user, uploaded_by_obj=g.user, type=type, note=form.file_note.data)
submitter = mo.submit.Submitter()
self_url = url_for('org_submit_list', contest_id=contest_id, user_id=user_id, task_id=task_id, site_id=site_id)
self_url = url_for('org_submit_list', ct_id=ct_id, user_id=user_id, task_id=task_id, site_id=site_id)
try:
submitter.submit_paper(paper, file.name)
@@ -859,7 +853,7 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
return redirect(self_url)
papers = (sess.query(db.Paper)
.filter_by(for_user_obj=sc.user, task=sc.task)
.filter_by(for_user_obj=ctx.user, task=ctx.task)
.options(joinedload(db.Paper.uploaded_by_obj))
.order_by(db.Paper.uploaded_at.desc())
.all())
@@ -868,21 +862,21 @@ def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Option
fb_papers = [p for p in papers if p.type == db.PaperType.feedback]
points_history = (sess.query(db.PointsHistory)
.filter_by(task=sc.task, participant=sc.user)
.filter_by(task=ctx.task, participant=ctx.user)
.options(joinedload(db.PointsHistory.user))
.order_by(db.PointsHistory.points_at.desc())
.all())
return render_template(
'org_submit_list.html',
sc=sc,
ctx=ctx, rights=rights,
solution=sol,
sol_papers=sol_papers,
fb_papers=fb_papers,
points_history=points_history,
for_site=(site_id is not None),
paper_link=lambda p: mo.web.util.org_paper_link(sc.contest, sc.site, sc.user, p),
orig_paper_link=lambda p: mo.web.util.org_paper_link(sc.contest, sc.site, sc.user, p, orig=True),
paper_link=lambda p: mo.web.util.org_paper_link(ctx.contest, ctx.site, ctx.user, p),
orig_paper_link=lambda p: mo.web.util.org_paper_link(ctx.contest, ctx.site, ctx.user, p, orig=True),
form=form,
set_final_form=set_final_form,
)
@@ -894,11 +888,11 @@ class SubmitEditForm(FlaskForm):
submit = wtforms.SubmitField("Uložit")
@app.route('/org/contest/c/<int:contest_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:contest_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
def org_submit_paper(contest_id: int, paper_id: int, filename: str, site_id: Optional[int] = None):
@app.route('/org/contest/c/<int:ct_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:ct_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
def org_submit_paper(ct_id: int, paper_id: int, filename: str, site_id: Optional[int] = None):
paper = (db.get_session().query(db.Paper)
.options(joinedload(db.Paper.task)) # pro task_paper_filename()
.get(paper_id))
@@ -908,7 +902,7 @@ def org_submit_paper(contest_id: int, paper_id: int, filename: str, site_id: Opt
if not filename.endswith('.pdf'):
raise werkzeug.exceptions.NotFound()
get_solution_context(contest_id, paper.for_user, paper.for_task, site_id)
get_context(ct_id=ct_id, user_id=paper.for_user, task_id=paper.for_task, site_id=site_id, right_needed=Right.view_submits)
return mo.web.util.send_task_paper(paper, (request.endpoint == 'org_submit_paper_orig'))
@@ -941,25 +935,25 @@ class TaskCreateForm(FlaskForm):
submit = wtforms.SubmitField("Založit označená řešení")
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/points', methods=('GET', 'POST'), endpoint="org_contest_task_points")
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, None, task_id, site_id)
assert sc.task is not None
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/points', methods=('GET', 'POST'), endpoint="org_contest_task_points")
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
def org_contest_task(ct_id: int, task_id: int, site_id: Optional[int] = None):
ctx = get_context(ct_id=ct_id, site_id=site_id, task_id=task_id, right_needed=Right.view_submits)
assert ctx.contest and ctx.task
action_create = request.endpoint == "org_contest_task_create"
action_points = request.endpoint == "org_contest_task_points"
if action_create and not sc.allow_create_solutions:
if action_create and not ctx.rights.can_create_solutions():
raise werkzeug.exceptions.Forbidden()
if action_points and not sc.allow_edit_points:
if action_points and not ctx.rights.can_edit_points():
raise werkzeug.exceptions.Forbidden()
sess = db.get_session()
q = get_solutions_query(sc.task, for_contest=sc.master_contest, for_site=sc.site)
q = get_solutions_query(ctx.task, for_contest=ctx.master_contest, for_site=ctx.site)
rows: List[Tuple[db.Participation, db.Solution]] = q.all()
rows.sort(key=lambda r: r[0].user.sort_key())
@@ -976,7 +970,7 @@ def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = Non
if not request.form.get(f"create_sol_{pion.user_id}"):
continue # nikdo nežádá o vytvoření
sol = db.Solution(task=sc.task, user=pion.user)
sol = db.Solution(task=ctx.task, user=pion.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
@@ -986,7 +980,7 @@ def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = Non
'task': task_id,
},
)
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {pion.user_id} založeno")
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {pion.user_id} založeno")
new_sol_count += 1
if new_sol_count > 0:
@@ -996,7 +990,7 @@ def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = Non
"success")
else:
flash("Žádné změny k uložení", "info")
return redirect(url_for('org_contest_task', contest_id=contest_id, task_id=task_id, site_id=site_id))
return redirect(ctx.url_for('org_contest_task'))
if action_points:
points_form = TaskPointsForm()
@@ -1007,7 +1001,7 @@ def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = Non
if sol is None:
continue
points, error = mo.util.parse_points(request.form.get(f"points_{sol.user_id}"), for_task=sc.task, for_round=sc.round)
points, error = mo.util.parse_points(request.form.get(f"points_{sol.user_id}"), for_task=ctx.task, for_round=ctx.round)
if error:
flash(f'{sol.user.first_name} {sol.user.last_name}: {error}', 'danger')
ok = False
@@ -1016,7 +1010,7 @@ def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = Non
# Save points
sol.points = points
sess.add(db.PointsHistory(
task=sc.task,
task=ctx.task,
participant=sol.user,
user=g.user,
points_at=mo.now,
@@ -1029,13 +1023,13 @@ def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = Non
flash("Změněny body u " + inflect_number(count, "řešení", "řešení", "řešení"), "success")
else:
flash("Žádné změny k uložení", "info")
return redirect(url_for('org_contest_task', contest_id=contest_id, task_id=task_id))
return redirect(ctx.url_for('org_contest_task'))
# Count papers for each solution
paper_counts = {}
for user_id, type, count in (
db.get_session().query(db.Paper.for_user, db.Paper.type, func.count(db.Paper.type))
.filter_by(task=sc.task)
.filter_by(task=ctx.task)
.group_by(db.Paper.for_user, db.Paper.type)
.all()
):
@@ -1043,8 +1037,10 @@ def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = Non
return render_template(
"org_contest_task.html",
sc=sc, rows=rows, paper_counts=paper_counts,
paper_link=lambda u, p: mo.web.util.org_paper_link(sc.contest, sc.site, u, p),
ctx=ctx, rights=ctx.rights,
round=ctx.round, contest=ctx.contest,
rows=rows, paper_counts=paper_counts,
paper_link=lambda u, p: mo.web.util.org_paper_link(ctx.contest, ctx.site, u, p),
points_form=points_form, create_form=create_form, request_form=request.form,
)
@@ -1053,33 +1049,34 @@ class ContestSolutionsEditForm(FlaskForm):
submit = wtforms.SubmitField("Založit označená řešení")
@app.route('/org/contest/c/<int:id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
def org_contest_solutions(id: int, site_id: Optional[int] = None):
sc = get_solution_context(id, None, None, site_id)
@app.route('/org/contest/c/<int:ct_id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
def org_contest_solutions(ct_id: int, site_id: Optional[int] = None):
sess = db.get_session()
ctx = get_context(ct_id=ct_id, site_id=site_id, right_needed=Right.view_submits)
assert ctx.contest
edit_action = request.endpoint == "org_contest_solutions_edit"
if edit_action and not sc.allow_create_solutions:
if edit_action and not ctx.rights.can_create_solutions():
raise werkzeug.exceptions.Forbidden()
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=sc.master_contest)
if sc.site:
pions_subq = pions_subq.filter_by(place=sc.site)
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=ctx.master_contest)
if ctx.site:
pions_subq = pions_subq.filter_by(place=ctx.site)
pions_subq = pions_subq.subquery()
pions = (sess.query(db.Participation)
.filter(
db.Participation.contest == sc.master_contest,
db.Participation.contest == ctx.master_contest,
db.Participation.user_id.in_(pions_subq),
).options(joinedload(db.Participation.user))
.all())
pions.sort(key=lambda p: p.user.sort_key())
tasks_subq = sess.query(db.Task.task_id).filter_by(round=sc.round).subquery()
tasks_subq = sess.query(db.Task.task_id).filter_by(round=ctx.round).subquery()
tasks = (sess.query(db.Task)
.filter_by(round=sc.round)
.filter_by(round=ctx.round)
.order_by(db.Task.code)
.all())
@@ -1139,13 +1136,14 @@ def org_contest_solutions(id: int, site_id: Optional[int] = None):
"success")
else:
flash("Žádné změny k uložení", "info")
return redirect(url_for('org_contest_solutions', id=id, site_id=site_id))
return redirect(ctx.url_for('org_contest_solutions'))
return render_template(
'org_contest_solutions.html',
contest=sc.contest, site=sc.site, sc=sc,
ctx=ctx,
contest=ctx.contest, site=ctx.site, rights=ctx.rights,
pions=pions, tasks=tasks, tasks_sols=task_sols, paper_counts=paper_counts,
paper_link=lambda u, p: mo.web.util.org_paper_link(sc.contest, sc.site, u, p),
paper_link=lambda u, p: mo.web.util.org_paper_link(ctx.contest, ctx.site, u, p),
edit_form=edit_form,
)
@@ -1200,23 +1198,30 @@ def download_submits(form: DownloadSubmitsForm, round: db.Round, sol_query, pion
return True
def generic_batch_download(round: db.Round, contest: Optional[db.Contest], site: Optional[db.Place], task: db.Task):
"""Společná funkce pro download submitů/feedbacku do soutěží a kol."""
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
def org_generic_batch_download(task_id: int, round_id: Optional[int] = None, hier_id: Optional[int] = None, ct_id: Optional[int] = None, site_id: Optional[int] = None):
sess = db.get_session()
ctx = get_context(round_id=round_id, hier_id=hier_id, ct_id=ct_id, site_id=site_id, task_id=task_id, right_needed=Right.view_submits)
round, hier_place, contest, site, task = ctx.round, ctx.hier_place, ctx.contest, ctx.site, ctx.task
assert task
pion_query = sess.query(db.Participation.user_id).select_from(db.Participation)
if contest is not None:
if contest:
pion_query = pion_query.filter_by(contest_id=contest.master_contest_id)
if site is not None:
if site:
pion_query = pion_query.filter_by(place=site)
else:
pion_query = pion_query.join(db.Contest).filter(db.Contest.round_id == round.master_round_id)
if hier_place:
pion_query = db.filter_place_nth_parent(pion_query, db.Contest.place_id, round.level - hier_place.level, hier_place.place_id)
sol_query = (sess.query(db.Solution)
.select_from(db.Solution)
.filter(db.Solution.task == task))
if contest is not None:
if contest or hier_place:
sol_query = sol_query.filter(db.Solution.user_id.in_(pion_query.subquery()))
form = DownloadSubmitsForm()
@@ -1226,6 +1231,8 @@ def generic_batch_download(round: db.Round, contest: Optional[db.Contest], site:
subj = f'{subj} ({site.name})'
elif contest is not None:
subj = f'{subj} ({contest.place.name})'
elif hier_place is not None:
subj = f'{subj} ({hier_place.name})'
if download_submits(form, round, sol_query, pion_query, subj, contest is None):
return redirect(url_for('org_jobs'))
@@ -1244,7 +1251,8 @@ def generic_batch_download(round: db.Round, contest: Optional[db.Contest], site:
return render_template(
'org_generic_batch_download.html',
round=round, contest=contest, site=site, task=task,
ctx=ctx,
task=task,
submit_count=submit_count,
pion_count=pion_count,
sol_count=sol_count, fb_count=fb_count,
@@ -1258,13 +1266,18 @@ class UploadSubmitsForm(FlaskForm):
submit = wtforms.SubmitField('Odeslat')
def generic_batch_upload(round: db.Round, contest: Optional[db.Contest], site: Optional[db.Place], task: db.Task,
offer_upload_solutions: bool, offer_upload_feedback: bool):
"""Společná funkce pro upload feedbacku do soutěží a kol."""
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
def org_generic_batch_upload(task_id: int, round_id: Optional[int] = None, hier_id: Optional[int] = None, ct_id: Optional[int] = None, site_id: Optional[int] = None):
ctx = get_context(round_id=round_id, hier_id=hier_id, ct_id=ct_id, site_id=site_id, task_id=task_id)
round, hier_place, contest, site, task = ctx.round, ctx.hier_place, ctx.contest, ctx.site, ctx.task
assert task
# Základní kontrola, zda vůbec chceme akci spustit.
# Základní kontrola, zda vůbec chceme akci spustit. Zbytek se kontrole uvnitř jobu.
# Zatím neumíme dávkově nahrávat řešení.
if not offer_upload_feedback:
if not ctx.rights.offer_upload_feedback():
raise werkzeug.exceptions.Forbidden()
request.custom_max_content_length = mo.config.MAX_BATCH_CONTENT_LENGTH
@@ -1274,35 +1287,18 @@ def generic_batch_upload(round: db.Round, contest: Optional[db.Contest], site: O
file = form.file.data.stream
mo.jobs.submit.schedule_upload_feedback(round, file.name, f'Nahrání opravených řešení {round.round_code()}',
for_user=g.user,
only_contest=contest, only_site=site, only_task=task)
only_contest=contest, only_site=site, only_region=hier_place, only_task=task)
return redirect(url_for('org_jobs'))
return render_template(
'org_generic_batch_upload.html',
round=round, contest=contest, site=site, task=task,
ctx=ctx,
task=task,
max_size=mo.config.MAX_BATCH_CONTENT_LENGTH,
form=form,
)
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
def org_contest_task_download(contest_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, None, task_id, site_id)
assert sc.task is not None
return generic_batch_download(round=sc.round, contest=sc.contest, site=sc.site, task=sc.task)
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
def org_contest_task_upload(contest_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, None, task_id, site_id)
assert sc.task is not None
return generic_batch_upload(round=sc.round, contest=sc.contest, site=sc.site, task=sc.task,
offer_upload_solutions=sc.allow_upload_solutions,
offer_upload_feedback=sc.allow_upload_feedback)
class BatchPointsForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
fmt = wtforms.SelectField(
@@ -1315,15 +1311,22 @@ class BatchPointsForm(FlaskForm):
get_template = wtforms.SubmitField('Stáhnout šablonu')
def generic_batch_points(round: db.Round, contest: Optional[db.Contest], task: db.Task):
"""Společná funkce pro download/upload bodů do soutěží a kol."""
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
def org_generic_batch_points(task_id: int, round_id: Optional[int] = None, hier_id: Optional[int] = None, ct_id: Optional[int] = None):
ctx = get_context(round_id=round_id, hier_id=hier_id, ct_id=ct_id, task_id=task_id)
round, hier_place, contest, task = ctx.round, ctx.hier_place, ctx.contest, ctx.task
if not ctx.rights.can_edit_points():
raise werkzeug.exceptions.Forbidden()
form = BatchPointsForm()
errs = []
warnings = []
if form.validate_on_submit():
fmt = form.fmt.data
imp = create_import(user=g.user, type=ImportType.points, fmt=fmt, round=round, contest=contest, task=task, allow_add_del=form.add_del_sols.data)
imp = create_import(user=g.user, type=ImportType.points, fmt=fmt, round=round, only_region=hier_place, contest=contest, task=task, allow_add_del=form.add_del_sols.data)
if form.submit.data:
if form.file.data is not None:
file = form.file.data.stream
@@ -1334,10 +1337,7 @@ def generic_batch_points(round: db.Round, contest: Optional[db.Contest], task: d
flash('Soubor neobsahoval žádné řádky s daty', 'danger')
else:
flash(f'Importováno ({imp.cnt_rows} řádků, {imp.cnt_set_points} řešení přebodováno, {imp.cnt_add_sols} založeno a {imp.cnt_del_sols} smazáno)', 'success')
if contest is not None:
return redirect(url_for('org_contest', id=contest.contest_id))
else:
return redirect(url_for('org_round', id=round.round_id))
return redirect(ctx.url_home())
else:
errs = imp.errors
warnings = imp.warnings
@@ -1352,6 +1352,7 @@ def generic_batch_points(round: db.Round, contest: Optional[db.Contest], task: d
return render_template(
'org_generic_batch_points.html',
ctx=ctx,
round=round, contest=contest, task=task,
form=form,
errs=errs,
@@ -1359,37 +1360,27 @@ def generic_batch_points(round: db.Round, contest: Optional[db.Contest], task: d
)
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
def org_contest_task_batch_points(contest_id: int, task_id: int):
sc = get_solution_context(contest_id, None, task_id, None)
assert sc.task is not None
if not sc.allow_edit_points:
raise werkzeug.exceptions.Forbidden()
return generic_batch_points(round=sc.round, contest=sc.contest, task=sc.task)
@app.route('/org/contest/c/<int:contest_id>/user/<int:user_id>')
def org_contest_user(contest_id: int, user_id: int):
sc = get_solution_context(contest_id, user_id, None, None)
@app.route('/org/contest/c/<int:ct_id>/user/<int:user_id>')
def org_contest_user(ct_id: int, user_id: int):
ctx = get_context(ct_id=ct_id, user_id=user_id, right_needed=Right.view_contestants)
assert ctx.contest
sess = db.get_session()
pant = sess.query(db.Participant).filter_by(
user_id=user_id, year=sc.round.year
user_id=user_id, year=ctx.round.year
).options(joinedload(db.Participant.school_place)).one_or_none()
if not pant:
raise werkzeug.exceptions.NotFound()
task_sols = sess.query(db.Task, db.Solution).select_from(db.Task).outerjoin(
db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == sc.user)
).filter(db.Task.round == sc.round).options(
db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == ctx.user)
).filter(db.Task.round == ctx.round).options(
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)
).order_by(db.Task.code).all()
# Count papers for each task and solution
tasks_subq = sess.query(db.Task.task_id).filter_by(round=sc.round).subquery()
tasks_subq = sess.query(db.Task.task_id).filter_by(round=ctx.round).subquery()
paper_counts = {}
for task_id, type, count in (
db.get_session().query(db.Paper.for_task, db.Paper.type, func.count(db.Paper.type))
@@ -1403,8 +1394,9 @@ def org_contest_user(contest_id: int, user_id: int):
return render_template(
'org_contest_user.html',
sc=sc, pant=pant, task_sols=task_sols,
paper_link=lambda u, p: mo.web.util.org_paper_link(sc.contest, None, u, p),
ctx=ctx, rights=ctx.rights,
pant=pant, task_sols=task_sols,
paper_link=lambda u, p: mo.web.util.org_paper_link(ctx.contest, None, u, p),
paper_counts=paper_counts,
)
@@ -1420,16 +1412,18 @@ class AdvanceForm(FlaskForm):
execute = wtforms.SubmitField('Provést')
@app.route('/org/contest/c/<int:contest_id>/advance', methods=('GET', 'POST'))
def org_contest_advance(contest_id: int):
@app.route('/org/contest/c/<int:ct_id>/advance', methods=('GET', 'POST'))
def org_contest_advance(ct_id: int):
sess = db.get_session()
conn = sess.connection()
contest, master_contest, rr = get_contest_rr(contest_id, Right.manage_contest)
ctx = get_context(ct_id=ct_id, right_needed=Right.manage_contest)
contest, master_contest = ctx.contest, ctx.master_contest
round = ctx.round
assert contest and master_contest
def redirect_back():
return redirect(url_for('org_contest', id=contest_id))
return redirect(ctx.url_for('org_contest'))
round = contest.round
if round.state != db.RoundState.preparing:
flash('Aktuální kolo není ve stavu přípravy', 'danger')
return redirect_back()
@@ -1519,7 +1513,7 @@ def org_contest_advance(contest_id: int):
if inserted:
# Opravdu došlo ke vložení
really_inserted += 1
app.logger.info(f'Postup: Založena účast user=#{pp.user_id} contest=#{contest_id} place=#{contest.place_id}')
app.logger.info(f'Postup: Založena účast user=#{pp.user_id} contest=#{ct_id} place=#{contest.place_id}')
mo.util.log(
type=db.LogType.participant,
what=pp.user_id,
@@ -1544,6 +1538,7 @@ def org_contest_advance(contest_id: int):
return render_template(
'org_contest_advance.html',
ctx=ctx,
contest=contest,
round=contest.round,
prev_round=prev_round,
@@ -1561,11 +1556,13 @@ class ContestEditForm(FlaskForm):
submit = wtforms.SubmitField('Uložit')
@app.route('/org/contest/c/<int:id>/edit', methods=('GET', 'POST'))
def org_contest_edit(id: int):
@app.route('/org/contest/c/<int:ct_id>/edit', methods=('GET', 'POST'))
def org_contest_edit(ct_id: int):
sess = db.get_session()
contest, _, rr = get_contest_rr(id, Right.manage_contest)
round = contest.round
ctx = get_context(ct_id=ct_id, right_needed=Right.manage_contest)
contest = ctx.contest
round = ctx.round
assert contest and round
form = ContestEditForm(obj=contest)
if round.state != db.RoundState.delegate:
@@ -1580,31 +1577,31 @@ def org_contest_edit(id: int):
if 'state' in changes and round.state != db.RoundState.delegate:
flash("Nastavení kola neumožňuje měnit stav soutěže", "danger")
return redirect(url_for('org_contest', id=id))
return redirect(url_for('org_contest', ct_id=ct_id))
app.logger.info(f"Contest #{id} modified, changes: {changes}")
app.logger.info(f"Contest #{ct_id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.contest,
what=id,
what=ct_id,
details={'action': 'edit', 'changes': changes},
)
sess.commit()
flash('Změny soutěže uloženy', 'success')
else:
flash(u'Žádné změny k uložení', 'info')
flash('Žádné změny k uložení', 'info')
return redirect(url_for('org_contest', id=id))
return redirect(ctx.url_for('org_contest'))
return render_template(
'org_contest_edit.html',
ctx=ctx,
round=round,
contest=contest,
form=form,
)
class ParticipantAddForm(FlaskForm):
email = mo_fields.Email(validators=[validators.Required()])
first_name = mo_fields.FirstName(validators=[validators.Optional()])
@@ -1620,17 +1617,19 @@ class ParticipantAddForm(FlaskForm):
self.participation_place.description = f'Pokud účastník soutěží někde jinde než {contest.place.name_locative()}, vyplňte <a href="{url_for("org_place", id=contest.place.place_id)}">kód místa</a>. Dozor na tomto místě pak může za účastníka odevzdávat řešení.'
@app.route('/org/contest/c/<int:id>/participants/new', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/participants/new', methods=('GET', 'POST'))
def org_contest_add_user(id: int, site_id: Optional[int] = None):
contest, master_contest, site, rr = get_contest_site_rr(id, site_id, right_needed=Right.manage_contest)
@app.route('/org/contest/c/<int:ct_id>/participants/new', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/participants/new', methods=('GET', 'POST'))
def org_contest_add_user(ct_id: int, site_id: Optional[int] = None):
ctx = get_context(ct_id=ct_id, site_id=site_id, right_needed=Right.manage_contest)
contest = ctx.master_contest
assert contest
form = ParticipantAddForm()
if site_id is not None:
if not form.is_submitted():
form.participation_place.process_data(site_id)
form.participation_place.render_kw = {"readonly": True}
form.set_descriptions(master_contest)
form.set_descriptions(contest)
if form.validate_on_submit():
try:
@@ -1652,10 +1651,11 @@ def org_contest_add_user(id: int, site_id: Optional[int] = None):
flash("Uživatel přihlášen do soutěže.", "info")
else:
flash("Žádná změna. Uživatel už byl přihlášen.", "info")
return redirect(url_for('org_contest_list', id=id, site_id=site_id))
return redirect(ctx.url_for('org_generic_list'))
return render_template(
'org_contest_add_user.html',
contest=master_contest, site=site,
ctx=ctx,
contest=contest, round=ctx.master_round, site=ctx.site,
form=form
)
Loading