# Web: Certifikáty from flask import render_template, g, redirect, url_for, flash from flask.helpers import send_file from flask_wtf import FlaskForm import flask_wtf.file import locale from markupsafe import Markup import os import pikepdf from pikepdf.models.metadata import encode_pdf_date from sqlalchemy import func, select, and_ from sqlalchemy.orm import joinedload from tempfile import NamedTemporaryFile from typing import Tuple, Optional, Dict import werkzeug.exceptions import wtforms from wtforms import validators import mo import mo.config as config import mo.db as db import mo.email from mo.jobs.certs import schedule_create_certs, DesignParams, BackgroundType from mo.rights import Right import mo.submit import mo.util from mo.util import merge_objects from mo.web import app from mo.web.org_contest import get_context import mo.web.fields as mo_fields from mo.web.user import send_certificate def validate_background(form: 'CertSetForm', field: wtforms.Field) -> None: if not field.data: return file = field.data.stream submitter = mo.submit.Submitter() try: submitter.check_certificate_background(file.name) except mo.submit.SubmitException as e: raise wtforms.ValidationError(str(e)) class SpaceField(wtforms.FloatField): def __init__(self, label: str, **kwargs): super().__init__(label, validators=[validators.InputRequired(), validators.NumberRange(-10, 200, "Hodnota musí být mezi %(min)s a %(max)s")], **kwargs) default_design = DesignParams() class CertSetForm(FlaskForm): signer1_name = mo_fields.String("1. podepisující: jméno") signer1_title = mo_fields.String("1. podepisující: funkce", render_kw={'placeholder': 'např. Předseda okresní komise MO'}) signer2_name = mo_fields.String("2. podepisující: jméno") signer2_title = mo_fields.String("2. podepisující: funkce", render_kw={'placeholder': 'např. Ředitel školy'}) issue_place = mo_fields.String("Místo vydání") issue_date = mo_fields.String("Datum vydání", render_kw={'placeholder': 'např. 3. března 2025'}) background_type = wtforms.SelectField(choices=BackgroundType.choices(), coerce=BackgroundType.coerce) upload_background = flask_wtf.file.FileField(validators=[validate_background], render_kw={'accept': 'application/pdf'}, description="Zde můžete nahrát obrázek ve formátu PDF, který se použije jako pozadí diplomu.") space1 = SpaceField("Mezera 1", description=f"Mezera mezi horním okrajem a hlavičkou (default: {default_design.space1}).") space2 = SpaceField("Mezera 2", description=f"Mezera mezi hlavičkou a jménem (default: {default_design.space2}).") space3 = SpaceField("Mezera 3", description=f"Mezera mezi jménem a umístěním (default: {default_design.space3}).") space4 = SpaceField("Mezera 4", description=f"Mezera mezi umístěním a podpisy (default: {default_design.space4}).") space5 = SpaceField("Mezera 5", description=f"Mezera mezi podpisy a patičkou (default: {default_design.space5}).") space6 = SpaceField("Mezera 6", description=f"Mezera mezi patičkou a dolním okrajem (default: {default_design.space6}).") logo_visible = wtforms.BooleanField("Logo MO a JČMF v patičce") tex_hacks = mo_fields.String("Nastavení sazby") # description se nastavuje později generate = wtforms.SubmitField("Vytvořit diplomy") save = wtforms.SubmitField("Pouze uložit nastavení") delete = wtforms.SubmitField("Smazat diplomy") def osmo_validate(self, cset) -> bool: if (delete := getattr(self, 'delete')) is not None and delete.data: return True ok = True if self.background_type.data == BackgroundType.custom and self.upload_background.data is None and not cset.background_file: self.upload_background.errors.append('Nahrajte obrázek na pozadí.') # FIXME: typing ok = False if all(getattr(self, key).data >= 0 for key in DesignParams.SPACE_PARAMS): self.space1.errors.append('Alespoň jedna z mezer musí být pružná.') # FIXME: typing ok = False return ok class CertApproveForm(FlaskForm): ctype = wtforms.HiddenField() approve = wtforms.SubmitField("Schválit") unapprove = wtforms.SubmitField("Zrušit schválení") @app.route('/org/contest/c/<int:ct_id>/certificates', methods=('GET', 'POST')) def org_certificates(ct_id: int): ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants) assert ctx.contest and ctx.master_contest group_rounds = ctx.round.get_group_rounds(True) group_rounds.sort(key=lambda r: r.round_code()) contest = ctx.master_contest ct_id = contest.contest_id can_change = ctx.rights.have_right(Right.manage_contest) and ctx.master_round.round_type != db.RoundType.other sess = db.get_session() cset = (sess.query(db.CertSet) .with_for_update() .filter_by(contest=contest) .one_or_none()) if cset is None: new_cset = True cset = db.CertSet( contest_id=ct_id, ) dparams = DesignParams(issue_date=mo.now.strftime('%e. %B %Y').strip()) else: new_cset = False dparams = DesignParams.from_json(cset.design_params) if can_change: form = CertSetForm(obj=merge_objects(cset, dparams)) form.tex_hacks.description = Markup("Speciální nastavení sazby diplomů (viz <a href='" + url_for('doc_admin') + "'>návod</a>)") if not ctx.rights.have_right(Right.edit_tex_hacks): del form.tex_hacks if new_cset: del form.delete approve_form = CertApproveForm() else: form = None approve_form = None if approve_form and approve_form.validate_on_submit() and (approve_form.approve.data or approve_form.unapprove.data): try: ctype = db.CertType.coerce(approve_form.ctype.data) except ValueError: raise werkzeug.exceptions.UnprocessableEntity() cfile = sess.query(db.CertFile).filter_by(cert_set_id=ct_id, type=ctype).one_or_none() if cfile: if approve_form.approve.data: cfile.approved = True mo.util.log( type=db.LogType.cert_set, what=contest.contest_id, details={'action': 'approve', 'reason': 'web'}, ) app.logger.info(f'Schváleny diplomy pro soutěž #{contest.contest_id}') flash(f'Diplomy ({ctype.friendly_name()}) schváleny.', 'success') elif approve_form.unapprove.data: cfile.approved = False mo.util.log( type=db.LogType.cert_set, what=contest.contest_id, details={'action': 'unapprove', 'reason': 'web'}, ) app.logger.info(f'Zrušeno schválení diplomů pro soutěž #{contest.contest_id}') flash(f'Odvoláno schválení diplomů ({ctype.friendly_name()}).', 'success') sess.commit() return redirect(ctx.url_for('org_certificates')) else: flash('Tento typ diplomů nebyl vytvořen.', 'danger') elif form: if not form.is_submitted(): pass elif not form.validate() or not form.osmo_validate(cset): flash('V nastavení diplomů byly nalezeny chyby.', 'danger') elif not new_cset and form.delete.data: sess.delete(cset) mo.util.log( type=db.LogType.cert_set, what=contest.contest_id, details={'action': 'delete', 'cert_set': db.row2dict(cset), 'reason': 'web'}, ) sess.commit() app.logger.info(f'Smazány diplomy pro soutěž #{contest.contest_id}') flash('Diplomy smazány.', 'success') return redirect(ctx.url_for('org_certificates')) elif form.generate.data or form.save.data: form.populate_obj(cset) form.populate_obj(dparams) cset.design_params = dparams.to_json() if new_cset: # To je potřeba udělat už teď, protože cset.dir_path() níže potřebuje funkční relationships sess.add(cset) sess.flush() if dparams.background_type == BackgroundType.custom: if form.upload_background.data: old_background = cset.background_file out_dir = cset.dir_path() cset.background_file = mo.util.link_to_dir(form.upload_background.data.stream.name, out_dir, base_dir=mo.util.data_dir('certs'), prefix='background-', suffix='.pdf', make_dirs=True) app.logger.info(f'Nahráno pozadí diplomů {cset.background_file}') else: old_background = None else: old_background = cset.background_file cset.background_file = None changes = None if new_cset: mo.util.log( type=db.LogType.cert_set, what=contest.contest_id, details={'action': 'new', 'cert_set': db.row2dict(cset), 'reason': 'web'}, ) app.logger.info(f'Založeny diplomy pro soutěž #{contest.contest_id}') sess.commit() elif sess.is_modified(cset): cset.changed_at = mo.now changes = db.get_object_changes(cset) mo.util.log( type=db.LogType.cert_set, what=contest.contest_id, details={'action': 'edit', 'changes': changes, 'reason': 'web'}, ) app.logger.info(f'Upraveno nastavení diplomů pro soutěž #{contest.contest_id}') sess.commit() if old_background: mo.util.unlink_if_exists(os.path.join(mo.util.data_dir('certs'), old_background)) if form.generate.data: if cset.job is not None and cset.job.is_active(): flash('Počkejte, až doběhne předchozí dávka na tvorbu diplomů.', 'danger') else: cset.job_id = schedule_create_certs(contest, g.user) sess.commit() return redirect(url_for('org_job_wait', id=cset.job_id, back=ctx.url_for('org_certificates'))) else: if changes is not None: flash('Nastavení uloženo.', 'success') else: flash('Žádné změny k uložení.', 'info') return redirect(ctx.url_for('org_certificates')) pions = sess.query(db.Participation).filter_by(contest_id=ct_id).options(joinedload(db.Participation.user)).all() pions.sort(key=lambda p: p.user.sort_key()) users_pions = [(p.user, p) for p in pions] cert_files_by_type: Dict[db.CertType, Optional[db.CertFile]] = {} for cf in sess.query(db.CertFile).filter_by(cert_set_id=ct_id).all(): cert_files_by_type[cf.type] = cf cert_file_columns = [(t, cert_files_by_type.get(t)) for t in db.CertType] certs_by_uid_type: Dict[Tuple[int, db.CertType], db.Certificate] = {} for c in sess.query(db.Certificate).filter_by(cert_set_id=ct_id).all(): certs_by_uid_type[c.user_id, c.type] = c return render_template( 'org_certificates.html', ctx=ctx, group_rounds=group_rounds, form=form, approve_form=approve_form, cset=cset, users_pions=users_pions, cert_file_columns=cert_file_columns, certs_by_uid_type=certs_by_uid_type, settings_changed=(cset.changed_at is not None and (cset.certs_issued_at is None or cset.certs_issued_at < cset.changed_at)), scoretable_changed=(cset.scoretable != contest.scoretable), form_has_errors=form and form.is_submitted(), ) @app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/all/<filename>') @app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/<int:user_id>/<filename>') def org_cert_file(ct_id: int, cert_type: str, filename: str, user_id: Optional[int] = None): ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants) assert ctx.contest and ctx.master_contest return send_certificate(ct_id, cert_type, filename, user_id) @app.route('/org/contest/c/<int:ct_id>/certificates/standard-bg.pdf') def org_certificates_standard_bg(ct_id: int): ctx = get_context(ct_id=ct_id) bg = BackgroundType.standard.find_default_background(ctx.round) if bg is not None: return send_file(bg, mimetype='application/pdf') else: raise werkzeug.exceptions.NotFound() @app.route('/org/contest/school-results/<int:school_id>/c/<int:ct_id>/certificates/<cert_type>/<filename>') def org_school_results_certs(school_id: int, ct_id: int, cert_type: str, filename: str): sess = db.get_session() place = sess.query(db.Place).filter_by(place_id=school_id).one_or_none() if place is None: raise werkzeug.exceptions.NotFound() if not g.gatekeeper.rights_for(place=place).have_right(Right.view_school_contestants): raise werkzeug.exceptions.Forbidden() contest = sess.query(db.Contest).options(joinedload(db.Contest.round)).get(ct_id) if contest is None or contest.round.year != config.CURRENT_YEAR: raise werkzeug.exceptions.NotFound() if contest.state != db.RoundState.closed: raise werkzeug.exceptions.Forbidden() try: ctype: db.CertType = db.CertType.coerce(cert_type) except ValueError: raise werkzeug.exceptions.NotFound() if filename != ctype.file_name(plural=True) + '.pdf': raise werkzeug.exceptions.NotFound() cfile = sess.query(db.CertFile).filter_by(cert_set_id=ct_id, type=ctype, approved=True).one_or_none() if cfile is None: raise werkzeug.exceptions.NotFound() users_and_certs = (sess.query(db.User, db.Certificate) .select_from(db.Certificate) .join(db.Participant, and_(db.Participant.user_id == db.Certificate.user_id, db.Participant.year == config.CURRENT_YEAR, db.Participant.school == school_id)) .join(db.Certificate.user) .filter(db.Certificate.cert_set_id == ct_id, db.Certificate.type == ctype) .all()) users_and_certs.sort(key=lambda x: x[0].sort_key()) try: file = os.path.join(mo.util.data_dir('certs'), cfile.pdf_file) if not os.path.isfile(file): app.logger.error(f'Certifikát {file} je v DB, ale soubor neexistuje') raise werkzeug.exceptions.NotFound() with pikepdf.open(file, attempt_recovery=False) as src: with pikepdf.new() as dst: for _, cert in users_and_certs: dst.pages.append(src.pages[cert.page_number - 1]) dst.docinfo['/Title'] = f'Matematická Olympiáda – {ctype.friendly_name(plural=True)}' dst.docinfo['/Creator'] = 'Odevzdávací Systém Matematické Olympiády' dst.docinfo['/CreationDate'] = encode_pdf_date(mo.now.astimezone()) tmp_file = NamedTemporaryFile(dir=mo.util.data_dir('tmp'), prefix='cert-') dst.save(tmp_file.name) except pikepdf.PdfError as e: app.logger.error(f'Chyba při zpracování PDF certifikátů: {e}') raise werkzeug.exceptions.InternalServerError() return send_file(open(tmp_file.name, 'rb'), mimetype='application/pdf') @app.route('/org/contest/school-results/<int:school_id>/') def org_school_results(school_id: int): sess = db.get_session() place = sess.query(db.Place).filter_by(place_id=school_id).one_or_none() if place is None: raise werkzeug.exceptions.NotFound() # Úmyslně nekontrolujeme práva ke kategorii if not g.gatekeeper.rights_for(place=place).have_right(Right.view_school_contestants): raise werkzeug.exceptions.Forbidden() pants_subq = (sess.query(db.Participant.user_id) .filter_by(year=config.CURRENT_YEAR, school=school_id) .subquery()) contest_counts = (sess.query(db.Contest.contest_id, func.count(db.Participation.user_id)) .select_from(db.Contest) .join(db.Contest.round) .join(db.Contest.participations) .filter(db.Contest.contest_id == db.Contest.master_contest_id, db.Round.year == config.CURRENT_YEAR, db.Participation.user_id.in_(select(pants_subq)), db.Participation.state == db.PartState.active) .group_by(db.Contest.contest_id) .all()) contest_ids = [cid for cid, _ in contest_counts] num_pants_by_cid = {cid: cnt for cid, cnt in contest_counts} contests = (sess.query(db.Contest) .filter(db.Contest.contest_id.in_(contest_ids)) .options(joinedload(db.Contest.round), joinedload(db.Contest.place)) .all()) contests.sort(key=lambda c: (c.round.category, c.round.seq, locale.strxfrm(c.place.name))) # part nepotřebujeme, vše jsou hlavní soutěže cert_counts = (sess.query(db.CertFile.cert_set_id, db.CertFile.type, func.count(db.Certificate.user_id)) .select_from(db.Certificate) .join(db.Certificate.cert_file) .filter(db.Certificate.cert_set_id.in_(contest_ids)) .filter(db.Certificate.user_id.in_(select(pants_subq))) .filter(db.CertFile.approved) .group_by(db.CertFile.cert_set_id, db.CertFile.type) .all()) cert_counts_by_cid_type = {(cid, ctype): cnt for cid, ctype, cnt in cert_counts} contests_with_cert = {cid for cid, ctype, cnt in cert_counts} return render_template( 'org_school_results.html', place=place, contests=contests, num_pants_by_cid=num_pants_by_cid, cert_counts_by_cid_type=cert_counts_by_cid_type, contests_with_cert=contests_with_cert, ) # URL je zadrátované do mo.email.send_grading_info_email @app.route('/org/contest/school-results/') def org_school_results_all(): school_place_ids = set(ur.place_id for ur in g.gatekeeper.roles if ur.role == db.RoleType.garant_skola) if len(school_place_ids) == 1: return redirect(url_for('org_school_results', school_id=list(school_place_ids)[0])) sess = db.get_session() schools = sess.query(db.Place).filter(db.Place.place_id.in_(school_place_ids)).all() schools.sort(key=lambda p: locale.strxfrm(p.name)) return render_template('org_school_results_all.html', schools=schools)