Skip to content
Snippets Groups Projects
Select Git revision
  • 7dd20e3084817a520ece177f6fa3fe6377cc9ecd
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

db.py

Blame
  • org_round.py 16.06 KiB
    from flask import render_template, g, redirect, url_for, flash, request
    import locale
    from flask_wtf.form import FlaskForm
    from sqlalchemy import func
    from sqlalchemy.orm import joinedload
    from sqlalchemy.sql.functions import coalesce
    from typing import Optional, Tuple
    import werkzeug.exceptions
    import wtforms
    from wtforms import validators
    from wtforms.fields.html5 import IntegerField
    
    import mo
    import mo.db as db
    import mo.imports
    from mo.rights import Right, Rights
    import mo.util
    from mo.web import app
    from mo.web.org_contest import ParticipantsActionForm, ParticipantsFilterForm, get_contestants_query, make_contestant_table, \
        generic_import, generic_batch_download, generic_batch_upload, generic_batch_points
    
    
    def get_round(id: int) -> db.Round:
        round = db.get_session().query(db.Round).get(id)
        if not round:
            raise werkzeug.exceptions.NotFound()
        return round
    
    
    def get_round_rr(id: int, right_needed: Optional[Right], any_place: bool) -> Tuple[db.Round, Rights]:
        round = get_round(id)
    
        rr = g.gatekeeper.rights_for_round(round, any_place)
    
        if not (right_needed is None or rr.have_right(right_needed)):
            raise werkzeug.exceptions.Forbidden()
    
        return round, rr
    
    
    def get_task(round: db.Round, task_id: int) -> db.Task:
        task = db.get_session().query(db.Task).get(task_id)
        if not task or task.round_id != round.round_id:
            raise werkzeug.exceptions.NotFound()
        return task
    
    
    @app.route('/org/contest/')
    def org_rounds():
        sess = db.get_session()
    
        rounds = sess.query(db.Round).filter_by(year=mo.current_year).order_by(db.Round.year, db.Round.category, db.Round.seq)
        return render_template('org_rounds.html', rounds=rounds, level_names=mo.db.place_level_names)
    
    
    class TaskDeleteForm(FlaskForm):
        delete_task_id = wtforms.IntegerField()
        delete_task = wtforms.SubmitField('Smazat úlohu')
    
    
    def delete_task(round_id: int, form: TaskDeleteForm) -> bool:
        if not (request.method == 'POST' and 'delete_task_id' in request.form and form.validate_on_submit()):
            return False
    
        sess = db.get_session()
        delete_task = sess.query(db.Task).filter_by(
            round_id=round_id, task_id=form.delete_task_id.data
        ).first()
    
        if not delete_task:
            flash('Úloha s daným ID v tomto kole neexistuje', 'danger')
        elif sess.query(db.Solution).filter_by(task_id=delete_task.task_id).first() is not None:
            flash(f'Úlohu {delete_task.code} nelze smazat, existují řešení vázající se na ni', 'danger')
        elif sess.query(db.Paper).filter_by(for_task=delete_task.task_id).first() is not None:
            flash(f'Úlohu {delete_task.code} nelze smazat, existují papíry vázající se na ni', 'danger')
        elif sess.query(db.PointsHistory).filter_by(task_id=delete_task.task_id).first() is not None:
            flash(f'Úlohu {delete_task.code} nelze smazat, existují přidělené body vázající se na ni', 'danger')
        else:
            sess.delete(delete_task)
            mo.util.log(
                type=db.LogType.task,
                what=delete_task.task_id,
                details={'action': 'delete', 'task': db.row2dict(delete_task)},
            )
            app.logger.info(f"Úloha {delete_task.code} ({delete_task.task_id}) smazána: {db.row2dict(delete_task)}")
            sess.commit()
            flash(f'Úloha {delete_task.code} úspěšně smazána', 'success')
            return True
    
        return False
    
    
    class AddContestForm(FlaskForm):
        place_code = wtforms.StringField('Nová soutěž v oblasti:', validators=[validators.Required()])
        create_contest = wtforms.SubmitField('Založit')
    
    
    def add_contest(round: db.Round, form: AddContestForm) -> bool:
        if not (request.method == 'POST' and 'create_contest' in request.form and form.validate_on_submit()):
            return False
    
        place = db.get_place_by_code(form.place_code.data)
        if place is None:
            flash(f'Místo s kódem {form.place_code.data} neexistuje', 'danger')
            return False
    
        if place.level != round.level:
            flash(f'{place.type_name().title()} {place.name} není {db.place_level_names[round.level]}', 'danger')
            return False
    
        sess = db.get_session()
        if sess.query(db.Contest).filter_by(round=round, place=place).one_or_none():
            flash(f'Pro {place.type_name()} {place.name} už toto kolo existuje', 'danger')
            return False
    
        contest = db.Contest(round=round, place=place)
        rr = g.gatekeeper.rights_for_contest(contest)
        if not rr.have_right(Right.add_contest):
            flash('Vaše role nedovoluje vytvořit soutěž v oblasti {place.type_name()} {place.name}', 'danger')
            return False
    
        sess.add(contest)
        sess.flush()
    
        mo.util.log(
            type=db.LogType.contest,
            what=contest.contest_id,
            details={'action': 'add', 'contest': db.row2dict(contest)},
        )
        app.logger.info(f"Soutěž #{contest.contest_id} založena: {db.row2dict(contest)}")
        sess.commit()
    
        flash(f'Soutěž v oblasti {place.type_name()} {place.name} založena', 'success')
        return True
    
    
    @app.route('/org/contest/r/<int:id>/', methods=('GET', 'POST'))
    def org_round(id: int):
        sess = db.get_session()
        round, rr = get_round_rr(id, None, True)
    
        can_manage_round = rr.have_right(Right.manage_round)
        can_manage_contestants = rr.have_right(Right.manage_contest)
    
        participants_count = sess.query(
            db.Participation.contest_id,
            func.count(db.Participation.user_id).label('count')
        ).group_by(db.Participation.contest_id).subquery()
    
        contests_counts = (sess.query(
                        db.Contest,
                        coalesce(participants_count.c.count, 0)
                    ).outerjoin(participants_count)
                    .filter(db.Contest.round == round)
                    .options(joinedload(db.Contest.place))
                    .all())
    
        contests_counts.sort(key=lambda c: locale.strxfrm(c[0].place.name))
    
        sol_counts_q = (
            sess.query(db.Solution.task_id, func.count(db.Solution.task_id))
            .filter(db.Solution.task_id.in_(
                sess.query(db.Task.task_id).filter_by(round=round)
            ))
        )
    
        sol_counts = {}
        for task_id, count in sol_counts_q.group_by(db.Solution.task_id).all():
            sol_counts[task_id] = count
    
        tasks = sess.query(db.Task).filter_by(round=round).all()
        tasks.sort(key=lambda t: t.code)
        for task in tasks:
            task.sol_count = sol_counts[task.task_id] if task.task_id in sol_counts else 0
    
        form_delete_task = TaskDeleteForm()
        if can_manage_round and delete_task(id, form_delete_task):
            return redirect(url_for('org_round', id=id))
    
        form_add_contest = AddContestForm()
        if add_contest(round, form_add_contest):
            return redirect(url_for('org_round', id=id))
    
        return render_template(
            'org_round.html',
            round=round,
            roles=[r.friendly_name() for r in rr.get_roles()],
            contests_counts=contests_counts,
            tasks=tasks, form_delete_task=form_delete_task,
            form_add_contest=form_add_contest,
            level_names=mo.db.place_level_names,
            can_manage_round=can_manage_round,
            can_manage_contestants=can_manage_contestants,
            can_handle_submits=rr.have_right(Right.view_submits),
            can_upload=rr.can_upload_feedback(round),
            can_view_statement=rr.can_view_statement(round),
            can_add_contest=g.gatekeeper.rights_generic().have_right(Right.add_contest),
        )
    
    
    class TaskEditForm(FlaskForm):
        code = wtforms.StringField('Kód úlohy', validators=[
            validators.Required(),
            # trik: nelze použít \w protože obsahuje _, \W je negace \w, takže [^\W-] je \w bez _
            validators.Regexp(r'^([^\W_]|-)+$', message="Kód úlohy smí obsahovat jen písmena, čísla a znak -"),
        ])
        name = wtforms.StringField('Název úlohy')
        submit = wtforms.SubmitField('Uložit')
    
    
    @app.route('/org/contest/r/<int:id>/task/new', methods=('GET', 'POST'))
    def org_round_task_new(id: int):
        sess = db.get_session()
        round, rr = get_round_rr(id, Right.manage_round, True)
    
        form = TaskEditForm()
        if form.validate_on_submit():
            task = db.Task()
            task.round = round
            form.populate_obj(task)
    
            if sess.query(db.Task).filter_by(round_id=id, code=task.code).first():
                flash('Úloha se stejným kódem již v tomto kole existuje', 'danger')
            else:
                sess.add(task)
                sess.flush()
                mo.util.log(
                    type=db.LogType.task,
                    what=task.task_id,
                    details={'action': 'add', 'task': db.row2dict(task)},
                )
                sess.commit()
                app.logger.info(f"Úloha {task.code} ({task.task_id}) přidána: {db.row2dict(task)}")
                flash('Nová úloha přidána', 'success')
                return redirect(url_for('org_round', id=id))
    
        return render_template(
            'org_round_task_edit.html',
            round=round, task=None, form=form,
        )
    
    
    @app.route('/org/contest/r/<int:id>/task/<int:task_id>/edit', methods=('GET', 'POST'))
    def org_round_task_edit(id: int, task_id: int):
        sess = db.get_session()
        round, rr = get_round_rr(id, Right.manage_round, True)
    
        task = sess.query(db.Task).get(task_id)
        # FIXME: Check contest!
        if not task:
            raise werkzeug.exceptions.NotFound()
    
        form = TaskEditForm(obj=task)
        if form.validate_on_submit():
            if sess.query(db.Task).filter(
                db.Task.task_id != task_id, db.Task.round_id == id, db.Task.code == form.code.data
            ).first():
                flash('Úloha se stejným kódem již v tomto kole existuje', 'danger')
            else:
                form.populate_obj(task)
                if sess.is_modified(task):
                    changes = db.get_object_changes(task)
    
                    mo.util.log(
                        type=db.LogType.task,
                        what=task_id,
                        details={'action': 'edit', 'changes': changes},
                    )
                    sess.commit()
                    app.logger.info(f"Úloha {task.code} ({task_id}) modifikována, změny: {changes}")
                    flash('Změny úlohy uloženy', 'success')
                else:
                    flash(u'Žádné změny k uložení', 'info')
    
                return redirect(url_for('org_round', id=id))
    
        return render_template(
            'org_round_task_edit.html',
            round=round, task=task, form=form,
        )
    
    
    @app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/download', methods=('GET', 'POST'))
    def org_round_task_download(round_id: int, task_id: int):
        round, rr = get_round_rr(round_id, Right.view_submits, False)
        task = get_task(round, task_id)
        return generic_batch_download(round=round, contest=None, site=None, task=task)
    
    
    @app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/upload', methods=('GET', 'POST'))
    def org_round_task_upload(round_id: int, task_id: int):
        round, rr = get_round_rr(round_id, Right.view_submits, False)
        task = get_task(round, task_id)
        return generic_batch_upload(round=round, contest=None, site=None, task=task,
                                    can_upload_solutions=rr.can_upload_solutions(round),
                                    can_upload_feedback=rr.can_upload_feedback(round))
    
    
    @app.route('/org/contest/r/<int:round_id>/task/<int:task_id>/batch-points', methods=('GET', 'POST'))
    def org_round_task_batch_points(round_id: int, task_id: int):
        round, rr = get_round_rr(round_id, Right.edit_points, True)
        task = get_task(round, task_id)
        return generic_batch_points(round=round, contest=None, task=task)
    
    
    @app.route('/org/contest/r/<int:id>/list', methods=('GET', 'POST'))
    def org_round_list(id: int):
        round, rr = get_round_rr(id, Right.manage_contest, True)
        format = request.args.get('format', "")
    
        filter = ParticipantsFilterForm(request.args)
        filter.validate()
        query = get_contestants_query(
            round=round,
            school=filter.f_school,
            contest_place=filter.f_contest_place,
            participation_place=filter.f_participation_place,
            participation_state=filter.f_participation_state,
        )
    
        action_form = ParticipantsActionForm()
        if action_form.do_action(round=round, query=query):
            # Action happened, redirect
            return redirect(request.url)
    
        if format == "":
            (count, query) = filter.apply_limits(query, pagesize=50)
            # count = db.get_count(query)
    
            table = make_contestant_table(query, add_contest_column=True, add_checkbox=True)
            return render_template(
                'org_round_list.html',
                round=round,
                table=table,
                filter=filter, count=count, action_form=action_form,
            )
        else:
            table = make_contestant_table(query)
            return table.send_as(format)
    
    
    @app.route('/org/contest/r/<int:id>/import', methods=('GET', 'POST'))
    def org_round_import(id: int):
        round, rr = get_round_rr(id, Right.manage_contest, True)
        return generic_import(round, None)
    
    
    class MODateTimeField(wtforms.DateTimeField):
    
        def __init__(self, label, format='%Y-%m-%d %H:%M', description='Ve formátu 2000-01-01 12:34', **kwargs):
            super().__init__(label, format=format, description=description, **kwargs)
    
        def process_data(self, valuelist):
            super().process_data(valuelist)
            if self.data is not None:
                self.data = self.data.astimezone()
    
        def process_formdata(self, valuelist):
            super().process_formdata(valuelist)
            if self.data is not None:
                self.data = self.data.astimezone()
    
    
    class RoundEditForm(FlaskForm):
        state = wtforms.SelectField("Stav kola", choices=db.RoundState.choices(), coerce=db.RoundState.coerce)
        # Only the desktop Firefox does not support datetime-local field nowadays,
        # other browsers does provide date and time picker UI :(
        tasks_file = wtforms.StringField("Soubor se zadáním", description="Cesta k ručně uploadovanému souboru", filters=[lambda x: x or None])
        ct_tasks_start = MODateTimeField("Čas zveřejnění úloh pro účastníky", validators=[validators.Optional()])
        pr_tasks_start = MODateTimeField("Čas zveřejnění úloh pro dozor", validators=[validators.Optional()])
        ct_submit_end = MODateTimeField("Konec odevzdávání pro účastníky", validators=[validators.Optional()])
        pr_submit_end = MODateTimeField("Konec odevzdávání pro dozor", validators=[validators.Optional()])
        score_mode = wtforms.SelectField("Výsledková listina", choices=db.RoundScoreMode.choices(), coerce=db.RoundScoreMode.coerce)
        score_winner_limit = IntegerField(
            "Hranice bodů pro vítěze", validators=[validators.Optional()],
            description="Řešitelé s alespoň tolika body budou označeni za vítěze, prázdná hodnota = žádné neoznačovat",
        )
        score_successful_limit = IntegerField(
            "Hranice bodů pro úspěšné řešitele", validators=[validators.Optional()],
            description="Řešitelé s alespoň tolika body budou označeni za úspěšné řešitele, prázdná hodnota = žádné neoznačovat",
        )
        submit = wtforms.SubmitField('Uložit')
    
    
    @app.route('/org/contest/r/<int:id>/edit', methods=('GET', 'POST'))
    def org_round_edit(id: int):
        sess = db.get_session()
        round, rr = get_round_rr(id, Right.manage_round, True)
    
        form = RoundEditForm(obj=round)
        if form.validate_on_submit():
            form.populate_obj(round)
    
            if sess.is_modified(round):
                changes = db.get_object_changes(round)
    
                app.logger.info(f"Round {id} modified, changes: {changes}")
                mo.util.log(
                    type=db.LogType.round,
                    what=id,
                    details={'action': 'edit', 'changes': changes},
                )
                sess.commit()
                flash('Změny kola uloženy', 'success')
            else:
                flash(u'Žádné změny k uložení', 'info')
    
            return redirect(url_for('org_round', id=id))
    
        return render_template(
            'org_round_edit.html',
            round=round,
            form=form,
        )
    
    
    @app.route('/org/contest/r/<int:id>/task-statement/zadani.pdf')
    def org_task_statement(id: int):
        round, rr = get_round_rr(id, None, True)
    
        if not rr.can_view_statement(round):
            app.logger.warn(f'Organizátor #{g.user.user_id} chce zadání, na které nemá právo')
            raise werkzeug.exceptions.Forbidden()
    
        return mo.web.util.send_task_statement(round)