Skip to content
Snippets Groups Projects
Select Git revision
  • 3298e112d890cb4eea42b1e1dca6f2812c4b0368
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

user.py

Blame
  • user.py 15.45 KiB
    from flask import render_template, jsonify, g, redirect, url_for, flash, request
    from flask_wtf import FlaskForm
    import flask_wtf.file
    from sqlalchemy import and_
    from sqlalchemy.orm import joinedload
    from typing import List, Tuple
    import werkzeug.exceptions
    import wtforms
    from wtforms.validators import Required
    
    import mo
    import mo.config as config
    import mo.email
    import mo.db as db
    import mo.submit
    import mo.util
    from mo.util import logger
    from mo.util_format import time_and_timedelta
    from mo.web import app
    import mo.web.fields as mo_fields
    import mo.web.org_round
    import mo.web.util
    
    
    @app.route('/user/')
    def user_index():
        pcrs = load_pcrs()
    
        if not pcrs:
            return redirect(url_for('user_join'))
    
        return render_template(
            'user_index.html',
            pions=pcrs,
        )
    
    
    def load_pcrs() -> List[Tuple[db.Participation, db.Contest, db.Round]]:
        return (db.get_session().query(db.Participation, db.Contest, db.Round)
                .select_from(db.Participation)
                .join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id)
                .join(db.Round)
                .filter(db.Participation.user == g.user)
                .filter(db.Round.year == mo.current_year)
                .options(joinedload(db.Contest.place))
                .order_by(db.Round.category, db.Round.seq, db.Round.part)
                .all())
    
    
    @app.route('/user/join/')
    def user_join():
        available_rounds: List[db.Round] = (
            db.get_session().query(db.Round)
            .select_from(db.Round)
            .filter_by(year=mo.current_year)
            .filter(db.Round.enroll_mode.in_([db.RoundEnrollMode.register, db.RoundEnrollMode.confirm]))
            .filter_by(state=db.RoundState.running)
            .order_by(db.Round.category, db.Round.seq)
            .all())
        available_rounds = [r for r in available_rounds if not r.is_subround()]
    
        pcrs = load_pcrs()
        pcrs_by_round_id = {pcr[1].round_id: pcr for pcr in pcrs}
    
        return render_template(
            'user_join_list.html',
            available_rounds=available_rounds,
            pcrs_by_round_id=pcrs_by_round_id,
        )
    
    
    class JoinRoundForm(FlaskForm):
        # Zadávání školy je JS hack implementovaný v šabloně. Fields definují jen rozhraní.
        school = mo_fields.School("Škola", validators=[Required()])
        town_query = wtforms.HiddenField()
        town_list = wtforms.HiddenField()
    
        grade = mo_fields.Grade("Třída", validators=[Required()])
        birth_year = mo_fields.BirthYear("Rok narození", validators=[Required()])
        submit = wtforms.SubmitField('Přihlásit se')
    
    
    @app.route('/user/join/<int:round_id>/', methods=('GET', 'POST'))
    def user_join_round(round_id):
        sess = db.get_session()
        round = sess.query(db.Round).get(round_id)
        if not round:
            raise werkzeug.exceptions.NotFound()
    
        if (round.year != mo.current_year
                or round.part > 1
                or round.enroll_mode not in [db.RoundEnrollMode.register, db.RoundEnrollMode.confirm]
                or round.state != db.RoundState.running):
            flash('Do této kategorie se není možné přihlásit.', 'danger')
            return redirect(url_for('user_register'))
    
        pion = (sess.query(db.Participation)
                .select_from(db.Participation)
                .filter_by(user=g.user)
                .join(db.Participation.contest)
                .filter(db.Contest.round == round)
                .with_for_update()
                .one_or_none())
        if pion:
            flash('Do této kategorie už jste přihlášen.', 'info')
            return redirect(url_for('user_join'))
    
        pant = (sess.query(db.Participant)
                .filter_by(user=g.user, year=round.year)
                .with_for_update()
                .one_or_none())
    
        form = JoinRoundForm()
        if pant:
            del form.school
            del form.grade
            del form.birth_year
    
        if form.validate_on_submit():
            if form.submit.data:
                if not pant:
                    pant = join_create_pant(form)
                    sess.add(pant)
                contest = join_create_contest(round, pant)
                join_create_pion(contest)
                sess.commit()
                join_notify(contest)
    
                msg = 'Přihláška přijata.'
                if round.enroll_mode == db.RoundEnrollMode.confirm:
                    msg += ' Ještě ji musí potvrdit organizátor soutěže.'
                flash(msg, 'success')
                return redirect(url_for('user_index'))
        elif not pant and request.method == 'GET':
            # Pokusíme se předvyplnit data z minulých ročníků
            prev_pant = (sess.query(db.Participant)
                         .filter_by(user=g.user)
                         .options(joinedload(db.Participant.school_place, db.Place.parent_place))
                         .order_by(db.Participant.year.desc())
                         .limit(1).one_or_none())
            if prev_pant:
                form.school.data = f'#{prev_pant.school}'
                town = prev_pant.school_place.parent_place
                form.town_query.data = town.name
                form.town_list.data = str(town.place_id)
                form.birth_year.data = prev_pant.birth_year
    
        return render_template(
            'user_join_round.html',
            round=round,
            form=form,
        )
    
    
    def join_create_pant(form: JoinRoundForm) -> db.Participant:
        assert form.school.place is not None
        pant = db.Participant(user=g.user,
                              year=mo.current_year,
                              school_place=form.school.place,
                              grade=form.grade.data,
                              birth_year=form.birth_year.data)
    
        logger.info(f'Join: Účastník #{g.user.user_id} se přihlásil do {pant.year}. ročníku')
        mo.util.log(
            type=db.LogType.participant,
            what=g.user.user_id,
            details={'action': 'create-participant', 'reason': 'user-join', 'new': db.row2dict(pant)},
        )
    
        return pant
    
    
    def join_create_contest(round: db.Round, pant: db.Participant) -> db.Contest:
        sess = db.get_session()
    
        place = pant.school_place
        if place.level != round.level:
            parents = db.get_place_parents(pant.school_place)
            places = [p for p in parents if p.level == round.level]
            assert len(places) == 1
            place = places[0]
        # XXX: Z rekurzivního dotazu nedostaneme plnohodnotný db.Place, ale jenom named tuple, tak musíme pracovat s ID.
        place_id = place.place_id
    
        assert round.part <= 1
        c = (sess.query(db.Contest)
             .filter_by(round=round, place_id=place_id)
             .with_for_update()
             .one_or_none())
        if not c:
            c = db.Contest(
                round=round,
                place_id=place_id,
                state=db.RoundState.running,
            )
            sess.add(c)
            sess.flush()
            c.master = c
    
            logger.info(f'Join: Automaticky založena soutěž #{c.contest_id} {round.round_code()} pro místo #{place_id}')
            mo.util.log(
                type=db.LogType.contest,
                what=c.contest_id,
                details={'action': 'created', 'reason': 'user-join'},
            )
    
            mo.web.org_round.create_subcontests(round, c)
    
        return c
    
    
    def join_create_pion(c: db.Contest) -> None:
        sess = db.get_session()
    
        if c.round.enroll_mode == db.RoundEnrollMode.register:
            state = db.PartState.active
        else:
            state = db.PartState.registered
        p = db.Participation(user=g.user, contest=c, place=c.place, state=state)
        sess.add(p)
    
        logger.info(f'Join: Účastník #{g.user.user_id} přihlášen do soutěže #{c.contest_id}')
        mo.util.log(
            type=db.LogType.participant,
            what=g.user.user_id,
            details={'action': 'add-to-contest', 'reason': 'user-join', 'new': db.row2dict(p)},
        )
    
    
    def join_notify(c: db.Contest) -> None:
        sess = db.get_session()
        r = c.round
        place = c.place
        while place is not None:
            uroles = (sess.query(db.UserRole)
                      .filter(db.UserRole.role.in_((db.RoleType.garant, db.RoleType.garant_kraj, db.RoleType.garant_okres, db.RoleType.garant_skola)))
                      .filter_by(place_id=place.place_id)
                      .options(joinedload(db.UserRole.user))
                      .all())
            notify = {ur.user for ur in uroles if ur.applies_to(at=place, year=r.year, cat=r.category, seq=r.seq) and ur.user.email_notify}
            if notify:
                for org in notify:
                    logger.info(f'Join: Notifikuji orga <{org.email}> pro místo {place.get_code()}')
                    mo.email.send_join_notify_email(org, g.user, c)
                return
            place = place.parent_place
    
        logger.warn('Join: Není komu poslat mail')
    
    
    def get_contest_pion(id: int, require_reg: bool = True) -> Tuple[db.Contest, db.Participation]:
        contest = (db.get_session().query(db.Contest)
                   .options(joinedload(db.Contest.place),
                            joinedload(db.Contest.round))
                   .get(id))
    
        if not contest:
            raise werkzeug.exceptions.NotFound()
    
        pion = (db.get_session().query(db.Participation)
                .filter_by(user=g.user, contest_id=contest.master_contest_id)
                .one_or_none())
        if not pion:
            raise werkzeug.exceptions.Forbidden()
    
        if require_reg and pion.state in [db.PartState.registered, db.PartState.refused]:
            raise werkzeug.exceptions.Forbidden()
    
        return contest, pion
    
    
    def get_contest(id: int, require_reg: bool = True) -> db.Contest:
        contest, _ = get_contest_pion(id, require_reg)
        return contest
    
    
    def get_task(contest: db.Contest, id: int) -> db.Task:
        task = db.get_session().query(db.Task).get(id)
    
        # Nezapomeňme zkontrolovat, že úloha patří do soutěže :)
        if not task or task.round_id != contest.round_id:
            raise werkzeug.exceptions.NotFound()
    
        return task
    
    
    @app.route('/user/contest/<int:id>/')
    def user_contest(id: int):
        sess = db.get_session()
        contest, pion = get_contest_pion(id, require_reg=False)
    
        messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all()
    
        task_sols = (sess.query(db.Task, db.Solution)
                     .select_from(db.Task)
                     .outerjoin(db.Solution, and_(db.Solution.task_id == db.Task.task_id, db.Solution.user == g.user))
                     .filter(db.Task.round == contest.round)
                     .options(joinedload(db.Solution.final_submit_obj),
                              joinedload(db.Solution.final_feedback_obj))
                     .order_by(db.Task.code)
                     .all())
    
        return render_template(
            'user_contest.html',
            contest=contest,
            part_state=pion.state,
            task_sols=task_sols,
            messages=messages,
            max_submit_size=config.MAX_CONTENT_LENGTH,
        )
    
    
    @app.route('/user/contest/<int:id>/news')
    def user_contest_news(id: int):
        sess = db.get_session()
        contest = get_contest(id, require_reg=False)
    
        messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all()
    
        out_messages = [{
            'title': msg.title,
            'date_format': time_and_timedelta(msg.created_at),
            'body': msg.html,
        } for msg in messages]
    
        return jsonify(out_messages)
    
    
    @app.route('/user/contest/<int:id>/task-statement/zadani.pdf')
    def user_task_statement(id: int):
        contest = get_contest(id, require_reg=False)
    
        if not contest.ct_task_statement_available():
            logger.warn(f'Účastník #{g.user.user_id} chce zadání, na které nemá právo')
            raise werkzeug.exceptions.Forbidden()
    
        return mo.web.util.send_task_statement(contest.round)
    
    
    class SubmitForm(FlaskForm):
        file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()], render_kw={'autofocus': True})
        note = wtforms.TextAreaField("Poznámka", description="Zde můžete něco vzkázat organizátorům soutěže.")
        submit = wtforms.SubmitField('Odevzdat')
    
    
    @app.route('/user/contest/<int:contest_id>/task/<int:task_id>/', methods=('GET', 'POST'))
    def user_contest_task(contest_id: int, task_id: int):
        contest = get_contest(contest_id)
        task = get_task(contest, task_id)
        sess = db.get_session()
    
        if contest.round.has_messages:
            messages = sess.query(db.Message).filter_by(round_id=contest.round_id).order_by(db.Message.created_at).all()
        else:
            messages = None
    
        state = contest.ct_state()
        if state == db.RoundState.preparing:
            # Dokud se kolo připravuje nebo čeká na zveřejnění zadání, tak ani nezobrazujeme
            # stránku, abychom něco neprozradili jménem úlohy
            raise werkzeug.exceptions.Forbidden()
    
        form = SubmitForm()
        if contest.ct_can_submit() and form.validate_on_submit():
            file = form.file.data.stream
            paper = db.Paper(task=task, for_user_obj=g.user, uploaded_by_obj=g.user, type=db.PaperType.solution, note=form.note.data)
            submitter = mo.submit.Submitter()
    
            try:
                submitter.submit_paper(paper, file.name)
            except mo.submit.SubmitException as e:
                flash(f'Chyba: {e}', 'danger')
                return redirect(url_for('user_contest_task', contest_id=contest_id, task_id=task_id))
    
            sess.add(paper)
    
            # FIXME: Bylo by hezké použít INSERT ... ON CONFLICT UPDATE
            # (SQLAlchemy to umí, ale ne přes ORM, jen core rozhraním)
            sol = (sess.query(db.Solution)
                   .filter_by(task=task, user=g.user)
                   .with_for_update()
                   .one_or_none())
            if sol is None:
                sol = db.Solution(task=task, user=g.user)
                sess.add(sol)
            sol.final_submit_obj = paper
    
            sess.commit()
    
            if paper.is_broken():
                flash('Soubor není korektní PDF, ale přesto jsme ho přijali a pokusíme se ho zpracovat. ' +
                      'Zkontrolujte prosím, že se na vašem počítači zobrazuje správně.',
                      'warning')
            else:
                flash('Řešení odevzdáno', 'success')
            return redirect(url_for('user_contest', id=contest_id))
    
        sol = sess.query(db.Solution).filter_by(task=task, user=g.user).one_or_none()
    
        papers = (sess.query(db.Paper)
                  .filter_by(for_user_obj=g.user, task=task, type=db.PaperType.solution)
                  .options(joinedload(db.Paper.uploaded_by_obj))
                  .order_by(db.Paper.uploaded_at.desc())
                  .all())
    
        return render_template(
            'user_contest_task.html',
            contest=contest,
            task=task,
            sol=sol,
            papers=papers,
            form=form,
            messages=messages,
        )
    
    
    @app.route('/user/contest/<int:contest_id>/paper/<int:paper_id>/')
    def user_paper(contest_id: int, paper_id: int):
        sess = db.get_session()
    
        contest = get_contest(contest_id)
    
        paper = (sess.query(db.Paper)
                 .options(joinedload(db.Paper.task))
                 .get(paper_id))
    
        # XXX: Tímhle dáváme útočníkům orákulum na zjišťování validity IDček,
        # ale to nevadí, protože IDčka stejně přidělujeme sekvenčně.
        if paper is None:
            raise werkzeug.exceptions.NotFound()
    
        if paper.for_user != g.user.user_id:
            logger.warn(f'Účastník #{g.user.user_id} chce cizí papír')
            raise werkzeug.exceptions.Forbidden()
    
        if paper.type == db.PaperType.solution:
            allowed_states = [db.RoundState.running, db.RoundState.grading, db.RoundState.closed]
        elif paper.type == db.PaperType.feedback:
            allowed_states = [db.RoundState.closed]
        else:
            assert False
    
        if paper.task.round != contest.round:
            logger.warn(f'Účastník #{g.user.user_id} chce papír z jiného kola')
            raise werkzeug.exceptions.Forbidden()
    
        if contest.ct_state() not in allowed_states:
            raise werkzeug.exceptions.Forbidden()
    
        return mo.web.util.send_task_paper(paper)