Skip to content
Snippets Groups Projects
Select Git revision
  • 9577f8eff4771c0e5cad09c2fce35ca80610bce9
  • master default protected
2 results

matrix_tests.py

Blame
  • org_contest.py 21.15 KiB
    from dataclasses import dataclass
    from flask import render_template, g, redirect, url_for, flash, request
    from flask_wtf import FlaskForm
    import flask_wtf.file
    import locale
    import os
    import secrets
    from sqlalchemy.orm import joinedload
    from sqlalchemy.orm.query import Query
    from typing import List, Tuple, Optional, Sequence, Dict
    import werkzeug.exceptions
    import wtforms
    
    import mo
    import mo.csv
    import mo.db as db
    import mo.imports
    import mo.rights
    from mo.rights import Right, Rights
    import mo.util
    from mo.web import app
    from mo.web.util import PagerForm
    from mo.web.table import CellCheckbox, Table, Column, cell_place_link, cell_user_link, cell_email_link
    import wtforms.validators as validators
    
    
    class ImportForm(FlaskForm):
        file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()])
        submit = wtforms.SubmitField('Importovat')
    
    
    class ParticipantsFilterForm(PagerForm):
        school = wtforms.StringField("Škola")
        participation_place = wtforms.StringField("Soutěžní místo")
        contest_place = wtforms.StringField("Soutěžní oblast")
        participation_state = wtforms.SelectField('Stav účasti', choices=[('*', '*')] + list(db.PartState.choices()), default='*')
    
        # format = wtforms.RadioField(choices=[('', 'Zobrazit'), ('csv', 'Stáhnout vše v CSV'), ('tsv', 'Stáhnout vše v TSV')])
        submit = wtforms.SubmitField("Zobrazit")
        download_csv = wtforms.SubmitField("↓ CSV")
        download_tsv = wtforms.SubmitField("↓ TSV")
    
    
    class ParticipantsActionForm(FlaskForm):
        action_on = wtforms.RadioField(
            "Provést akci na", validators=[validators.DataRequired()],
            choices=[('all', 'všech vyfiltrovaných účastnících'), ('checked', 'označených účastnících')]
            # checkboxes are handled not through FlaskForm, see below
        )
    
        participation_state = wtforms.SelectField('Stav účasti', choices=list(db.PartState.choices()))
        set_participation_state = wtforms.SubmitField("Nastavit stav účasti")
    
        participation_place = wtforms.StringField(
            'Soutěžní místo', description='Zadejte kód nebo ID ve tvaru <code>#123</code>'
        )
        set_participation_place = wtforms.SubmitField("Nastavit soutěžní místo")
    
        contest_place = wtforms.StringField(
            'Soutěžní oblast',
            description='Musí existovat soutěž v dané oblasti pro stejné kolo. Oblast zadejte pomocí kódu nebo ID ve tvaru <code>#123</code>.'
        )
        set_contest = wtforms.SubmitField("Přesunout do jiné soutěžní oblasti")
    
        remove_participation = wtforms.SubmitField("Smazat záznam o účasti")
    
        def do_action(self, round: db.Round, rights: Rights, query: Query) -> bool:
            """Do participation modification on partipations from given query
            (possibly filtered by checkboxes). `rights` param is used to check rights
            for contest of each modified participation or for contest in which
            participation is moved to."""
    
            if not self.validate_on_submit():
                return False
    
            sess = db.get_session()
    
            # Check that operation is valid
            if self.set_participation_state.data:
                pass
            elif self.set_participation_place.data:
                participation_place = db.get_place_by_code(self.participation_place.data)
                if not participation_place:
                    flash('Nenalezeno místo s daným kódem', 'danger')
                    return False
            elif self.set_contest.data:
                contest_place = db.get_place_by_code(self.contest_place.data)
                if not contest_place:
                    flash("Nepovedlo se najít zadanou soutěžní oblast", 'danger')
                    return False
                contest = sess.query(db.Contest).filter_by(round_id=round.round_id, place_id=contest_place.place_id).one_or_none()
                if not contest:
                    flash(f"Nepovedlo se najít soutěž v kole {round.round_code()} v oblasti {contest_place.name}", 'danger')
                    return False
                rights.get_for_contest(contest)
                if not rights.have_right(Right.manage_contest):
                    flash(f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {contest_place.name}, nelze do ní přesunout účastníky", 'danger')
                    return False
            elif self.remove_participation.data:
                pass
            else:
                flash('Neznámá operace', 'danger')
                return False
    
            try:
                user_ids = list(map(int, request.form.getlist('checked')))
            except ValueError:
                flash('Data v checkboxech nelze převést na čísla, kontaktujte správce', 'danger')
                return False
    
            # Check all participations if we can edit them
            ctants = query.all()
            rights_cache = set()
            for pion, _, _ in ctants:
                u = pion.user
                if self.action_on.data == 'checked' and u.user_id not in user_ids:
                    continue
                if pion.contest_id in rights_cache:
                    continue
                rights.get_for_contest(pion.contest)
                if rights.have_right(Right.manage_contest):
                    rights_cache.add(pion.contest_id)
                    continue
                flash(
                    f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {pion.contest.place.name} "
                    + f"(účastník {u.full_name()}). Žádná akce nebyla provedena.", 'danger'
                )
                return False
    
            count = 0
            for pion, _, _ in ctants:
                u = pion.user
                if self.action_on.data == 'checked' and u.user_id not in user_ids:
                    continue
    
                if self.remove_participation.data:
                    sess.delete(pion)
                    app.logger.info(f"Participation of user {u.user_id} in contest {pion.contest} removed")
                    mo.util.log(
                        type=db.LogType.participant,
                        what=u.user_id,
                        details={'action': 'participation-removed', 'participation': db.row2dict(pion)},
                    )
                else:
                    if self.set_participation_state.data:
                        pion.state = self.participation_state.data
                    elif self.set_participation_place.data:
                        pion.place = participation_place
                    elif self.set_contest.data:
                        pion.contest = contest
    
                    changes = db.get_object_changes(pion)
                    app.logger.info(f"Participation of user {u.user_id} modified, changes: {changes}")
                    mo.util.log(
                        type=db.LogType.participant,
                        what=u.user_id,
                        details={'action': 'participation-changed', 'changes': changes},
                    )
                    sess.flush()
                count += 1
    
            sess.commit()
            if count == 0:
                flash('Žádní vybraní účastníci', 'warning')
            elif self.set_participation_state.data:
                flash(f'Nastaven stav {db.part_state_names[self.participation_state.data]} {count} účastníkům', 'success')
            elif self.set_participation_place.data:
                flash(f'Nastaveno soutěžní místo {participation_place.name} {count} účastníkům', 'success')
            elif self.set_contest.data:
                flash(f'{count} účastníků přesunuto do soutěže v oblasti {contest_place.name}', 'success')
            elif self.remove_participation.data:
                flash(f'Odstraněno {count} účastníků z této soutěže', 'success')
    
            return True
    
    
    def get_contest(id: int) -> db.Contest:
        contest = (db.get_session().query(db.Contest)
                   .options(joinedload(db.Contest.place),
                            joinedload(db.Contest.round))
                   .get(id))
        if not contest:
            raise werkzeug.exceptions.NotFound()
        return contest
    
    
    def get_contest_rr(id: int, right_needed: Optional[Right] = None) -> Tuple[db.Contest, Rights]:
        contest = get_contest(id)
    
        rr = Rights(g.user)
        rr.get_for_contest(contest)
    
        if not (right_needed is None or rr.have_right(right_needed)):
            raise werkzeug.exceptions.Forbidden()
    
        return contest, rr
    
    
    @app.route('/org/contest/c/<int:id>')
    def org_contest(id: int):
        contest, rr = get_contest_rr(id, None)
    
        return render_template(
            'org_contest.html',
            contest=contest,
            rights=sorted(rr.current_rights, key=lambda r: r. name),
            can_manage=rr.have_right(Right.manage_contest),
        )
    
    
    @app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
    def org_contest_import(id: int):
        contest, rr = get_contest_rr(id, Right.manage_contest)
    
        form = ImportForm()
        errs = []
        if form.validate_on_submit():
            tmp_name = secrets.token_hex(16) + '.csv'
            tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
            form.file.data.save(tmp_path)
    
            imp = mo.imports.Import(g.user)
            if imp.import_contest(contest.round, contest, tmp_path):
                flash(f'Účastníci importováni ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_participations} účastí)', 'success')
                return redirect(url_for('org_contest', id=contest.contest_id))
            else:
                flash('Došlo k chybě při importu (detaily níže)', 'danger')
                errs = imp.errors
    
        return render_template(
            'org_contest_import.html',
            contest=contest,
            form=form,
            errs=errs,
        )
    
    
    @app.route('/doc/import_contest')
    def doc_import_contest():
        return render_template('doc_import_contest.html')
    
    
    @app.route('/org/contest/import/sablona.csv')
    def org_contest_import_template():
        out = mo.imports.contest_template()
        resp = app.make_response(out)
        resp.content_type = 'text/csv; charset=utf=8'
        return resp
    
    
    @app.route('/org/contest/c/<int:id>/ucastnici', methods=('GET', 'POST'))
    def org_contest_list(id: int):
        contest, rr = get_contest_rr(id)
        can_edit = rr.have_right(Right.manage_contest)
        format = request.args.get('format', "")
    
        filter = ParticipantsFilterForm(request.args)
        filter.validate()
        query = get_contestants_query(
            round=contest.round, contest=contest,
            school=db.get_place_by_code(filter.school.data),
            # contest_place=db.get_place_by_code(filter.contest_place.data),
            participation_place=db.get_place_by_code(filter.participation_place.data),
            participation_state=None if filter.participation_state.data == '*' else filter.participation_state.data
        )
    
        action_form = None
        if can_edit:
            action_form = ParticipantsActionForm()
            if action_form.do_action(round=contest.round, rights=rr, query=query):
                # Action happened, redirect
                return redirect(request.url)
    
        # (count, query) = filter.apply_limits(query, pagesize=50)
        count = query.count()
    
        if format == "":
            table = make_contestant_table(query, add_checkbox=can_edit)
            return render_template(
                'org_contest_list.html',
                contest=contest,
                table=table,
                filter=filter, count=count, action_form=action_form,
            )
        else:
            table = make_contestant_table(query)
            return table.send_as(format)
    
    
    contest_list_columns = (
        Column(key='first_name',    name='krestni',     title='Křestní jméno'),
        Column(key='last_name',     name='prijmeni',    title='Příjmení'),
        Column(key='email',         name='email',       title='E-mail'),
        Column(key='school',        name='skola',       title='Škola'),
        Column(key='school_code',   name='kod_skoly',   title='Kód školy'),
        Column(key='grade',         name='rocnik',      title='Ročník'),
        Column(key='born_year',     name='rok_naroz',   title='Rok naroz.'),
        Column(key='place_code',    name='kod_soutez_mista',  title='Sout. místo'),
        Column(key='status',        name='stav',        title='Stav'),
    )
    
    
    def get_contestants_query(
            round: db.Round, contest: Optional[db.Contest] = None,
            contest_place: Optional[db.Place] = None,
            participation_place: Optional[db.Place] = None,
            participation_state: Optional[db.PartState] = None,
            school: Optional[db.Place] = None) -> Query:
        query = (db.get_session()
                 .query(db.Participation, db.Participant, db.Contest)
                 .select_from(db.Participation)
                 .join(db.Participant, db.Participant.user_id == db.Participation.user_id)
                 .filter(db.Participant.year == round.year))
        if contest:
            query = query.join(db.Contest, db.Contest.contest_id == contest.contest_id)
        else:
            query = query.filter(db.Contest.round == round)
            query = query.options(joinedload(db.Contest.place))
        query = query.filter(db.Participation.contest_id == db.Contest.contest_id)
        if contest_place:
            query = query.filter(db.Contest.place_id == contest_place.place_id)
        if participation_place:
            query = query.filter(db.Participation.place_id == participation_place.place_id)
        if school:
            query = query.filter(db.Participant.school == school.place_id)
        if participation_state:
            query = query.filter(db.Participation.state == participation_state)
        query = query.options(joinedload(db.Participation.user),
                              joinedload(db.Participation.place),
                              joinedload(db.Participant.school_place))
    
        return query
    
    
    def make_contestant_table(query: Query, add_checkbox: bool = False, add_contest_column: bool = False):
        ctants = query.all()
    
        rows: List[dict] = []
        for pion, pant, ct in ctants:
            u = pion.user
            rows.append({
                'sort_key': (locale.strxfrm(u.last_name), locale.strxfrm(u.first_name), u.user_id),
                'first_name': cell_user_link(u, u.first_name),
                'last_name': cell_user_link(u, u.last_name),
                'email': cell_email_link(u),
                'school': pant.school_place.name,
                'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()),
                'grade': pant.grade,
                'born_year': pant.birth_year,
                'region_code': cell_place_link(ct.place, ct.place.get_code()),
                'place_code': cell_place_link(pion.place, pion.place.get_code()),
                'status': pion.state.friendly_name(),
                'checkbox': CellCheckbox('checked', u.user_id, False),
            })
    
        rows.sort(key=lambda r: r['sort_key'])
    
        cols: Sequence[Column] = contest_list_columns
        if add_checkbox:
            cols = [Column(key='checkbox', name=' ', title=' ')] + list(cols)
        if add_contest_column:
            cols = list(cols) + [Column(key='region_code', name='kod_oblasti', title='Oblast')]
    
        return Table(
            columns=cols,
            rows=rows,
            filename='ucastnici',
            show_downlink=False,  # downlinks are in filter
        )
    
    
    @app.route('/org/contest/c/<int:id>/reseni')
    def org_contest_solutions(id: int):
        # FIXME: Práva?
        # FIXME: Hlavička stránky podle Jirkova předělání
        contest, rr = get_contest_rr(id, Right.manage_contest)
        format = request.args.get('format', "")
    
        sess = db.get_session()
    
        pions = (sess.query(db.Participation)
                 .filter_by(contest=contest)
                 .options(joinedload(db.Participation.user))
                 .all())
    
        tasks = (sess.query(db.Task)
                 .filter_by(round=contest.round)
                 .order_by(db.Task.code)
                 .all())
    
        pions_subq = (sess.query(db.Participation.user_id)
                      .filter_by(contest=contest)
                      .subquery())
    
        sols = (sess.query(db.Solution)
                .filter(db.Solution.user_id.in_(pions_subq))
                .options(joinedload(db.Solution.last_submit_obj),
                         joinedload(db.Solution.last_feedback_obj))
                .all())
    
        print('XXX pions:', pions)  # FIXME
        print('XXX tasks:', tasks)
        print('XXX sols:', sols)
    
        cols = [ Column(key='name', name='jmeno', title='Jméno') ]
    
        task_sols: Dict[int, Dict[int, db.Solution]] = {}
        for t in tasks:
            cols.append(Column(key=f't-{t.task_id}', name=t.code))
            task_sols[t.task_id] = {}
    
        for s in sols:
            task_sols[s.task_id][s.user_id] = s
    
        rows = []
        for pion in pions:
            user = pion.user
            r = {
                'name': user.full_name(),
            }
            for t in tasks:
                s = task_sols[t.task_id].get(user.user_id, None)
                if s is not None:
                    cell = '*'
                else:
                    cell = ""
                r[f't-{t.task_id}'] = cell
            rows.append(r)
    
        print('XXX cols:', cols)    # FIXME
        print('XXX rows:', rows)
    
        table = Table(
            columns=cols,
            rows=rows,
            filename='reseni',
        )
    
        if format == "":
            return render_template(
                'org_contest_solutions.html',
                contest=contest,
                table=table,
            )
        else:
            return table.send_as(format)
    
    
    @dataclass
    class SolutionContext:
        contest: db.Contest
        round: db.Round
        pion: db.Participation
        task: db.Task
        solution: Optional[db.Solution]
        allow_view: bool
        allow_upload_solutions: bool
        allow_upload_feedback: bool
    
    
    def get_solution_context(contest_id: int, site_id: Optional[int], user_id: int, task_id: int) -> SolutionContext:
        sess = db.get_session()
    
        # Nejprve zjistíme, zda existuje soutěž
        contest = get_contest(contest_id)
        round = contest.round
    
        # Zkontrolujeme, zda se účastník opravdu účastní soutěže
        pion = (sess.query(db.Participation)
                .filter_by(user_id=user_id, contest_id=contest_id)
                .options(joinedload(db.Participation.place))
                .one_or_none())
        if not pion:
            raise werkzeug.exceptions.NotFound()
    
        # A zda soutěží na zadaném soutěžním místě, je-li určeno
        if site_id is not None and site_id != pion.site_id:
            raise werkzeug.exceptions.NotFound()
    
        # Najdeme úlohu a ověříme, že je součástí soutěže
        task = sess.query(db.Task).get(task_id)
        if not task or task.round_id != round:
            raise werkzeug.exceptions.NotFound()
    
        # Najdeme řešení úlohy (nemusí existovat)
        sol = (sess.query(db.Solution)
               .filter_by(user_id=user_id, task_id=task_id)
               .one_or_none())
    
        # Pokud je uvedeno soutěžní místo, hledáme práva k němu, jinak k soutěži
        if site_id is not None:
            site = pion.place
        else:
            site = contest.place
        rr = Rights(g.user)
        rr.get_for_contest_site(contest, site)
    
        # Kdo má právo na jaké operace
        allow_upload_solutions = (rr.have_right(Right.manage_contest)
                                  or (rr.have_right(Right.upload_solutions) and round.state == db.RoundState.running))
        allow_upload_feedback = (rr.have_right(Right.manage_contest)
                                 or (rr.have_right(Right.upload_feedback) and round.state == db.RoundState.grading))
        allow_view = (rr.have_right(Right.manage_contest)
                      or (rr.have_right(Right.upload_solutions) and round.state in (db.RoundState.running, db.RoundState.grading, db.RoundState.closed))
                      or (rr.have_right(Right.upload_feedback) and round.state in (db.RoundState.grading, db.RoundState.closed)))
        if not allow_view:
            raise werkzeug.exceptions.Forbidden()
    
        return SolutionContext(
            contest=contest,
            round=round,
            pion=pion,
            task=task,
            solution=sol,
            allow_view=allow_view,
            allow_upload_solutions=allow_upload_solutions,
            allow_upload_feedback=allow_upload_feedback,
        )
    
    
    @app.route('/org/contest/c/<int:contest_id>/submit/<int:user_id>/<int:task_id>/')
    def org_submit_list(contest_id, user_id, task_id):
        sc = get_solution_context(contest_id, None, user_id, task_id)
    
        # FIXME
    
        return render_template('not_implemented.html')
    
    
    @app.route('/org/contest/c/<int:contest_id>/site/<int:site_id>/submit/<int:user_id>/<int:task_id>/')
    def org_submit_site_list(contest_id, site_id, user_id, task_id):
        sc = get_solution_context(contest_id, site_id, user_id, task_id)
    
        # FIXME
    
        return render_template('not_implemented.html')
    
    
    @app.route('/org/contest/c/<int:id>/proctor-import', methods=('GET', 'POST'))
    def org_proctor_import(id: int):
        contest, rr = get_contest_rr(id, Right.manage_contest)
    
        form = ImportForm()
        errs = []
        if form.validate_on_submit():
            tmp_name = secrets.token_hex(16) + '.csv'
            tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
            form.file.data.save(tmp_path)
    
            imp = mo.imports.Import(g.user)
            if imp.import_proctors(contest.round, tmp_path):
                flash(f'Dozor importován ({imp.cnt_rows} řádků, založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_roles} rolí)', 'success')
                return redirect(url_for('org_contest', id=contest.contest_id))
            else:
                flash('Došlo k chybě při importu (detaily níže)', 'danger')
                errs = imp.errors
    
        return render_template(
            'org_proctor_import.html',
            contest=contest,
            form=form,
            errs=errs,
        )
    
    
    @app.route('/org/contest/import/sablona-dozor.csv')
    def org_proctor_import_template():
        out = mo.imports.proctor_template()
        resp = app.make_response(out)
        resp.content_type = 'text/csv; charset=utf=8'
        return resp
    
    
    @app.route('/doc/import-dozor')
    def org_proctor_import_help():
        return render_template('doc_import_proctor.html')