Select Git revision
-
Jiří Kalvoda authoredJiří Kalvoda authored
org_contest.py 68.94 KiB
from dataclasses import dataclass
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
import locale
from markupsafe import Markup
from sqlalchemy import func, and_, select
from sqlalchemy.orm import joinedload, aliased
from sqlalchemy.orm.query import Query
from sqlalchemy.dialects.postgresql import insert as pgsql_insert
from typing import Any, List, Tuple, Optional, Sequence, Dict
import urllib.parse
import werkzeug.exceptions
import wtforms
import mo
from mo.csv import FileFormat
import mo.config as config
import mo.db as db
from mo.imports import ImportType, create_import
import mo.jobs.submit
from mo.rights import Right, ContestRights
import mo.util
from mo.util_format import inflect_number, inflect_by_number
from mo.web import app
import mo.web.util
from mo.web.util import PagerForm
from mo.web.table import CellCheckbox, Table, Row, Column, cell_pion_link, cell_place_link, cell_email_link
import wtforms.validators as validators
from wtforms.fields.html5 import IntegerField
from wtforms.widgets.html5 import NumberInput
class ImportForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
typ = wtforms.SelectField(
"Typ dat",
choices=[(x.name, x.friendly_name()) for x in (ImportType.participants, ImportType.proctors, ImportType.judges)],
coerce=ImportType.coerce,
default=ImportType.participants,
)
fmt = wtforms.SelectField(
"Formát souboru",
choices=FileFormat.choices(), coerce=FileFormat.coerce,
default=FileFormat.cs_csv,
)
submit = wtforms.SubmitField('Importovat')
get_template = wtforms.SubmitField('Stáhnout šablonu')
class ParticipantsFilterForm(PagerForm):
school = wtforms.StringField("Škola")
participation_place = wtforms.StringField("Soutěžní místo", render_kw={'autofocus': True})
contest_place = wtforms.StringField("Soutěžní oblast", render_kw={'autofocus': True})
participation_state = wtforms.SelectField('Stav účasti', choices=[('*', '*')] + list(db.PartState.choices()), default='*')
# format = wtforms.RadioField(choices=[('', 'Zobrazit'), ('csv', 'Stáhnout vše v CSV'), ('tsv', 'Stáhnout vše v TSV')])
submit = wtforms.SubmitField("Zobrazit")
download_csv = wtforms.SubmitField("↓ CSV")
download_tsv = wtforms.SubmitField("↓ TSV")
# Výstupní hodnoty filtru, None při nepoužitém filtru, prázdná db hodnota při
# nepovedené filtraci (neexistující místo a podobně)
f_school: Optional[db.Place] = None
f_participation_place: Optional[db.Place] = None
f_contest_place: Optional[db.Place] = None
f_participation_state: Optional[db.PartState] = None
def validate(self):
if self.school.data:
self.f_school = db.get_place_by_code(self.school.data)
if not self.f_school:
flash(f"Zadaná škola '{self.school.data}' neexistuje", "danger")
self.f_school = db.School()
if self.participation_place.data:
self.f_participation_place = db.get_place_by_code(self.participation_place.data)
if not self.f_participation_place:
flash(f"Zadané soutěžní místo '{self.participation_place.data}' neexistuje", "danger")
self.f_participation_place = db.Place()
if self.contest_place.data:
self.f_contest_place = db.get_place_by_code(self.contest_place.data)
if not self.f_contest_place:
flash(f"Zadaná soutěžní oblast '{self.contest_place.data}' neexistuje", "danger")
self.f_contest_place = db.Place()
self.f_participation_state = None if self.participation_state.data == '*' else self.participation_state.data
class ParticipantsActionForm(FlaskForm):
action_on = wtforms.RadioField(
"Provést akci na", validators=[validators.DataRequired()],
choices=[('all', 'všech vyfiltrovaných účastnících'), ('checked', 'označených účastnících')]
# checkboxes are handled not through FlaskForm, see below
)
participation_state = wtforms.SelectField('Stav účasti', choices=db.PartState.choices(), coerce=db.PartState.coerce)
set_participation_state = wtforms.SubmitField("Nastavit stav účasti")
participation_place = wtforms.StringField(
'Soutěžní místo', description='Zadejte kód místa'
)
set_participation_place = wtforms.SubmitField("Nastavit soutěžní místo")
contest_place = wtforms.StringField(
'Soutěžní oblast',
description='Musí existovat soutěž v dané oblasti pro stejné kolo. Oblast zadejte pomocí kódu.'
)
set_contest = wtforms.SubmitField("Přesunout do jiné soutěžní oblasti")
remove_participation = wtforms.SubmitField("Smazat záznam o účasti")
def do_action(self, round: db.Round, query: Query) -> bool:
"""Do participation modification on partipations from given query
(possibly filtered by checkboxes)."""
if not self.validate_on_submit():
return False
sess = db.get_session()
# Check that operation is valid
if self.set_participation_state.data:
pass
elif self.set_participation_place.data:
participation_place = db.get_place_by_code(self.participation_place.data)
if not participation_place:
flash('Nenalezeno místo s daným kódem', 'danger')
return False
elif self.set_contest.data:
contest_place = db.get_place_by_code(self.contest_place.data)
if not contest_place:
flash("Nepovedlo se najít zadanou soutěžní oblast", 'danger')
return False
# Contest hledáme vždy v master kole, abychom náhodou nepřesunuli účastníky do soutěže v podkole
contest = sess.query(db.Contest).filter_by(round_id=round.master_round_id, place_id=contest_place.place_id).one_or_none()
if not contest:
flash(f"Nepovedlo se najít soutěž v kole {round.round_code_short()} v oblasti {contest_place.name}", 'danger')
return False
rr = g.gatekeeper.rights_for_contest(contest)
if not rr.have_right(Right.manage_contest):
flash(f"Nemáte právo ke správě soutěže v kole {round.round_code_short()} v oblasti {contest_place.name}, nelze do ní přesunout účastníky", 'danger')
return False
elif self.remove_participation.data:
pass
else:
flash('Neznámá operace', 'danger')
return False
try:
user_ids = list(map(int, request.form.getlist('checked')))
except ValueError:
flash('Data v checkboxech nelze převést na čísla, kontaktujte správce', 'danger')
return False
# Check all participations if we can edit them
ctants: List[Tuple[db.Participation, Any, Any]] = query.all()
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
continue
rr = g.gatekeeper.rights_for_contest(pion.contest)
if not rr.have_right(Right.manage_contest):
flash(
f"Nemáte právo ke správě soutěže v kole {round.round_code_short()} v oblasti {pion.contest.place.name} "
+ f"(účastník {u.full_name()}). Žádná akce nebyla provedena.", 'danger'
)
return False
count = 0
unchanged = 0
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
continue
if self.remove_participation.data:
sess.delete(pion)
app.logger.info(f"Participation of user {u.user_id} in contest {pion.contest} removed")
mo.util.log(
type=db.LogType.participant,
what=u.user_id,
details={'action': 'participation-removed', 'participation': db.row2dict(pion)},
)
count += 1
else:
if self.set_participation_state.data:
pion.state = self.participation_state.data
elif self.set_participation_place.data:
pion.place_id = participation_place.place_id
elif self.set_contest.data:
pion.contest_id = contest.contest_id
if sess.is_modified(pion):
changes = db.get_object_changes(pion)
app.logger.info(f"Participation of user {u.user_id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.participant,
what=u.user_id,
details={'action': 'participation-changed', 'changes': changes},
)
sess.flush()
count += 1
else:
unchanged += 1
sess.commit()
if count + unchanged == 0:
flash('Žádní vybraní účastníci', 'warning')
elif count == 0:
flash('Žádné změny k uložení', 'info')
elif self.set_participation_state.data:
flash(
f'Nastaven stav {db.part_state_names[self.participation_state.data]} '
+ inflect_number(count, 'účastníkovi', 'účastníkům', 'účastníkům'),
'success'
)
elif self.set_participation_place.data:
flash(
f'Nastaveno soutěžní místo {participation_place.name} '
+ inflect_number(count, 'účastníkovi', 'účastníkům', 'účastníkům'),
'success'
)
elif self.set_contest.data:
flash(
inflect_number(count, 'účastník přesunut', 'účastníci přesunuti', 'účastníků přesunuto')
+ f' do soutěže v oblasti {contest_place.name}',
'success'
)
elif self.remove_participation.data:
flash(
inflect_number(count, 'účastník odstraněn', 'účastníci odstraněni', 'účastníků odstraněno')
+ ' z této soutěže',
'success'
)
return True
def get_contest(id: int) -> Tuple[db.Contest, db.Contest]:
"""Vrací contest a master_contest pro zadané contest_id.
Pro nedělená kola platí contest == master_contest.
Operace s účastníky by měly probíhat vždy přes master_contest."""
contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round),
joinedload(db.Contest.master).joinedload(db.Contest.round))
.get(id))
if not contest:
raise werkzeug.exceptions.NotFound()
return contest, contest.master
def get_contest_rr(id: int, right_needed: Optional[Right] = None) -> Tuple[db.Contest, db.Contest, ContestRights]:
"""Vrací contest, master_contest a ContestRights objekt pro zadané contest_id.
Pro nedělená kola platí contest == master_contest.
Operace s účastníky by měly probíhat vždy přes master_contest."""
contest, master_contest = get_contest(id)
rr = g.gatekeeper.rights_for_contest(contest)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, master_contest, rr
def get_contest_site_rr(id: int, site_id: Optional[int], right_needed: Optional[Right] = None) -> Tuple[db.Contest, db.Contest, Optional[db.Place], ContestRights]:
"""Vrací contest, master_contest, optional site a ContestRights objekt pro zadané contest_id a site_id.
Pro nedělená kola platí contest == master_contest.
Operace s účastníky by měly probíhat vždy přes master_contest."""
if site_id is None:
contest, master_contest, rr = get_contest_rr(id, right_needed)
return contest, master_contest, None, rr
contest, master_contest = get_contest(id)
site = db.get_session().query(db.Place).get(site_id)
if not site:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for_contest(contest, site)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, master_contest, site, rr
def contest_breadcrumbs(
round: Optional[db.Round] = None, contest: Optional[db.Contest] = None,
site: Optional[db.Place] = None, task: Optional[db.Task] = None,
user: Optional[db.User] = None, action: Optional[str] = None
) -> Markup:
elements = [(url_for('org_rounds'), 'Soutěže')]
round_id = None
if round:
round_id = round.round_id
elements.append((url_for('org_round', id=round_id), round.round_code()))
ct_id = None
if contest:
ct_id = contest.contest_id
elements.append((url_for('org_contest', id=ct_id), contest.place.name))
site_id = None
if site:
site_id = site.place_id
elements.append((url_for('org_contest', id=ct_id, site_id=site_id), f"soutěžní místo {site.name}"))
if task:
task_id = task.task_id
elements.append((
url_for('org_contest_task', contest_id=ct_id, site_id=site_id, task_id=task_id) if ct_id
else url_for('org_round_task_edit', id=round_id, task_id=task_id),
f"{task.code} {task.name}"
))
if user:
user_id = user.user_id
elements.append((url_for('org_contest_user', contest_id=ct_id, user_id=user_id), user.full_name()))
if action:
elements.append(('', action))
return Markup(
"\n".join([f"<li><a href='{url}'>{name}</a>" for url, name in elements[:-1]])
+ "<li>" + elements[-1][1]
)
@app.route('/org/contest/c/<int:id>')
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/')
def org_contest(id: int, site_id: Optional[int] = None):
sess = db.get_session()
contest, master_contest, site, rr = get_contest_site_rr(id, site_id, None)
round = contest.round
tasks_subq = sess.query(db.Task.task_id).filter_by(round=contest.round)
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=master_contest)
if site:
pions_subq = pions_subq.filter_by(place=site)
sol_counts_q = (
sess.query(db.Solution.task_id, func.count(db.Solution.task_id))
.filter(
db.Solution.task_id.in_(tasks_subq),
db.Solution.user_id.in_(pions_subq),
)
)
sol_counts = {}
for task_id, count in sol_counts_q.group_by(db.Solution.task_id).all():
sol_counts[task_id] = count
tasks = sess.query(db.Task).filter_by(round=round).all()
tasks.sort(key=lambda t: t.code)
for task in tasks:
task.sol_count = sol_counts[task.task_id] if task.task_id in sol_counts else 0
places_counts = None
if not site_id:
places_counts = (
sess.query(db.Place, func.count('*'))
.select_from(db.Participation).join(db.Place)
.group_by(db.Place)
.filter(db.Participation.contest == master_contest).all()
)
group_contests = contest.get_group_contests(True)
group_contests.sort(key=lambda c: c.round.round_code())
return render_template(
'org_contest.html',
contest=contest, group_contests=group_contests, site=site,
rights=sorted(rr.rights, key=lambda r: r.name),
roles=[r.friendly_name() for r in rr.get_roles()],
can_manage=rr.have_right(Right.manage_contest),
can_upload=rr.can_upload_feedback(),
can_edit_points=rr.can_edit_points(),
can_create_solutions=rr.can_upload_feedback() or rr.can_upload_solutions(),
can_view_statement=rr.can_view_statement(),
tasks=tasks, places_counts=places_counts,
edit_form=ContestEditForm(obj=contest),
)
def generic_import(round: db.Round, master_round: db.Round, contest: Optional[db.Contest], master_contest: Optional[db.Contest]):
"""Společná funkce pro importování do soutěží a kol"""
form = ImportForm()
errs = []
warnings = []
if form.validate_on_submit():
fmt = form.fmt.data
imp = create_import(user=g.user, type=form.typ.data, fmt=fmt, round=master_round, contest=master_contest)
if form.submit.data:
if form.file.data is not None:
file = form.file.data.stream
import_tmp = mo.util.link_to_dir(file.name, mo.util.data_dir('imports'), suffix='.csv')
if imp.run(import_tmp):
if imp.cnt_rows == 0:
flash('Soubor neobsahoval žádné řádky s daty', 'danger')
else:
flash(f'Importováno ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_participations} účastí, {imp.cnt_new_roles} rolí)', 'success')
if contest is not None:
return redirect(url_for('org_contest', id=contest.contest_id))
else:
return redirect(url_for('org_round', id=round.round_id))
else:
errs = imp.errors
warnings = imp.warnings
else:
flash('Vyberte si prosím soubor', 'danger')
elif form.get_template.data:
out = imp.get_template()
resp = app.make_response(out)
resp.content_type = fmt.get_content_type()
resp.headers.add('Content-Disposition', 'attachment; filename=OSMO-' + imp.template_basename + '.' + fmt.get_extension())
return resp
return render_template(
'org_generic_import.html',
contest=contest,
round=round,
form=form,
errs=errs,
warnings=warnings
)
@app.route('/doc/import')
def doc_import():
return render_template('doc_import.html')
@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
def org_contest_import(id: int):
contest, master_contest, rr = get_contest_rr(id, Right.manage_contest)
return generic_import(
round=contest.round, master_round=master_contest.round,
contest=contest, master_contest=master_contest
)
@app.route('/org/contest/c/<int:id>/ucastnici', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/ucastnici', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/ucastnici/emails', endpoint="org_contest_list_emails")
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/ucastnici/emails', endpoint="org_contest_list_emails")
def org_contest_list(id: int, site_id: Optional[int] = None):
contest, master_contest, site, rr = get_contest_site_rr(id, site_id, Right.view_contestants)
can_edit = rr.have_right(Right.manage_contest) and request.endpoint != 'org_contest_list_emails'
format = request.args.get('format', "")
filter = ParticipantsFilterForm(request.args)
filter.validate()
query = get_contestants_query(
round=master_contest.round, contest=master_contest, site=site,
school=filter.f_school,
# contest_place=filter.f_contest_place,
participation_place=filter.f_participation_place,
participation_state=filter.f_participation_state,
)
action_form = None
if can_edit:
action_form = ParticipantsActionForm()
if action_form.do_action(round=contest.round, query=query):
# Action happened, redirect
return redirect(request.url)
if format == "":
table = None
emails = None
mailto_link = None
if request.endpoint == 'org_contest_list_emails':
(emails, mailto_link) = get_contestant_emails(query)
count = len(emails)
else:
# (count, query) = filter.apply_limits(query, pagesize=50)
count = db.get_count(query)
table = make_contestant_table(query, add_checkbox=can_edit)
return render_template(
'org_contest_list.html',
contest=contest, site=site,
table=table, emails=emails, mailto_link=mailto_link,
filter=filter, count=count, action_form=action_form,
)
else:
table = make_contestant_table(query)
return table.send_as(format)
contest_list_columns = (
Column(key='first_name', name='krestni', title='Křestní jméno'),
Column(key='last_name', name='prijmeni', title='Příjmení'),
Column(key='email', name='email', title='E-mail'),
Column(key='school', name='skola', title='Škola'),
Column(key='school_code', name='kod_skoly', title='Kód školy'),
Column(key='grade', name='rocnik', title='Ročník'),
Column(key='born_year', name='rok_naroz', title='Rok naroz.'),
Column(key='place_code', name='kod_soutez_mista', title='Sout. místo'),
Column(key='status', name='stav', title='Stav'),
)
def get_contestants_query(
round: db.Round, contest: Optional[db.Contest] = None,
site: Optional[db.Place] = None,
contest_place: Optional[db.Place] = None,
participation_place: Optional[db.Place] = None,
participation_state: Optional[db.PartState] = None,
school: Optional[db.Place] = None) -> Query:
query = (db.get_session()
.query(db.Participation, db.Participant, db.Contest)
.select_from(db.Participation)
.join(db.Participant, db.Participant.user_id == db.Participation.user_id)
.join(db.User, db.User.user_id == db.Participation.user_id)
.filter(db.Participant.year == round.year))
if contest:
query = query.join(db.Contest, db.Contest.contest_id == contest.contest_id)
else:
query = query.filter(db.Contest.round == round)
query = query.options(joinedload(db.Contest.place))
query = query.filter(db.Participation.contest_id == db.Contest.contest_id)
if site:
query = query.filter(db.Participation.place_id == site.place_id)
if contest_place:
query = query.filter(db.Contest.place_id == contest_place.place_id)
if participation_place:
query = query.filter(db.Participation.place_id == participation_place.place_id)
if school:
query = query.filter(db.Participant.school == school.place_id)
if participation_state:
query = query.filter(db.Participation.state == participation_state)
query = query.options(
joinedload(db.Participation.user),
joinedload(db.Participation.place),
joinedload(db.Participant.school_place)
).order_by(db.User.last_name, db.User.first_name)
return query
def make_contestant_table(query: Query, add_checkbox: bool = False, add_contest_column: bool = False):
ctants = query.all()
rows: List[Row] = []
for pion, pant, ct in ctants:
u = pion.user
html_attr = {
'class': 'state-' + pion.state.name
}
if u.is_test:
html_attr['class'] += ' testuser'
html_attr['title'] = 'Testovací uživatel'
rows.append(Row(
keys={
'sort_key': u.sort_key(),
'first_name': cell_pion_link(u, pion.contest_id, u.first_name),
'last_name': cell_pion_link(u, pion.contest_id, u.last_name),
'email': cell_email_link(u),
'school': pant.school_place.name,
'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()),
'grade': pant.grade,
'born_year': pant.birth_year,
'region_code': cell_place_link(ct.place, ct.place.get_code()),
'place_code': cell_place_link(pion.place, pion.place.get_code()),
'status': pion.state.friendly_name(),
'checkbox': CellCheckbox('checked', u.user_id, False),
},
html_attr=html_attr,
))
rows.sort(key=lambda r: r.keys['sort_key'])
cols: Sequence[Column] = contest_list_columns
if add_checkbox:
cols = [Column(key='checkbox', name=' ', title=' ')] + list(cols)
if add_contest_column:
cols = list(cols) + [Column(key='region_code', name='kod_oblasti', title='Oblast')]
return Table(
columns=cols,
rows=rows,
filename='ucastnici',
show_downlink=False, # downlinks are in filter
)
def get_contestant_emails(query: Query, mailto_subject: str = '[OSMO] Zpráva pro účastníky') -> Tuple[List[str], str]:
users = [pion.user for (pion, _, _) in query.all()]
emails = [f'{u.first_name} {u.last_name} <{u.email}>' for u in users]
mailto_link = (
'mailto:' + urllib.parse.quote(config.MAIL_CONTACT, safe='@')
+ '?subject=' + urllib.parse.quote(mailto_subject)
+ '&bcc=' + ','.join([urllib.parse.quote(email, safe='@') for email in emails])
)
return (emails, mailto_link)
@dataclass
class SolutionContext:
contest: db.Contest
master_contest: db.Contest
round: db.Round
master_round: db.Round
pion: Optional[db.Participation]
user: Optional[db.User]
task: Optional[db.Task]
site: Optional[db.Place]
allow_view: bool
allow_upload_solutions: bool
allow_upload_feedback: bool
allow_create_solutions: bool
allow_edit_points: bool
def get_solution_context(contest_id: int, user_id: Optional[int], task_id: Optional[int], site_id: Optional[int]) -> SolutionContext:
sess = db.get_session()
# Nejprve zjistíme, zda existuje soutěž
contest, master_contest = get_contest(contest_id)
round = contest.round
master_round = master_contest.round
# Najdeme úlohu a ověříme, že je součástí soutěže
if task_id is not None:
task = sess.query(db.Task).get(task_id)
if not task or task.round != round:
raise werkzeug.exceptions.NotFound()
else:
task = None
site = None
user = None
if user_id is not None:
# Zkontrolujeme, zda se účastník opravdu účastní soutěže
pion = (sess.query(db.Participation)
.filter_by(user_id=user_id, contest_id=master_contest.contest_id)
.options(joinedload(db.Participation.place),
joinedload(db.Participation.user))
.one_or_none())
if not pion:
raise werkzeug.exceptions.NotFound()
user = pion.user
# A zda soutěží na zadaném soutěžním místě, je-li určeno
if site_id is not None and site_id != pion.place_id:
raise werkzeug.exceptions.NotFound()
# Pokud je uvedeno soutěžní místo, hledáme práva k němu, jinak k soutěži
if site_id is not None:
site = pion.place
else:
pion = None
if site_id is not None:
site = sess.query(db.Place).get(site_id)
if not site:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_for_contest(contest, site)
allow_view = rr.have_right(Right.view_submits)
if not allow_view:
raise werkzeug.exceptions.Forbidden()
allow_upload_solutions = rr.can_upload_solutions()
allow_upload_feedback = rr.can_upload_feedback()
return SolutionContext(
contest=contest, master_contest=master_contest,
round=round, master_round=master_round,
pion=pion,
user=user,
task=task,
site=site,
# XXX: Potřebujeme tohle všechno? Nechceme spíš vracet rr a nechat každého, ať na něm volá metody?
allow_view=allow_view,
allow_upload_solutions=allow_upload_solutions,
allow_upload_feedback=allow_upload_feedback,
allow_create_solutions=allow_upload_solutions or allow_upload_feedback,
allow_edit_points=rr.can_edit_points(),
)
class SubmitForm(FlaskForm):
note = wtforms.TextAreaField("Poznámka pro účastníka", description="Viditelná účastníkovi po uzavření kola", render_kw={'autofocus': True})
org_note = wtforms.TextAreaField("Interní poznámka", description="Viditelná jen organizátorům")
# Validátory k points budou přidány podle počtu maximálních bodů úlohy v org_submit_list
points = IntegerField('Body', description="Účastník po uzavření kola uvidí jen naposledy zadané body")
submit = wtforms.SubmitField('Uložit')
file = flask_wtf.file.FileField("Soubor")
file_note = wtforms.TextAreaField("Poznámka k souboru")
submit_sol = wtforms.SubmitField('Uložit a nahrát soubor jako řešení')
submit_fb = wtforms.SubmitField('Uložit a nahrát soubor jako opravu')
delete = wtforms.SubmitField('Smazat řešení')
class SetFinalForm(FlaskForm):
type = wtforms.StringField()
paper_id = wtforms.IntegerField()
submit_final = wtforms.SubmitField("Prohlásit za finální")
@app.route('/org/contest/c/<int:contest_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, user_id, task_id, site_id)
assert sc.user is not None
sess = db.get_session()
self_url = url_for('org_submit_list', contest_id=contest_id, user_id=user_id, task_id=task_id, site_id=site_id)
# Najdeme řešení úlohy (nemusí existovat)
sol = (sess.query(db.Solution)
.filter_by(user_id=user_id, task_id=task_id)
.one_or_none())
set_final_form: Optional[SetFinalForm] = None
if sol and sc.allow_upload_feedback:
set_final_form = SetFinalForm()
if set_final_form.validate_on_submit() and set_final_form.submit_final.data:
is_submit = set_final_form.type.data == "submit"
is_feedback = set_final_form.type.data == "feedback"
paper = sess.query(db.Paper).get(set_final_form.paper_id.data)
if not paper:
flash('Chyba: Papír s takovým ID neexistuje', 'danger')
elif paper.for_user != user_id or paper.for_task != task_id:
flash('Chyba: Papír nepatří k dané úloze a uživateli', 'danger')
elif (is_submit and sol.final_submit_obj == paper) or (is_feedback and sol.final_feedback_obj == paper):
flash('Žádná změna', 'warning')
elif is_submit:
app.logger.info(f"Finální submit úlohy {task_id} pro uživatele {user_id} změněn na {paper.paper_id}")
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={
'action': 'task-final-submit-changed',
'task': task_id,
'old_paper': sol.final_submit,
'new_paper': paper.paper_id
},
)
sol.final_submit = paper.paper_id
sess.commit()
flash('Finální řešení změněno', 'success')
elif is_feedback:
app.logger.info(f"Finální feedback úlohy {task_id} pro uživatele {user_id} změněn na {paper.paper_id}")
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={
'action': 'task-final-feedback-changed',
'task': task_id,
'old_paper': sol.final_feedback,
'new_paper': paper.paper_id
},
)
sol.final_feedback = paper.paper_id
sess.commit()
flash('Finální oprava změněna', 'success')
else:
flash('Chyba: Neplatná akce', 'danger')
return redirect(self_url)
form = SubmitForm(obj=sol)
form.points.validators = [
validators.Optional(),
validators.NumberRange(min=0, max=sc.task.max_points, message="Počet bodů musí být mezi %(min)s a %(max)s")
]
form.points.widget = NumberInput(min=0, max=sc.task.max_points) # min a max v HTML
if form.validate_on_submit():
if sol and form.delete.data:
if sol.final_submit or sol.final_feedback:
flash('Nelze smazat řešení, ke kterému již byl odevzdán soubor', 'danger')
else:
flash('Řešení smazáno', 'success')
sess.delete(sol)
mo.util.log(
type=db.LogType.participant,
what=sc.user.user_id,
details={
'action': 'solution-removed',
'task': task_id,
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {sc.user.user_id} smazáno")
return redirect(self_url)
points = form.points.data
# Checks
if (form.submit_sol.data or form.submit_fb.data) and form.file.data is None:
flash('Schází soubor k nahrání, žádné změny nebyly uloženy', 'danger')
return redirect(self_url)
if not sol and (sc.allow_edit_points or sc.allow_upload_solutions or sc.allow_upload_feedback):
flash('Řešení založeno', 'success')
sol = db.Solution(task=sc.task, user=sc.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
what=sc.user.user_id,
details={
'action': 'solution-created',
'task': task_id,
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {sc.user.user_id} založeno")
# Edit sol and points
if sol and sc.allow_edit_points:
# Sol edit
sol.note = form.note.data
sol.org_note = form.org_note.data
if sess.is_modified(sol):
flash('Změny hodnocení uloženy', 'success')
# Points
if points != sol.points:
sol.points = points
sess.add(db.PointsHistory(
task=sc.task,
participant=sol.user,
user=g.user,
points_at=mo.now,
points=points,
))
flash('Body uloženy', 'success')
# Save changes and commit
if sess.is_modified(sol):
changes = db.get_object_changes(sol)
mo.util.log(
type=db.LogType.participant,
what=sc.user.user_id,
details={
'action': 'solution-edit',
'task': task_id,
'changes': changes
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {sc.user.user_id} modifikováno, změny: {changes}")
if (form.submit_sol.data and sc.allow_upload_solutions) or (form.submit_fb.data and sc.allow_upload_feedback):
file = form.file.data.stream
if sc.allow_upload_solutions and form.submit_sol.data:
type = db.PaperType.solution
elif sc.allow_upload_feedback and form.submit_fb.data:
type = db.PaperType.feedback
else:
raise werkzeug.exceptions.Forbidden()
assert sc.task is not None and sc.user is not None
paper = db.Paper(task=sc.task, for_user_obj=sc.user, uploaded_by_obj=g.user, type=type, note=form.file_note.data)
submitter = mo.submit.Submitter()
self_url = url_for('org_submit_list', contest_id=contest_id, user_id=user_id, task_id=task_id, site_id=site_id)
try:
submitter.submit_paper(paper, file.name)
except mo.submit.SubmitException as e:
flash(f'Chyba: {e}', 'danger')
return redirect(self_url)
sess.add(paper)
if type == db.PaperType.solution:
sol.final_submit_obj = paper
else:
sol.final_feedback_obj = paper
sess.commit()
if type == db.PaperType.solution:
prefix = 'Řešení'
else:
prefix = 'Opravené řešení'
if paper.is_broken():
flash(prefix + ' není korektní PDF, ale přesto jsme ho přijali a pokusíme se ho zpracovat. ' +
'Zkontrolujte prosím, že se na vašem počítači zobrazuje správně.',
'warning')
else:
flash(prefix + ' odevzdáno', 'success')
return redirect(self_url)
papers = (sess.query(db.Paper)
.filter_by(for_user_obj=sc.user, task=sc.task)
.options(joinedload(db.Paper.uploaded_by_obj))
.order_by(db.Paper.uploaded_at.desc())
.all())
sol_papers = [p for p in papers if p.type == db.PaperType.solution]
fb_papers = [p for p in papers if p.type == db.PaperType.feedback]
points_history = (sess.query(db.PointsHistory)
.filter_by(task=sc.task, participant=sc.user)
.options(joinedload(db.PointsHistory.user))
.order_by(db.PointsHistory.points_at.desc())
.all())
return render_template(
'org_submit_list.html',
sc=sc,
solution=sol,
sol_papers=sol_papers,
fb_papers=fb_papers,
points_history=points_history,
for_site=(site_id is not None),
paper_link=lambda p: mo.web.util.org_paper_link(sc.contest, sc.site, sc.user, p),
orig_paper_link=lambda p: mo.web.util.org_paper_link(sc.contest, sc.site, sc.user, p, orig=True),
form=form,
set_final_form=set_final_form,
)
class SubmitEditForm(FlaskForm):
note = wtforms.TextAreaField("Poznámka pro účastníka", description="Viditelná účastníkovi po uzavření kola", render_kw={"rows": 8, 'autofocus': True})
org_note = wtforms.TextAreaField("Interní poznámka", description="Viditelná jen organizátorům", render_kw={"rows": 8})
submit = wtforms.SubmitField("Uložit")
@app.route('/org/contest/c/<int:contest_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:contest_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
def org_submit_paper(contest_id: int, paper_id: int, filename: str, site_id: Optional[int] = None):
paper = (db.get_session().query(db.Paper)
.options(joinedload(db.Paper.task)) # pro task_paper_filename()
.get(paper_id))
if not paper:
raise werkzeug.exceptions.NotFound()
if not filename.endswith('.pdf'):
raise werkzeug.exceptions.NotFound()
get_solution_context(contest_id, paper.for_user, paper.for_task, site_id)
return mo.web.util.send_task_paper(paper, (request.endpoint == 'org_submit_paper_orig'))
def get_solutions_query(
task: db.Task,
for_contest: Optional[db.Contest] = None,
for_site: Optional[db.Place] = None) -> Query:
sess = db.get_session()
query = (sess.query(db.Participation, db.Solution)
.select_from(db.Participation)
.outerjoin(db.Solution, and_(db.Solution.task_id == task.task_id, db.Solution.user_id == db.Participation.user_id))
.options(joinedload(db.Solution.user),
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)))
if for_contest:
query = query.filter(db.Participation.contest == for_contest)
if for_site:
query = query.filter(db.Participation.place == for_site)
return query
class TaskPointsForm(FlaskForm):
submit = wtforms.SubmitField("Uložit body")
class TaskCreateForm(FlaskForm):
submit = wtforms.SubmitField("Založit označená řešení")
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/points', methods=('GET', 'POST'), endpoint="org_contest_task_points")
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
def org_contest_task(contest_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, None, task_id, site_id)
action_create = request.endpoint == "org_contest_task_create"
action_points = request.endpoint == "org_contest_task_points"
if action_create and not sc.allow_create_solutions:
raise werkzeug.exceptions.Forbidden()
if action_points and not sc.allow_edit_points:
raise werkzeug.exceptions.Forbidden()
sess = db.get_session()
q = get_solutions_query(sc.task, for_contest=sc.master_contest, for_site=sc.site)
rows: List[Tuple[db.Participation, db.Solution]] = q.all()
rows.sort(key=lambda r: r[0].user.sort_key())
points_form: Optional[TaskPointsForm] = None
create_form: Optional[TaskCreateForm] = None
if action_create:
create_form = TaskCreateForm()
if create_form.validate_on_submit():
new_sol_count = 0
for pion, sol in rows:
if sol:
continue # již existuje
if not request.form.get(f"create_sol_{pion.user_id}"):
continue # nikdo nežádá o vytvoření
sol = db.Solution(task=sc.task, user=pion.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
what=pion.user_id,
details={
'action': 'solution-created',
'task': task_id,
},
)
app.logger.info(f"Řešení úlohy {sc.task.code} od účastníka {pion.user_id} založeno")
new_sol_count += 1
if new_sol_count > 0:
sess.commit()
flash(inflect_by_number(new_sol_count, "Založeno", "Založena", "Založeno") + ' '
+ inflect_number(new_sol_count, "nové řešení", "nová řešení", "nových řešení"),
"success")
else:
flash("Žádné změny k uložení", "info")
return redirect(url_for('org_contest_task', contest_id=contest_id, task_id=task_id, site_id=site_id))
if action_points:
points_form = TaskPointsForm()
if points_form.validate_on_submit():
count = 0
ok = True
for _, sol in rows:
if sol is None:
continue
points = request.form.get(f"points_{sol.user_id}", type=int)
if points and points < 0:
flash('Nelze zadat záporné body', 'danger')
ok = False
break
elif points and sc.task.max_points is not None and points > sc.task.max_points:
flash(f'Maximální počet bodů za úlohu je {sc.task.max_points}, nelze zadat více', 'danger')
ok = False
break
if points != sol.points:
# Save points
sol.points = points
sess.add(db.PointsHistory(
task=sc.task,
participant=sol.user,
user=g.user,
points_at=mo.now,
points=points,
))
count += 1
if ok:
if count > 0:
sess.commit()
flash("Změněny body u " + inflect_number(count, "řešení", "řešení", "řešení"), "success")
else:
flash("Žádné změny k uložení", "info")
return redirect(url_for('org_contest_task', contest_id=contest_id, task_id=task_id))
# Count papers for each solution
paper_counts = {}
for user_id, type, count in (
db.get_session().query(db.Paper.for_user, db.Paper.type, func.count(db.Paper.type))
.filter_by(task=sc.task)
.group_by(db.Paper.for_user, db.Paper.type)
.all()
):
paper_counts[(user_id, type.name)] = count
return render_template(
"org_contest_task.html",
sc=sc, rows=rows, paper_counts=paper_counts,
paper_link=lambda u, p: mo.web.util.org_paper_link(sc.contest, sc.site, u, p),
points_form=points_form, create_form=create_form, request_form=request.form,
)
class ContestSolutionsEditForm(FlaskForm):
submit = wtforms.SubmitField("Založit označená řešení")
@app.route('/org/contest/c/<int:id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
def org_contest_solutions(id: int, site_id: Optional[int] = None):
sc = get_solution_context(id, None, None, site_id)
sess = db.get_session()
edit_action = request.endpoint == "org_contest_solutions_edit"
if edit_action and not sc.allow_create_solutions:
raise werkzeug.exceptions.Forbidden()
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=sc.master_contest)
if sc.site:
pions_subq = pions_subq.filter_by(place=sc.site)
pions_subq = pions_subq.subquery()
pions = (sess.query(db.Participation)
.filter(
db.Participation.contest == sc.master_contest,
db.Participation.user_id.in_(pions_subq),
).options(joinedload(db.Participation.user))
.all())
pions.sort(key=lambda p: p.user.sort_key())
tasks_subq = sess.query(db.Task.task_id).filter_by(round=sc.round).subquery()
tasks = (sess.query(db.Task)
.filter_by(round=sc.round)
.order_by(db.Task.code)
.all())
sols = sess.query(db.Solution).filter(
db.Solution.user_id.in_(pions_subq),
db.Solution.task_id.in_(tasks_subq)
).options(
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)
).all()
# Count papers for each task and solution
paper_counts = {}
for user_id, task_id, type, count in (
db.get_session().query(db.Paper.for_user, db.Paper.for_task, db.Paper.type, func.count(db.Paper.type))
.filter(db.Paper.for_task.in_(tasks_subq))
.group_by(db.Paper.for_user, db.Paper.for_task, db.Paper.type)
.all()
):
paper_counts[(user_id, task_id, type.name)] = count
task_sols: Dict[int, Dict[int, db.Solution]] = {}
for t in tasks:
task_sols[t.task_id] = {}
for s in sols:
task_sols[s.task_id][s.user_id] = s
edit_form: Optional[ContestSolutionsEditForm] = None
if edit_action:
edit_form = ContestSolutionsEditForm()
if edit_form.validate_on_submit():
new_sol_count = 0
for task in tasks:
for pion in pions:
if pion.user_id in task_sols[task.task_id]:
continue # již existuje
if not request.form.get(f"create_sol_{task.task_id}_{pion.user_id}"):
continue # nikdo nežádá o vytvoření
sol = db.Solution(task=task, user=pion.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
what=pion.user_id,
details={
'action': 'solution-created',
'task': task.task_id,
},
)
app.logger.info(f"Řešení úlohy {task.code} od účastníka {pion.user_id} založeno")
new_sol_count += 1
if new_sol_count > 0:
sess.commit()
flash(inflect_by_number(new_sol_count, "Založeno", "Založena", "Založeno") + ' '
+ inflect_number(new_sol_count, "nové řešení", "nová řešení", "nových řešení"),
"success")
else:
flash("Žádné změny k uložení", "info")
return redirect(url_for('org_contest_solutions', id=id, site_id=site_id))
return render_template(
'org_contest_solutions.html',
contest=sc.contest, site=sc.site, sc=sc,
pions=pions, tasks=tasks, tasks_sols=task_sols, paper_counts=paper_counts,
paper_link=lambda u, p: mo.web.util.org_paper_link(sc.contest, sc.site, u, p),
edit_form=edit_form,
)
class DownloadSubmitsForm(FlaskForm):
min_points = wtforms.IntegerField(
'Minimální počet bodů v kole', render_kw={'autofocus': True},
description='Je-li uveden, řešení účastníků, kteří ve výsledcích celého kola dostali méně bodů, se nestahují.',
validators=[validators.Optional()]
)
download_sol = wtforms.SubmitField('Stáhnout všechna účastnická řešení')
download_fb = wtforms.SubmitField('Stáhnout všechna opravená řešení')
download_sol_mix = wtforms.SubmitField('Stáhnout účastnická/opravená')
download_fb_mix = wtforms.SubmitField('Stáhnout opravená/účastnická')
def download_submits(form: DownloadSubmitsForm, round: db.Round, sol_query, pion_query, subj_suffix: str, want_subdirs: bool) -> bool:
if not form.validate_on_submit():
return False
sols = sol_query.all()
if form.min_points.data is not None:
# Každému účastníkovi z vybrané množiny posčítáme body za všechny úlohy kola
pts = (db.get_session().query(db.Solution.user_id, func.sum(db.Solution.points))
.select_from(db.Solution)
.join(db.Task, and_(db.Task.task_id == db.Solution.task_id, db.Task.round == round))
.filter(db.Solution.user_id.in_(pion_query.subquery()))
.group_by(db.Solution.user_id)
.all())
pts_dict = {uid: pt or 0 for uid, pt in pts}
sols = [s for s in sols if pts_dict[s.user_id] >= form.min_points.data]
if form.download_sol.data:
paper_ids = [s.final_submit for s in sols]
subj_prefix = 'Odevzdaná'
elif form.download_fb.data:
paper_ids = [s.final_feedback for s in sols]
subj_prefix = 'Opravená'
elif form.download_sol_mix:
paper_ids = [s.final_submit or s.final_feedback for s in sols]
subj_prefix = 'Odevzdaná/opravená'
elif form.download_fb_mix:
paper_ids = [s.final_feedback or s.final_submit for s in sols]
subj_prefix = 'Opravená/odevzdaná'
else:
return False
paper_ids = [p for p in paper_ids if p is not None]
mo.jobs.submit.schedule_download_submits(paper_ids, f'{subj_prefix} {subj_suffix}', g.user, want_subdirs)
flash('Příprava řešení ke stažení zahájena.', 'success')
return True
def generic_batch_download(round: db.Round, contest: Optional[db.Contest], site: Optional[db.Place], task: db.Task):
"""Společná funkce pro download submitů/feedbacku do soutěží a kol."""
sess = db.get_session()
pion_query = sess.query(db.Participation.user_id).select_from(db.Participation)
if contest is not None:
pion_query = pion_query.filter_by(contest_id=contest.master_contest_id)
if site is not None:
pion_query = pion_query.filter_by(place=site)
else:
pion_query = pion_query.join(db.Contest).filter(db.Contest.round_id == round.master_round_id)
sol_query = (sess.query(db.Solution)
.select_from(db.Solution)
.filter(db.Solution.task == task))
if contest is not None:
sol_query = sol_query.filter(db.Solution.user_id.in_(pion_query.subquery()))
form = DownloadSubmitsForm()
if request.method == 'POST':
subj = f'řešení {task.code}'
if site is not None:
subj = f'{subj} ({site.name})'
elif contest is not None:
subj = f'{subj} ({contest.place.name})'
if download_submits(form, round, sol_query, pion_query, subj, contest is None):
return redirect(url_for('org_jobs'))
sol_paper = aliased(db.Paper)
fb_paper = aliased(db.Paper)
sol_query = (sol_query.with_entities(func.count(db.Solution.user_id),
func.count(sol_paper.paper_id),
func.count(fb_paper.paper_id),
func.sum(sol_paper.bytes),
func.sum(fb_paper.bytes))
.outerjoin(sol_paper, sol_paper.paper_id == db.Solution.final_submit)
.outerjoin(fb_paper, fb_paper.paper_id == db.Solution.final_feedback))
submit_count, sol_count, fb_count, sol_size, fb_size = sol_query.one()
pion_count = pion_query.with_entities(func.count(db.Participation.user_id)).scalar()
return render_template(
'org_generic_batch_download.html',
round=round, contest=contest, site=site, task=task,
submit_count=submit_count,
pion_count=pion_count,
sol_count=sol_count, fb_count=fb_count,
sol_size=sol_size, fb_size=fb_size,
form=form,
)
class UploadSubmitsForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()], render_kw={'autofocus': True})
submit = wtforms.SubmitField('Odeslat')
def generic_batch_upload(round: db.Round, contest: Optional[db.Contest], site: Optional[db.Place], task: db.Task,
offer_upload_solutions: bool, offer_upload_feedback: bool):
"""Společná funkce pro upload feedbacku do soutěží a kol."""
# Základní kontrola, zda vůbec chceme akci spustit.
# Zatím neumíme dávkově nahrávat řešení.
if not offer_upload_feedback:
raise werkzeug.exceptions.Forbidden()
request.custom_max_content_length = mo.config.MAX_BATCH_CONTENT_LENGTH
form = UploadSubmitsForm()
if form.validate_on_submit():
file = form.file.data.stream
mo.jobs.submit.schedule_upload_feedback(round, file.name, f'Nahrání opravených řešení {round.round_code()}',
for_user=g.user,
only_contest=contest, only_site=site, only_task=task)
return redirect(url_for('org_jobs'))
return render_template(
'org_generic_batch_upload.html',
round=round, contest=contest, site=site, task=task,
max_size=mo.config.MAX_BATCH_CONTENT_LENGTH,
form=form,
)
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
def org_contest_task_download(contest_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, None, task_id, site_id)
assert sc.task is not None
return generic_batch_download(round=sc.round, contest=sc.contest, site=sc.site, task=sc.task)
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
def org_contest_task_upload(contest_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, None, task_id, site_id)
assert sc.task is not None
return generic_batch_upload(round=sc.round, contest=sc.contest, site=sc.site, task=sc.task,
offer_upload_solutions=sc.allow_upload_solutions,
offer_upload_feedback=sc.allow_upload_feedback)
class BatchPointsForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
fmt = wtforms.SelectField(
"Formát souboru",
choices=FileFormat.choices(), coerce=FileFormat.coerce,
default=FileFormat.cs_csv,
)
add_del_sols = wtforms.BooleanField('Zakládat / mazat řešení', description='Xyzzy')
submit = wtforms.SubmitField('Nahrát body')
get_template = wtforms.SubmitField('Stáhnout šablonu')
def generic_batch_points(round: db.Round, contest: Optional[db.Contest], task: db.Task):
"""Společná funkce pro download/upload bodů do soutěží a kol."""
form = BatchPointsForm()
errs = []
warnings = []
if form.validate_on_submit():
fmt = form.fmt.data
imp = create_import(user=g.user, type=ImportType.points, fmt=fmt, round=round, contest=contest, task=task, allow_add_del=form.add_del_sols.data)
if form.submit.data:
if form.file.data is not None:
file = form.file.data.stream
import_tmp = mo.util.link_to_dir(file.name, mo.util.data_dir('imports'), suffix='.csv')
if imp.run(import_tmp):
if imp.cnt_rows == 0:
flash('Soubor neobsahoval žádné řádky s daty', 'danger')
else:
flash(f'Importováno ({imp.cnt_rows} řádků, {imp.cnt_set_points} řešení přebodováno, {imp.cnt_add_sols} založeno a {imp.cnt_del_sols} smazáno)', 'success')
if contest is not None:
return redirect(url_for('org_contest', id=contest.contest_id))
else:
return redirect(url_for('org_round', id=round.round_id))
else:
errs = imp.errors
warnings = imp.warnings
else:
flash('Vyberte si prosím soubor', 'danger')
elif form.get_template.data:
out = imp.get_template()
resp = app.make_response(out)
resp.content_type = fmt.get_content_type()
resp.headers.add('Content-Disposition', 'attachment; filename=OSMO-' + imp.template_basename + '.' + fmt.get_extension())
return resp
return render_template(
'org_generic_batch_points.html',
round=round, contest=contest, task=task,
form=form,
errs=errs,
warnings=warnings
)
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
def org_contest_task_batch_points(contest_id: int, task_id: int):
sc = get_solution_context(contest_id, None, task_id, None)
assert sc.task is not None
if not sc.allow_edit_points:
raise werkzeug.exceptions.Forbidden()
return generic_batch_points(round=sc.round, contest=sc.contest, task=sc.task)
@app.route('/org/contest/c/<int:contest_id>/user/<int:user_id>')
def org_contest_user(contest_id: int, user_id: int):
sc = get_solution_context(contest_id, user_id, None, None)
sess = db.get_session()
pant = sess.query(db.Participant).filter_by(
user_id=user_id, year=sc.round.year
).options(joinedload(db.Participant.school_place)).one_or_none()
if not pant:
raise werkzeug.exceptions.NotFound()
task_sols = sess.query(db.Task, db.Solution).select_from(db.Task).outerjoin(
db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == sc.user)
).filter(db.Task.round == sc.round).options(
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)
).order_by(db.Task.code).all()
# Count papers for each task and solution
tasks_subq = sess.query(db.Task.task_id).filter_by(round=sc.round).subquery()
paper_counts = {}
for task_id, type, count in (
db.get_session().query(db.Paper.for_task, db.Paper.type, func.count(db.Paper.type))
.filter(
db.Paper.for_user == user_id,
db.Paper.for_task.in_(tasks_subq)
).group_by(db.Paper.for_task, db.Paper.type)
.all()
):
paper_counts[(task_id, type.name)] = count
return render_template(
'org_contest_user.html',
sc=sc, pant=pant, task_sols=task_sols,
paper_link=lambda u, p: mo.web.util.org_paper_link(sc.contest, None, u, p),
paper_counts=paper_counts,
)
class AdvanceForm(FlaskForm):
boundary = IntegerField(
'Bodová hranice', render_kw={'autofocus': True},
description="Postoupí všichni účastníci, kteří v minulém kole získali aspoň tolik bodů.",
validators=[validators.InputRequired()]
)
status = wtforms.HiddenField()
preview = wtforms.SubmitField('Zobrazit návrh')
execute = wtforms.SubmitField('Provést')
@app.route('/org/contest/c/<int:contest_id>/advance', methods=('GET', 'POST'))
def org_contest_advance(contest_id: int):
sess = db.get_session()
conn = sess.connection()
contest, master_contest, rr = get_contest_rr(contest_id, Right.manage_contest)
def redirect_back():
return redirect(url_for('org_contest', id=contest_id))
round = contest.round
if round.state != db.RoundState.preparing:
flash('Aktuální kolo není ve stavu přípravy', 'danger')
return redirect_back()
prev_round = sess.query(db.Round).filter_by(
year=round.year, category=round.category, seq=round.seq - 1
).filter(db.Round.master_round_id == db.Round.round_id).one_or_none()
if prev_round is None:
flash('Předchozí kolo nenalezeno', 'danger')
return redirect_back()
elif prev_round.state != db.RoundState.closed:
# FIXME: Možná kontrolovat stav uzavření všech kol ve skupině kol?
flash('Předchozí kolo dosud nebylo ukončeno', 'danger')
return redirect_back()
elif prev_round.level < round.level:
flash('Předchozí kolo se koná ve vyšší oblasti než toto kolo', 'danger')
return redirect_back()
prev_contests: List[db.Contest] = []
accept_by_place_id: Dict[int, int] = {}
reject_by_place_id: Dict[int, int] = {}
form = AdvanceForm()
if form.validate_on_submit():
desc_cte = db.place_descendant_cte(contest.place, max_level=prev_round.level)
prev_contests = (sess.query(db.Contest)
.filter(db.Contest.round == prev_round)
.filter(db.Contest.place_id.in_(select([desc_cte])))
.options(joinedload(db.Contest.place))
.all())
prev_contests.sort(key=lambda c: locale.strxfrm(c.place.name or ""))
accept_by_place_id = {c.place_id: 0 for c in prev_contests}
reject_by_place_id = {c.place_id: 0 for c in prev_contests}
prev_pion_query = (sess.query(db.Participation)
.filter(db.Participation.contest_id.in_([c.contest_id for c in prev_contests]))
.filter(db.Participation.state.in_((db.PartState.registered, db.PartState.invited, db.PartState.present))))
prev_pions = prev_pion_query.all()
if form.boundary.data > 0:
round_subquery = sess.query(db.Round.round_id).filter_by(master_round_id=prev_round.round_id).subquery()
accept_uids = (sess.query(db.Solution.user_id)
.select_from(db.Solution)
# Vybíráme úlohy, jejich round patří do stejné skupiny kol jako prev_round
.join(db.Task, and_(db.Task.task_id == db.Solution.task_id, db.Task.round_id.in_(round_subquery)))
.filter(db.Solution.user_id.in_(prev_pion_query.with_entities(db.Participation.user_id).subquery()))
.group_by(db.Solution.user_id)
.having(func.sum(db.Solution.points) >= form.boundary.data)
.all())
accept_uids = [a[0] for a in accept_uids]
else:
accept_uids = None
want_execute = form.execute.data
if want_execute:
app.logger.info(f'Postup: Z kola #{prev_round.round_id} do #{round.master_round_id}, soutěž #{master_contest.contest_id}')
mo.util.log(
type=db.LogType.contest,
what=master_contest.contest_id,
details={'action': 'advance'},
)
really_inserted = 0
for pp in prev_pions:
# This incurs no real queries as we have all the contests cached
prev_place_id = sess.query(db.Contest).get(pp.contest_id).place_id
if accept_uids and pp.user_id not in accept_uids:
reject_by_place_id[prev_place_id] += 1
continue
accept_by_place_id[prev_place_id] += 1
if want_execute:
# ORM neumí ON CONFLICT DO NOTHING, takže musíme o vrstvu níže
res = conn.execute(
pgsql_insert(db.Participation.__table__)
.values(
user_id=pp.user_id,
contest_id=contest.contest_id,
place_id=contest.place.place_id,
state=db.PartState.invited,
)
.on_conflict_do_nothing()
.returning(db.Participation.contest_id)
)
inserted = res.fetchall()
if inserted:
# Opravdu došlo ke vložení
really_inserted += 1
app.logger.info(f'Postup: Založena účast user=#{pp.user_id} contest=#{contest_id} place=#{contest.place_id}')
mo.util.log(
type=db.LogType.participant,
what=pp.user_id,
details={
'action': 'add-to-contest',
# Tady nemůžeme použít obvyklé row2dict, neboť nemáme v ruce ORMový objekt
'new': {
'contest_id': contest.contest_id,
'place_id': contest.place_id,
},
},
)
if want_execute:
sess.commit()
msg = (inflect_by_number(really_inserted, 'Pozván', 'Pozváni', 'Pozváno')
+ ' '
+ inflect_number(really_inserted, 'nový soutěžící', 'noví soutěžící', 'nových soutěžících')
+ '.')
flash(msg, 'success')
return redirect_back()
return render_template(
'org_contest_advance.html',
contest=contest,
round=contest.round,
prev_round=prev_round,
prev_contests=prev_contests,
accept_by_place_id=accept_by_place_id,
reject_by_place_id=reject_by_place_id,
form=form,
)
class ContestEditForm(FlaskForm):
state = wtforms.SelectField("Stav soutěže",
choices=[ch for ch in db.RoundState.choices() if ch[0] != 'delegate'],
coerce=db.RoundState.coerce)
submit = wtforms.SubmitField('Uložit')
@app.route('/org/contest/c/<int:id>/edit', methods=('GET', 'POST'))
def org_contest_edit(id: int):
sess = db.get_session()
contest, _, rr = get_contest_rr(id, Right.manage_contest)
round = contest.round
form = ContestEditForm(obj=contest)
if round.state != db.RoundState.delegate:
form.state.render_kw = {'disabled': ""}
form.state.description = 'Nastavení kola neumožňuje měnit stav soutěže.'
if form.validate_on_submit():
form.populate_obj(contest)
if sess.is_modified(contest):
changes = db.get_object_changes(contest)
if 'state' in changes and round.state != db.RoundState.delegate:
flash("Nastavení kola neumožňuje měnit stav soutěže", "danger")
return redirect(url_for('org_contest', id=id))
app.logger.info(f"Contest #{id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.contest,
what=id,
details={'action': 'edit', 'changes': changes},
)
sess.commit()
flash('Změny soutěže uloženy', 'success')
else:
flash(u'Žádné změny k uložení', 'info')
return redirect(url_for('org_contest', id=id))
return render_template(
'org_contest_edit.html',
round=round,
contest=contest,
form=form,
)