from dataclasses import dataclass from flask import render_template, jsonify, g, redirect, url_for, flash, request from flask_wtf import FlaskForm import flask_wtf.file import hashlib import hmac from sqlalchemy import and_ from sqlalchemy.orm import joinedload from typing import List, Tuple, Optional, Dict, Any import werkzeug.exceptions import wtforms from wtforms.validators import Required import mo.config as config import mo.email import mo.db as db import mo.submit import mo.util from mo.util import logger from mo.util_format import time_and_timedelta from mo.web import app import mo.web.fields as mo_fields import mo.web.org_round from mo.web.table import Column, Row, OrderCell, Table import mo.web.util @app.route('/user/') def user_index(): pcrs = load_pcrs() if not pcrs: return redirect(url_for('user_join')) return render_template( 'user_index.html', pions=pcrs, ) def load_pcrs() -> List[Tuple[db.Participation, db.Contest, db.Round]]: return (db.get_session().query(db.Participation, db.Contest, db.Round) .select_from(db.Participation) .join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id) .join(db.Round) .filter(db.Participation.user == g.user) .filter(db.Round.year == config.CURRENT_YEAR) .options(joinedload(db.Contest.place)) .order_by(db.Round.category, db.Round.seq, db.Round.part) .all()) @app.route('/user/join/') def user_join(): available_rounds: List[db.Round] = ( db.get_session().query(db.Round) .select_from(db.Round) .filter_by(year=config.CURRENT_YEAR) .filter(db.Round.enroll_mode.in_([db.RoundEnrollMode.register, db.RoundEnrollMode.confirm])) .filter_by(state=db.RoundState.running) .order_by(db.Round.category, db.Round.seq) .all()) available_rounds = [r for r in available_rounds if not r.is_subround()] pcrs = load_pcrs() pcrs_by_round_id = {pcr[1].round_id: pcr for pcr in pcrs} return render_template( 'user_join_list.html', available_rounds=available_rounds, pcrs_by_round_id=pcrs_by_round_id, ) class JoinRoundForm(FlaskForm): # Zadávání školy je JS hack implementovaný v šabloně. Fields definují jen rozhraní. school = mo_fields.School("Škola", validators=[Required()]) town_query = wtforms.HiddenField() town_list = wtforms.HiddenField() grade = mo_fields.Grade("Třída", validators=[Required()]) birth_year = mo_fields.BirthYear("Rok narození", validators=[Required()]) submit = wtforms.SubmitField('Přihlásit se') @app.route('/user/join/<int:round_id>/', methods=('GET', 'POST')) def user_join_round(round_id): sess = db.get_session() round = sess.query(db.Round).get(round_id) if not round: raise werkzeug.exceptions.NotFound() if (round.year != config.CURRENT_YEAR or round.part > 1 or round.enroll_mode not in [db.RoundEnrollMode.register, db.RoundEnrollMode.confirm] or round.state != db.RoundState.running): flash('Do této kategorie se není možné přihlásit.', 'danger') return redirect(url_for('user_join')) pion = (sess.query(db.Participation) .select_from(db.Participation) .filter_by(user=g.user) .join(db.Participation.contest) .filter(db.Contest.round == round) .with_for_update() .one_or_none()) if pion: flash('Do této kategorie už jste přihlášen.', 'info') return redirect(url_for('user_join')) pant = (sess.query(db.Participant) .filter_by(user=g.user, year=round.year) .with_for_update() .one_or_none()) form = JoinRoundForm() if pant: del form.school del form.grade del form.birth_year if form.validate_on_submit(): if form.submit.data: if not pant: pant = join_create_pant(form) sess.add(pant) contest = join_create_contest(round, pant) join_create_pion(contest) sess.commit() join_notify(contest) msg = 'Přihláška přijata.' if round.enroll_mode == db.RoundEnrollMode.confirm: msg += ' Ještě ji musí potvrdit organizátor soutěže.' flash(msg, 'success') return redirect(url_for('user_index')) elif not pant and request.method == 'GET': # Pokusíme se předvyplnit data z minulých ročníků prev_pant = (sess.query(db.Participant) .filter_by(user=g.user) .options(joinedload(db.Participant.school_place, db.Place.parent_place)) .order_by(db.Participant.year.desc()) .limit(1).one_or_none()) if prev_pant: form.school.data = f'#{prev_pant.school}' town = prev_pant.school_place.parent_place form.town_query.data = town.name form.town_list.data = str(town.place_id) form.birth_year.data = prev_pant.birth_year return render_template( 'user_join_round.html', round=round, form=form, ) def join_create_pant(form: JoinRoundForm) -> db.Participant: assert form.school.place is not None pant = db.Participant(user=g.user, year=config.CURRENT_YEAR, school_place=form.school.place, grade=form.grade.data, birth_year=form.birth_year.data) logger.info(f'Join: Účastník #{g.user.user_id} se přihlásil do {pant.year}. ročníku') mo.util.log( type=db.LogType.participant, what=g.user.user_id, details={'action': 'create-participant', 'reason': 'user-join', 'new': db.row2dict(pant)}, ) return pant def join_create_contest(round: db.Round, pant: db.Participant) -> db.Contest: sess = db.get_session() place = pant.school_place if place.level != round.level: parents = db.get_place_ancestors(pant.school_place) places = [p for p in parents if p.level == round.level] assert len(places) == 1 place = places[0] # XXX: Z rekurzivního dotazu nedostaneme plnohodnotný db.Place, ale jenom named tuple, tak musíme pracovat s ID. place_id = place.place_id assert round.part <= 1 c = (sess.query(db.Contest) .filter_by(round=round, place_id=place_id) .with_for_update() .one_or_none()) if not c: c = db.Contest( round=round, place_id=place_id, state=db.RoundState.running, ) sess.add(c) sess.flush() c.master = c logger.info(f'Join: Automaticky založena soutěž #{c.contest_id} {round.round_code()} pro místo #{place_id}') mo.util.log( type=db.LogType.contest, what=c.contest_id, details={'action': 'created', 'reason': 'user-join'}, ) mo.web.org_round.create_subcontests(round, c) return c def join_create_pion(c: db.Contest) -> None: sess = db.get_session() if c.round.enroll_mode == db.RoundEnrollMode.register: state = db.PartState.active else: state = db.PartState.registered p = db.Participation(user=g.user, contest=c, place=c.place, state=state) sess.add(p) logger.info(f'Join: Účastník #{g.user.user_id} přihlášen do soutěže #{c.contest_id}') mo.util.log( type=db.LogType.participant, what=g.user.user_id, details={'action': 'add-to-contest', 'reason': 'user-join', 'new': db.row2dict(p)}, ) def join_notify(c: db.Contest) -> None: sess = db.get_session() r = c.round place = c.place while place is not None: uroles = (sess.query(db.UserRole) .filter(db.UserRole.role.in_((db.RoleType.garant, db.RoleType.garant_kraj, db.RoleType.garant_okres, db.RoleType.garant_skola))) .filter_by(place_id=place.place_id) .options(joinedload(db.UserRole.user)) .all()) notify = {ur.user for ur in uroles if ur.applies_to(at=place, year=r.year, cat=r.category, seq=r.seq)} if notify: for org in notify: logger.info(f'Join: {"Notifikuji" if org.email_notify else "Nenotifikuji"} orga <{org.email}> pro místo {place.get_code()}') if org.email_notify: mo.email.send_join_notify_email(org, g.user, c) return place = place.parent_place logger.warn('Join: Není komu poslat mail') def get_contest_pion(id: int, require_reg: bool = True) -> Tuple[db.Contest, db.Participation]: contest = (db.get_session().query(db.Contest) .options(joinedload(db.Contest.place), joinedload(db.Contest.round)) .get(id)) if not contest: raise werkzeug.exceptions.NotFound() pion = (db.get_session().query(db.Participation) .filter_by(user=g.user, contest_id=contest.master_contest_id) .one_or_none()) if not pion: raise werkzeug.exceptions.Forbidden() if require_reg and pion.state in [db.PartState.registered, db.PartState.refused]: raise werkzeug.exceptions.Forbidden() return contest, pion def get_contest(id: int, require_reg: bool = True) -> db.Contest: contest, _ = get_contest_pion(id, require_reg) return contest def get_task(contest: db.Contest, id: int) -> db.Task: task = db.get_session().query(db.Task).get(id) # Nezapomeňme zkontrolovat, že úloha patří do soutěže :) if not task or task.round_id != contest.round_id: raise werkzeug.exceptions.NotFound() return task @app.route('/user/contest/<int:id>/') def user_contest(id: int): sess = db.get_session() contest, pion = get_contest_pion(id, require_reg=False) messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all() task_sols = (sess.query(db.Task, db.Solution) .select_from(db.Task) .outerjoin(db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == g.user)) .filter(db.Task.round == contest.round) .options(joinedload(db.Solution.final_submit_obj), joinedload(db.Solution.final_feedback_obj)) .order_by(db.Task.code) .all()) return render_template( 'user_contest.html', contest=contest, part_state=pion.state, task_sols=task_sols, messages=messages, max_submit_size=config.MAX_CONTENT_LENGTH, ) @app.route('/user/contest/<int:id>/news') def user_contest_news(id: int): sess = db.get_session() contest = get_contest(id, require_reg=False) messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all() out_messages = [{ 'title': msg.title, 'date_format': time_and_timedelta(msg.created_at), 'body': msg.html, } for msg in messages] return jsonify(out_messages) @app.route('/user/contest/<int:id>/task-statement/zadani.pdf') def user_task_statement(id: int): contest = get_contest(id, require_reg=False) if not contest.ct_task_statement_available(): logger.warn(f'Účastník #{g.user.user_id} chce zadání, na které nemá právo') raise werkzeug.exceptions.Forbidden() return mo.web.util.send_task_statement(contest.round) class SubmitForm(FlaskForm): file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()], render_kw={'autofocus': True}) note = wtforms.TextAreaField("Poznámka", description="Zde můžete něco vzkázat organizátorům soutěže.") submit = wtforms.SubmitField('Odevzdat') @dataclass class CMSParams: url: str username: str first_name: str last_name: str timestamp: str signature: str = "" back_url: str = "" def get_cms_params() -> Optional[CMSParams]: if not (hasattr(config, 'CMS_ROOT') and hasattr(config, 'CMS_SSO_SECRET')): return None p = CMSParams( url=config.CMS_ROOT + 'sso-login', username=f'osmo{g.user.user_id}', first_name=g.user.first_name, last_name=g.user.last_name, timestamp=str(int(mo.now.timestamp())), ) msg = ":".join((p.username, p.first_name, p.last_name, p.timestamp)).encode('utf-8') key = config.CMS_SSO_SECRET.encode('us-ascii') p.signature = hmac.HMAC(key, msg, digestmod=hashlib.sha256).hexdigest() return p @app.route('/user/contest/<int:contest_id>/task/<int:task_id>/', methods=('GET', 'POST')) def user_contest_task(contest_id: int, task_id: int): contest = get_contest(contest_id) task = get_task(contest, task_id) sess = db.get_session() if contest.round.has_messages: messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all() else: messages = None state = contest.ct_state() if state == db.RoundState.preparing: # Dokud se kolo připravuje nebo čeká na zveřejnění zadání, tak ani nezobrazujeme # stránku, abychom něco neprozradili jménem úlohy raise werkzeug.exceptions.Forbidden() form: Optional[SubmitForm] = None if task.type == db.TaskType.regular: form = SubmitForm() if contest.ct_can_submit() and form.validate_on_submit(): if task.type != db.TaskType.regular: raise werkzeug.exceptions.Forbidden() file = form.file.data.stream paper = db.Paper(task=task, for_user_obj=g.user, uploaded_by_obj=g.user, type=db.PaperType.solution, note=form.note.data) submitter = mo.submit.Submitter() try: submitter.submit_paper(paper, file.name) except mo.submit.SubmitException as e: flash(f'Chyba: {e}', 'danger') return redirect(url_for('user_contest_task', contest_id=contest_id, task_id=task_id)) sess.add(paper) # FIXME: Bylo by hezké použít INSERT ... ON CONFLICT UPDATE # (SQLAlchemy to umí, ale ne přes ORM, jen core rozhraním) sol = (sess.query(db.Solution) .filter_by(task=task, user=g.user) .with_for_update() .one_or_none()) if sol is None: sol = db.Solution(task=task, user=g.user) sess.add(sol) sol.final_submit_obj = paper sess.commit() if paper.is_broken(): flash('Soubor není korektní PDF, ale přesto jsme ho přijali a pokusíme se ho zpracovat. ' + 'Zkontrolujte prosím, že se na vašem počítači zobrazuje správně.', 'warning') else: flash('Řešení odevzdáno', 'success') return redirect(url_for('user_contest', id=contest_id)) cms_params: Optional[CMSParams] = None if task.type == db.TaskType.cms: cms_params = get_cms_params() if cms_params: cms_params.back_url = url_for('user_contest_task', contest_id=contest_id, task_id=task_id) sol = sess.query(db.Solution).filter_by(task=task, user=g.user).one_or_none() papers = (sess.query(db.Paper) .filter_by(for_user_obj=g.user, task=task, type=db.PaperType.solution) .options(joinedload(db.Paper.uploaded_by_obj)) .order_by(db.Paper.uploaded_at.desc()) .all()) return render_template( 'user_contest_task.html', contest=contest, task=task, sol=sol, papers=papers, form=form, cms_params=cms_params, messages=messages, max_submit_size=config.MAX_CONTENT_LENGTH, ) @app.route('/user/contest/<int:contest_id>/paper/<int:paper_id>/') def user_paper(contest_id: int, paper_id: int): sess = db.get_session() contest = get_contest(contest_id) paper = (sess.query(db.Paper) .options(joinedload(db.Paper.task)) .get(paper_id)) # XXX: Tímhle dáváme útočníkům orákulum na zjišťování validity IDček, # ale to nevadí, protože IDčka stejně přidělujeme sekvenčně. if paper is None: raise werkzeug.exceptions.NotFound() if paper.for_user != g.user.user_id: logger.warn(f'Účastník #{g.user.user_id} chce cizí papír') raise werkzeug.exceptions.Forbidden() if paper.type == db.PaperType.solution: allowed_states = [db.RoundState.running, db.RoundState.grading, db.RoundState.closed] elif paper.type == db.PaperType.feedback: allowed_states = [db.RoundState.closed] else: assert False if paper.task.round != contest.round: logger.warn(f'Účastník #{g.user.user_id} chce papír z jiného kola') raise werkzeug.exceptions.Forbidden() if contest.ct_state() not in allowed_states: raise werkzeug.exceptions.Forbidden() return mo.web.util.send_task_paper(paper) def scoretable_construct(scoretable: db.ScoreTable, is_export: bool = False) -> Tuple[List[Column], List[Row]]: """Pro konstrukci výsledkovky zobrazované soutěžícím. Využito i při zobrazení uložených snapshotů výsledkovky v org_score.py. """ columns = [ Column(key='order', name='poradi', title='Pořadí'), Column(key='name', name='ucastnik', title='Účastník'), Column(key='school', name='skola', title='Škola'), Column(key='grade', name='rocnik', title='Ročník') ] if is_export: columns.insert(1, Column(key='status', name='stav')) tasks: List[Tuple[str, str]] = scoretable.tasks # type: ignore for (code, name) in tasks: columns.append(Column(key=f'task_{code}', name=code, title=code)) columns.append(Column(key='total_points', name='celkove_body', title='Celkové body')) table_rows = [] rows: List[Dict[str, Any]] = scoretable.rows # type: ignore for row in rows: order_cell = OrderCell(place=int(row['order']['place']), span=int(row['order']['span']), continuation=bool(row['order']['continuation'])) row['order'] = order_cell html_attr = {} if row['winner']: row['status'] = 'vítěz' html_attr = {"class": "winner", "title": "Vítěz"} elif row['successful']: row['status'] = 'úspěšný' html_attr = {"class": "successful", "title": "Úspěšný řešitel"} for ((code, _), points) in zip(tasks, row['tasks']): row[f'task_{code}'] = points or '–' table_rows.append(Row(keys=row, html_attr=html_attr)) return (columns, table_rows) @app.route('/user/contest/<int:id>/score') def user_contest_score(id: int): contest, pion = get_contest_pion(id, require_reg=False) round = contest.round format = request.args.get('format', "") # Výsledkovku zobrazíme jen pokud je soutěž již ukončená state = contest.ct_state() if not contest.scoretable or state != db.RoundState.closed: raise werkzeug.exceptions.NotFound() columns, table_rows = scoretable_construct(contest.scoretable, format != "") # columns.append(Column(key='order_key', name='order_key', title='Třídící klíč')) filename = f"vysledky_{round.year}-{round.category}-{round.level}_oblast_{contest.place.code or contest.place.place_id}" table = Table( table_class="data full center", columns=columns, rows=table_rows, filename=filename, ) if format == "": return render_template( 'user_contest_score.html', contest=contest, table=table, ) else: return table.send_as(format)