Skip to content
Snippets Groups Projects
Select Git revision
  • 9786fb2ed4f8326c35583f7ea08d83602271bf3d
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

setup.py

Blame
  • org_round.py 24.04 KiB
    from dataclasses import dataclass, field
    import decimal
    from flask import render_template, g, redirect, flash, request
    import locale
    import flask_wtf.file
    from flask_wtf.form import FlaskForm
    import bleach
    from bleach.sanitizer import ALLOWED_TAGS
    import markdown
    import os
    from sqlalchemy import func
    from sqlalchemy.orm import joinedload, aliased
    from sqlalchemy.sql.functions import coalesce
    from typing import Optional, List, Dict, Tuple, Set
    import werkzeug.exceptions
    import wtforms
    from wtforms import validators, ValidationError
    from wtforms.widgets.html5 import NumberInput
    
    import mo.config as config
    import mo.db as db
    import mo.imports
    from mo.rights import Right
    import mo.util
    from mo.web import app
    import mo.web.fields as mo_fields
    from mo.web.org_contest import get_context
    
    
    @app.route('/org/contest/')
    def org_rounds():
        sess = db.get_session()
    
        rounds = sess.query(db.Round).filter_by(year=config.CURRENT_YEAR).order_by(db.Round.year, db.Round.category, db.Round.seq, db.Round.part)
        return render_template('org_rounds.html', rounds=rounds, history=False)
    
    
    @app.route('/org/contest/history')
    def org_rounds_history():
        sess = db.get_session()
    
        rounds = sess.query(db.Round).order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part)
        return render_template('org_rounds.html', rounds=rounds, history=True)
    
    
    class TaskDeleteForm(FlaskForm):
        delete_task_id = wtforms.IntegerField()
        delete_task = wtforms.SubmitField('Smazat úlohu')
    
    
    def delete_task(round_id: int, form: TaskDeleteForm) -> bool:
        if not (request.method == 'POST' and 'delete_task_id' in request.form and form.validate_on_submit()):
            return False
    
        sess = db.get_session()
        delete_task = sess.query(db.Task).filter_by(
            round_id=round_id, task_id=form.delete_task_id.data
        ).first()
    
        if not delete_task:
            flash('Úloha s daným ID v tomto kole neexistuje', 'danger')
        elif sess.query(db.Solution).filter_by(task_id=delete_task.task_id).first() is not None:
            flash(f'Úlohu {delete_task.code} nelze smazat, existují řešení vázající se na ni', 'danger')
        elif sess.query(db.Paper).filter_by(for_task=delete_task.task_id).first() is not None:
            flash(f'Úlohu {delete_task.code} nelze smazat, existují papíry vázající se na ni', 'danger')
        elif sess.query(db.PointsHistory).filter_by(task_id=delete_task.task_id).first() is not None:
            flash(f'Úlohu {delete_task.code} nelze smazat, existují přidělené body vázající se na ni', 'danger')
        else:
            sess.delete(delete_task)
            mo.util.log(
                type=db.LogType.task,
                what=delete_task.task_id,
                details={'action': 'delete', 'task': db.row2dict(delete_task)},
            )
            app.logger.info(f"Úloha {delete_task.code} ({delete_task.task_id}) smazána: {db.row2dict(delete_task)}")
            sess.commit()
            flash(f'Úloha {delete_task.code} úspěšně smazána', 'success')
            return True
    
        return False
    
    
    class AddContestForm(FlaskForm):
        place = mo_fields.Place('Nová soutěž v oblasti:', validators=[validators.Required()])
        create_contest = wtforms.SubmitField('Založit')
    
    
    def add_contest(round: db.Round, form: AddContestForm) -> bool:
        if not (request.method == 'POST' and 'create_contest' in request.form and form.validate_on_submit()):
            return False
    
        place: db.Place = form.place.place
    
        if place.level != round.level:
            flash(f'{place.type_name().title()} {place.name} není {round.get_level().name}', 'danger')
            return False
    
        sess = db.get_session()
        if sess.query(db.Contest).filter_by(round=round, place=place).one_or_none():
            flash(f'Pro {place.type_name()} {place.name} už toto kolo existuje', 'danger')
            return False
    
        # Počáteční stav soutěže
        if round.state != db.RoundState.delegate:
            state = round.state
        else:
            state = db.RoundState.preparing
    
        # Soutěž vytvoříme vždy v hlavním kole
        contest = db.Contest(round=round.master, place=place, state=state)
        rr = g.gatekeeper.rights_for_contest(contest)
        if not rr.have_right(Right.add_contest):
            flash(f'Vaše role nedovoluje vytvořit soutěž {place.name_locative()}', 'danger')
            return False
    
        sess.add(contest)
        sess.flush()
        contest.master_contest_id = contest.contest_id
        sess.add(contest)
        sess.flush()
    
        mo.util.log(
            type=db.LogType.contest,
            what=contest.contest_id,
            details={'action': 'add', 'contest': db.row2dict(contest)},
        )
        app.logger.info(f"Soutěž #{contest.contest_id} založena: {db.row2dict(contest)}")
    
        create_subcontests(round.master, contest)
    
        sess.commit()
        flash(f'Založena soutěž {place.name_locative()}', 'success')
        return True
    
    
    # XXX: Používá se i v registraci účastníků
    def create_subcontests(master_round: db.Round, master_contest: db.Contest):
        if master_round.part == 0:
            return
    
        sess = db.get_session()
        subrounds = master_round.get_group_rounds()
        for subround in subrounds:
            subcontest = db.Contest(
                round_id=subround.round_id,
                master_contest_id=master_contest.contest_id,
                place_id=master_contest.place_id,
                state=master_contest.state,
            )
            sess.add(subcontest)
            sess.flush()
    
            mo.util.log(
                type=db.LogType.contest,
                what=subcontest.contest_id,
                details={'action': 'add', 'contest': db.row2dict(subcontest)},
            )
            app.logger.info(f"Podsoutěž #{subcontest.contest_id} založena: {db.row2dict(subcontest)}")
    
    
    @dataclass
    class ContestStat:
        region: db.Place
        contest: Optional[db.Contest] = None
        num_contests: int = 0
        contest_states: Set[db.RoundState] = field(default_factory=set)
        num_active_pants: int = 0
        num_unconfirmed_pants: int = 0
    
    
    def region_stats(round: db.Round, region: db.Place) -> List[ContestStat]:
        stats: Dict[int, ContestStat] = {}
        sess = db.get_session()
    
        if region.level > round.level:
            return []
    
        if (region.level >= round.level - 1
           or region.level == 2 and round.level == 4):
            # List individual contests
            q = sess.query(db.Contest).filter_by(round=round)
            q = db.filter_place_nth_parent(q, db.Contest.place_id, round.level - region.level, region.place_id)
            q = q.options(joinedload(db.Contest.place))
            for c in q.all():
                s = ContestStat(region=c.place, contest=c, num_contests=1)
                stats[c.place.place_id] = s
            have_contests = True
        else:
            # List sub-regions
            regs = sess.query(db.Place).filter(db.Place.parent_place == region).all()
            for r in regs:
                s = ContestStat(region=r)
                stats[r.place_id] = s
            have_contests = False
    
        region_ids = [s.region.place_id for s in stats.values()]
    
        if not have_contests:
            rcs = (sess.query(db.RegionContestStat)
                       .filter_by(round=round)
                       .filter(db.RegionContestStat.region.in_(region_ids))
                       .all())
            for r in rcs:
                stats[r.region].num_contests += r.count
                stats[r.region].contest_states.add(r.state)
    
        rs = (sess.query(db.RegionParticipantStat)
                  .filter_by(round_id=round.master_round_id)
                  .filter(db.RegionParticipantStat.region.in_(region_ids))
                  .all())
        for r in rs:
            if r.state == db.PartState.active:
                stats[r.region].num_active_pants = r.count
            elif r.state == db.PartState.registered:
                stats[r.region].num_unconfirmed_pants = r.count
    
        out = list(stats.values())
        out.sort(key=lambda s: locale.strxfrm(s.region.name or ""))
        return out
    
    
    def region_totals(region: db.Place, stats: List[ContestStat]) -> ContestStat:
        return ContestStat(
            region=region,
            num_contests=sum(s.num_contests for s in stats),
            num_active_pants=sum(s.num_active_pants for s in stats),
            num_unconfirmed_pants=sum(s.num_unconfirmed_pants for s in stats),
        )
    
    
    def task_stats(round: db.Round, region: db.Place) -> List[Tuple[db.Task, int]]:
        sess = db.get_session()
        tasks = sess.query(db.Task).filter_by(round=round).all()
        tasks.sort(key=lambda t: t.code)
    
        ts = (sess.query(db.RegionTaskStat)
                  .filter_by(round=round, region=region.place_id)
                  .all())
        count_by_id = {s.task_id: s.count for s in ts}
    
        return [(t, count_by_id.get(t.task_id, 0)) for t in tasks]
    
    
    @app.route('/org/contest/r/<int:round_id>/', methods=('GET', 'POST'))
    @app.route('/org/contest/r/<int:round_id>/h/<int:hier_id>', methods=('GET', 'POST'))
    def org_round(round_id: int, hier_id: Optional[int] = None):
        ctx = get_context(round_id=round_id, hier_id=hier_id)
        round = ctx.round
        rights = ctx.rights
    
        form_delete_task = TaskDeleteForm()
        if rights.have_right(Right.manage_round) and delete_task(round_id, form_delete_task):
            return redirect(ctx.url_for('org_round'))
    
        form_add_contest = AddContestForm()
        form_add_contest.place.label.text = "Nová soutěž " + round.get_level().in_name()
        if add_contest(round, form_add_contest):
            return redirect(ctx.url_for('org_round'))
    
        group_rounds = round.get_group_rounds(True)
        group_rounds.sort(key=lambda r: r.round_code())
    
        region = ctx.hier_place or db.get_root_place()
        reg_stats = region_stats(round, region)
        reg_total = region_totals(region, reg_stats)
        task_info = task_stats(round, region)
    
        return render_template(
            'org_round.html',
            ctx=ctx, rights=rights,
            round=round, group_rounds=group_rounds,
            roles=[r.friendly_name() for r in rights.get_roles()],
            reg_stats=reg_stats, reg_total=reg_total,
            task_info=task_info,
            form_delete_task=form_delete_task,
            form_add_contest=form_add_contest,
            statement_exists=mo.web.util.task_statement_exists(round),
        )
    
    
    class TaskEditForm(FlaskForm):
        code = wtforms.StringField('Kód úlohy', validators=[
            validators.Required(),
            validators.Regexp(r'^[A-Za-z0-9-]+$', message="Kód úlohy smí obsahovat jen nediakritická písmena, čísla a znak -"),
        ], render_kw={'autofocus': True})
        name = wtforms.StringField('Název úlohy')
        type = wtforms.SelectField('Typ úlohy', choices=db.TaskType.choices(), coerce=db.TaskType.coerce)
        max_points = mo_fields.Points(
            'Maximum bodů', validators=[validators.Optional(), validators.NumberRange(min=0)],
            description="Při nastavení maxima nelze udělit více bodů, pro zrušení uložte prázdnou hodnotu",
        )
        submit = wtforms.SubmitField('Uložit')
    
        def __init__(self, points_step: decimal.Decimal, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.max_points.widget = NumberInput(min=0, step=points_step)
    
    
    @app.route('/org/contest/r/<int:round_id>/task/new', methods=('GET', 'POST'))
    def org_round_task_new(round_id: int):
        sess = db.get_session()
        ctx = get_context(round_id=round_id, right_needed=Right.manage_round)
    
        form = TaskEditForm(ctx.master_round.points_step)
        if form.validate_on_submit():
            task = db.Task()
            task.round = ctx.round
            form.populate_obj(task)
    
            if sess.query(db.Task).filter_by(round_id=round_id, code=task.code).first():
                flash('Úloha se stejným kódem již v tomto kole existuje', 'danger')
            else:
                sess.add(task)
                sess.flush()
                mo.util.log(
                    type=db.LogType.task,
                    what=task.task_id,
                    details={'action': 'add', 'task': db.row2dict(task)},
                )
                sess.commit()
                app.logger.info(f"Úloha {task.code} ({task.task_id}) přidána: {db.row2dict(task)}")
                flash('Nová úloha přidána', 'success')
                return redirect(ctx.url_for('org_round'))
    
        return render_template(
            'org_round_task_edit.html',
            ctx=ctx, form=form,
        )
    
    
    @app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/edit', methods=('GET', 'POST'))
    def org_round_task_edit(round_id: int, task_id: int):
        sess = db.get_session()
        ctx = get_context(round_id=round_id, task_id=task_id, right_needed=Right.manage_round)
        task = ctx.task
        assert task
    
        form = TaskEditForm(ctx.master_round.points_step, obj=task)
        if form.validate_on_submit():
            if sess.query(db.Task).filter(
                db.Task.task_id != task_id, db.Task.round_id == round_id, db.Task.code == form.code.data
            ).first():
                flash('Úloha se stejným kódem již v tomto kole existuje', 'danger')
            else:
                form.populate_obj(task)
                if sess.is_modified(task):
                    changes = db.get_object_changes(task)
    
                    mo.util.log(
                        type=db.LogType.task,
                        what=task_id,
                        details={'action': 'edit', 'changes': changes},
                    )
                    sess.commit()
                    app.logger.info(f"Úloha {task.code} ({task_id}) modifikována, změny: {changes}")
                    flash('Změny úlohy uloženy', 'success')
                else:
                    flash('Žádné změny k uložení', 'info')
    
                return redirect(ctx.url_for('org_round', task_id=None))
    
        return render_template(
            'org_round_task_edit.html',
            ctx=ctx, form=form,
        )
    
    
    class RoundEditForm(FlaskForm):
        _for_round: Optional[db.Round] = None
    
        name = wtforms.StringField("Název", render_kw={'autofocus': True})
        code = wtforms.StringField("Kód",
            description="Kód kola používaný v kódech úloh ('1', 'S' apod.). Není-li vyplněn, použije se pořadí kola v kategorii.",
        )
        state = wtforms.SelectField(
            "Stav kola", choices=db.RoundState.choices(), coerce=db.RoundState.coerce,
            description="Stav soutěží ve všech oblastech kola. Pokud zvolíme 'po oblastech', každá soutěž si svůj stav určuje sama.",
        )
        # Only the desktop Firefox does not support datetime-local field nowadays,
        # other browsers does provide date and time picker UI :(
        ct_tasks_start = mo_fields.DateTime("Čas zveřejnění úloh pro účastníky", validators=[validators.Optional()])
        pr_tasks_start = mo_fields.DateTime("Čas zveřejnění úloh pro dozor", validators=[validators.Optional()])
        ct_submit_end = mo_fields.DateTime("Konec odevzdávání pro účastníky", validators=[validators.Optional()])
        pr_submit_end = mo_fields.DateTime("Konec odevzdávání pro dozor", validators=[validators.Optional()])
        score_mode = wtforms.SelectField("Výsledková listina", choices=db.RoundScoreMode.choices(), coerce=db.RoundScoreMode.coerce)
        score_winner_limit = mo_fields.Points(
            "Hranice bodů pro vítěze", validators=[validators.Optional(), validators.NumberRange(min=0)],
            description="Řešitelé s alespoň tolika body budou označeni za vítěze, prázdná hodnota = žádné neoznačovat",
        )
        score_successful_limit = mo_fields.Points(
            "Hranice bodů pro úspěšné řešitele", validators=[validators.Optional(), validators.NumberRange(min=0)],
            description="Řešitelé s alespoň tolika body budou označeni za úspěšné řešitele, prázdná hodnota = žádné neoznačovat",
        )
        points_step = wtforms.SelectField(
            "Přesnost bodování", choices=db.round_points_step_choices, coerce=decimal.Decimal,
            description="Ovlivňuje možnost zadávání nových bodů, již uložené body nezmění"
        )
        enroll_mode = wtforms.SelectField("Režim přihlašování", choices=db.RoundEnrollMode.choices(), coerce=db.RoundEnrollMode.coerce)
        enroll_advert = wtforms.StringField("Popis v přihlášce")
        has_messages = wtforms.BooleanField("Zprávičky pro účastníky (aktivuje možnost vytvářet novinky zobrazované účastníkům)")
        submit = wtforms.SubmitField('Uložit')
    
        def validate_state(self, field):
            if field.data != db.RoundState.preparing:
                if self.ct_tasks_start.data is None:
                    raise ValidationError('Není-li nastaven času začátku soutěže, stav musí být "připravuje se"')
                if self._for_round is not None:
                    num_tasks = db.get_session().query(db.Task).filter_by(round=self._for_round).count()
                    if num_tasks == 0:
                        raise ValidationError('Nejsou-li definovány žádné úlohy, stav musí být "připravuje se"')
    
        def abstract_validate_time_order(self, field):
            if field.data is not None:
                if any([i.data is not None and i.data > field.data for i in [self.ct_tasks_start, self.pr_tasks_start]]):
                    raise ValidationError('Soutěž nesmí skončit dříve než začne.')
    
        def validate_ct_submit_end(self, field):
            self.abstract_validate_time_order(field)
    
        def validate_pr_submit_end(self, field):
            self.abstract_validate_time_order(field)
    
    
    @app.route('/org/contest/r/<int:round_id>/edit', methods=('GET', 'POST'))
    def org_round_edit(round_id: int):
        sess = db.get_session()
        ctx = get_context(round_id=round_id, right_needed=Right.manage_round)
        round = ctx.round
    
        form = RoundEditForm(obj=round)
        form._for_round = round
        if round.is_subround():
            # podkolo nemá nastavení výsledkové listiny
            del form.score_mode
            del form.score_winner_limit
            del form.score_successful_limit
            del form.points_step
            # ani nastavení přihlašování
            del form.enroll_mode
        if form.validate_on_submit():
            form.populate_obj(round)
    
            if sess.is_modified(round):
                changes = db.get_object_changes(round)
    
                app.logger.info(f"Round #{round_id} modified, changes: {changes}")
                mo.util.log(
                    type=db.LogType.round,
                    what=round_id,
                    details={'action': 'edit', 'changes': changes},
                )
    
                if 'state' in changes and round.state != db.RoundState.delegate:
                    # Propagujeme změny stavu do všech soutěží
                    for contest in sess.query(db.Contest).filter_by(round=round).filter(db.Contest.state != round.state):
                        contest.state = round.state
                        ct_changes = db.get_object_changes(contest)
                        app.logger.info(f"Change propagated to contest #{contest.contest_id}: {ct_changes}")
                        mo.util.log(
                            type=db.LogType.contest,
                            what=contest.contest_id,
                            details={'action': 'propagate', 'changes': ct_changes},
                        )
    
                sess.commit()
                flash('Změny kola uloženy', 'success')
            else:
                flash('Žádné změny k uložení', 'info')
    
            return redirect(ctx.url_for('org_round'))
    
        return render_template(
            'org_round_edit.html',
            ctx=ctx,
            round=round,
            form=form,
        )
    
    
    @app.route('/org/contest/r/<int:round_id>/task-statement/zadani.pdf')
    def org_task_statement(round_id: int):
        ctx = get_context(round_id=round_id)
    
        if not ctx.rights.can_view_statement():
            app.logger.warn(f'Organizátor #{g.user.user_id} chce zadání, na které nemá právo')
            raise werkzeug.exceptions.Forbidden()
    
        return mo.web.util.send_task_statement(ctx.round)
    
    
    class StatementEditForm(FlaskForm):
        file = flask_wtf.file.FileField("Soubor", render_kw={'autofocus': True})
        upload = wtforms.SubmitField('Nahrát')
        delete = wtforms.SubmitField('Smazat')
    
    
    @app.route('/org/contest/r/<int:round_id>/task-statement/edit', methods=('GET', 'POST'))
    def org_edit_statement(round_id: int):
        sess = db.get_session()
        ctx = get_context(round_id=round_id, right_needed=Right.manage_round)
        round = ctx.round
    
        def log_changes():
            if sess.is_modified(round):
                changes = db.get_object_changes(round)
                app.logger.info(f"Kolo #{round_id} změněno, změny: {changes}")
                mo.util.log(
                    type=db.LogType.round,
                    what=round_id,
                    details={'action': 'edit', 'changes': changes},
                )
    
        form = StatementEditForm()
        if form.validate_on_submit():
            if form.upload.data:
                if form.file.data is not None:
                    file = form.file.data.stream
                    secure_category = werkzeug.utils.secure_filename(round.category)
                    stmt_dir = f'{round.year}-{secure_category}-{round.seq}'
                    full_dir = os.path.join(mo.util.data_dir('statements'), stmt_dir)
                    os.makedirs(full_dir, exist_ok=True)
                    full_name = mo.util.link_to_dir(file.name, full_dir, suffix='.pdf')
                    file_name = os.path.join(stmt_dir, os.path.basename(full_name))
                    app.logger.info(f'Nahráno zadání: {file_name}')
    
                    round.tasks_file = file_name
                    log_changes()
                    sess.commit()
                    flash('Zadání nahráno', 'success')
                    return redirect(ctx.url_for('org_round'))
                else:
                    flash('Vyberte si prosím soubor', 'danger')
            if form.delete.data:
                round.tasks_file = None
                log_changes()
                sess.commit()
                flash('Zadání smazáno', 'success')
                return redirect(ctx.url_for('org_round'))
    
        return render_template(
            'org_edit_statement.html',
            ctx=ctx,
            round=round,
            form=form,
        )
    
    
    class MessageAddForm(FlaskForm):
        title = wtforms.StringField('Nadpis', validators=[validators.Required()])
        markdown = wtforms.TextAreaField(
            'Text zprávičky', description='Zprávičky lze formátovat pomocí Markdownu',
            validators=[validators.Required()],
            render_kw={'rows': 10},
        )
        submit = wtforms.SubmitField(label='Vložit zprávičku')
        preview = wtforms.SubmitField(label='Zobrazit náhled')
    
    
    class MessageRemoveForm(FlaskForm):
        message_id = wtforms.IntegerField(validators=[validators.Required()])
        message_remove = wtforms.SubmitField()
    
    
    @app.route('/org/contest/r/<int:round_id>/messages/', methods=('GET', 'POST'))
    def org_round_messages(round_id: int):
        sess = db.get_session()
        ctx = get_context(round_id=round_id)
        round = ctx.round
    
        if not round.has_messages:
            flash('Toto kolo nemá aktivní zprávičky pro účastníky, aktivujte je v nastavení kola', 'warning')
            return redirect(ctx.url_for('org_round'))
    
        messages = sess.query(db.Message).filter_by(round_id=round_id).order_by(db.Message.created_at).all()
    
        add_form: Optional[MessageAddForm] = None
        remove_form: Optional[MessageRemoveForm] = None
        preview: Optional[db.Message] = None
        if ctx.rights.have_right(Right.manage_round):
            add_form = MessageAddForm()
            remove_form = MessageRemoveForm()
    
            if remove_form.validate_on_submit() and remove_form.message_remove.data:
                msg = sess.query(db.Message).get(remove_form.message_id.data)
                if not msg or msg.round_id != round_id:
                    raise werkzeug.exceptions.NotFound()
                sess.delete(msg)
                sess.commit()
                app.logger.info(f"Zprávička pro kolo {round_id} odstraněna: {db.row2dict(msg)}")
    
                flash('Zprávička odstraněna', 'success')
                return redirect(ctx.url_for('org_round_messages'))
    
            if add_form.validate_on_submit():
                msg = db.Message(
                    round_id=round_id,
                    created_by=g.user.user_id,
                    created_at=mo.now,
                )
                add_form.populate_obj(msg)
                msg.html = bleach.clean(
                    markdown.markdown(msg.markdown),
                    tags=ALLOWED_TAGS+['p']
                )
    
                if add_form.preview.data:
                    preview = msg
                elif add_form.submit.data:
                    sess.add(msg)
                    sess.commit()
                    app.logger.info(f"Vložena nová zprávička pro kolo {round_id}: {db.row2dict(msg)}")
    
                    flash('Zprávička úspěšně vložena', 'success')
                    return redirect(ctx.url_for('org_round_messages'))
    
        return render_template(
            'org_round_messages.html',
            ctx=ctx,
            round=round, messages=messages,
            add_form=add_form, remove_form=remove_form,
            preview=preview,
        )