Select Git revision
test_main.cpp
org_contest.py 21.15 KiB
from dataclasses import dataclass
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
import locale
import os
import secrets
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.query import Query
from typing import List, Tuple, Optional, Sequence, Dict
import werkzeug.exceptions
import wtforms
import mo
import mo.csv
import mo.db as db
import mo.imports
import mo.rights
from mo.rights import Right, Rights
import mo.util
from mo.web import app
from mo.web.util import PagerForm
from mo.web.table import CellCheckbox, Table, Column, cell_place_link, cell_user_link, cell_email_link
import wtforms.validators as validators
class ImportForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()])
submit = wtforms.SubmitField('Importovat')
class ParticipantsFilterForm(PagerForm):
school = wtforms.StringField("Škola")
participation_place = wtforms.StringField("Soutěžní místo")
contest_place = wtforms.StringField("Soutěžní oblast")
participation_state = wtforms.SelectField('Stav účasti', choices=[('*', '*')] + list(db.PartState.choices()), default='*')
# format = wtforms.RadioField(choices=[('', 'Zobrazit'), ('csv', 'Stáhnout vše v CSV'), ('tsv', 'Stáhnout vše v TSV')])
submit = wtforms.SubmitField("Zobrazit")
download_csv = wtforms.SubmitField("↓ CSV")
download_tsv = wtforms.SubmitField("↓ TSV")
class ParticipantsActionForm(FlaskForm):
action_on = wtforms.RadioField(
"Provést akci na", validators=[validators.DataRequired()],
choices=[('all', 'všech vyfiltrovaných účastnících'), ('checked', 'označených účastnících')]
# checkboxes are handled not through FlaskForm, see below
)
participation_state = wtforms.SelectField('Stav účasti', choices=list(db.PartState.choices()))
set_participation_state = wtforms.SubmitField("Nastavit stav účasti")
participation_place = wtforms.StringField(
'Soutěžní místo', description='Zadejte kód nebo ID ve tvaru <code>#123</code>'
)
set_participation_place = wtforms.SubmitField("Nastavit soutěžní místo")
contest_place = wtforms.StringField(
'Soutěžní oblast',
description='Musí existovat soutěž v dané oblasti pro stejné kolo. Oblast zadejte pomocí kódu nebo ID ve tvaru <code>#123</code>.'
)
set_contest = wtforms.SubmitField("Přesunout do jiné soutěžní oblasti")
remove_participation = wtforms.SubmitField("Smazat záznam o účasti")
def do_action(self, round: db.Round, rights: Rights, query: Query) -> bool:
"""Do participation modification on partipations from given query
(possibly filtered by checkboxes). `rights` param is used to check rights
for contest of each modified participation or for contest in which
participation is moved to."""
if not self.validate_on_submit():
return False
sess = db.get_session()
# Check that operation is valid
if self.set_participation_state.data:
pass
elif self.set_participation_place.data:
participation_place = db.get_place_by_code(self.participation_place.data)
if not participation_place:
flash('Nenalezeno místo s daným kódem', 'danger')
return False
elif self.set_contest.data:
contest_place = db.get_place_by_code(self.contest_place.data)
if not contest_place:
flash("Nepovedlo se najít zadanou soutěžní oblast", 'danger')
return False
contest = sess.query(db.Contest).filter_by(round_id=round.round_id, place_id=contest_place.place_id).one_or_none()
if not contest:
flash(f"Nepovedlo se najít soutěž v kole {round.round_code()} v oblasti {contest_place.name}", 'danger')
return False
rights.get_for_contest(contest)
if not rights.have_right(Right.manage_contest):
flash(f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {contest_place.name}, nelze do ní přesunout účastníky", 'danger')
return False
elif self.remove_participation.data:
pass
else:
flash('Neznámá operace', 'danger')
return False
try:
user_ids = list(map(int, request.form.getlist('checked')))
except ValueError:
flash('Data v checkboxech nelze převést na čísla, kontaktujte správce', 'danger')
return False
# Check all participations if we can edit them
ctants = query.all()
rights_cache = set()
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
continue
if pion.contest_id in rights_cache:
continue
rights.get_for_contest(pion.contest)
if rights.have_right(Right.manage_contest):
rights_cache.add(pion.contest_id)
continue
flash(
f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {pion.contest.place.name} "
+ f"(účastník {u.full_name()}). Žádná akce nebyla provedena.", 'danger'
)
return False
count = 0
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
continue
if self.remove_participation.data:
sess.delete(pion)
app.logger.info(f"Participation of user {u.user_id} in contest {pion.contest} removed")
mo.util.log(
type=db.LogType.participant,
what=u.user_id,
details={'action': 'participation-removed', 'participation': db.row2dict(pion)},
)
else:
if self.set_participation_state.data:
pion.state = self.participation_state.data
elif self.set_participation_place.data:
pion.place = participation_place
elif self.set_contest.data:
pion.contest = contest
changes = db.get_object_changes(pion)
app.logger.info(f"Participation of user {u.user_id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.participant,
what=u.user_id,
details={'action': 'participation-changed', 'changes': changes},
)
sess.flush()
count += 1
sess.commit()
if count == 0:
flash('Žádní vybraní účastníci', 'warning')
elif self.set_participation_state.data:
flash(f'Nastaven stav {db.part_state_names[self.participation_state.data]} {count} účastníkům', 'success')
elif self.set_participation_place.data:
flash(f'Nastaveno soutěžní místo {participation_place.name} {count} účastníkům', 'success')
elif self.set_contest.data:
flash(f'{count} účastníků přesunuto do soutěže v oblasti {contest_place.name}', 'success')
elif self.remove_participation.data:
flash(f'Odstraněno {count} účastníků z této soutěže', 'success')
return True
def get_contest(id: int) -> db.Contest:
contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round))
.get(id))
if not contest:
raise werkzeug.exceptions.NotFound()
return contest
def get_contest_rr(id: int, right_needed: Optional[Right] = None) -> Tuple[db.Contest, Rights]:
contest = get_contest(id)
rr = Rights(g.user)
rr.get_for_contest(contest)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, rr
@app.route('/org/contest/c/<int:id>')
def org_contest(id: int):
contest, rr = get_contest_rr(id, None)
return render_template(
'org_contest.html',
contest=contest,
rights=sorted(rr.current_rights, key=lambda r: r. name),
can_manage=rr.have_right(Right.manage_contest),
)
@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
def org_contest_import(id: int):
contest, rr = get_contest_rr(id, Right.manage_contest)
form = ImportForm()
errs = []
if form.validate_on_submit():
tmp_name = secrets.token_hex(16) + '.csv'
tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
form.file.data.save(tmp_path)
imp = mo.imports.Import(g.user)
if imp.import_contest(contest.round, contest, tmp_path):
flash(f'Účastníci importováni ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_participations} účastí)', 'success')
return redirect(url_for('org_contest', id=contest.contest_id))
else:
flash('Došlo k chybě při importu (detaily níže)', 'danger')
errs = imp.errors
return render_template(
'org_contest_import.html',
contest=contest,
form=form,
errs=errs,
)
@app.route('/doc/import_contest')
def doc_import_contest():
return render_template('doc_import_contest.html')
@app.route('/org/contest/import/sablona.csv')
def org_contest_import_template():
out = mo.imports.contest_template()
resp = app.make_response(out)
resp.content_type = 'text/csv; charset=utf=8'
return resp
@app.route('/org/contest/c/<int:id>/ucastnici', methods=('GET', 'POST'))
def org_contest_list(id: int):
contest, rr = get_contest_rr(id)
can_edit = rr.have_right(Right.manage_contest)
format = request.args.get('format', "")
filter = ParticipantsFilterForm(request.args)
filter.validate()
query = get_contestants_query(
round=contest.round, contest=contest,
school=db.get_place_by_code(filter.school.data),
# contest_place=db.get_place_by_code(filter.contest_place.data),
participation_place=db.get_place_by_code(filter.participation_place.data),
participation_state=None if filter.participation_state.data == '*' else filter.participation_state.data
)
action_form = None
if can_edit:
action_form = ParticipantsActionForm()
if action_form.do_action(round=contest.round, rights=rr, query=query):
# Action happened, redirect
return redirect(request.url)
# (count, query) = filter.apply_limits(query, pagesize=50)
count = query.count()
if format == "":
table = make_contestant_table(query, add_checkbox=can_edit)
return render_template(
'org_contest_list.html',
contest=contest,
table=table,
filter=filter, count=count, action_form=action_form,
)
else:
table = make_contestant_table(query)
return table.send_as(format)
contest_list_columns = (
Column(key='first_name', name='krestni', title='Křestní jméno'),
Column(key='last_name', name='prijmeni', title='Příjmení'),
Column(key='email', name='email', title='E-mail'),
Column(key='school', name='skola', title='Škola'),
Column(key='school_code', name='kod_skoly', title='Kód školy'),
Column(key='grade', name='rocnik', title='Ročník'),
Column(key='born_year', name='rok_naroz', title='Rok naroz.'),
Column(key='place_code', name='kod_soutez_mista', title='Sout. místo'),
Column(key='status', name='stav', title='Stav'),
)
def get_contestants_query(
round: db.Round, contest: Optional[db.Contest] = None,
contest_place: Optional[db.Place] = None,
participation_place: Optional[db.Place] = None,
participation_state: Optional[db.PartState] = None,
school: Optional[db.Place] = None) -> Query:
query = (db.get_session()
.query(db.Participation, db.Participant, db.Contest)
.select_from(db.Participation)
.join(db.Participant, db.Participant.user_id == db.Participation.user_id)
.filter(db.Participant.year == round.year))
if contest:
query = query.join(db.Contest, db.Contest.contest_id == contest.contest_id)
else:
query = query.filter(db.Contest.round == round)
query = query.options(joinedload(db.Contest.place))
query = query.filter(db.Participation.contest_id == db.Contest.contest_id)
if contest_place:
query = query.filter(db.Contest.place_id == contest_place.place_id)
if participation_place:
query = query.filter(db.Participation.place_id == participation_place.place_id)
if school:
query = query.filter(db.Participant.school == school.place_id)
if participation_state:
query = query.filter(db.Participation.state == participation_state)
query = query.options(joinedload(db.Participation.user),
joinedload(db.Participation.place),
joinedload(db.Participant.school_place))
return query
def make_contestant_table(query: Query, add_checkbox: bool = False, add_contest_column: bool = False):
ctants = query.all()
rows: List[dict] = []
for pion, pant, ct in ctants:
u = pion.user
rows.append({
'sort_key': (locale.strxfrm(u.last_name), locale.strxfrm(u.first_name), u.user_id),
'first_name': cell_user_link(u, u.first_name),
'last_name': cell_user_link(u, u.last_name),
'email': cell_email_link(u),
'school': pant.school_place.name,
'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()),
'grade': pant.grade,
'born_year': pant.birth_year,
'region_code': cell_place_link(ct.place, ct.place.get_code()),
'place_code': cell_place_link(pion.place, pion.place.get_code()),
'status': pion.state.friendly_name(),
'checkbox': CellCheckbox('checked', u.user_id, False),
})
rows.sort(key=lambda r: r['sort_key'])
cols: Sequence[Column] = contest_list_columns
if add_checkbox:
cols = [Column(key='checkbox', name=' ', title=' ')] + list(cols)
if add_contest_column:
cols = list(cols) + [Column(key='region_code', name='kod_oblasti', title='Oblast')]
return Table(
columns=cols,
rows=rows,
filename='ucastnici',
show_downlink=False, # downlinks are in filter
)
@app.route('/org/contest/c/<int:id>/reseni')
def org_contest_solutions(id: int):
# FIXME: Práva?
# FIXME: Hlavička stránky podle Jirkova předělání
contest, rr = get_contest_rr(id, Right.manage_contest)
format = request.args.get('format', "")
sess = db.get_session()
pions = (sess.query(db.Participation)
.filter_by(contest=contest)
.options(joinedload(db.Participation.user))
.all())
tasks = (sess.query(db.Task)
.filter_by(round=contest.round)
.order_by(db.Task.code)
.all())
pions_subq = (sess.query(db.Participation.user_id)
.filter_by(contest=contest)
.subquery())
sols = (sess.query(db.Solution)
.filter(db.Solution.user_id.in_(pions_subq))
.options(joinedload(db.Solution.last_submit_obj),
joinedload(db.Solution.last_feedback_obj))
.all())
print('XXX pions:', pions) # FIXME
print('XXX tasks:', tasks)
print('XXX sols:', sols)
cols = [ Column(key='name', name='jmeno', title='Jméno') ]
task_sols: Dict[int, Dict[int, db.Solution]] = {}
for t in tasks:
cols.append(Column(key=f't-{t.task_id}', name=t.code))
task_sols[t.task_id] = {}
for s in sols:
task_sols[s.task_id][s.user_id] = s
rows = []
for pion in pions:
user = pion.user
r = {
'name': user.full_name(),
}
for t in tasks:
s = task_sols[t.task_id].get(user.user_id, None)
if s is not None:
cell = '*'
else:
cell = ""
r[f't-{t.task_id}'] = cell
rows.append(r)
print('XXX cols:', cols) # FIXME
print('XXX rows:', rows)
table = Table(
columns=cols,
rows=rows,
filename='reseni',
)
if format == "":
return render_template(
'org_contest_solutions.html',
contest=contest,
table=table,
)
else:
return table.send_as(format)
@dataclass
class SolutionContext:
contest: db.Contest
round: db.Round
pion: db.Participation
task: db.Task
solution: Optional[db.Solution]
allow_view: bool
allow_upload_solutions: bool
allow_upload_feedback: bool
def get_solution_context(contest_id: int, site_id: Optional[int], user_id: int, task_id: int) -> SolutionContext:
sess = db.get_session()
# Nejprve zjistíme, zda existuje soutěž
contest = get_contest(contest_id)
round = contest.round
# Zkontrolujeme, zda se účastník opravdu účastní soutěže
pion = (sess.query(db.Participation)
.filter_by(user_id=user_id, contest_id=contest_id)
.options(joinedload(db.Participation.place))
.one_or_none())
if not pion:
raise werkzeug.exceptions.NotFound()
# A zda soutěží na zadaném soutěžním místě, je-li určeno
if site_id is not None and site_id != pion.site_id:
raise werkzeug.exceptions.NotFound()
# Najdeme úlohu a ověříme, že je součástí soutěže
task = sess.query(db.Task).get(task_id)
if not task or task.round_id != round:
raise werkzeug.exceptions.NotFound()
# Najdeme řešení úlohy (nemusí existovat)
sol = (sess.query(db.Solution)
.filter_by(user_id=user_id, task_id=task_id)
.one_or_none())
# Pokud je uvedeno soutěžní místo, hledáme práva k němu, jinak k soutěži
if site_id is not None:
site = pion.place
else:
site = contest.place
rr = Rights(g.user)
rr.get_for_contest_site(contest, site)
# Kdo má právo na jaké operace
allow_upload_solutions = (rr.have_right(Right.manage_contest)
or (rr.have_right(Right.upload_solutions) and round.state == db.RoundState.running))
allow_upload_feedback = (rr.have_right(Right.manage_contest)
or (rr.have_right(Right.upload_feedback) and round.state == db.RoundState.grading))
allow_view = (rr.have_right(Right.manage_contest)
or (rr.have_right(Right.upload_solutions) and round.state in (db.RoundState.running, db.RoundState.grading, db.RoundState.closed))
or (rr.have_right(Right.upload_feedback) and round.state in (db.RoundState.grading, db.RoundState.closed)))
if not allow_view:
raise werkzeug.exceptions.Forbidden()
return SolutionContext(
contest=contest,
round=round,
pion=pion,
task=task,
solution=sol,
allow_view=allow_view,
allow_upload_solutions=allow_upload_solutions,
allow_upload_feedback=allow_upload_feedback,
)
@app.route('/org/contest/c/<int:contest_id>/submit/<int:user_id>/<int:task_id>/')
def org_submit_list(contest_id, user_id, task_id):
sc = get_solution_context(contest_id, None, user_id, task_id)
# FIXME
return render_template('not_implemented.html')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/submit/<int:user_id>/<int:task_id>/')
def org_submit_site_list(contest_id, site_id, user_id, task_id):
sc = get_solution_context(contest_id, site_id, user_id, task_id)
# FIXME
return render_template('not_implemented.html')
@app.route('/org/contest/c/<int:id>/proctor-import', methods=('GET', 'POST'))
def org_proctor_import(id: int):
contest, rr = get_contest_rr(id, Right.manage_contest)
form = ImportForm()
errs = []
if form.validate_on_submit():
tmp_name = secrets.token_hex(16) + '.csv'
tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
form.file.data.save(tmp_path)
imp = mo.imports.Import(g.user)
if imp.import_proctors(contest.round, tmp_path):
flash(f'Dozor importován ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_roles} rolí)', 'success')
return redirect(url_for('org_contest', id=contest.contest_id))
else:
flash('Došlo k chybě při importu (detaily níže)', 'danger')
errs = imp.errors
return render_template(
'org_proctor_import.html',
contest=contest,
form=form,
errs=errs,
)
@app.route('/org/contest/import/sablona-dozor.csv')
def org_proctor_import_template():
out = mo.imports.proctor_template()
resp = app.make_response(out)
resp.content_type = 'text/csv; charset=utf=8'
return resp
@app.route('/doc/import-dozor')
def org_proctor_import_help():
return render_template('doc_import_proctor.html')