Skip to content
Snippets Groups Projects
Select Git revision
  • 56c37de1933cad73123d0ce0228ddd5199a6594a
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

setup.py

Blame
  • org_contest.py 15.80 KiB
    from flask import render_template, g, redirect, url_for, flash, request
    from flask_wtf import FlaskForm
    import flask_wtf.file
    import locale
    import os
    import secrets
    from sqlalchemy.orm import joinedload
    from sqlalchemy.orm.query import Query
    from typing import List, Tuple, Optional, Sequence
    import werkzeug.exceptions
    import wtforms
    
    import mo
    import mo.csv
    import mo.db as db
    import mo.imports
    import mo.rights
    import mo.util
    from mo.web import app
    from mo.web.util import PagerForm
    from mo.web.table import CellCheckbox, Table, Column, cell_place_link, cell_user_link, cell_email_link
    import wtforms.validators as validators
    
    
    class ImportForm(FlaskForm):
        file = flask_wtf.file.FileField("Soubor", validators=[flask_wtf.file.FileRequired()])
        submit = wtforms.SubmitField('Importovat')
    
    
    class ParticipantsFilterForm(PagerForm):
        school = wtforms.StringField("Škola")
        participation_place = wtforms.StringField("Soutěžní místo")
        contest_place = wtforms.StringField("Soutěžní oblast")
        participation_state = wtforms.SelectField('Stav účasti', choices=[('*', '*')] + list(db.PartState.choices()), default='*')
    
        # format = wtforms.RadioField(choices=[('', 'Zobrazit'), ('csv', 'Stáhnout vše v CSV'), ('tsv', 'Stáhnout vše v TSV')])
        submit = wtforms.SubmitField("Zobrazit")
        download_csv = wtforms.SubmitField("↓ CSV")
        download_tsv = wtforms.SubmitField("↓ TSV")
    
    
    class ParticipantsActionForm(FlaskForm):
        action_on = wtforms.RadioField(
            "Provést akci na", validators=[validators.DataRequired()],
            choices=[('all', 'všech vyfiltrovaných účastnících'), ('checked', 'označených účastnících')]
            # checkboxes are handled not through FlaskForm, see below
        )
    
        participation_state = wtforms.SelectField('Stav účasti', choices=list(db.PartState.choices()))
        set_participation_state = wtforms.SubmitField("Nastavit stav účasti")
    
        participation_place = wtforms.StringField(
            'Soutěžní místo', description='Zadejte kód nebo ID ve tvaru <code>#123</code>'
        )
        set_participation_place = wtforms.SubmitField("Nastavit soutěžní místo")
    
        contest_place = wtforms.StringField(
            'Soutěžní oblast',
            description='Musí existovat soutěž v dané oblasti pro stejné kolo. Oblast zadejte pomocí kódu nebo ID ve tvaru <code>#123</code>.'
        )
        set_contest = wtforms.SubmitField("Přesunout do jiné soutěžní oblasti")
    
        remove_participation = wtforms.SubmitField("Smazat záznam o účasti")
    
        def do_action(self, round: db.Round, rights: mo.rights.Rights, query: Query) -> bool:
            """Do participation modification on partipations from given query
            (possibly filtered by checkboxes). `rights` param is used to check rights
            for contest of each modified participation or for contest in which
            participation is moved to."""
    
            if not self.validate_on_submit():
                return False
    
            sess = db.get_session()
    
            # Check that operation is valid
            if self.set_participation_state.data:
                pass
            elif self.set_participation_place.data:
                participation_place = db.get_place_by_code(self.participation_place.data)
                if not participation_place:
                    flash('Nenalezeno místo s daným kódem', 'danger')
                    return False
            elif self.set_contest.data:
                contest_place = db.get_place_by_code(self.contest_place.data)
                if not contest_place:
                    flash("Nepovedlo se najít zadanou soutěžní oblast", 'danger')
                    return False
                contest = sess.query(db.Contest).filter_by(round_id=round.round_id, place_id=contest_place.place_id).one_or_none()
                if not contest:
                    flash(f"Nepovedlo se najít soutěž v kole {round.round_code()} v oblasti {contest_place.name}", 'danger')
                    return False
                rights.get_for_contest(contest)
                if not rights.have_right(mo.rights.Right.manage_contest):
                    flash(f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {contest_place.name}, nelze do ní přesunout účastníky", 'danger')
                    return False
            elif self.remove_participation.data:
                pass
            else:
                flash('Neznámá operace', 'danger')
                return False
    
            try:
                user_ids = list(map(int, request.form.getlist('checked')))
            except ValueError:
                flash('Data v checkboxech nelze převést na čísla, kontaktujte správce', 'danger')
                return False
    
            # Check all participations if we can edit them
            ctants = query.all()
            rights_cache = set()
            for pion, _, _ in ctants:
                u = pion.user
                if self.action_on.data == 'checked' and u.user_id not in user_ids:
                    continue
                if pion.contest_id in rights_cache:
                    continue
                rights.get_for_contest(pion.contest)
                if rights.have_right(mo.rights.Right.manage_contest):
                    rights_cache.add(pion.contest_id)
                    continue
                flash(
                    f"Nemáte právo ke správě soutěže v kole {round.round_code()} v oblasti {pion.contest.place.name} "
                    + f"(účastník {u.first_name} {u.last_name}). Žádná akce nebyla provedena.", 'danger'
                )
                return False
    
            count = 0
            for pion, _, _ in ctants:
                u = pion.user
                if self.action_on.data == 'checked' and u.user_id not in user_ids:
                    continue
    
                if self.remove_participation.data:
                    sess.delete(pion)
                    app.logger.info(f"Participation of user {u.user_id} in contest {pion.contest} removed")
                    mo.util.log(
                        type=db.LogType.participant,
                        what=u.user_id,
                        details={'action': 'participation-removed', 'participation': db.row2dict(pion)},
                    )
                else:
                    if self.set_participation_state.data:
                        pion.state = self.participation_state.data
                    elif self.set_participation_place.data:
                        pion.place = participation_place
                    elif self.set_contest.data:
                        pion.contest = contest
    
                    changes = db.get_object_changes(pion)
                    app.logger.info(f"Participation of user {u.user_id} modified, changes: {changes}")
                    mo.util.log(
                        type=db.LogType.participant,
                        what=u.user_id,
                        details={'action': 'participation-changed', 'changes': changes},
                    )
                    sess.flush()
                count += 1
    
            sess.commit()
            if count == 0:
                flash('Žádní vybraní účastníci', 'warning')
            elif self.set_participation_state.data:
                flash(f'Nastaven stav {db.part_state_names[self.participation_state.data]} {count} účastníkům', 'success')
            elif self.set_participation_place.data:
                flash(f'Nastaveno soutěžní místo {participation_place.name} {count} účastníkům', 'success')
            elif self.set_contest.data:
                flash(f'{count} účastníků přesunuto do soutěže v oblasti {contest_place.name}', 'success')
            elif self.remove_participation.data:
                flash(f'Odstraněno {count} účastníků z této soutěže', 'success')
    
            return True
    
    
    def get_contest(id: int) -> db.Contest:
        contest = (db.get_session().query(db.Contest)
                   .options(joinedload(db.Contest.place),
                            joinedload(db.Contest.round))
                   .get(id))
        if not contest:
            raise werkzeug.exceptions.NotFound()
        return contest
    
    
    def get_contest_rr(id: int, right_needed: Optional[mo.rights.Right]) -> Tuple[db.Contest, mo.rights.Rights]:
        contest = get_contest(id)
    
        rr = mo.rights.Rights(g.user)
        rr.get_for_contest(contest)
    
        if not (right_needed is None or rr.have_right(right_needed)):
            raise werkzeug.exceptions.Forbidden()
    
        return contest, rr
    
    
    @app.route('/org/contest/c/<int:id>')
    def org_contest(id: int):
        contest, rr = get_contest_rr(id, None)
    
        return render_template(
            'org_contest.html',
            contest=contest,
            rights=sorted(rr.current_rights, key=lambda r: r. name),
            can_manage=rr.have_right(mo.rights.Right.manage_contest),
        )
    
    
    @app.route('/org/contest/c/<int:id>/import', methods=('GET', 'POST'))
    def org_contest_import(id: int):
        contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
    
        form = ImportForm()
        errs = []
        if form.validate_on_submit():
            tmp_name = secrets.token_hex(16) + '.csv'
            tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
            form.file.data.save(tmp_path)
    
            imp = mo.imports.Import(g.user)
            if imp.import_contest(contest.round, contest, tmp_path):
                flash(f'Účastníci importováni (založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_participations} účastí)', 'success')
                return redirect(url_for('org_contest', id=contest.contest_id))
            else:
                flash('Došlo k chybě při importu (detaily níže)', 'danger')
                errs = imp.errors
    
        return render_template(
            'org_contest_import.html',
            contest=contest,
            form=form,
            errs=errs,
        )
    
    
    @app.route('/doc/import_contest')
    def doc_import_contest():
        return render_template('doc_import_contest.html')
    
    
    @app.route('/org/contest/import/sablona.csv')
    def org_contest_import_template():
        out = mo.imports.contest_template()
        resp = app.make_response(out)
        resp.content_type = 'text/csv; charset=utf=8'
        return resp
    
    
    @app.route('/org/contest/c/<int:id>/ucastnici', methods=('GET', 'POST'))
    def org_contest_list(id: int):
        contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
        format = request.args.get('format', "")
    
        filter = ParticipantsFilterForm(request.args)
        filter.validate()
        query = get_contestants_query(
            round=contest.round, contest=contest,
            school=db.get_place_by_code(filter.school.data),
            # contest_place=db.get_place_by_code(filter.contest_place.data),
            participation_place=db.get_place_by_code(filter.participation_place.data),
            participation_state=None if filter.participation_state.data == '*' else filter.participation_state.data
        )
    
        action_form = ParticipantsActionForm()
        if action_form.do_action(round=contest.round, rights=rr, query=query):
            # Action happened, redirect
            return redirect(request.url)
    
        # (count, query) = filter.apply_limits(query, pagesize=50)
        count = query.count()
    
        if format == "":
            table = make_contestant_table(query, add_checkbox=True)
            return render_template(
                'org_contest_list.html',
                contest=contest,
                table=table,
                filter=filter, count=count, action_form=action_form,
            )
        else:
            table = make_contestant_table(query)
            return table.send_as(format)
    
    
    contest_list_columns = (
        Column(key='first_name',    name='krestni',     title='Křestní jméno'),
        Column(key='last_name',     name='prijmeni',    title='Příjmení'),
        Column(key='email',         name='email',       title='E-mail'),
        Column(key='school',        name='skola',       title='Škola'),
        Column(key='school_code',   name='kod_skoly',   title='Kód školy'),
        Column(key='grade',         name='rocnik',      title='Ročník'),
        Column(key='born_year',     name='rok_naroz',   title='Rok naroz.'),
        Column(key='place_code',    name='kod_soutez_mista',  title='Sout. místo'),
        Column(key='status',        name='stav',        title='Stav'),
    )
    
    
    def get_contestants_query(
            round: db.Round, contest: Optional[db.Contest] = None,
            contest_place: Optional[db.Place] = None,
            participation_place: Optional[db.Place] = None,
            participation_state: Optional[db.PartState] = None,
            school: Optional[db.Place] = None) -> Query:
        query = (db.get_session()
                 .query(db.Participation, db.Participant, db.Contest)
                 .select_from(db.Participation)
                 .join(db.Participant, db.Participant.user_id == db.Participation.user_id)
                 .filter(db.Participant.year == round.year))
        if contest:
            query = query.join(db.Contest, db.Contest.contest_id == contest.contest_id)
        else:
            query = query.filter(db.Contest.round == round)
            query = query.options(joinedload(db.Contest.place))
        query = query.filter(db.Participation.contest_id == db.Contest.contest_id)
        if contest_place:
            query = query.filter(db.Contest.place_id == contest_place.place_id)
        if participation_place:
            query = query.filter(db.Participation.place_id == participation_place.place_id)
        if school:
            query = query.filter(db.Participant.school == school.place_id)
        if participation_state:
            query = query.filter(db.Participation.state == participation_state)
        query = query.options(joinedload(db.Participation.user),
                              joinedload(db.Participation.place),
                              joinedload(db.Participant.school_place))
    
        return query
    
    
    def make_contestant_table(query: Query, add_checkbox: bool = False, add_contest_column: bool = False):
        ctants = query.all()
    
        rows: List[dict] = []
        for pion, pant, ct in ctants:
            u = pion.user
            rows.append({
                'sort_key': (locale.strxfrm(u.last_name), locale.strxfrm(u.first_name), u.user_id),
                'first_name': cell_user_link(u, u.first_name),
                'last_name': cell_user_link(u, u.last_name),
                'email': cell_email_link(u),
                'school': pant.school_place.name,
                'school_code': cell_place_link(pant.school_place, pant.school_place.get_code()),
                'grade': pant.grade,
                'born_year': pant.birth_year,
                'region_code': cell_place_link(ct.place, ct.place.get_code()),
                'place_code': cell_place_link(pion.place, pion.place.get_code()),
                'status': pion.state.friendly_name(),
                'checkbox': CellCheckbox('checked', u.user_id, False),
            })
    
        rows.sort(key=lambda r: r['sort_key'])
    
        cols: Sequence[Column] = contest_list_columns
        if add_checkbox:
            cols = [Column(key='checkbox', name=' ', title=' ')] + list(cols)
        if add_contest_column:
            cols = list(cols) + [Column(key='region_code', name='kod_oblasti', title='Oblast')]
    
        return Table(
            columns=cols,
            rows=rows,
            filename='ucastnici',
            show_downlink=False,  # downlinks are in filter
        )
    
    
    @app.route('/org/contest/c/<int:id>/proctor-import', methods=('GET', 'POST'))
    def org_proctor_import(id: int):
        contest, rr = get_contest_rr(id, mo.rights.Right.manage_contest)
    
        form = ImportForm()
        errs = []
        if form.validate_on_submit():
            tmp_name = secrets.token_hex(16) + '.csv'
            tmp_path = os.path.join(app.instance_path, 'imports', tmp_name)
            form.file.data.save(tmp_path)
    
            imp = mo.imports.Import(g.user)
            if imp.import_proctors(contest.round, tmp_path):
                flash(f'Dozor importován (založeno {imp.cnt_new_users} uživatelů, {imp.cnt_new_roles} rolí)', 'success')
                return redirect(url_for('org_contest', id=contest.contest_id))
            else:
                flash('Došlo k chybě při importu (detaily níže)', 'danger')
                errs = imp.errors
    
        return render_template(
            'org_proctor_import.html',
            contest=contest,
            form=form,
            errs=errs,
        )
    
    
    @app.route('/org/contest/import/sablona-dozor.csv')
    def org_proctor_import_template():
        out = mo.imports.proctor_template()
        resp = app.make_response(out)
        resp.content_type = 'text/csv; charset=utf=8'
        return resp
    
    
    @app.route('/doc/import-dozor')
    def org_proctor_import_help():
        return render_template('doc_import_proctor.html')