Select Git revision
org_score.py
-
Martin Mareš authoredMartin Mareš authored
user.py 15.45 KiB
from flask import render_template, jsonify, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import flask_wtf.file
from sqlalchemy import and_
from sqlalchemy.orm import joinedload
from typing import List, Tuple
import werkzeug.exceptions
import wtforms
from wtforms.validators import Required
import mo
import mo.config as config
import mo.email
import mo.db as db
import mo.submit
import mo.util
from mo.util import logger
from mo.util_format import time_and_timedelta
from mo.web import app
import mo.web.fields as mo_fields
import mo.web.org_round
import mo.web.util
@app.route('/user/')
def user_index():
pcrs = load_pcrs()
if not pcrs:
return redirect(url_for('user_join'))
return render_template(
'user_index.html',
pions=pcrs,
)
def load_pcrs() -> List[Tuple[db.Participation, db.Contest, db.Round]]:
return (db.get_session().query(db.Participation, db.Contest, db.Round)
.select_from(db.Participation)
.join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id)
.join(db.Round)
.filter(db.Participation.user == g.user)
.filter(db.Round.year == mo.current_year)
.options(joinedload(db.Contest.place))
.order_by(db.Round.category, db.Round.seq, db.Round.part)
.all())
@app.route('/user/join/')
def user_join():
available_rounds: List[db.Round] = (
db.get_session().query(db.Round)
.select_from(db.Round)
.filter_by(year=mo.current_year)
.filter(db.Round.enroll_mode.in_([db.RoundEnrollMode.register, db.RoundEnrollMode.confirm]))
.filter_by(state=db.RoundState.running)
.order_by(db.Round.category, db.Round.seq)
.all())
available_rounds = [r for r in available_rounds if not r.is_subround()]
pcrs = load_pcrs()
pcrs_by_round_id = {pcr[1].round_id: pcr for pcr in pcrs}
return render_template(
'user_join_list.html',
available_rounds=available_rounds,
pcrs_by_round_id=pcrs_by_round_id,
)
class JoinRoundForm(FlaskForm):
# Zadávání školy je JS hack implementovaný v šabloně. Fields definují jen rozhraní.
school = mo_fields.School("Škola", validators=[Required()])
town_query = wtforms.HiddenField()
town_list = wtforms.HiddenField()
grade = mo_fields.Grade("Třída", validators=[Required()])
birth_year = mo_fields.BirthYear("Rok narození", validators=[Required()])
submit = wtforms.SubmitField('Přihlásit se')
@app.route('/user/join/<int:round_id>/', methods=('GET', 'POST'))
def user_join_round(round_id):
sess = db.get_session()
round = sess.query(db.Round).get(round_id)
if not round:
raise werkzeug.exceptions.NotFound()
if (round.year != mo.current_year
or round.part > 1
or round.enroll_mode not in [db.RoundEnrollMode.register, db.RoundEnrollMode.confirm]
or round.state != db.RoundState.running):
flash('Do této kategorie se není možné přihlásit.', 'danger')
return redirect(url_for('user_register'))
pion = (sess.query(db.Participation)
.select_from(db.Participation)
.filter_by(user=g.user)
.join(db.Participation.contest)
.filter(db.Contest.round == round)
.with_for_update()
.one_or_none())
if pion:
flash('Do této kategorie už jste přihlášen.', 'info')
return redirect(url_for('user_join'))
pant = (sess.query(db.Participant)
.filter_by(user=g.user, year=round.year)
.with_for_update()
.one_or_none())
form = JoinRoundForm()
if pant:
del form.school
del form.grade
del form.birth_year
if form.validate_on_submit():
if form.submit.data:
if not pant:
pant = join_create_pant(form)
sess.add(pant)
contest = join_create_contest(round, pant)
join_create_pion(contest)
sess.commit()
join_notify(contest)
msg = 'Přihláška přijata.'
if round.enroll_mode == db.RoundEnrollMode.confirm:
msg += ' Ještě ji musí potvrdit organizátor soutěže.'
flash(msg, 'success')
return redirect(url_for('user_index'))
elif not pant and request.method == 'GET':
# Pokusíme se předvyplnit data z minulých ročníků
prev_pant = (sess.query(db.Participant)
.filter_by(user=g.user)
.options(joinedload(db.Participant.school_place, db.Place.parent_place))
.order_by(db.Participant.year.desc())
.limit(1).one_or_none())
if prev_pant:
form.school.data = f'#{prev_pant.school}'
town = prev_pant.school_place.parent_place
form.town_query.data = town.name
form.town_list.data = str(town.place_id)
form.birth_year.data = prev_pant.birth_year
return render_template(
'user_join_round.html',
round=round,
form=form,
)
def join_create_pant(form: JoinRoundForm) -> db.Participant:
assert form.school.place is not None
pant = db.Participant(user=g.user,
year=mo.current_year,
school_place=form.school.place,
grade=form.grade.data,
birth_year=form.birth_year.data)
logger.info(f'Join: Účastník #{g.user.user_id} se přihlásil do {pant.year}. ročníku')
mo.util.log(
type=db.LogType.participant,
what=g.user.user_id,
details={'action': 'create-participant', 'reason': 'user-join', 'new': db.row2dict(pant)},
)
return pant
def join_create_contest(round: db.Round, pant: db.Participant) -> db.Contest:
sess = db.get_session()
place = pant.school_place
if place.level != round.level:
parents = db.get_place_parents(pant.school_place)
places = [p for p in parents if p.level == round.level]
assert len(places) == 1
place = places[0]
# XXX: Z rekurzivního dotazu nedostaneme plnohodnotný db.Place, ale jenom named tuple, tak musíme pracovat s ID.
place_id = place.place_id
assert round.part <= 1
c = (sess.query(db.Contest)
.filter_by(round=round, place_id=place_id)
.with_for_update()
.one_or_none())
if not c:
c = db.Contest(
round=round,
place_id=place_id,
state=db.RoundState.running,
)
sess.add(c)
sess.flush()
c.master = c
logger.info(f'Join: Automaticky založena soutěž #{c.contest_id} {round.round_code()} pro místo #{place_id}')
mo.util.log(
type=db.LogType.contest,
what=c.contest_id,
details={'action': 'created', 'reason': 'user-join'},
)
mo.web.org_round.create_subcontests(round, c)
return c
def join_create_pion(c: db.Contest) -> None:
sess = db.get_session()
if c.round.enroll_mode == db.RoundEnrollMode.register:
state = db.PartState.active
else:
state = db.PartState.registered
p = db.Participation(user=g.user, contest=c, place=c.place, state=state)
sess.add(p)
logger.info(f'Join: Účastník #{g.user.user_id} přihlášen do soutěže #{c.contest_id}')
mo.util.log(
type=db.LogType.participant,
what=g.user.user_id,
details={'action': 'add-to-contest', 'reason': 'user-join', 'new': db.row2dict(p)},
)
def join_notify(c: db.Contest) -> None:
sess = db.get_session()
r = c.round
place = c.place
while place is not None:
uroles = (sess.query(db.UserRole)
.filter(db.UserRole.role.in_((db.RoleType.garant, db.RoleType.garant_kraj, db.RoleType.garant_okres, db.RoleType.garant_skola)))
.filter_by(place_id=place.place_id)
.options(joinedload(db.UserRole.user))
.all())
notify = {ur.user for ur in uroles if ur.applies_to(at=place, year=r.year, cat=r.category, seq=r.seq) and ur.user.email_notify}
if notify:
for org in notify:
logger.info(f'Join: Notifikuji orga <{org.email}> pro místo {place.get_code()}')
mo.email.send_join_notify_email(org, g.user, c)
return
place = place.parent_place
logger.warn('Join: Není komu poslat mail')
def get_contest_pion(id: int, require_reg: bool = True) -> Tuple[db.Contest, db.Participation]:
contest = (db.get_session().query(db.Contest)
.options(joinedload(db.Contest.place),
joinedload(db.Contest.round))
.get(id))
if not contest:
raise werkzeug.exceptions.NotFound()
pion = (db.get_session().query(db.Participation)
.filter_by(user=g.user, contest_id=contest.master_contest_id)
.one_or_none())
if not pion:
raise werkzeug.exceptions.Forbidden()
if require_reg and pion.state in [db.PartState.registered, db.PartState.refused]:
raise werkzeug.exceptions.Forbidden()
return contest, pion
def get_contest(id: int, require_reg: bool = True) -> db.Contest:
contest, _ = get_contest_pion(id, require_reg)
return contest
def get_task(contest: db.Contest, id: int) -> db.Task:
task = db.get_session().query(db.Task).get(id)
# Nezapomeňme zkontrolovat, že úloha patří do soutěže :)
if not task or task.round_id != contest.round_id:
raise werkzeug.exceptions.NotFound()
return task
@app.route('/user/contest/<int:id>/')
def user_contest(id: int):
sess = db.get_session()
contest, pion = get_contest_pion(id, require_reg=False)
messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all()
task_sols = (sess.query(db.Task, db.Solution)
.select_from(db.Task)
.outerjoin(db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == g.user))
.filter(db.Task.round == contest.round)
.options(joinedload(db.Solution.final_submit_obj),
joinedload(db.Solution.final_feedback_obj))
.order_by(db.Task.code)
.all())
return render_template(
'user_contest.html',
contest=contest,
part_state=pion.state,
task_sols=task_sols,
messages=messages,
max_submit_size=config.MAX_CONTENT_LENGTH,
)
@app.route('/user/contest/<int:id>/news')
def user_contest_news(id: int):
sess = db.get_session()
contest = get_contest(id, require_reg=False)
messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all()
out_messages = [{
'title': msg.title,
'date_format': time_and_timedelta(msg.created_at),
'body': msg.html,
} for msg in messages]
return jsonify(out_messages)
@app.route('/user/contest/<int:id>/task-statement/zadani.pdf')
def user_task_statement(id: int):
contest = get_contest(id, require_reg=False)
if not contest.ct_task_statement_available():
logger.warn(f'Účastník #{g.user.user_id} chce zadání, na které nemá právo')
raise werkzeug.exceptions.Forbidden()
return mo.web.util.send_task_statement(contest.round)
class SubmitForm(FlaskForm):
file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()], render_kw={'autofocus': True})
note = wtforms.TextAreaField("Poznámka", description="Zde můžete něco vzkázat organizátorům soutěže.")
submit = wtforms.SubmitField('Odevzdat')
@app.route('/user/contest/<int:contest_id>/task/<int:task_id>/', methods=('GET', 'POST'))
def user_contest_task(contest_id: int, task_id: int):
contest = get_contest(contest_id)
task = get_task(contest, task_id)
sess = db.get_session()
if contest.round.has_messages:
messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all()
else:
messages = None
state = contest.ct_state()
if state == db.RoundState.preparing:
# Dokud se kolo připravuje nebo čeká na zveřejnění zadání, tak ani nezobrazujeme
# stránku, abychom něco neprozradili jménem úlohy
raise werkzeug.exceptions.Forbidden()
form = SubmitForm()
if contest.ct_can_submit() and form.validate_on_submit():
file = form.file.data.stream
paper = db.Paper(task=task, for_user_obj=g.user, uploaded_by_obj=g.user, type=db.PaperType.solution, note=form.note.data)
submitter = mo.submit.Submitter()
try:
submitter.submit_paper(paper, file.name)
except mo.submit.SubmitException as e:
flash(f'Chyba: {e}', 'danger')
return redirect(url_for('user_contest_task', contest_id=contest_id, task_id=task_id))
sess.add(paper)
# FIXME: Bylo by hezké použít INSERT ... ON CONFLICT UPDATE
# (SQLAlchemy to umí, ale ne přes ORM, jen core rozhraním)
sol = (sess.query(db.Solution)
.filter_by(task=task, user=g.user)
.with_for_update()
.one_or_none())
if sol is None:
sol = db.Solution(task=task, user=g.user)
sess.add(sol)
sol.final_submit_obj = paper
sess.commit()
if paper.is_broken():
flash('Soubor není korektní PDF, ale přesto jsme ho přijali a pokusíme se ho zpracovat. ' +
'Zkontrolujte prosím, že se na vašem počítači zobrazuje správně.',
'warning')
else:
flash('Řešení odevzdáno', 'success')
return redirect(url_for('user_contest', id=contest_id))
sol = sess.query(db.Solution).filter_by(task=task, user=g.user).one_or_none()
papers = (sess.query(db.Paper)
.filter_by(for_user_obj=g.user, task=task, type=db.PaperType.solution)
.options(joinedload(db.Paper.uploaded_by_obj))
.order_by(db.Paper.uploaded_at.desc())
.all())
return render_template(
'user_contest_task.html',
contest=contest,
task=task,
sol=sol,
papers=papers,
form=form,
messages=messages,
)
@app.route('/user/contest/<int:contest_id>/paper/<int:paper_id>/')
def user_paper(contest_id: int, paper_id: int):
sess = db.get_session()
contest = get_contest(contest_id)
paper = (sess.query(db.Paper)
.options(joinedload(db.Paper.task))
.get(paper_id))
# XXX: Tímhle dáváme útočníkům orákulum na zjišťování validity IDček,
# ale to nevadí, protože IDčka stejně přidělujeme sekvenčně.
if paper is None:
raise werkzeug.exceptions.NotFound()
if paper.for_user != g.user.user_id:
logger.warn(f'Účastník #{g.user.user_id} chce cizí papír')
raise werkzeug.exceptions.Forbidden()
if paper.type == db.PaperType.solution:
allowed_states = [db.RoundState.running, db.RoundState.grading, db.RoundState.closed]
elif paper.type == db.PaperType.feedback:
allowed_states = [db.RoundState.closed]
else:
assert False
if paper.task.round != contest.round:
logger.warn(f'Účastník #{g.user.user_id} chce papír z jiného kola')
raise werkzeug.exceptions.Forbidden()
if contest.ct_state() not in allowed_states:
raise werkzeug.exceptions.Forbidden()
return mo.web.util.send_task_paper(paper)