Skip to content
Snippets Groups Projects
Select Git revision
  • e5df1a02ac3a460052e0bfcca49e19e1d25dd78f
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

org_score.py

Blame
  • org_certs.py 12.61 KiB
    # Web: Certifikáty
    
    from datetime import datetime
    from flask import render_template, g, redirect, url_for
    from flask.helpers import send_file, flash
    from flask_wtf import FlaskForm
    import flask_wtf.file
    from markupsafe import Markup
    import os
    import pikepdf
    from pikepdf.models.metadata import encode_pdf_date
    from sqlalchemy.orm import joinedload
    from tempfile import NamedTemporaryFile
    from typing import Tuple, Optional, Dict
    import werkzeug.exceptions
    import wtforms
    from wtforms import validators
    
    import mo
    import mo.db as db
    import mo.email
    from mo.jobs.certs import schedule_create_certs, DesignParams, BackgroundType
    from mo.rights import Right
    import mo.submit
    import mo.util
    from mo.util import logger, merge_objects
    from mo.web import app
    from mo.web.org_contest import get_context
    import mo.web.fields as mo_fields
    
    
    # Využívá se i z účastnické části webu
    def send_certificate(ct_id: int, cert_type: str, user_filename: str, user_id: Optional[int] = None):
        try:
            typ = db.CertType.coerce(cert_type)
        except ValueError:
            raise werkzeug.exceptions.NotFound()
    
        if user_filename != typ.file_name() + '.pdf':
            raise werkzeug.exceptions.NotFound()
    
        sess = db.get_session()
        cfile = sess.query(db.CertFile).get((ct_id, typ))
        if cfile is None:
            raise werkzeug.exceptions.NotFound()
    
        file = os.path.join(mo.util.data_dir('certs'), cfile.pdf_file)
        try:
            stat = os.stat(file)
        except FileNotFoundError:
            logger.error(f'Certifikát {file} je v DB, ale soubor neexistuje')
            raise werkzeug.exceptions.NotFound()
    
        if user_id is None:
            return send_file(file, mimetype='application/pdf')
    
        cert = sess.query(db.Certificate).get((ct_id, user_id, typ))
        if cert is None:
            raise werkzeug.exceptions.NotFound()
    
        try:
            with pikepdf.open(file, attempt_recovery=False) as src:
                dst = pikepdf.new()
                dst.pages.append(src.pages[cert.page_number - 1])
                dst.docinfo['/Title'] = f'Matematická Olympiáda – {typ.friendly_name()}'
                dst.docinfo['/Creator'] = 'Odevzdávací Systém Matematické Olympiády'
                dst.docinfo['/CreationDate'] = encode_pdf_date(datetime.fromtimestamp(stat.st_mtime).astimezone())
                tmp_file = NamedTemporaryFile(dir=mo.util.data_dir('tmp'), prefix='cert-')
                dst.save(tmp_file.name)
        except pikepdf.PdfError as e:
            logger.error(f'Chyba při zpracování certifikátu {file}: {e}')
            raise werkzeug.exceptions.InternalServerError()
    
        return send_file(open(tmp_file.name, 'rb'), mimetype='application/pdf')
    
    
    def validate_background(form: 'CertSetForm', field: wtforms.Field) -> None:
        if not field.data:
            return
    
        file = field.data.stream
        submitter = mo.submit.Submitter()
    
        try:
            submitter.check_certificate_background(file.name)
        except mo.submit.SubmitException as e:
            raise wtforms.ValidationError(str(e))
    
    
    class SpaceField(wtforms.FloatField):
    
        def __init__(self, label: str, **kwargs):
            super().__init__(label,
                             validators=[validators.InputRequired(),
                                         validators.NumberRange(-10, 200, "Hodnota musí být mezi %(min)s a %(max)s")],
                             **kwargs)
    
    default_design = DesignParams()
    
    
    class CertSetForm(FlaskForm):
        signer1_name = mo_fields.String("1. podepisující: jméno")
        signer1_title = mo_fields.String("1. podepisující: funkce", render_kw={'placeholder': 'např. Předseda okresní komise MO'})
        signer2_name = mo_fields.String("2. podepisující: jméno")
        signer2_title = mo_fields.String("2. podepisující: funkce", render_kw={'placeholder': 'např. Ředitel školy'})
        issue_place = mo_fields.String("Místo vydání")
        issue_date = mo_fields.String("Datum vydání", render_kw={'placeholder': 'např. 3. března 2025'})
        background_type = wtforms.SelectField(choices=BackgroundType.choices(), coerce=BackgroundType.coerce)
        upload_background = flask_wtf.file.FileField(validators=[validate_background],
                                                     render_kw={'accept': 'application/pdf'},
                                                     description="Zde můžete nahrát obrázek ve formátu PDF, který se použije jako pozadí diplomu.")
        space1 = SpaceField("Mezera 1", description=f"Mezera mezi horním okrajem a hlavičkou (default: {default_design.space1}).")
        space2 = SpaceField("Mezera 2", description=f"Mezera mezi hlavičkou a jménem (default: {default_design.space2}).")
        space3 = SpaceField("Mezera 3", description=f"Mezera mezi jménem a umístěním (default: {default_design.space3}).")
        space4 = SpaceField("Mezera 4", description=f"Mezera mezi umístěním a podpisy (default: {default_design.space4}).")
        space5 = SpaceField("Mezera 5", description=f"Mezera mezi podpisy a patičkou (default: {default_design.space5}).")
        space6 = SpaceField("Mezera 6", description=f"Mezera mezi patičkou a dolním okrajem (default: {default_design.space6}).")
        logo_visible = wtforms.BooleanField("Logo MO a JČMF v patičce")
        tex_hacks = mo_fields.String("Nastavení sazby")  # description se nastavuje později
        generate = wtforms.SubmitField("Vytvořit diplomy")
        save = wtforms.SubmitField("Pouze uložit nastavení")
        delete = wtforms.SubmitField("Smazat diplomy")
    
        def osmo_validate(self, cset) -> bool:
            ok = True
    
            if self.background_type.data == BackgroundType.custom and self.upload_background.data is None and not cset.background_file:
                self.upload_background.errors.append('Nahrajte obrázek na pozadí.')     # FIXME: typing
                ok = False
    
            if all(getattr(self, key).data >= 0 for key in DesignParams.SPACE_PARAMS):
                self.space1.errors.append('Alespoň jedna z mezer musí být pružná.')     # FIXME: typing
                ok = False
    
            return ok
    
    
    @app.route('/org/contest/c/<int:ct_id>/certificates', methods=('GET', 'POST'))
    def org_certificates(ct_id: int):
        ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants)
        assert ctx.contest and ctx.master_contest
    
        group_rounds = ctx.round.get_group_rounds(True)
        group_rounds.sort(key=lambda r: r.round_code())
    
        contest = ctx.master_contest
        ct_id = contest.contest_id
        can_change = ctx.rights.have_right(Right.manage_contest) and ctx.master_round.round_type != db.RoundType.other
    
        sess = db.get_session()
        cset = (sess.query(db.CertSet)
                .with_for_update()
                .filter_by(contest=contest)
                .one_or_none())
        if cset is None:
            new_cset = True
            cset = db.CertSet(
                contest_id=ct_id,
            )
            dparams = DesignParams(issue_date=mo.now.strftime('%d. %B %Y'))
        else:
            new_cset = False
            dparams = DesignParams.from_json(cset.design_params)
    
        if can_change:
            form = CertSetForm(obj=merge_objects(cset, dparams))
            form.tex_hacks.description = Markup("Speciální nastavení sazby diplomů (viz <a href='" + url_for('doc_admin') + "'>návod</a>)")
            if not ctx.rights.have_right(Right.edit_tex_hacks):
                del form.tex_hacks
            if new_cset:
                del form.delete
        else:
            form = None
    
        if form:
            if not form.is_submitted():
                pass
            elif not form.validate() or not form.osmo_validate(cset):
                flash('V nastavení diplomů byly nalezeny chyby.', 'danger')
            elif not new_cset and form.delete.data:
                sess.delete(cset)
                mo.util.log(
                    type=db.LogType.cert_set,
                    what=contest.contest_id,
                    details={'action': 'delete', 'cert_set': db.row2dict(cset), 'reason': 'web'},
                )
                sess.commit()
                app.logger.info(f'Smazány diplomy pro soutěž #{contest.contest_id}')
                flash('Diplomy smazány.', 'success')
                return redirect(ctx.url_for('org_certificates'))
            elif form.generate.data or form.save.data:
                form.populate_obj(cset)
                form.populate_obj(dparams)
                cset.design_params = dparams.to_json()
                if dparams.background_type == BackgroundType.custom:
                    if form.upload_background.data:
                        old_background = cset.background_file
                        out_dir = cset.dir_path()
                        cset.background_file = mo.util.link_to_dir(form.upload_background.data.stream.name,
                                                                   out_dir,
                                                                   base_dir=mo.util.data_dir('certs'),
                                                                   prefix='background-',
                                                                   suffix='.pdf',
                                                                   make_dirs=True)
                        app.logger.info(f'Nahráno pozadí diplomů {cset.background_file}')
                    else:
                        old_background = None
                else:
                    old_background = cset.background_file
                    cset.background_file = None
                changes = None
                if new_cset:
                    sess.add(cset)
                    mo.util.log(
                        type=db.LogType.cert_set,
                        what=contest.contest_id,
                        details={'action': 'new', 'cert_set': db.row2dict(cset), 'reason': 'web'},
                    )
                    app.logger.info(f'Založeny diplomy pro soutěž #{contest.contest_id}')
                    sess.commit()
                elif sess.is_modified(cset):
                    cset.changed_at = mo.now
                    changes = db.get_object_changes(cset)
                    mo.util.log(
                        type=db.LogType.cert_set,
                        what=contest.contest_id,
                        details={'action': 'edit', 'changes': changes, 'reason': 'web'},
                    )
                    app.logger.info(f'Upraveno nastavení diplomů pro soutěž #{contest.contest_id}')
                    sess.commit()
                    if old_background:
                        mo.util.unlink_if_exists(os.path.join(mo.util.data_dir('certs'), old_background))
                if form.generate.data:
                    if cset.job is not None and cset.job.is_active():
                        flash('Počkejte, až doběhne předchozí dávka na tvorbu diplomů.', 'danger')
                    else:
                        cset.job_id = schedule_create_certs(contest, g.user)
                        sess.commit()
                        return redirect(url_for('org_job_wait', id=cset.job_id, back=ctx.url_for('org_certificates')))
                else:
                    if changes is not None:
                        flash('Nastavení uloženo.', 'success')
                    else:
                        flash('Žádné změny k uložení.', 'info')
                    return redirect(ctx.url_for('org_certificates'))
    
        pions = sess.query(db.Participation).filter_by(contest_id=ct_id).options(joinedload(db.Participation.user)).all()
        pions.sort(key=lambda p: p.user.sort_key())
        users_pions = [(p.user, p) for p in pions]
    
        cert_files_by_type: Dict[db.CertType, Optional[db.CertFile]] = {}
        for cf in sess.query(db.CertFile).filter_by(cert_set_id=ct_id).all():
            cert_files_by_type[cf.type] = cf
    
        certs_by_uid_type: Dict[Tuple[int, db.CertType], db.Certificate] = {}
        for c in sess.query(db.Certificate).filter_by(cert_set_id=ct_id).all():
            certs_by_uid_type[c.user_id, c.type] = c
    
        return render_template(
            'org_certificates.html',
            ctx=ctx,
            group_rounds=group_rounds,
            form=form,
            cset=cset,
            users_pions=users_pions,
            cert_files_by_type=cert_files_by_type,
            certs_by_uid_type=certs_by_uid_type,
            settings_changed=(cset.changed_at is not None and (cset.certs_issued_at is None or cset.certs_issued_at < cset.changed_at)),
            scoretable_changed=(cset.scoretable != contest.scoretable),
            form_has_errors=form and form.is_submitted(),
        )
    
    
    @app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/all/<filename>')
    @app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/<int:user_id>/<filename>')
    def org_cert_file(ct_id: int, cert_type: str, filename: str, user_id: Optional[int] = None):
        ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants)
        assert ctx.contest and ctx.master_contest
    
        return send_certificate(ct_id, cert_type, filename, user_id)
    
    
    # URL je explicitně uvedeno v mo.jobs.certs.Cert._make_qr_url
    @app.route('/cc/<int:year>/<cat>/<round_type>/<place>/<cert_type_short>/<int:user_id>/<time_code>')
    def cert_check(year: int, cat: str, round_type: str, place: str, cert_type_short: str, user_id: int, time_code: str):
        return "Not implemented yet."