Skip to content
Snippets Groups Projects

WIP: Zárodek uživatelské části webu a submitování

2 open threads
Compare and Show latest version
1 file
+ 18
0
Compare changes
  • Side-by-side
  • Inline
+ 413
142
from dataclasses import dataclass
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
import locale
import os
import secrets
from sqlalchemy import func, and_
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.query import Query
from typing import List, Tuple, Optional, Sequence
from typing import List, Tuple, Optional, Sequence, Dict
import werkzeug.exceptions
import wtforms
@@ -14,9 +16,12 @@ import mo
import mo.csv
import mo.db as db
import mo.imports
import mo.jobs.submit
import mo.rights
from mo.rights import Right, Rights
import mo.util
from mo.web import app
import mo.web.util
from mo.web.util import PagerForm
from mo.web.table import CellCheckbox, Table, Column, cell_place_link, cell_user_link, cell_email_link
import wtforms.validators as validators
@@ -42,7 +47,7 @@ class ParticipantsFilterForm(PagerForm):
class ParticipantsActionForm(FlaskForm):
action_on = wtforms.RadioField(
"Provést akci na", validators=[validators.DataRequired()],
choices=[('all', 'všech vyfiltrovaných soutěžících'), ('checked', 'označených soutěžících')]
choices=[('all', 'všech vyfiltrovaných účastnících'), ('checked', 'označených účastnících')]
# checkboxes are handled not through FlaskForm, see below
)
@@ -62,11 +67,11 @@ class ParticipantsActionForm(FlaskForm):
remove_participation = wtforms.SubmitField("Smazat záznam o účasti")
def do_action(self, round: db.Round, rights: mo.rights.Rights, query: Query) -> bool:
def do_action(self, round: db.Round, rights: Rights, query: Query) -> bool:
"""Do participation modification on partipations from given query
(possibly filtered by checkboxes). Expects that rights for round/contest
are checked before calling this function, `rights` param are used only
for checking that we can move participation to another contest."""
(possibly filtered by checkboxes). `rights` param is used to check rights
for contest of each modified participation or for contest in which
participation is moved to."""
if not self.validate_on_submit():
return False
@@ -91,8 +96,9 @@ class ParticipantsActionForm(FlaskForm):
flash(f"Nepovedlo se najít soutěž v kole {round.round_code()} v oblasti {contest_place.name}", 'danger')
return False
rights.get_for_contest(contest)
if not rights.have_right(mo.rights.Right.manage_contest):
flash(f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {contest_place.name}, nelze do ní přesunout soutěžící", 'danger')
if not rights.have_right(Right.manage_contest):
flash(f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {contest_place.name}, nelze do ní přesunout účastníky", 'danger')
return False
elif self.remove_participation.data:
pass
else:
@@ -105,8 +111,26 @@ class ParticipantsActionForm(FlaskForm):
flash('Data v checkboxech nelze převést na čísla, kontaktujte správce', 'danger')
return False
count = 0
# Check all participations if we can edit them
ctants = query.all()
rights_cache = set()
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
continue
if pion.contest_id in rights_cache:
continue
rights.get_for_contest(pion.contest)
if rights.have_right(Right.manage_contest):
rights_cache.add(pion.contest_id)
continue
flash(
f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {pion.contest.place.name} "
+ f"(účastník {u.full_name()}). Žádná akce nebyla provedena.", 'danger'
)
return False
count = 0
for pion, _, _ in ctants:
u = pion.user
if self.action_on.data == 'checked' and u.user_id not in user_ids:
@@ -140,130 +164,19 @@ class ParticipantsActionForm(FlaskForm):
sess.commit()
if count == 0:
flash('Žádní vybraní soutěžící', 'warning')
flash('Žádní vybraní účastníci', 'warning')
elif self.set_participation_state.data:
flash(f'Nastaven stav {db.part_state_names[self.participation_state.data]} {count} řešitelům', 'success')
flash(f'Nastaven stav {db.part_state_names[self.participation_state.data]} {count} účastníkům', 'success')
elif self.set_participation_place.data:
flash(f'Nastaveno soutěžní místo {participation_place.name} {count} řešitelům', 'success')
flash(f'Nastaveno soutěžní místo {participation_place.name} {count} účastníkům', 'success')
elif self.set_contest.data:
flash(f'{count} řešitelů přesunuto do soutěže v oblasti {contest_place.name}', 'success')
flash(f'{count} účastníků přesunuto do soutěže v oblasti {contest_place.name}', 'success')
elif self.remove_participation.data:
flash(f'Odstraněno {count} soutěžících z této soutěže', 'success')
flash(f'Odstraněno {count} účastníků z této soutěže', 'success')
return True
@app.route('/org/contest/')
def org_contest_root():
sess = db.get_session()
rounds = sess.query(db.Round).filter_by(year=mo.current_year).order_by(db.Round.year, db.Round.category, db.Round.seq)
return render_template('org_contest_root.html', rounds=rounds, level_names=mo.db.place_level_names)
def get_round(id: int) -> db.Round:
round = db.get_session().query(db.Round).get(id)
if not round:
raise werkzeug.exceptions.NotFound()
return round
def get_round_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Round, mo.rights.Rights]:
round = get_round(id)
rr = mo.rights.Rights(g.user)
rr.get_for_round(round)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return round, rr
@app.route('/org/contest/r/<int:id>/')
def org_round(id: int):
sess = db.get_session()
round, rr = get_round_rr(id, None)
contests = (sess.query(db.Contest)
.filter_by(round=round)
.options(joinedload(db.Contest.place))
.all())
contests.sort(key=lambda c: locale.strxfrm(c.place.name))
return render_template(
'org_round.html',
round=round,
contests=contests,
level_names=mo.db.place_level_names,
can_manage=rr.have_right(mo.rights.Right.manage_contest),
)
@app.route('/org/contest/r/<int:id>/list', methods=('GET', 'POST'))
def org_round_list(id: int):
round, rr = get_round_rr(id, mo.rights.Right.manage_contest)
format = request.args.get('format', "")
filter = ParticipantsFilterForm(request.args)
filter.validate()
query = get_contestants_query(
round=round,
school=db.get_place_by_code(filter.school.data),
contest_place=db.get_place_by_code(filter.contest_place.data),
participation_place=db.get_place_by_code(filter.participation_place.data),
participation_state=None if filter.participation_state.data == '*' else filter.participation_state.data
)
action_form = ParticipantsActionForm()
if action_form.do_action(round=round, rights=rr, query=query):
# Action happened, redirect
return redirect(request.url)
(count, query) = filter.apply_limits(query, pagesize=50)
# count = query.count()
if format == "":
table = make_contestant_table(query, add_contest_column=True, add_checkbox=True)
return render_template(
'org_round_list.html',
round=round,
table=table,
filter=filter, count=count, action_form=action_form,
)
else:
table = make_contestant_table(query)
return table.send_as(format)
@app.route('/org/contest/r/<int:id>/import', methods=('GET', 'POST'))
def org_round_import(id: int):
round, rr = get_round_rr(id, mo.rights.Right.manage_contest)
form = ImportForm()
errs = []
if form.validate_on_submit():
tmp_name = secrets.token_hex(16) + '.csv'
tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
form.file.data.save(tmp_path)
imp = mo.imports.Import(g.user)
if imp.import_contest(round, None, tmp_path):
flash(f'Účastníci importováni ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_participations} účastí)', 'success')
return redirect(url_for('org_round', id=round.round_id))
else:
flash('Došlo k chybě při importu (detaily níže)', 'danger')
errs = imp.errors
return render_template(
'org_round_import.html',
round=round,
form=form,
errs=errs,
)
def get_contest(id: int) -> db.Contest:
contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
@@ -274,10 +187,10 @@ def get_contest(id: int) -> db.Contest:
return contest
def get_contest_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Contest, mo.rights.Rights]:
def get_contest_rr(id: int, right_needed: Optional[Right] = None) -> Tuple[db.Contest, Rights]:
contest = get_contest(id)
rr = mo.rights.Rights(g.user)
rr = Rights(g.user)
rr.get_for_contest(contest)
if not (right_needed is None or rr.have_right(right_needed)):
@@ -286,21 +199,73 @@ def get_contest_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db
return contest, rr
def get_contest_site_rr(id: int, site_id: Optional[int], right_needed: Optional[Right] = None) -> Tuple[db.Contest, db.Place, Rights]:
if site_id is None:
contest, rr = get_contest_rr(id, right_needed)
return contest, None, rr
contest = get_contest(id)
site = db.get_session().query(db.Place).get(site_id)
if not site:
raise werkzeug.exceptions.NotFound()
rr = Rights(g.user)
rr.get_for_contest_site(contest, site)
if not (right_needed is None or rr.have_right(right_needed)):
raise werkzeug.exceptions.Forbidden()
return contest, site, rr
@app.route('/org/contest/c/<int:id>')
def org_contest(id: int):
contest, rr = get_contest_rr(id, None)
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/')
def org_contest(id: int, site_id: Optional[int] = None):
sess = db.get_session()
contest, site, rr = get_contest_site_rr(id, site_id, None)
tasks_subq = sess.query(db.Task.task_id).filter_by(round=contest.round)
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=contest)
if site:
pions_subq = pions_subq.filter_by(place=site)
sol_counts_q = (
sess.query(db.Solution.task_id, func.count(db.Solution.task_id))
.filter(
db.Solution.task_id.in_(tasks_subq),
db.Solution.user_id.in_(pions_subq),
)
)
sol_counts = {}
for task_id, count in sol_counts_q.group_by(db.Solution.task_id).all():
sol_counts[task_id] = count
tasks = sess.query(db.Task).filter_by(round=contest.round).all()
tasks.sort(key=lambda t: t.code)
for task in tasks:
task.sol_count = sol_counts[task.task_id] if task.task_id in sol_counts else 0
places_counts = None
if not site_id:
places_counts = (
sess.query(db.Place, func.count('*'))
.select_from(db.Participation).join(db.Place)
.group_by(db.Place)
.filter(db.Participation.contest_id == id).all()
)
return render_template(
'org_contest.html',
contest=contest,
contest=contest, site=site,
rights=sorted(rr.current_rights, key=lambda r: r. name),
can_manage=rr.have_right(mo.rights.Right.manage_contest),
can_manage=rr.have_right(Right.manage_contest),
tasks=tasks, places_counts=places_counts,
)
@app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
def org_contest_import(id: int):
contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
contest, rr = get_contest_rr(id, Right.manage_contest)
form = ImportForm()
errs = []
@@ -339,8 +304,10 @@ def org_contest_import_template():
@app.route('/org/contest/c/<int:id>/ucastnici', methods=('GET', 'POST'))
def org_contest_list(id: int):
contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/ucastnici', methods=('GET', 'POST'))
def org_contest_list(id: int, site_id: Optional[int] = None):
contest, site, rr = get_contest_site_rr(id, site_id)
can_edit = rr.have_right(Right.manage_contest)
format = request.args.get('format', "")
filter = ParticipantsFilterForm(request.args)
@@ -349,10 +316,12 @@ def org_contest_list(id: int):
round=contest.round, contest=contest,
school=db.get_place_by_code(filter.school.data),
# contest_place=db.get_place_by_code(filter.contest_place.data),
participation_place=db.get_place_by_code(filter.participation_place.data),
participation_place=site if site else db.get_place_by_code(filter.participation_place.data),
participation_state=None if filter.participation_state.data == '*' else filter.participation_state.data
)
action_form = None
if can_edit:
action_form = ParticipantsActionForm()
if action_form.do_action(round=contest.round, rights=rr, query=query):
# Action happened, redirect
@@ -362,10 +331,10 @@ def org_contest_list(id: int):
count = query.count()
if format == "":
table = make_contestant_table(query, add_checkbox=True)
table = make_contestant_table(query, add_checkbox=can_edit)
return render_template(
'org_contest_list.html',
contest=contest,
contest=contest, site=site,
table=table,
filter=filter, count=count, action_form=action_form,
)
@@ -456,9 +425,212 @@ def make_contestant_table(query: Query, add_checkbox: bool = False, add_contest_
)
@dataclass
class SolutionContext:
contest: db.Contest
round: db.Round
pion: db.Participation
user: Optional[db.User]
task: Optional[db.Task]
site: Optional[db.Place]
allow_view: bool
allow_upload_solutions: bool
allow_upload_feedback: bool
def get_solution_context(contest_id: int, user_id: Optional[int], task_id: Optional[int], site_id: Optional[int]) -> SolutionContext:
sess = db.get_session()
# Nejprve zjistíme, zda existuje soutěž
contest = get_contest(contest_id)
round = contest.round
# Najdeme úlohu a ověříme, že je součástí soutěže
if task_id is not None:
task = sess.query(db.Task).get(task_id)
if not task or task.round != round:
raise werkzeug.exceptions.NotFound()
else:
task = None
site = None
user = None
if user_id is not None:
# Zkontrolujeme, zda se účastník opravdu účastní soutěže
pion = (sess.query(db.Participation)
.filter_by(user_id=user_id, contest_id=contest_id)
.options(joinedload(db.Participation.place),
joinedload(db.Participation.user))
.one_or_none())
if not pion:
raise werkzeug.exceptions.NotFound()
user = pion.user
# A zda soutěží na zadaném soutěžním místě, je-li určeno
if site_id is not None and site_id != pion.place_id:
raise werkzeug.exceptions.NotFound()
# Pokud je uvedeno soutěžní místo, hledáme práva k němu, jinak k soutěži
if site_id is not None:
site = pion.place
else:
pion = None
if site_id is not None:
site = sess.query(db.Place).get(site_id)
if not site:
raise werkzeug.exceptions.NotFound()
rr = Rights(g.user)
rr.get_for_contest_site(contest, site or contest.place)
# Kdo má právo na jaké operace
allow_upload_solutions = (rr.have_right(Right.manage_contest)
or (rr.have_right(Right.upload_solutions) and round.state == db.RoundState.running))
allow_upload_feedback = (rr.have_right(Right.manage_contest)
or (rr.have_right(Right.upload_feedback) and round.state == db.RoundState.grading))
allow_view = (rr.have_right(Right.manage_contest)
or (rr.have_right(Right.upload_solutions) and round.state in (db.RoundState.running, db.RoundState.grading, db.RoundState.closed))
or (rr.have_right(Right.upload_feedback) and round.state in (db.RoundState.grading, db.RoundState.closed)))
if not allow_view:
raise werkzeug.exceptions.Forbidden()
return SolutionContext(
contest=contest,
round=round,
pion=pion,
user=user,
task=task,
site=site,
allow_view=allow_view,
allow_upload_solutions=allow_upload_solutions,
allow_upload_feedback=allow_upload_feedback,
)
class SubmitForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()])
note = wtforms.TextAreaField("Poznámka")
submit_sol = wtforms.SubmitField('Odevzdat řešení')
submit_fb = wtforms.SubmitField('Odevzdat opravené')
@app.route('/org/contest/c/<int:contest_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/submit/<int:user_id>/<int:task_id>/', methods=('GET', 'POST'))
def org_submit_list(contest_id: int, user_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, user_id, task_id, site_id)
sess = db.get_session()
form: Optional[SubmitForm] = None
if sc.allow_upload_solutions or sc.allow_upload_feedback:
form = SubmitForm()
if not sc.allow_upload_solutions:
del form.submit_sol
if not sc.allow_upload_feedback:
del form.submit_fb
if form.validate_on_submit():
# FIXME: Viz komentář o efektivitě v user_contest_task
tmp_name = secrets.token_hex(16)
tmp_path = os.path.join(app.instance_path, 'tmp', tmp_name)
form.file.data.save(tmp_path)
if sc.allow_upload_solutions and form.submit_sol.data:
type = db.PaperType.solution
elif sc.allow_upload_feedback and form.submit_fb.data:
type = db.PaperType.feedback
else:
raise werkzeug.exceptions.Forbidden()
assert sc.task is not None and sc.user is not None
paper = db.Paper(task=sc.task, for_user_obj=sc.user, uploaded_by_obj=g.user, type=type, note=form.note.data)
submitter = mo.submit.Submitter(instance_path=app.instance_path)
self_url = url_for('org_submit_list', contest_id=contest_id, user_id=user_id, task_id=task_id, site_id=site_id)
try:
submitter.submit_paper(paper, tmp_path)
except mo.submit.SubmitException as e:
flash(f'Chyba: {e}', 'danger')
# FIXME: Tady nemažeme tmpfile, zatím si ho chceme nechat pro analýzu.
return redirect(self_url)
sess.add(paper)
# FIXME: Bylo by hezké použít INSERT ... ON CONFLICT UPDATE
# (SQLAlchemy to umí, ale ne přes ORM, jen core rozhraním)
sol = (sess.query(db.Solution)
.filter_by(user_id=user_id, task_id=task_id)
.with_for_update()
.one_or_none())
if sol is None:
sol = db.Solution(task=sc.task, user=sc.user)
sess.add(sol)
if type == db.PaperType.solution:
sol.final_submit_obj = paper
else:
sol.final_feedback_obj = paper
sess.commit()
if type == db.PaperType.solution:
flash('Řešení odevzdáno', 'success')
else:
flash('Opravené řešení odevzdáno', 'success')
return redirect(self_url)
# Najdeme řešení úlohy (nemusí existovat)
sol = (sess.query(db.Solution)
.filter_by(user_id=user_id, task_id=task_id)
.one_or_none())
papers = (sess.query(db.Paper)
.filter_by(for_user_obj=sc.user, task=sc.task)
.options(joinedload(db.Paper.uploaded_by_obj))
.order_by(db.Paper.uploaded_at.desc())
.all())
sol_papers = [p for p in papers if p.type == db.PaperType.solution]
fb_papers = [p for p in papers if p.type == db.PaperType.feedback]
def paper_link(paper: db.Paper) -> str:
return url_for('org_submit_paper',
contest_id=sc.contest.contest_id,
paper_id=paper.paper_id,
site_id=site_id,
filename=mo.web.util.task_paper_filename(paper))
return render_template(
'org_submit_list.html',
sc=sc,
solution=sol,
sol_papers=sol_papers,
fb_papers=fb_papers,
for_site=(site_id is not None),
paper_link=paper_link,
form=form,
)
@app.route('/org/contest/c/<int:contest_id>/paper/<int:paper_id>/<filename>')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/paper/<int:paper_id>/<filename>')
def org_submit_paper(contest_id: int, paper_id: int, filename: str, site_id: Optional[int] = None):
paper = (db.get_session().query(db.Paper)
.options(joinedload(db.Paper.task)) # pro task_paper_filename()
.get(paper_id))
if not paper:
raise werkzeug.exceptions.NotFound()
if filename != mo.web.util.task_paper_filename(paper):
raise werkzeug.exceptions.NotFound()
get_solution_context(contest_id, paper.for_user, paper.for_task, site_id)
return mo.web.util.send_task_paper(paper)
@app.route('/org/contest/c/<int:id>/proctor-import', methods=('GET', 'POST'))
def org_proctor_import(id: int):
contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
contest, rr = get_contest_rr(id, Right.manage_contest)
form = ImportForm()
errs = []
@@ -494,3 +666,102 @@ def org_proctor_import_template():
@app.route('/doc/import-dozor')
def org_proctor_import_help():
return render_template('doc_import_proctor.html')
def get_solutions_query(
task: db.Task,
for_contest: Optional[db.Contest] = None,
for_site: Optional[db.Place] = None) -> Query:
sess = db.get_session()
query = (sess.query(db.Participation, db.Solution)
.select_from(db.Participation)
.outerjoin(db.Solution, and_(db.Solution.task_id == task.task_id, db.Solution.user_id == db.Participation.user_id))
.options(joinedload(db.Solution.user),
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)))
if for_contest:
query = query.filter(db.Participation.contest == for_contest)
if for_site:
query = query.filter(db.Participation.place == for_site)
return query
@app.route('/org/contest/c/<int:contest_id>/task/<int:task_id>/')
@app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/task/<int:task_id>/')
def org_contest_task_submits(contest_id: int, task_id: int, site_id: Optional[int] = None):
sc = get_solution_context(contest_id, None, task_id, site_id)
assert sc.task is not None
q = get_solutions_query(sc.task, for_contest=sc.contest, for_site=sc.site)
rows: List[Tuple[db.Participation, db.Solution]] = q.all()
def paper_link(paper: db.Paper) -> str:
return url_for('org_submit_paper',
contest_id=sc.contest.contest_id,
paper_id=paper.paper_id,
site_id=site_id,
filename=mo.web.util.task_paper_filename(paper))
return render_template(
"org_contest_task.html",
sc=sc, rows=rows,
paper_link=paper_link,
)
@app.route('/org/contest/c/<int:id>/reseni', methods=('GET', 'POST'))
@app.route('/org/contest/c/<int:id>/site/<int:site_id>/reseni', methods=('GET', 'POST'))
def org_contest_solutions(id: int, site_id: Optional[int] = None):
sc = get_solution_context(id, None, None, site_id)
sess = db.get_session()
pions_subq = sess.query(db.Participation.user_id).filter_by(contest=sc.contest)
if sc.site:
pions_subq = pions_subq.filter_by(place=sc.site)
pions_subq = pions_subq.subquery()
pions = (sess.query(db.Participation)
.filter(db.Participation.user_id.in_(pions_subq))
.options(joinedload(db.Participation.user))
.all())
tasks_subq = sess.query(db.Task.task_id).filter_by(round=sc.round).subquery()
tasks = (sess.query(db.Task)
.filter_by(round=sc.round)
.order_by(db.Task.code)
.all())
sols = sess.query(db.Solution).filter(
db.Solution.user_id.in_(pions_subq),
db.Solution.task_id.in_(tasks_subq)
).options(
joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj)
).all()
if request.method == 'POST' and 'download' in request.form:
paper_ids = [sol.final_submit for sol in sols if sol.final_submit is not None]
mo.jobs.submit.schedule_download_submits(paper_ids, 'Odevzdaná řešení', g.user)
flash('Příprava řešení ke stažení zahájena.', 'success')
return redirect(url_for('org_jobs'))
task_sols: Dict[int, Dict[int, db.Solution]] = {}
for t in tasks:
task_sols[t.task_id] = {}
for s in sols:
task_sols[s.task_id][s.user_id] = s
def paper_link(paper: db.Paper) -> str:
return url_for('org_submit_paper',
contest_id=sc.contest.contest_id,
paper_id=paper.paper_id,
site_id=site_id,
filename=mo.web.util.task_paper_filename(paper))
return render_template(
'org_contest_solutions.html',
contest=sc.contest, site=sc.site,
pions=pions, tasks=tasks, tasks_sols=task_sols,
paper_link=paper_link,
)
Loading