Select Git revision
update-users.py
-
Martin Mareš authoredMartin Mareš authored
org_contest.py 70.90 KiB
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
import locale
from markupsafe import Markup
from sqlalchemy import func, and_, select
from sqlalchemy.orm import joinedload, aliased
from sqlalchemy.orm.query import Query
from sqlalchemy.dialects.postgresql import insert as pgsql_insert
import sqlalchemy.sql.schema
from typing import Any, List, Tuple, Optional, Dict
import urllib.parse
import werkzeug.exceptions
import wtforms
import wtforms.validators as validators
from wtforms.widgets.html5 import NumberInput
import mo
from mo.csv import FileFormat
import mo.config as config
import mo.db as db
from mo.imports import ImportType, create_import
import mo.jobs.submit
from mo.rights import Right, RoundRights
import mo.util
from mo.util_format import inflect_number, inflect_by_number
from mo.web import app
import mo.web.fields as mo_fields
import mo.web.util
from mo.web.util import PagerForm
from mo.web.table import CellCheckbox, Table, Row, Column, cell_pion_link, cell_place_link, cell_email_link_flags
class Context:
# Kolo máme vždy
round: db.Round
master_round: db.Round
# Vypočtená práva: Může být implementováno i jako ContestRights
rights: RoundRights
# Můžeme mít vybranou soutěž a místo
# Pro nedělená kola platí contest == master_contest.
# Operace s účastníky by měly probíhat vždy přes master_contest.
contest: Optional[db.Contest] = None
master_contest: Optional[db.Contest] = None
site: Optional[db.Place] = None
# Můžeme se omezit na soutěže v dané oblasti
hier_place: Optional[db.Place] = None
# Účastník a úloha
pion: Optional[db.Participation] = None
user: Optional[db.User] = None
task: Optional[db.Task] = None
# IDčka, jak je předáváme do URL
round_id: Optional[int] = None
ct_id: Optional[int] = None
site_id: Optional[int] = None
hier_id: Optional[int] = None
user_id: Optional[int] = None
task_id: Optional[int] = None
def url_for(self, endpoint: str, **kwargs):
a = {}
round_id = kwargs.get('round_id', self.round_id)
ct_id = kwargs.get('ct_id', self.ct_id)
if ct_id is not None:
a['ct_id'] = ct_id
else:
assert round_id is not None
a['round_id'] = round_id
for arg in ('site_id', 'hier_id', 'user_id', 'task_id'):
val = getattr(self, arg)
if val is not None:
a[arg] = val
for arg, val in kwargs.items():
a[arg] = val
return url_for(endpoint, **a)
def url_home(self):
if self.ct_id:
return url_for('org_contest', ct_id=self.ct_id)
else:
return url_for('org_round', round_id=self.round_id, hier_id=self.hier_id)
def breadcrumbs(self, table: bool = False, action: Optional[str] = None) -> Markup:
elements = [(url_for('org_rounds'), 'Soutěže')]
elements.append((url_for('org_round', round_id=self.round_id), self.round.round_code()))
if self.hier_place:
parents = g.gatekeeper.get_parents(self.hier_place)
parents = sorted(parents, key=lambda p: p.level)
for p in parents[1:]:
elements.append((url_for('org_round', round_id=self.round_id, hier_id=p.place_id), p.name or '???'))
if self.contest:
if self.round.level >= 2:
parents = g.gatekeeper.get_parents(self.contest.place)
parents = sorted(parents, key=lambda p: p.level)
for i in range(1, len(parents) - 1):
p = parents[i]
if p.level >= 3:
break
elements.append((url_for('org_round', round_id=self.round_id, hier_id=p.place_id), db.Place.get_code(p)))
elements.append((url_for('org_contest', ct_id=self.ct_id), self.contest.place.name or '???'))
if self.site:
elements.append((url_for('org_contest', ct_id=self.ct_id, site_id=self.site_id), f"soutěžní místo {self.site.name}"))
if self.task:
elements.append((
url_for('org_contest_task', ct_id=self.ct_id, site_id=self.site_id, task_id=self.task_id) if self.contest
else url_for('org_round_task_edit', round_id=self.round_id, task_id=self.task_id),
f"{self.task.code} {self.task.name}"
))
if self.user:
elements.append((url_for('org_contest_user', ct_id=self.ct_id, user_id=self.user_id), self.user.full_name()))
if table:
if self.contest:
elements.append((url_for('org_generic_list', ct_id=self.ct_id, site=self.site_id), "Seznam účastníků"))
else:
elements.append((url_for('org_generic_list', round_id=self.round_id), "Seznam účastníků"))
if action:
elements.append(('', action))
return Markup(
"\n".join([f"<li><a href='{url}'>{name}</a>" for url, name in elements[:-1]])
+ "<li>" + elements[-1][1]
)
def get_context(round_id: Optional[int] = None,
ct_id: Optional[int] = None,
site_id: Optional[int] = None,
hier_id: Optional[int] = None,
user_id: Optional[int] = None,
task_id: Optional[int] = None,
right_needed: Optional[Right] = None,
) -> Context:
ctx = Context()
ctx.round_id = round_id
ctx.ct_id = ct_id
ctx.site_id = site_id
ctx.hier_id = hier_id
ctx.user_id = user_id
ctx.task_id = task_id
sess = db.get_session()
if site_id is not None:
assert ct_id is not None
ctx.site = db.get_session().query(db.Place).get(site_id)
if not ctx.site:
raise werkzeug.exceptions.NotFound()
if hier_id is not None:
assert ct_id is None
ctx.hier_place = db.get_session().query(db.Place).get(hier_id)
if not ctx.hier_place:
raise werkzeug.exceptions.NotFound()
if ct_id is not None:
ctx.contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round),
joinedload(db.Contest.master).joinedload(db.Contest.round))
.get(ct_id))
if not ctx.contest:
raise werkzeug.exceptions.NotFound()
ctx.master_contest = ctx.contest.master
ctx.round, ctx.master_round = ctx.contest.round, ctx.master_contest.round
if round_id is not None and ctx.round.round_id != round_id:
raise werkzeug.exceptions.NotFound()
ctx.round_id = ctx.round.round_id
ctx.rights = g.gatekeeper.rights_for_contest(ctx.contest, ctx.site)
else:
ctx.round = sess.query(db.Round).options(joinedload(db.Round.master)).get(round_id)
if not ctx.round:
raise werkzeug.exceptions.NotFound()
if hier_id is not None and ctx.hier_place.level > ctx.round.level:
raise werkzeug.exceptions.NotFound()
ctx.master_round = ctx.round.master
ctx.rights = g.gatekeeper.rights_for_round(ctx.round, for_place=ctx.hier_place)
# Zkontrolujeme, zda se účastník opravdu účastní soutěže
if user_id is not None:
assert ctx.master_contest is not None
ctx.pion = (sess.query(db.Participation)
.filter_by(user_id=user_id, contest_id=ctx.master_contest.contest_id)
.options(joinedload(db.Participation.place),
joinedload(db.Participation.user))
.one_or_none())
if not ctx.pion:
raise werkzeug.exceptions.NotFound()
ctx.user = ctx.pion.user
# A zda soutěží na zadaném soutěžním místě, je-li určeno
if site_id is not None and site_id != ctx.pion.place_id:
raise werkzeug.exceptions.NotFound()
# Najdeme úlohu a ověříme, že je součástí kola
if task_id is not None:
ctx.task = sess.query(db.Task).get(task_id)
if not ctx.task or ctx.task.round != ctx.round:
raise werkzeug.exceptions.NotFound()
if not (right_needed is None or ctx.rights.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return ctx
class ParticipantsFilterForm(PagerForm):
school = mo_fields.School()
participation_place = mo_fields.Place("Soutěžní místo", render_kw={'autofocus': True})
contest_place = mo_fields.Place("Soutěžní oblast", render_kw={'autofocus': True})
participation_state = wtforms.SelectField('Stav účasti', choices=[('*', '*')] + list(db.PartState.choices()), default='*')
# format = wtforms.RadioField(choices=[('', 'Zobrazit'), ('csv', 'Stáhnout vše v CSV'), ('tsv', 'Stáhnout vše v TSV')])
submit = wtforms.SubmitField("Zobrazit")
download_csv = wtforms.SubmitField("↓ CSV")
download_tsv = wtforms.SubmitField("↓ TSV")
class ParticipantsActionForm(FlaskForm):
action_on = wtforms.RadioField(
"Provést akci na", validators=[validators.DataRequired()],
choices=[('all', 'všech vyfiltrovaných účastnících'), ('checked', 'zaškrtnutých účastnících')],
default='checked',
# checkboxes are handled not through FlaskForm, see below
)
participation_state = wtforms.SelectField('Stav účasti', choices=db.PartState.choices(), coerce=db.PartState.coerce)
set_participation_state = wtforms.SubmitField("Nastavit stav účasti")
participation_place = mo_fields.Place(
'Soutěžní místo', description='Zadejte kód místa'
)
set_participation_place = wtforms.SubmitField("Nastavit soutěžní místo")
contest_place = mo_fields.Place(
'Soutěžní oblast',
description='Musí existovat soutěž v dané oblasti pro stejné kolo. Oblast zadejte pomocí kódu.'
)
set_contest = wtforms.SubmitField("Přesunout do jiné soutěžní oblasti")
remove_participation = wtforms.SubmitField("Smazat záznam o účasti")
def do_action(self, round: db.Round, query: Query) -> bool:
"""Do participation modification on partipations from given query
(possibly filtered by checkboxes)."""
if not self.validate_on_submit():
return False
sess = db.get_session()
# Check that operation is valid
if self.set_participation_state.data:
pass
elif self.set_participation_place.data:
participation_place = self.participation_place.place
elif self.set_contest.data:
contest_place = self.contest_place.place
# Contest hledáme vždy v master kole, abychom náhodou nepřesunuli účastníky do soutěže v podkole
contest = sess.query(db.Contest).filter_by(round_id=round.master_round_id, place_id=contest_place.place_id).one_or_none()
if not contest:
flash(f"Nepovedlo se najít soutěž v kole {round.round_code_short()} {contest_place.name_locative()}", 'danger')
return False
rr = g.gatekeeper.rights_for_contest(contest)
if not rr.have_right(Right.manage_contest):
flash(f"Nemáte právo ke správě soutěže v kole {round.round_code_short()} {contest_place.name_locative()}, nelze do ní přesunout účastníky", 'danger')
return False
elif self.remove_participation.data:
pass
else:
flash('Neznámá operace', 'danger')
return False
try:
user_ids = list(map(int, request.form.getlist('checked')))
except ValueError:
flash('Data v checkboxech nelze převést na čísla, kontaktujte správce', 'danger')
return False
ctants: List[Tuple[db.Participation, Any, Any]] = query.all()
count = 0
unchanged = 0
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
continue
if self.remove_participation.data:
sess.delete(pion)
app.logger.info(f"Participation of user {u.user_id} in contest {pion.contest} removed")
mo.util.log(
type=db.LogType.participant,
what=u.user_id,
details={'action': 'participation-removed', 'participation': db.row2dict(pion)},
)
count += 1
else:
if self.set_participation_state.data:
pion.state = self.participation_state.data
elif self.set_participation_place.data:
assert participation_place
pion.place_id = participation_place.place_id
elif self.set_contest.data:
pion.contest_id = contest.contest_id
if sess.is_modified(pion):
changes = db.get_object_changes(pion)
app.logger.info(f"Participation of user {u.user_id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.participant,
what=u.user_id,
details={'action': 'participation-changed', 'changes': changes},
)
sess.flush()
count += 1
else:
unchanged += 1
sess.commit()
if count + unchanged == 0:
flash('Žádní vybraní účastníci', 'warning')
elif count == 0:
flash('Žádné změny k uložení', 'info')
elif self.set_participation_state.data:
flash(
f'Nastaven stav {db.part_state_names[self.participation_state.data]} '
+ inflect_number(count, 'účastníkovi', 'účastníkům', 'účastníkům'),
'success'
)
elif self.set_participation_place.data:
assert participation_place
flash(
f'Nastaveno soutěžní místo {participation_place.name} '
+ inflect_number(count, 'účastníkovi', 'účastníkům', 'účastníkům'),
'success'
)
elif self.set_contest.data:
assert contest_place
flash(
inflect_number(count, 'účastník přesunut', 'účastníci přesunuti', 'účastníků přesunuto')
+ f' do soutěže {contest_place.name_locative()}',
'success'
)
elif self.remove_participation.data:
flash(
inflect_number(count, 'účastník odstraněn', 'účastníci odstraněni', 'účastníků odstraněno')
+ ' z této soutěže',
'success'
)
return True
@app.route('/org/contest/c/<int:ct_id>/')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/')
def org_contest(ct_id: int, site_id: Optional[int] = None):
sess = db.get_session()
ctx = get_context(ct_id=ct_id, site_id=site_id)
contest = ctx.contest
assert contest
rights = ctx.rights
tasks_subq = sess.query(db.Task.task_id).filter_by(round=ctx.round)
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=ctx.master_contest)
if ctx.site:
pions_subq = pions_subq.filter_by(place=ctx.site)
sol_counts_q = (
sess.query(db.Solution.task_id, func.count(db.Solution.task_id))
.filter(
db.Solution.task_id.in_(tasks_subq),
db.Solution.user_id.in_(pions_subq),
)
)
sol_counts = {}
for task_id, count in sol_counts_q.group_by(db.Solution.task_id).all():
sol_counts[task_id] = count
tasks = sess.query(db.Task).filter_by(round=ctx.round).all()
tasks.sort(key=lambda t: t.code)
for task in tasks:
task.sol_count = sol_counts[task.task_id] if task.task_id in sol_counts else 0
places_counts = None
if not site_id:
places_counts = (
sess.query(db.Place, func.count('*'))
.select_from(db.Participation).join(db.Place)
.group_by(db.Place)
.filter(db.Participation.contest == ctx.master_contest).all()
)
group_contests = contest.get_group_contests(True)
group_contests.sort(key=lambda c: c.round.round_code())
return render_template(
'org_contest.html',
ctx=ctx, rights=rights,
round=ctx.round, contest=contest, site=ctx.site,
group_contests=group_contests,
rights_list=sorted(rights.rights, key=lambda r: r.name),
roles=[r.friendly_name() for r in rights.get_roles()],
tasks=tasks, places_counts=places_counts,
)
@app.route('/doc/import')
def doc_import():
return render_template('doc_import.html')
class ImportForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
typ = wtforms.SelectField(
"Typ dat",
choices=[(x.name, x.friendly_name()) for x in (ImportType.participants, ImportType.proctors, ImportType.judges)],
coerce=ImportType.coerce,
default=ImportType.participants,
)
fmt = wtforms.SelectField(
"Formát souboru",
choices=FileFormat.choices(), coerce=FileFormat.coerce,
default=FileFormat.cs_csv,
)
submit = wtforms.SubmitField('Importovat')
get_template = wtforms.SubmitField('Stáhnout šablonu')
@app.route('/org/contest/c/<int:ct_id>/import', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/import', methods=('GET', 'POST'))
def org_generic_import(round_id: Optional[int] = None, ct_id: Optional[int] = None):
ctx = get_context(round_id=round_id, ct_id=ct_id, right_needed=Right.manage_contest)
round, contest = ctx.master_round, ctx.master_contest
form = ImportForm()
errs = []
warnings = []
if form.validate_on_submit():
fmt = form.fmt.data
imp = create_import(user=g.user, type=form.typ.data, fmt=fmt, round=round, contest=contest)
if form.submit.data:
if form.file.data is not None:
file = form.file.data.stream
import_tmp = mo.util.link_to_dir(file.name, mo.util.data_dir('imports'), suffix='.csv')
if imp.run(import_tmp):
if imp.cnt_rows == 0:
flash('Soubor neobsahoval žádné řádky s daty', 'danger')
else:
flash(f'Importováno ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_participations} účastí, {imp.cnt_new_roles} rolí)', 'success')
return redirect(ctx.url_home())
else:
errs = imp.errors
warnings = imp.warnings
else:
flash('Vyberte si prosím soubor', 'danger')
elif form.get_template.data:
out = imp.get_template()
resp = app.make_response(out)
resp.content_type = fmt.get_content_type()
resp.headers.add('Content-Disposition', 'attachment; filename=OSMO-' + imp.template_basename + '.' + fmt.get_extension())
return resp
return render_template(
'org_generic_import.html',
ctx=ctx,
contest=contest,
round=round,
form=form,
errs=errs,
warnings=warnings
)
# URL je explicitně uvedeno v mo.email.contestant_list_url
@app.route('/org/contest/c/<int:ct_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/participants/emails', endpoint="org_generic_list_emails")
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/participants/emails', endpoint="org_generic_list_emails")
@app.route('/org/contest/r/<int:round_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/participants/emails', endpoint="org_generic_list_emails")
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/participants', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>/participants/emails', endpoint="org_generic_list_emails")
def org_generic_list(round_id: Optional[int] = None, hier_id: Optional[int] = None,
ct_id: Optional[int] = None, site_id: Optional[int] = None):
ctx = get_context(round_id=round_id, hier_id=hier_id, ct_id=ct_id, site_id=site_id, right_needed=Right.view_contestants)
round, contest = ctx.master_round, ctx.master_contest
rr = ctx.rights
can_edit = rr.have_right(Right.manage_contest) and request.endpoint != 'org_generic_list_emails'
format = request.args.get('format', "")
filter = ParticipantsFilterForm(formdata=request.args)
if request.args:
filter.validate()
query = get_contestants_query(
ctx=ctx,
school=filter.school.place,
contest_place=filter.contest_place.place,
participation_place=filter.participation_place.place,
participation_state=mo.util.star_is_none(filter.participation_state.data),
)
action_form = None
if can_edit:
action_form = ParticipantsActionForm()
if action_form.do_action(round=round, query=query):
# Action happened, redirect
return redirect(request.url)
if format == "":
table = None
emails = None
mailto_link = None
if request.endpoint == 'org_generic_list_emails':
if contest:
subj = f'{contest.round.name} {contest.round.category} {contest.place.name_locative()}'
else:
subj = f'{round.name} kategorie {round.category}'
(emails, mailto_link) = get_contestant_emails(query, mailto_subject=subj)
count = len(emails)
else:
(count, query) = filter.apply_limits(query, pagesize=50)
table = make_contestant_table(query, round, add_contest_column=(contest is None), add_checkbox=can_edit)
return render_template(
'org_generic_list.html',
ctx=ctx,
contest=contest, round=round, site=ctx.site,
table=table, emails=emails, mailto_link=mailto_link,
filter=filter, count=count, action_form=action_form,
)
else:
table = make_contestant_table(query, round, is_export=True)
return table.send_as(format)
contest_list_columns = (
Column(key='first_name', name='krestni', title='Křestní jméno'),
Column(key='last_name', name='prijmeni', title='Příjmení'),
Column(key='email', name='email', title='E-mail'),
Column(key='school', name='skola', title='Škola'),
Column(key='school_code', name='kod_skoly', title='Kód školy'),
Column(key='grade', name='rocnik', title='Ročník'),
Column(key='born_year', name='rok_naroz', title='Rok naroz.'),
Column(key='place_code', name='kod_soutez_mista', title='Sout. místo'),
Column(key='status', name='stav', title='Stav'),
)
def get_contestants_query(
ctx: Context,
contest_place: Optional[db.Place] = None,
participation_place: Optional[db.Place] = None,
participation_state: Optional[db.PartState] = None,
school: Optional[db.Place] = None) -> Query:
query = (db.get_session()
.query(db.Participation, db.Participant, db.Contest)
.select_from(db.Participation)
.join(db.Participant, db.Participant.user_id == db.Participation.user_id)
.join(db.User, db.User.user_id == db.Participation.user_id)
.join(db.Contest)
.filter(db.Participant.year == ctx.round.year)
.filter(db.Participation.contest_id == db.Contest.contest_id))
if ctx.contest:
query = query.filter(db.Contest.contest_id == ctx.contest.contest_id)
if ctx.site:
query = query.filter(db.Participation.place_id == ctx.site.place_id)
else:
query = query.filter(db.Contest.round == ctx.round)
if ctx.hier_place:
query = db.filter_place_nth_parent(query, db.Contest.place_id, ctx.round.level - ctx.hier_place.level, ctx.hier_place.place_id)
query = query.options(joinedload(db.Contest.place))
if contest_place:
query = query.filter(db.Contest.place_id == contest_place.place_id)
if participation_place:
query = query.filter(db.Participation.place_id == participation_place.place_id)
if school:
query = query.filter(db.Participant.school == school.place_id)
if participation_state:
query = query.filter(db.Participation.state == participation_state)
query = query.options(
joinedload(db.Participation.user),
joinedload(db.Participation.place),
joinedload(db.Participant.school_place)
).order_by(db.User.last_name, db.User.first_name)
return query
def make_contestant_table(query: Query, round: db.Round, add_checkbox: bool = False, add_contest_column: bool = False, is_export: bool = False):
ctants = query.all()
rows: List[Row] = []
for pion, pant, ct in ctants:
u = pion.user
html_attr = {
'class': 'state-' + pion.state.name
}
rows.append(Row(
keys={
'sort_key': u.sort_key(),
'user_id': u.user_id,
'first_name': cell_pion_link(u, pion.contest_id, u.first_name),
'last_name': cell_pion_link(u, pion.contest_id, u.last_name),
'email': cell_email_link_flags(u),
'school': pant.school_place.name,
'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()),
'grade': pant.grade,
'born_year': pant.birth_year,
'region_code': cell_place_link(ct.place, ct.place.get_code()),
'place_code': cell_place_link(pion.place, pion.place.get_code()),
'status': pion.state.friendly_name(),
'checkbox': CellCheckbox('checked', u.user_id, False),
},
html_attr=html_attr,
))
rows.sort(key=lambda r: r.keys['sort_key'])
cols: List[Column] = list(contest_list_columns)
if add_checkbox:
cols = [Column(key='checkbox', name=' ', title=' ')] + cols
if add_contest_column:
cols.append(Column(key='region_code', name='kod_oblasti', title=round.get_level().name.title()))
if is_export:
cols.append(Column(key='user_id', name='user_id'))
return Table(
columns=cols,
rows=rows,
filename='ucastnici',
show_downlink=False, # downlinks are in filter
)
def get_contestant_emails(query: Query, mailto_subject: str = '[OSMO] Zpráva pro účastníky') -> Tuple[List[str], str]:
users = [pion.user for (pion, _, _) in query.all()]
emails = [f'{u.first_name} {u.last_name} <{u.email}>' for u in users]
mailto_link = (
'mailto:' + urllib.parse.quote(config.MAIL_CONTACT, safe='@')
+ '?subject=' + urllib.parse.quote(mailto_subject)
+ '&bcc=' + ','.join([urllib.parse.quote(email, safe='@') for email in emails])
)
return (emails, mailto_link)
class SubmitForm(FlaskForm):
note = wtforms.TextAreaField("Poznámka pro účastníka", description="Viditelná účastníkovi po uzavření kola", render_kw={'autofocus': True})
org_note = wtforms.TextAreaField("Interní poznámka", description="Viditelná jen organizátorům")
points = mo_fields.Points(description="Účastník po uzavření kola uvidí jen naposledy zadané body", validators=[validators.Optional()])
submit = wtforms.SubmitField('Uložit')
file = flask_wtf.file.FileField("Soubor")
file_note = wtforms.TextAreaField("Poznámka k souboru")
submit_sol = wtforms.SubmitField('Uložit a nahrát soubor jako řešení')
submit_fb = wtforms.SubmitField('Uložit a nahrát soubor jako opravu')
delete = wtforms.SubmitField('Smazat řešení')
class SetFinalForm(FlaskForm):
type = wtforms.StringField()
paper_id = wtforms.IntegerField()
submit_final = wtforms.SubmitField("Prohlásit za finální")
@app.route('/org/contest/c/<int:ct_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
def org_submit_list(ct_id: int, user_id: int, task_id: int, site_id: Optional[int] = None):
ctx = get_context(ct_id=ct_id, site_id=site_id, user_id=user_id, task_id=task_id, right_needed=Right.view_submits)
assert ctx.contest and ctx.user and ctx.task
rights = ctx.rights
sess = db.get_session()
self_url = ctx.url_for('org_submit_list')
# Najdeme řešení úlohy (nemusí existovat)
sol = (sess.query(db.Solution)
.filter_by(user_id=user_id, task_id=task_id)
.one_or_none())
set_final_form: Optional[SetFinalForm] = None
if sol and rights.can_upload_feedback():
set_final_form = SetFinalForm()
if set_final_form.validate_on_submit() and set_final_form.submit_final.data:
is_submit = set_final_form.type.data == "submit"
is_feedback = set_final_form.type.data == "feedback"
paper = sess.query(db.Paper).get(set_final_form.paper_id.data)
if not paper:
flash('Chyba: Papír s takovým ID neexistuje', 'danger')
elif paper.for_user != user_id or paper.for_task != task_id:
flash('Chyba: Papír nepatří k dané úloze a uživateli', 'danger')
elif (is_submit and sol.final_submit_obj == paper) or (is_feedback and sol.final_feedback_obj == paper):
flash('Žádná změna', 'warning')
elif is_submit:
app.logger.info(f"Finální submit úlohy {task_id} pro uživatele {user_id} změněn na {paper.paper_id}")
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={
'action': 'task-final-submit-changed',
'task': task_id,
'old_paper': sol.final_submit,
'new_paper': paper.paper_id
},
)
sol.final_submit = paper.paper_id
sess.commit()
flash('Finální řešení změněno', 'success')
elif is_feedback:
app.logger.info(f"Finální feedback úlohy {task_id} pro uživatele {user_id} změněn na {paper.paper_id}")
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={
'action': 'task-final-feedback-changed',
'task': task_id,
'old_paper': sol.final_feedback,
'new_paper': paper.paper_id
},
)
sol.final_feedback = paper.paper_id
sess.commit()
flash('Finální oprava změněna', 'success')
else:
flash('Chyba: Neplatná akce', 'danger')
return redirect(self_url)
form = SubmitForm(obj=sol)
form.points.widget = NumberInput(min=0, max=ctx.task.max_points, step=ctx.master_round.points_step) # min a max v HTML
if form.validate_on_submit():
if sol and form.delete.data:
if sol.final_submit or sol.final_feedback:
flash('Nelze smazat řešení, ke kterému již byl odevzdán soubor', 'danger')
else:
flash('Řešení smazáno', 'success')
sess.delete(sol)
mo.util.log(
type=db.LogType.participant,
what=ctx.user.user_id,
details={
'action': 'solution-removed',
'task': task_id,
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {ctx.user.user_id} smazáno")
return redirect(self_url)
points = form.points.data
# Checks
if (form.submit_sol.data or form.submit_fb.data) and form.file.data is None:
flash('Schází soubor k nahrání, žádné změny nebyly uloženy', 'danger')
return redirect(self_url)
if points:
error = mo.util.check_points(points, for_task=ctx.task, for_round=ctx.round)
if error:
flash(error, 'danger')
return redirect(self_url)
if not sol and (rights.can_edit_points() or rights.can_upload_solutions() or rights.can_upload_feedback()):
flash('Řešení založeno', 'success')
sol = db.Solution(task=ctx.task, user=ctx.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
what=ctx.user.user_id,
details={
'action': 'solution-created',
'task': task_id,
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {ctx.user.user_id} založeno")
# Edit sol and points
if sol and rights.can_edit_points():
# Sol edit
sol.note = form.note.data
sol.org_note = form.org_note.data
if sess.is_modified(sol):
flash('Změny hodnocení uloženy', 'success')
# Points
if points != sol.points:
sol.points = points
sess.add(db.PointsHistory(
task=ctx.task,
participant=sol.user,
user=g.user,
points_at=mo.now,
points=points,
))
flash('Body uloženy', 'success')
# Save changes and commit
if sess.is_modified(sol):
changes = db.get_object_changes(sol)
mo.util.log(
type=db.LogType.participant,
what=ctx.user.user_id,
details={
'action': 'solution-edit',
'task': task_id,
'changes': changes
},
)
sess.commit()
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {ctx.user.user_id} modifikováno, změny: {changes}")
if (form.submit_sol.data and rights.can_upload_solutions()) or (form.submit_fb.data and rights.can_upload_feedback()):
file = form.file.data.stream
if rights.can_upload_solutions() and form.submit_sol.data:
type = db.PaperType.solution
elif rights.can_upload_feedback() and form.submit_fb.data:
type = db.PaperType.feedback
else:
raise werkzeug.exceptions.Forbidden()
assert ctx.task is not None and ctx.user is not None
paper = db.Paper(task=ctx.task, for_user_obj=ctx.user, uploaded_by_obj=g.user, type=type, note=form.file_note.data)
submitter = mo.submit.Submitter()
self_url = url_for('org_submit_list', ct_id=ct_id, user_id=user_id, task_id=task_id, site_id=site_id)
try:
submitter.submit_paper(paper, file.name)
except mo.submit.SubmitException as e:
flash(f'Chyba: {e}', 'danger')
return redirect(self_url)
sess.add(paper)
if type == db.PaperType.solution:
sol.final_submit_obj = paper
else:
sol.final_feedback_obj = paper
sess.commit()
if type == db.PaperType.solution:
prefix = 'Řešení'
else:
prefix = 'Opravené řešení'
if paper.is_broken():
flash(prefix + ' není korektní PDF, ale přesto jsme ho přijali a pokusíme se ho zpracovat. ' +
'Zkontrolujte prosím, že se na vašem počítači zobrazuje správně.',
'warning')
else:
flash(prefix + ' odevzdáno', 'success')
return redirect(self_url)
papers = (sess.query(db.Paper)
.filter_by(for_user_obj=ctx.user, task=ctx.task)
.options(joinedload(db.Paper.uploaded_by_obj))
.order_by(db.Paper.uploaded_at.desc())
.all())
sol_papers = [p for p in papers if p.type == db.PaperType.solution]
fb_papers = [p for p in papers if p.type == db.PaperType.feedback]
points_history = (sess.query(db.PointsHistory)
.filter_by(task=ctx.task, participant=ctx.user)
.options(joinedload(db.PointsHistory.user))
.order_by(db.PointsHistory.points_at.desc())
.all())
return render_template(
'org_submit_list.html',
ctx=ctx, rights=rights,
solution=sol,
sol_papers=sol_papers,
fb_papers=fb_papers,
points_history=points_history,
for_site=(site_id is not None),
paper_link=lambda p: mo.web.util.org_paper_link(ctx.contest, ctx.site, ctx.user, p),
orig_paper_link=lambda p: mo.web.util.org_paper_link(ctx.contest, ctx.site, ctx.user, p, orig=True),
form=form,
set_final_form=set_final_form,
)
class SubmitEditForm(FlaskForm):
note = wtforms.TextAreaField("Poznámka pro účastníka", description="Viditelná účastníkovi po uzavření kola", render_kw={"rows": 8, 'autofocus': True})
org_note = wtforms.TextAreaField("Interní poznámka", description="Viditelná jen organizátorům", render_kw={"rows": 8})
submit = wtforms.SubmitField("Uložit")
@app.route('/org/contest/c/<int:ct_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/paper/<int:paper_id>/<filename>', endpoint='org_submit_paper')
@app.route('/org/contest/c/<int:ct_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/paper/orig/<int:paper_id>/<filename>', endpoint='org_submit_paper_orig')
def org_submit_paper(ct_id: int, paper_id: int, filename: str, site_id: Optional[int] = None):
paper = (db.get_session().query(db.Paper)
.options(joinedload(db.Paper.task)) # pro task_paper_filename()
.get(paper_id))
if not paper:
raise werkzeug.exceptions.NotFound()
if not filename.endswith('.pdf'):
raise werkzeug.exceptions.NotFound()
get_context(ct_id=ct_id, user_id=paper.for_user, task_id=paper.for_task, site_id=site_id, right_needed=Right.view_submits)
return mo.web.util.send_task_paper(paper, (request.endpoint == 'org_submit_paper_orig'))
def get_solutions_query(
task: db.Task,
for_contest: Optional[db.Contest] = None,
for_site: Optional[db.Place] = None) -> Query:
sess = db.get_session()
query = (sess.query(db.Participation, db.Solution)
.select_from(db.Participation)
.outerjoin(db.Solution, and_(db.Solution.task_id == task.task_id, db.Solution.user_id == db.Participation.user_id))
.options(joinedload(db.Solution.user),
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)))
if for_contest:
query = query.filter(db.Participation.contest == for_contest)
if for_site:
query = query.filter(db.Participation.place == for_site)
return query
class TaskPointsForm(FlaskForm):
submit = wtforms.SubmitField("Uložit body")
class TaskCreateForm(FlaskForm):
submit = wtforms.SubmitField("Založit označená řešení")
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/points', methods=('GET', 'POST'), endpoint="org_contest_task_points")
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/create', methods=('GET', 'POST'), endpoint="org_contest_task_create")
def org_contest_task(ct_id: int, task_id: int, site_id: Optional[int] = None):
ctx = get_context(ct_id=ct_id, site_id=site_id, task_id=task_id, right_needed=Right.view_submits)
assert ctx.contest and ctx.task
action_create = request.endpoint == "org_contest_task_create"
action_points = request.endpoint == "org_contest_task_points"
if action_create and not ctx.rights.can_create_solutions():
raise werkzeug.exceptions.Forbidden()
if action_points and not ctx.rights.can_edit_points():
raise werkzeug.exceptions.Forbidden()
sess = db.get_session()
q = get_solutions_query(ctx.task, for_contest=ctx.master_contest, for_site=ctx.site)
rows: List[Tuple[db.Participation, db.Solution]] = q.all()
rows.sort(key=lambda r: r[0].user.sort_key())
points_form: Optional[TaskPointsForm] = None
create_form: Optional[TaskCreateForm] = None
if action_create:
create_form = TaskCreateForm()
if create_form.validate_on_submit():
new_sol_count = 0
for pion, sol in rows:
if sol:
continue # již existuje
if not request.form.get(f"create_sol_{pion.user_id}"):
continue # nikdo nežádá o vytvoření
sol = db.Solution(task=ctx.task, user=pion.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
what=pion.user_id,
details={
'action': 'solution-created',
'task': task_id,
},
)
app.logger.info(f"Řešení úlohy {ctx.task.code} od účastníka {pion.user_id} založeno")
new_sol_count += 1
if new_sol_count > 0:
sess.commit()
flash(inflect_by_number(new_sol_count, "Založeno", "Založena", "Založeno") + ' '
+ inflect_number(new_sol_count, "nové řešení", "nová řešení", "nových řešení"),
"success")
else:
flash("Žádné změny k uložení", "info")
return redirect(ctx.url_for('org_contest_task'))
if action_points:
points_form = TaskPointsForm()
if points_form.validate_on_submit():
count = 0
ok = True
for _, sol in rows:
if sol is None:
continue
points, error = mo.util.parse_points(request.form.get(f"points_{sol.user_id}"), for_task=ctx.task, for_round=ctx.round)
if error:
flash(f'{sol.user.first_name} {sol.user.last_name}: {error}', 'danger')
ok = False
if ok and points != sol.points:
# Save points
sol.points = points
sess.add(db.PointsHistory(
task=ctx.task,
participant=sol.user,
user=g.user,
points_at=mo.now,
points=points,
))
count += 1
if ok:
if count > 0:
sess.commit()
flash("Změněny body u " + inflect_number(count, "řešení", "řešení", "řešení"), "success")
else:
flash("Žádné změny k uložení", "info")
return redirect(ctx.url_for('org_contest_task'))
# Count papers for each solution
paper_counts = {}
for user_id, type, count in (
db.get_session().query(db.Paper.for_user, db.Paper.type, func.count(db.Paper.type))
.filter_by(task=ctx.task)
.group_by(db.Paper.for_user, db.Paper.type)
.all()
):
paper_counts[(user_id, type.name)] = count
return render_template(
"org_contest_task.html",
ctx=ctx, rights=ctx.rights,
round=ctx.round, contest=ctx.contest,
rows=rows, paper_counts=paper_counts,
paper_link=lambda u, p: mo.web.util.org_paper_link(ctx.contest, ctx.site, u, p),
points_form=points_form, create_form=create_form, request_form=request.form,
)
class ContestSolutionsEditForm(FlaskForm):
submit = wtforms.SubmitField("Založit označená řešení")
@app.route('/org/contest/c/<int:ct_id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/solutions', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/solutions/edit', methods=('GET', 'POST'), endpoint="org_contest_solutions_edit")
def org_contest_solutions(ct_id: int, site_id: Optional[int] = None):
sess = db.get_session()
ctx = get_context(ct_id=ct_id, site_id=site_id, right_needed=Right.view_submits)
assert ctx.contest
edit_action = request.endpoint == "org_contest_solutions_edit"
if edit_action and not ctx.rights.can_create_solutions():
raise werkzeug.exceptions.Forbidden()
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=ctx.master_contest)
if ctx.site:
pions_subq = pions_subq.filter_by(place=ctx.site)
pions_subq = pions_subq.subquery()
pions = (sess.query(db.Participation)
.filter(
db.Participation.contest == ctx.master_contest,
db.Participation.user_id.in_(pions_subq),
).options(joinedload(db.Participation.user))
.all())
pions.sort(key=lambda p: p.user.sort_key())
tasks_subq = sess.query(db.Task.task_id).filter_by(round=ctx.round).subquery()
tasks = (sess.query(db.Task)
.filter_by(round=ctx.round)
.order_by(db.Task.code)
.all())
sols = sess.query(db.Solution).filter(
db.Solution.user_id.in_(pions_subq),
db.Solution.task_id.in_(tasks_subq)
).options(
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)
).all()
# Count papers for each task and solution
paper_counts = {}
for user_id, task_id, type, count in (
db.get_session().query(db.Paper.for_user, db.Paper.for_task, db.Paper.type, func.count(db.Paper.type))
.filter(db.Paper.for_task.in_(tasks_subq))
.group_by(db.Paper.for_user, db.Paper.for_task, db.Paper.type)
.all()
):
paper_counts[(user_id, task_id, type.name)] = count
task_sols: Dict[int, Dict[int, db.Solution]] = {}
for t in tasks:
task_sols[t.task_id] = {}
for s in sols:
task_sols[s.task_id][s.user_id] = s
edit_form: Optional[ContestSolutionsEditForm] = None
if edit_action:
edit_form = ContestSolutionsEditForm()
if edit_form.validate_on_submit():
new_sol_count = 0
for task in tasks:
for pion in pions:
if pion.user_id in task_sols[task.task_id]:
continue # již existuje
if not request.form.get(f"create_sol_{task.task_id}_{pion.user_id}"):
continue # nikdo nežádá o vytvoření
sol = db.Solution(task=task, user=pion.user)
sess.add(sol)
mo.util.log(
type=db.LogType.participant,
what=pion.user_id,
details={
'action': 'solution-created',
'task': task.task_id,
},
)
app.logger.info(f"Řešení úlohy {task.code} od účastníka {pion.user_id} založeno")
new_sol_count += 1
if new_sol_count > 0:
sess.commit()
flash(inflect_by_number(new_sol_count, "Založeno", "Založena", "Založeno") + ' '
+ inflect_number(new_sol_count, "nové řešení", "nová řešení", "nových řešení"),
"success")
else:
flash("Žádné změny k uložení", "info")
return redirect(ctx.url_for('org_contest_solutions'))
return render_template(
'org_contest_solutions.html',
ctx=ctx,
contest=ctx.contest, site=ctx.site, rights=ctx.rights,
pions=pions, tasks=tasks, tasks_sols=task_sols, paper_counts=paper_counts,
paper_link=lambda u, p: mo.web.util.org_paper_link(ctx.contest, ctx.site, u, p),
edit_form=edit_form,
)
class DownloadSubmitsForm(FlaskForm):
min_points = mo_fields.Points(
'Minimální počet bodů v kole', render_kw={'autofocus': True},
description='Je-li uveden, řešení účastníků, kteří ve výsledcích celého kola dostali méně bodů, se nestahují.',
validators=[validators.Optional()]
)
download_sol = wtforms.SubmitField('Stáhnout všechna účastnická řešení')
download_fb = wtforms.SubmitField('Stáhnout všechna opravená řešení')
download_sol_mix = wtforms.SubmitField('Stáhnout účastnická/opravená')
download_fb_mix = wtforms.SubmitField('Stáhnout opravená/účastnická')
def download_submits(form: DownloadSubmitsForm, round: db.Round, sol_query, pion_query, subj_suffix: str, want_subdirs: bool) -> bool:
if not form.validate_on_submit():
return False
sols = sol_query.all()
if form.min_points.data is not None:
# Každému účastníkovi z vybrané množiny posčítáme body za všechny úlohy kola
pts = (db.get_session().query(db.Solution.user_id, func.sum(db.Solution.points))
.select_from(db.Solution)
.join(db.Task, and_(db.Task.task_id == db.Solution.task_id, db.Task.round == round))
.filter(db.Solution.user_id.in_(pion_query.subquery()))
.group_by(db.Solution.user_id)
.all())
pts_dict = {uid: pt or 0 for uid, pt in pts}
sols = [s for s in sols if pts_dict[s.user_id] >= form.min_points.data]
if form.download_sol.data:
paper_ids = [s.final_submit for s in sols]
subj_prefix = 'Odevzdaná'
elif form.download_fb.data:
paper_ids = [s.final_feedback for s in sols]
subj_prefix = 'Opravená'
elif form.download_sol_mix:
paper_ids = [s.final_submit or s.final_feedback for s in sols]
subj_prefix = 'Odevzdaná/opravená'
elif form.download_fb_mix:
paper_ids = [s.final_feedback or s.final_submit for s in sols]
subj_prefix = 'Opravená/odevzdaná'
else:
return False
paper_ids = [p for p in paper_ids if p is not None]
mo.jobs.submit.schedule_download_submits(paper_ids, f'{subj_prefix} {subj_suffix}', g.user, want_subdirs)
flash('Příprava řešení ke stažení zahájena.', 'success')
return True
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
def org_generic_batch_download(task_id: int, round_id: Optional[int] = None, ct_id: Optional[int] = None, site_id: Optional[int] = None):
sess = db.get_session()
ctx = get_context(round_id=round_id, ct_id=ct_id, site_id=site_id, task_id=task_id, right_needed=Right.view_submits)
round, contest, site, task = ctx.round, ctx.contest, ctx.site, ctx.task
assert task
pion_query = sess.query(db.Participation.user_id).select_from(db.Participation)
if contest:
pion_query = pion_query.filter_by(contest_id=contest.master_contest_id)
if site:
pion_query = pion_query.filter_by(place=site)
else:
pion_query = pion_query.join(db.Contest).filter(db.Contest.round_id == round.master_round_id)
sol_query = (sess.query(db.Solution)
.select_from(db.Solution)
.filter(db.Solution.task == task))
if contest is not None:
sol_query = sol_query.filter(db.Solution.user_id.in_(pion_query.subquery()))
form = DownloadSubmitsForm()
if request.method == 'POST':
subj = f'řešení {task.code}'
if site is not None:
subj = f'{subj} ({site.name})'
elif contest is not None:
subj = f'{subj} ({contest.place.name})'
if download_submits(form, round, sol_query, pion_query, subj, contest is None):
return redirect(url_for('org_jobs'))
sol_paper = aliased(db.Paper)
fb_paper = aliased(db.Paper)
sol_query = (sol_query.with_entities(func.count(db.Solution.user_id),
func.count(sol_paper.paper_id),
func.count(fb_paper.paper_id),
func.sum(sol_paper.bytes),
func.sum(fb_paper.bytes))
.outerjoin(sol_paper, sol_paper.paper_id == db.Solution.final_submit)
.outerjoin(fb_paper, fb_paper.paper_id == db.Solution.final_feedback))
submit_count, sol_count, fb_count, sol_size, fb_size = sol_query.one()
pion_count = pion_query.with_entities(func.count(db.Participation.user_id)).scalar()
return render_template(
'org_generic_batch_download.html',
ctx=ctx,
round=round, contest=contest, site=site, task=task,
submit_count=submit_count,
pion_count=pion_count,
sol_count=sol_count, fb_count=fb_count,
sol_size=sol_size, fb_size=fb_size,
form=form,
)
class UploadSubmitsForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()], render_kw={'autofocus': True})
submit = wtforms.SubmitField('Odeslat')
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
def org_generic_batch_upload(task_id: int, round_id: Optional[int] = None, ct_id: Optional[int] = None, site_id: Optional[int] = None):
ctx = get_context(round_id=round_id, ct_id=ct_id, site_id=site_id, task_id=task_id)
round, contest, site, task = ctx.round, ctx.contest, ctx.site, ctx.task
assert task
# Základní kontrola, zda vůbec chceme akci spustit. Zbytek se kontrole uvnitř jobu.
# Zatím neumíme dávkově nahrávat řešení.
if not ctx.rights.offer_upload_feedback():
raise werkzeug.exceptions.Forbidden()
request.custom_max_content_length = mo.config.MAX_BATCH_CONTENT_LENGTH
form = UploadSubmitsForm()
if form.validate_on_submit():
file = form.file.data.stream
mo.jobs.submit.schedule_upload_feedback(round, file.name, f'Nahrání opravených řešení {round.round_code()}',
for_user=g.user,
only_contest=contest, only_site=site, only_task=task)
return redirect(url_for('org_jobs'))
return render_template(
'org_generic_batch_upload.html',
ctx=ctx,
round=round, contest=contest, site=site, task=task,
max_size=mo.config.MAX_BATCH_CONTENT_LENGTH,
form=form,
)
class BatchPointsForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
fmt = wtforms.SelectField(
"Formát souboru",
choices=FileFormat.choices(), coerce=FileFormat.coerce,
default=FileFormat.cs_csv,
)
add_del_sols = wtforms.BooleanField('Zakládat / mazat řešení', description='Xyzzy')
submit = wtforms.SubmitField('Nahrát body')
get_template = wtforms.SubmitField('Stáhnout šablonu')
@app.route('/org/contest/c/<int:ct_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
@app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
def org_generic_batch_points(task_id: int, round_id: Optional[int] = None, ct_id: Optional[int] = None):
ctx = get_context(round_id=round_id, ct_id=ct_id, task_id=task_id)
round, contest, task = ctx.round, ctx.contest, ctx.task
if not ctx.rights.can_edit_points():
raise werkzeug.exceptions.Forbidden()
form = BatchPointsForm()
errs = []
warnings = []
if form.validate_on_submit():
fmt = form.fmt.data
imp = create_import(user=g.user, type=ImportType.points, fmt=fmt, round=round, contest=contest, task=task, allow_add_del=form.add_del_sols.data)
if form.submit.data:
if form.file.data is not None:
file = form.file.data.stream
import_tmp = mo.util.link_to_dir(file.name, mo.util.data_dir('imports'), suffix='.csv')
if imp.run(import_tmp):
if imp.cnt_rows == 0:
flash('Soubor neobsahoval žádné řádky s daty', 'danger')
else:
flash(f'Importováno ({imp.cnt_rows} řádků, {imp.cnt_set_points} řešení přebodováno, {imp.cnt_add_sols} založeno a {imp.cnt_del_sols} smazáno)', 'success')
return redirect(ctx.url_home())
else:
errs = imp.errors
warnings = imp.warnings
else:
flash('Vyberte si prosím soubor', 'danger')
elif form.get_template.data:
out = imp.get_template()
resp = app.make_response(out)
resp.content_type = fmt.get_content_type()
resp.headers.add('Content-Disposition', 'attachment; filename=OSMO-' + imp.template_basename + '.' + fmt.get_extension())
return resp
return render_template(
'org_generic_batch_points.html',
ctx=ctx,
round=round, contest=contest, task=task,
form=form,
errs=errs,
warnings=warnings
)
@app.route('/org/contest/c/<int:ct_id>/user/<int:user_id>')
def org_contest_user(ct_id: int, user_id: int):
ctx = get_context(ct_id=ct_id, user_id=user_id, right_needed=Right.view_contestants)
assert ctx.contest
sess = db.get_session()
pant = sess.query(db.Participant).filter_by(
user_id=user_id, year=ctx.round.year
).options(joinedload(db.Participant.school_place)).one_or_none()
if not pant:
raise werkzeug.exceptions.NotFound()
task_sols = sess.query(db.Task, db.Solution).select_from(db.Task).outerjoin(
db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == ctx.user)
).filter(db.Task.round == ctx.round).options(
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)
).order_by(db.Task.code).all()
# Count papers for each task and solution
tasks_subq = sess.query(db.Task.task_id).filter_by(round=ctx.round).subquery()
paper_counts = {}
for task_id, type, count in (
db.get_session().query(db.Paper.for_task, db.Paper.type, func.count(db.Paper.type))
.filter(
db.Paper.for_user == user_id,
db.Paper.for_task.in_(tasks_subq)
).group_by(db.Paper.for_task, db.Paper.type)
.all()
):
paper_counts[(task_id, type.name)] = count
return render_template(
'org_contest_user.html',
ctx=ctx, rights=ctx.rights,
pant=pant, task_sols=task_sols,
paper_link=lambda u, p: mo.web.util.org_paper_link(ctx.contest, None, u, p),
paper_counts=paper_counts,
)
class AdvanceForm(FlaskForm):
boundary = mo_fields.Points(
'Bodová hranice', render_kw={'autofocus': True},
description="Postoupí všichni účastníci, kteří v minulém kole získali aspoň tolik bodů.",
validators=[validators.InputRequired()]
)
status = wtforms.HiddenField()
preview = wtforms.SubmitField('Zobrazit návrh')
execute = wtforms.SubmitField('Provést')
@app.route('/org/contest/c/<int:ct_id>/advance', methods=('GET', 'POST'))
def org_contest_advance(ct_id: int):
sess = db.get_session()
conn = sess.connection()
ctx = get_context(ct_id=ct_id, right_needed=Right.manage_contest)
contest, master_contest = ctx.contest, ctx.master_contest
round = ctx.round
assert contest and master_contest
def redirect_back():
return redirect(ctx.url_for('org_contest'))
if round.state != db.RoundState.preparing:
flash('Aktuální kolo není ve stavu přípravy', 'danger')
return redirect_back()
prev_round = sess.query(db.Round).filter_by(
year=round.year, category=round.category, seq=round.seq - 1
).filter(db.Round.master_round_id == db.Round.round_id).one_or_none()
if prev_round is None:
flash('Předchozí kolo nenalezeno', 'danger')
return redirect_back()
elif prev_round.state != db.RoundState.closed:
# FIXME: Možná kontrolovat stav uzavření všech kol ve skupině kol?
flash('Předchozí kolo dosud nebylo ukončeno', 'danger')
return redirect_back()
elif prev_round.level < round.level:
flash('Předchozí kolo se koná ve vyšší oblasti než toto kolo', 'danger')
return redirect_back()
prev_contests: List[db.Contest] = []
accept_by_place_id: Dict[int, int] = {}
reject_by_place_id: Dict[int, int] = {}
form = AdvanceForm()
if form.validate_on_submit():
desc_cte = db.place_descendant_cte(contest.place, max_level=prev_round.level)
prev_contests = (sess.query(db.Contest)
.filter(db.Contest.round == prev_round)
.filter(db.Contest.place_id.in_(select([desc_cte])))
.options(joinedload(db.Contest.place))
.all())
prev_contests.sort(key=lambda c: locale.strxfrm(c.place.name or ""))
accept_by_place_id = {c.place_id: 0 for c in prev_contests}
reject_by_place_id = {c.place_id: 0 for c in prev_contests}
prev_pion_query = (sess.query(db.Participation)
.filter(db.Participation.contest_id.in_([c.contest_id for c in prev_contests]))
.filter_by(state=db.PartState.active))
prev_pions = prev_pion_query.all()
if form.boundary.data > 0:
round_subquery = sess.query(db.Round.round_id).filter_by(master_round_id=prev_round.round_id).subquery()
accept_uids = (sess.query(db.Solution.user_id)
.select_from(db.Solution)
# Vybíráme úlohy, jejich round patří do stejné skupiny kol jako prev_round
.join(db.Task, and_(db.Task.task_id == db.Solution.task_id, db.Task.round_id.in_(round_subquery)))
.filter(db.Solution.user_id.in_(prev_pion_query.with_entities(db.Participation.user_id).subquery()))
.group_by(db.Solution.user_id)
.having(func.sum(db.Solution.points) >= form.boundary.data)
.all())
accept_uids = [a[0] for a in accept_uids]
else:
accept_uids = None
want_execute = form.execute.data
if want_execute:
app.logger.info(f'Postup: Z kola #{prev_round.round_id} do #{round.master_round_id}, soutěž #{master_contest.contest_id}')
mo.util.log(
type=db.LogType.contest,
what=master_contest.contest_id,
details={'action': 'advance'},
)
really_inserted = 0
for pp in prev_pions:
# This incurs no real queries as we have all the contests cached
prev_place_id = sess.query(db.Contest).get(pp.contest_id).place_id
if accept_uids and pp.user_id not in accept_uids:
reject_by_place_id[prev_place_id] += 1
continue
accept_by_place_id[prev_place_id] += 1
if want_execute:
# ORM neumí ON CONFLICT DO NOTHING, takže musíme o vrstvu níže
res = conn.execute(
pgsql_insert(db.Participation.__table__)
.values(
user_id=pp.user_id,
contest_id=contest.contest_id,
place_id=contest.place.place_id,
state=db.PartState.active,
)
.on_conflict_do_nothing()
.returning(db.Participation.contest_id)
)
inserted = res.fetchall()
if inserted:
# Opravdu došlo ke vložení
really_inserted += 1
app.logger.info(f'Postup: Založena účast user=#{pp.user_id} contest=#{ct_id} place=#{contest.place_id}')
mo.util.log(
type=db.LogType.participant,
what=pp.user_id,
details={
'action': 'add-to-contest',
# Tady nemůžeme použít obvyklé row2dict, neboť nemáme v ruce ORMový objekt
'new': {
'contest_id': contest.contest_id,
'place_id': contest.place_id,
},
},
)
if want_execute:
sess.commit()
msg = (inflect_by_number(really_inserted, 'Pozván', 'Pozváni', 'Pozváno')
+ ' '
+ inflect_number(really_inserted, 'nový soutěžící', 'noví soutěžící', 'nových soutěžících')
+ '.')
flash(msg, 'success')
return redirect_back()
return render_template(
'org_contest_advance.html',
ctx=ctx,
contest=contest,
round=contest.round,
prev_round=prev_round,
prev_contests=prev_contests,
accept_by_place_id=accept_by_place_id,
reject_by_place_id=reject_by_place_id,
form=form,
)
class ContestEditForm(FlaskForm):
state = wtforms.SelectField("Stav soutěže",
choices=[ch for ch in db.RoundState.choices() if ch[0] != 'delegate'],
coerce=db.RoundState.coerce)
submit = wtforms.SubmitField('Uložit')
@app.route('/org/contest/c/<int:ct_id>/edit', methods=('GET', 'POST'))
def org_contest_edit(ct_id: int):
sess = db.get_session()
ctx = get_context(ct_id=ct_id, right_needed=Right.manage_contest)
contest = ctx.contest
round = ctx.round
assert contest and round
form = ContestEditForm(obj=contest)
if round.state != db.RoundState.delegate:
form.state.render_kw = {'disabled': ""}
form.state.description = 'Nastavení kola neumožňuje měnit stav soutěže.'
if form.validate_on_submit():
form.populate_obj(contest)
if sess.is_modified(contest):
changes = db.get_object_changes(contest)
if 'state' in changes and round.state != db.RoundState.delegate:
flash("Nastavení kola neumožňuje měnit stav soutěže", "danger")
return redirect(url_for('org_contest', ct_id=ct_id))
app.logger.info(f"Contest #{ct_id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.contest,
what=ct_id,
details={'action': 'edit', 'changes': changes},
)
sess.commit()
flash('Změny soutěže uloženy', 'success')
else:
flash('Žádné změny k uložení', 'info')
return redirect(ctx.url_for('org_contest'))
return render_template(
'org_contest_edit.html',
ctx=ctx,
round=round,
contest=contest,
form=form,
)
class ParticipantAddForm(FlaskForm):
email = mo_fields.Email(validators=[validators.Required()])
first_name = mo_fields.FirstName(validators=[validators.Optional()])
last_name = mo_fields.LastName(validators=[validators.Optional()])
school = mo_fields.School(validators=[validators.Optional()])
grade = mo_fields.Grade(validators=[validators.Optional()])
birth_year = mo_fields.BirthYear(validators=[validators.Optional()])
participation_place = mo_fields.Place("Kód soutěžního místa")
save = wtforms.SubmitField("Přidat")
def set_descriptions(self, contest: db.Contest):
self.school.description = f'Kód školy najdete v <a href="{url_for("org_place", id=contest.place.place_id)}">katalogu míst</a>.'
self.participation_place.description = f'Pokud účastník soutěží někde jinde než {contest.place.name_locative()}, vyplňte <a href="{url_for("org_place", id=contest.place.place_id)}">kód místa</a>. Dozor na tomto místě pak může za účastníka odevzdávat řešení.'
@app.route('/org/contest/c/<int:ct_id>/participants/new', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:ct_id>/site/<int:site_id>/participants/new', methods=('GET', 'POST'))
def org_contest_add_user(ct_id: int, site_id: Optional[int] = None):
ctx = get_context(ct_id=ct_id, site_id=site_id, right_needed=Right.manage_contest)
contest = ctx.master_contest
assert contest
form = ParticipantAddForm()
if site_id is not None:
if not form.is_submitted():
form.participation_place.process_data(site_id)
form.participation_place.render_kw = {"readonly": True}
form.set_descriptions(contest)
if form.validate_on_submit():
try:
user, is_new_user = mo.users.find_or_create_user(form.email.data, form.first_name.data, form.last_name.data, False, reason='web')
participant, is_new_participant = mo.users.find_or_create_participant(user, contest.round.year, form.school.get_place_id(), form.birth_year.data, form.grade.data, reason='web')
participation, is_new_participation = mo.users.find_or_create_participation(user, contest, form.participation_place.get_place(), reason='web')
except mo.CheckError as e:
db.get_session().rollback()
flash(f"{e}", "danger")
else:
db.get_session().commit()
if is_new_user:
flash("Založen nový uživatel.", "info")
token = mo.users.make_activation_token(user)
mo.email.send_new_account_email(user, token)
if is_new_participant:
flash("Založena nová registrace do ročníku.", "info")
if is_new_participation:
flash("Uživatel přihlášen do soutěže.", "info")
else:
flash("Žádná změna. Uživatel už byl přihlášen.", "info")
return redirect(ctx.url_for('org_generic_list'))
return render_template(
'org_contest_add_user.html',
ctx=ctx,
contest=contest, round=ctx.master_round, site=ctx.site,
form=form
)