from flask import render_template, jsonify, g, redirect, url_for, flash from flask_wtf import FlaskForm import flask_wtf.file from sqlalchemy import and_ from sqlalchemy.orm import joinedload from typing import List, Tuple import werkzeug.exceptions import wtforms import mo import mo.config as config import mo.db as db import mo.submit import mo.util from mo.util import logger from mo.util_format import time_and_timedelta from mo.web import app import mo.web.util @app.route('/user/') def user_index(): pcrs = load_pcrs() if getattr(config, 'AUTO_REGISTER_TEST', False) and not any(round.category == 'T' for pion, contest, round in pcrs): if register_to_test(): pcrs = load_pcrs() return render_template( 'user_index.html', pions=pcrs, ) def load_pcrs() -> List[Tuple[db.Participation, db.Contest, db.Round]]: return (db.get_session().query(db.Participation, db.Contest, db.Round) .select_from(db.Participation) .join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id) .join(db.Round) .filter(db.Participation.user == g.user) .options(joinedload(db.Contest.place)) .order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part) .all()) def register_to_test() -> bool: sess = db.get_session() round = sess.query(db.Round).filter_by(year=mo.current_year, category='T', seq=1, part=0).one_or_none() if not round: app.logger.error(f'Nemohu najít kolo {mo.current_year}-T-1') return False if round.level != 0: app.logger.error(f'Kolo {round.round_code_short()} není na celostátní úrovni') return False contest = sess.query(db.Contest).filter_by(round=round).limit(1).one_or_none() if not contest: app.logger.error(f'Kolo {round.round_code_short()} nemá soutěž') return False pion = db.Participation(user=g.user, contest=contest, place=contest.place, state=db.PartState.registered) sess.add(pion) sess.flush() mo.util.log( type=db.LogType.participant, what=g.user.user_id, details={'action': 'add-to-contest', 'new': db.row2dict(pion)}, ) sess.commit() app.logger.info(f'Účastník #{g.user.user_id} automaticky registrován do soutěže #{contest.contest_id}') return True def get_contest(id: int) -> db.Contest: contest = (db.get_session().query(db.Contest) .options(joinedload(db.Contest.place), joinedload(db.Contest.round)) .get(id)) if not contest: raise werkzeug.exceptions.NotFound() # FIXME: Kontrolovat nějak pion.state? pion = (db.get_session().query(db.Participation) .filter_by(user=g.user, contest_id=contest.master_contest_id) .one_or_none()) if not pion: raise werkzeug.exceptions.Forbidden() return contest def get_task(contest: db.Contest, id: int) -> db.Task: task = db.get_session().query(db.Task).get(id) # Nezapomeňme zkontrolovat, že úloha patří do soutěže :) if not task or task.round_id != contest.round_id: raise werkzeug.exceptions.NotFound() return task @app.route('/user/contest/<int:id>/') def user_contest(id: int): sess = db.get_session() contest = get_contest(id) messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all() task_sols = (sess.query(db.Task, db.Solution) .select_from(db.Task) .outerjoin(db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == g.user)) .filter(db.Task.round == contest.round) .options(joinedload(db.Solution.final_submit_obj), joinedload(db.Solution.final_feedback_obj)) .order_by(db.Task.code) .all()) return render_template( 'user_contest.html', contest=contest, task_sols=task_sols, messages=messages, max_submit_size=config.MAX_CONTENT_LENGTH, ) @app.route('/user/contest/<int:id>/news') def user_contest_news(id: int): sess = db.get_session() contest = get_contest(id) messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all() out_messages = [{ 'title': msg.title, 'date_format': time_and_timedelta(msg.created_at), 'body': msg.html, } for msg in messages] return jsonify(out_messages) @app.route('/user/contest/<int:id>/task-statement/zadani.pdf') def user_task_statement(id: int): contest = get_contest(id) if not contest.ct_task_statement_available(): logger.warn(f'Účastník #{g.user.user_id} chce zadání, na které nemá právo') raise werkzeug.exceptions.Forbidden() return mo.web.util.send_task_statement(contest.round) class SubmitForm(FlaskForm): file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()], render_kw={'autofocus': True}) note = wtforms.TextAreaField("Poznámka", description="Zde můžete něco vzkázat organizátorům soutěže.") submit = wtforms.SubmitField('Odevzdat') @app.route('/user/contest/<int:contest_id>/task/<int:task_id>/', methods=('GET', 'POST')) def user_contest_task(contest_id: int, task_id: int): contest = get_contest(contest_id) task = get_task(contest, task_id) sess = db.get_session() messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all() state = contest.ct_state() if state == db.RoundState.preparing: # Dokud se kolo připravuje nebo čeká na zveřejnění zadání, tak ani nezobrazujeme # stránku, abychom něco neprozradili jménem úlohy raise werkzeug.exceptions.Forbidden() form = SubmitForm() if contest.ct_can_submit() and form.validate_on_submit(): file = form.file.data.stream paper = db.Paper(task=task, for_user_obj=g.user, uploaded_by_obj=g.user, type=db.PaperType.solution, note=form.note.data) submitter = mo.submit.Submitter() try: submitter.submit_paper(paper, file.name) except mo.submit.SubmitException as e: flash(f'Chyba: {e}', 'danger') return redirect(url_for('user_contest_task', contest_id=contest_id, task_id=task_id)) sess.add(paper) # FIXME: Bylo by hezké použít INSERT ... ON CONFLICT UPDATE # (SQLAlchemy to umí, ale ne přes ORM, jen core rozhraním) sol = (sess.query(db.Solution) .filter_by(task=task, user=g.user) .with_for_update() .one_or_none()) if sol is None: sol = db.Solution(task=task, user=g.user) sess.add(sol) sol.final_submit_obj = paper sess.commit() if paper.is_broken(): flash('Soubor není korektní PDF, ale přesto jsme ho přijali a pokusíme se ho zpracovat. ' + 'Zkontrolujte prosím, že se na vašem počítači zobrazuje správně.', 'warning') else: flash('Řešení odevzdáno', 'success') return redirect(url_for('user_contest', id=contest_id)) sol = sess.query(db.Solution).filter_by(task=task, user=g.user).one_or_none() papers = (sess.query(db.Paper) .filter_by(for_user_obj=g.user, task=task, type=db.PaperType.solution) .options(joinedload(db.Paper.uploaded_by_obj)) .order_by(db.Paper.uploaded_at.desc()) .all()) return render_template( 'user_contest_task.html', contest=contest, task=task, sol=sol, papers=papers, form=form, messages=messages, ) @app.route('/user/contest/<int:contest_id>/paper/<int:paper_id>/') def user_paper(contest_id: int, paper_id: int): sess = db.get_session() contest = get_contest(contest_id) paper = (sess.query(db.Paper) .options(joinedload(db.Paper.task)) .get(paper_id)) # XXX: Tímhle dáváme útočníkům orákulum na zjišťování validity IDček, # ale to nevadí, protože IDčka stejně přidělujeme sekvenčně. if paper is None: raise werkzeug.exceptions.NotFound() if paper.for_user != g.user.user_id: logger.warn(f'Účastník #{g.user.user_id} chce cizí papír') raise werkzeug.exceptions.Forbidden() if paper.type == db.PaperType.solution: allowed_states = [db.RoundState.running, db.RoundState.grading, db.RoundState.closed] elif paper.type == db.PaperType.feedback: allowed_states = [db.RoundState.closed] else: assert False if paper.task.round != contest.round: logger.warn(f'Účastník #{g.user.user_id} chce papír z jiného kola') raise werkzeug.exceptions.Forbidden() if contest.ct_state() not in allowed_states: raise werkzeug.exceptions.Forbidden() return mo.web.util.send_task_paper(paper)