Skip to content
Snippets Groups Projects
Select Git revision
  • 282392b258cdc6852311d0f3d60aba57fe8a6672
  • master default protected
2 results

tree_successor.h

Blame
  • org_certs.py 19.25 KiB
    # Web: Certifikáty
    
    from flask import render_template, g, redirect, url_for, flash
    from flask.helpers import send_file
    from flask_wtf import FlaskForm
    import flask_wtf.file
    import locale
    from markupsafe import Markup
    import os
    import pikepdf
    from pikepdf.models.metadata import encode_pdf_date
    from sqlalchemy import func, select, and_
    from sqlalchemy.orm import joinedload
    from tempfile import NamedTemporaryFile
    from typing import Tuple, Optional, Dict
    import werkzeug.exceptions
    import wtforms
    from wtforms import validators
    
    import mo
    import mo.config as config
    import mo.db as db
    import mo.email
    from mo.jobs.certs import schedule_create_certs, DesignParams, BackgroundType
    from mo.rights import Right
    import mo.submit
    import mo.util
    from mo.util import merge_objects
    from mo.web import app
    from mo.web.org_contest import get_context
    import mo.web.fields as mo_fields
    from mo.web.user import send_certificate
    
    
    def validate_background(form: 'CertSetForm', field: wtforms.Field) -> None:
        if not field.data:
            return
    
        file = field.data.stream
        submitter = mo.submit.Submitter()
    
        try:
            submitter.check_certificate_background(file.name)
        except mo.submit.SubmitException as e:
            raise wtforms.ValidationError(str(e))
    
    
    class SpaceField(wtforms.FloatField):
    
        def __init__(self, label: str, **kwargs):
            super().__init__(label,
                             validators=[validators.InputRequired(),
                                         validators.NumberRange(-10, 200, "Hodnota musí být mezi %(min)s a %(max)s")],
                             **kwargs)
    
    default_design = DesignParams()
    
    
    class CertSetForm(FlaskForm):
        signer1_name = mo_fields.String("1. podepisující: jméno")
        signer1_title = mo_fields.String("1. podepisující: funkce", render_kw={'placeholder': 'např. Předseda okresní komise MO'})
        signer2_name = mo_fields.String("2. podepisující: jméno")
        signer2_title = mo_fields.String("2. podepisující: funkce", render_kw={'placeholder': 'např. Ředitel školy'})
        issue_place = mo_fields.String("Místo vydání")
        issue_date = mo_fields.String("Datum vydání", render_kw={'placeholder': 'např. 3. března 2025'})
        background_type = wtforms.SelectField(choices=BackgroundType.choices(), coerce=BackgroundType.coerce)
        upload_background = flask_wtf.file.FileField(validators=[validate_background],
                                                     render_kw={'accept': 'application/pdf'},
                                                     description="Zde můžete nahrát obrázek ve formátu PDF, který se použije jako pozadí diplomu.")
        space1 = SpaceField("Mezera 1", description=f"Mezera mezi horním okrajem a hlavičkou (default: {default_design.space1}).")
        space2 = SpaceField("Mezera 2", description=f"Mezera mezi hlavičkou a jménem (default: {default_design.space2}).")
        space3 = SpaceField("Mezera 3", description=f"Mezera mezi jménem a umístěním (default: {default_design.space3}).")
        space4 = SpaceField("Mezera 4", description=f"Mezera mezi umístěním a podpisy (default: {default_design.space4}).")
        space5 = SpaceField("Mezera 5", description=f"Mezera mezi podpisy a patičkou (default: {default_design.space5}).")
        space6 = SpaceField("Mezera 6", description=f"Mezera mezi patičkou a dolním okrajem (default: {default_design.space6}).")
        logo_visible = wtforms.BooleanField("Logo MO a JČMF v patičce")
        tex_hacks = mo_fields.String("Nastavení sazby")  # description se nastavuje později
        generate = wtforms.SubmitField("Vytvořit diplomy")
        save = wtforms.SubmitField("Pouze uložit nastavení")
        delete = wtforms.SubmitField("Smazat diplomy")
    
        def osmo_validate(self, cset) -> bool:
            if (delete := getattr(self, 'delete')) is not None and delete.data:
                return True
    
            ok = True
    
            if self.background_type.data == BackgroundType.custom and self.upload_background.data is None and not cset.background_file:
                self.upload_background.errors.append('Nahrajte obrázek na pozadí.')     # FIXME: typing
                ok = False
    
            if all(getattr(self, key).data >= 0 for key in DesignParams.SPACE_PARAMS):
                self.space1.errors.append('Alespoň jedna z mezer musí být pružná.')     # FIXME: typing
                ok = False
    
            return ok
    
    
    class CertApproveForm(FlaskForm):
        ctype = wtforms.HiddenField()
        approve = wtforms.SubmitField("Schválit")
        unapprove = wtforms.SubmitField("Zrušit schválení")
    
    
    @app.route('/org/contest/c/<int:ct_id>/certificates', methods=('GET', 'POST'))
    def org_certificates(ct_id: int):
        ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants)
        assert ctx.contest and ctx.master_contest
    
        group_rounds = ctx.round.get_group_rounds(True)
        group_rounds.sort(key=lambda r: r.round_code())
    
        contest = ctx.master_contest
        ct_id = contest.contest_id
        can_change = ctx.rights.have_right(Right.manage_contest) and ctx.master_round.round_type != db.RoundType.other
    
        sess = db.get_session()
        cset = (sess.query(db.CertSet)
                .with_for_update()
                .filter_by(contest=contest)
                .one_or_none())
        if cset is None:
            new_cset = True
            cset = db.CertSet(
                contest_id=ct_id,
            )
            dparams = DesignParams(issue_date=mo.now.strftime('%e. %B %Y').strip())
        else:
            new_cset = False
            dparams = DesignParams.from_json(cset.design_params)
    
        if can_change:
            form = CertSetForm(obj=merge_objects(cset, dparams))
            form.tex_hacks.description = Markup("Speciální nastavení sazby diplomů (viz <a href='" + url_for('doc_admin') + "'>návod</a>)")
            if not ctx.rights.have_right(Right.edit_tex_hacks):
                del form.tex_hacks
            if new_cset:
                del form.delete
            approve_form = CertApproveForm()
        else:
            form = None
            approve_form = None
    
        if approve_form and approve_form.validate_on_submit() and (approve_form.approve.data or approve_form.unapprove.data):
            try:
                ctype = db.CertType.coerce(approve_form.ctype.data)
            except ValueError:
                raise werkzeug.exceptions.UnprocessableEntity()
            cfile = sess.query(db.CertFile).filter_by(cert_set_id=ct_id, type=ctype).one_or_none()
            if cfile:
                if approve_form.approve.data:
                    cfile.approved = True
                    mo.util.log(
                        type=db.LogType.cert_set,
                        what=contest.contest_id,
                        details={'action': 'approve', 'reason': 'web'},
                    )
                    app.logger.info(f'Schváleny diplomy pro soutěž #{contest.contest_id}')
                    flash(f'Diplomy ({ctype.friendly_name()}) schváleny.', 'success')
                elif approve_form.unapprove.data:
                    cfile.approved = False
                    mo.util.log(
                        type=db.LogType.cert_set,
                        what=contest.contest_id,
                        details={'action': 'unapprove', 'reason': 'web'},
                    )
                    app.logger.info(f'Zrušeno schválení diplomů pro soutěž #{contest.contest_id}')
                    flash(f'Odvoláno schválení diplomů ({ctype.friendly_name()}).', 'success')
                sess.commit()
                return redirect(ctx.url_for('org_certificates'))
            else:
                flash('Tento typ diplomů nebyl vytvořen.', 'danger')
    
        elif form:
            if not form.is_submitted():
                pass
            elif not form.validate() or not form.osmo_validate(cset):
                flash('V nastavení diplomů byly nalezeny chyby.', 'danger')
            elif not new_cset and form.delete.data:
                sess.delete(cset)
                mo.util.log(
                    type=db.LogType.cert_set,
                    what=contest.contest_id,
                    details={'action': 'delete', 'cert_set': db.row2dict(cset), 'reason': 'web'},
                )
                sess.commit()
                app.logger.info(f'Smazány diplomy pro soutěž #{contest.contest_id}')
                flash('Diplomy smazány.', 'success')
                return redirect(ctx.url_for('org_certificates'))
            elif form.generate.data or form.save.data:
                form.populate_obj(cset)
                form.populate_obj(dparams)
                cset.design_params = dparams.to_json()
                if new_cset:
                    # To je potřeba udělat už teď, protože cset.dir_path() níže potřebuje funkční relationships
                    sess.add(cset)
                    sess.flush()
                if dparams.background_type == BackgroundType.custom:
                    if form.upload_background.data:
                        old_background = cset.background_file
                        out_dir = cset.dir_path()
                        cset.background_file = mo.util.link_to_dir(form.upload_background.data.stream.name,
                                                                   out_dir,
                                                                   base_dir=mo.util.data_dir('certs'),
                                                                   prefix='background-',
                                                                   suffix='.pdf',
                                                                   make_dirs=True)
                        app.logger.info(f'Nahráno pozadí diplomů {cset.background_file}')
                    else:
                        old_background = None
                else:
                    old_background = cset.background_file
                    cset.background_file = None
                changes = None
                if new_cset:
                    mo.util.log(
                        type=db.LogType.cert_set,
                        what=contest.contest_id,
                        details={'action': 'new', 'cert_set': db.row2dict(cset), 'reason': 'web'},
                    )
                    app.logger.info(f'Založeny diplomy pro soutěž #{contest.contest_id}')
                    sess.commit()
                elif sess.is_modified(cset):
                    cset.changed_at = mo.now
                    changes = db.get_object_changes(cset)
                    mo.util.log(
                        type=db.LogType.cert_set,
                        what=contest.contest_id,
                        details={'action': 'edit', 'changes': changes, 'reason': 'web'},
                    )
                    app.logger.info(f'Upraveno nastavení diplomů pro soutěž #{contest.contest_id}')
                    sess.commit()
                    if old_background:
                        mo.util.unlink_if_exists(os.path.join(mo.util.data_dir('certs'), old_background))
                if form.generate.data:
                    if cset.job is not None and cset.job.is_active():
                        flash('Počkejte, až doběhne předchozí dávka na tvorbu diplomů.', 'danger')
                    else:
                        cset.job_id = schedule_create_certs(contest, g.user)
                        sess.commit()
                        return redirect(url_for('org_job_wait', id=cset.job_id, back=ctx.url_for('org_certificates')))
                else:
                    if changes is not None:
                        flash('Nastavení uloženo.', 'success')
                    else:
                        flash('Žádné změny k uložení.', 'info')
                    return redirect(ctx.url_for('org_certificates'))
    
        pions = sess.query(db.Participation).filter_by(contest_id=ct_id).options(joinedload(db.Participation.user)).all()
        pions.sort(key=lambda p: p.user.sort_key())
        users_pions = [(p.user, p) for p in pions]
    
        cert_files_by_type: Dict[db.CertType, Optional[db.CertFile]] = {}
        for cf in sess.query(db.CertFile).filter_by(cert_set_id=ct_id).all():
            cert_files_by_type[cf.type] = cf
        cert_file_columns = [(t, cert_files_by_type.get(t)) for t in db.CertType]
    
        certs_by_uid_type: Dict[Tuple[int, db.CertType], db.Certificate] = {}
        for c in sess.query(db.Certificate).filter_by(cert_set_id=ct_id).all():
            certs_by_uid_type[c.user_id, c.type] = c
    
        return render_template(
            'org_certificates.html',
            ctx=ctx,
            group_rounds=group_rounds,
            form=form,
            approve_form=approve_form,
            cset=cset,
            users_pions=users_pions,
            cert_file_columns=cert_file_columns,
            certs_by_uid_type=certs_by_uid_type,
            settings_changed=(cset.changed_at is not None and (cset.certs_issued_at is None or cset.certs_issued_at < cset.changed_at)),
            scoretable_changed=(cset.scoretable != contest.scoretable),
            form_has_errors=form and form.is_submitted(),
        )
    
    
    @app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/all/<filename>')
    @app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/<int:user_id>/<filename>')
    def org_cert_file(ct_id: int, cert_type: str, filename: str, user_id: Optional[int] = None):
        ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants)
        assert ctx.contest and ctx.master_contest
    
        return send_certificate(ct_id, cert_type, filename, user_id)
    
    
    @app.route('/org/contest/c/<int:ct_id>/certificates/standard-bg.pdf')
    def org_certificates_standard_bg(ct_id: int):
        ctx = get_context(ct_id=ct_id)
        bg = BackgroundType.standard.find_default_background(ctx.round)
        if bg is not None:
            return send_file(bg, mimetype='application/pdf')
        else:
            raise werkzeug.exceptions.NotFound()
    
    
    @app.route('/org/contest/school-results/<int:school_id>/c/<int:ct_id>/certificates/<cert_type>/<filename>')
    def org_school_results_certs(school_id: int, ct_id: int, cert_type: str, filename: str):
        sess = db.get_session()
        place = sess.query(db.Place).filter_by(place_id=school_id).one_or_none()
        if place is None:
            raise werkzeug.exceptions.NotFound()
    
        if not g.gatekeeper.rights_for(place=place).have_right(Right.view_school_contestants):
            raise werkzeug.exceptions.Forbidden()
    
        contest = sess.query(db.Contest).options(joinedload(db.Contest.round)).get(ct_id)
        if contest is None or contest.round.year != config.CURRENT_YEAR:
            raise werkzeug.exceptions.NotFound()
    
        if contest.state != db.RoundState.closed:
            raise werkzeug.exceptions.Forbidden()
    
        try:
            ctype: db.CertType = db.CertType.coerce(cert_type)
        except ValueError:
            raise werkzeug.exceptions.NotFound()
    
        if filename != ctype.file_name(plural=True) + '.pdf':
            raise werkzeug.exceptions.NotFound()
    
        cfile = sess.query(db.CertFile).filter_by(cert_set_id=ct_id, type=ctype, approved=True).one_or_none()
        if cfile is None:
            raise werkzeug.exceptions.NotFound()
    
        users_and_certs = (sess.query(db.User, db.Certificate)
                           .select_from(db.Certificate)
                           .join(db.Participant, and_(db.Participant.user_id == db.Certificate.user_id,
                                                      db.Participant.year == config.CURRENT_YEAR,
                                                      db.Participant.school == school_id))
                           .join(db.Certificate.user)
                           .filter(db.Certificate.cert_set_id == ct_id,
                                   db.Certificate.type == ctype)
                           .all())
        users_and_certs.sort(key=lambda x: x[0].sort_key())
    
        try:
            file = os.path.join(mo.util.data_dir('certs'), cfile.pdf_file)
            if not os.path.isfile(file):
                app.logger.error(f'Certifikát {file} je v DB, ale soubor neexistuje')
                raise werkzeug.exceptions.NotFound()
            with pikepdf.open(file, attempt_recovery=False) as src:
                with pikepdf.new() as dst:
                    for _, cert in users_and_certs:
                        dst.pages.append(src.pages[cert.page_number - 1])
                    dst.docinfo['/Title'] = f'Matematická Olympiáda – {ctype.friendly_name(plural=True)}'
                    dst.docinfo['/Creator'] = 'Odevzdávací Systém Matematické Olympiády'
                    dst.docinfo['/CreationDate'] = encode_pdf_date(mo.now.astimezone())
                    tmp_file = NamedTemporaryFile(dir=mo.util.data_dir('tmp'), prefix='cert-')
                    dst.save(tmp_file.name)
        except pikepdf.PdfError as e:
            app.logger.error(f'Chyba při zpracování PDF certifikátů: {e}')
            raise werkzeug.exceptions.InternalServerError()
    
        return send_file(open(tmp_file.name, 'rb'), mimetype='application/pdf')
    
    
    @app.route('/org/contest/school-results/<int:school_id>/')
    def org_school_results(school_id: int):
        sess = db.get_session()
        place = sess.query(db.Place).filter_by(place_id=school_id).one_or_none()
        if place is None:
            raise werkzeug.exceptions.NotFound()
    
        # Úmyslně nekontrolujeme práva ke kategorii
        if not g.gatekeeper.rights_for(place=place).have_right(Right.view_school_contestants):
            raise werkzeug.exceptions.Forbidden()
    
        pants_subq = (sess.query(db.Participant.user_id)
                      .filter_by(year=config.CURRENT_YEAR,
                                 school=school_id)
                      .subquery())
    
        contest_counts = (sess.query(db.Contest.contest_id, func.count(db.Participation.user_id))
                          .select_from(db.Contest)
                          .join(db.Contest.round)
                          .join(db.Contest.participations)
                          .filter(db.Contest.contest_id == db.Contest.master_contest_id,
                                  db.Round.year == config.CURRENT_YEAR,
                                  db.Participation.user_id.in_(select(pants_subq)),
                                  db.Participation.state == db.PartState.active)
                          .group_by(db.Contest.contest_id)
                          .all())
        contest_ids = [cid for cid, _ in contest_counts]
        num_pants_by_cid = {cid: cnt for cid, cnt in contest_counts}
    
        contests = (sess.query(db.Contest)
                    .filter(db.Contest.contest_id.in_(contest_ids))
                    .options(joinedload(db.Contest.round),
                             joinedload(db.Contest.place))
                    .all())
        contests.sort(key=lambda c: (c.round.category, c.round.seq, locale.strxfrm(c.place.name)))    # part nepotřebujeme, vše jsou hlavní soutěže
    
        cert_counts = (sess.query(db.CertFile.cert_set_id, db.CertFile.type, func.count(db.Certificate.user_id))
                       .select_from(db.Certificate)
                       .join(db.Certificate.cert_file)
                       .filter(db.Certificate.cert_set_id.in_(contest_ids))
                       .filter(db.Certificate.user_id.in_(select(pants_subq)))
                       .filter(db.CertFile.approved)
                       .group_by(db.CertFile.cert_set_id, db.CertFile.type)
                       .all())
        cert_counts_by_cid_type = {(cid, ctype): cnt for cid, ctype, cnt in cert_counts}
        contests_with_cert = {cid for cid, ctype, cnt in cert_counts}
    
        return render_template(
            'org_school_results.html',
            place=place,
            contests=contests,
            num_pants_by_cid=num_pants_by_cid,
            cert_counts_by_cid_type=cert_counts_by_cid_type,
            contests_with_cert=contests_with_cert,
        )
    
    
    # URL je zadrátované do mo.email.send_grading_info_email
    @app.route('/org/contest/school-results/')
    def org_school_results_all():
        school_place_ids = set(ur.place_id for ur in g.gatekeeper.roles if ur.role == db.RoleType.garant_skola)
        if len(school_place_ids) == 1:
            return redirect(url_for('org_school_results', school_id=list(school_place_ids)[0]))
    
        sess = db.get_session()
        schools = sess.query(db.Place).filter(db.Place.place_id.in_(school_place_ids)).all()
        schools.sort(key=lambda p: locale.strxfrm(p.name))
    
        return render_template('org_school_results_all.html', schools=schools)