Select Git revision
tree_successor.h
-
Martin Mareš authoredMartin Mareš authored
org_certs.py 19.25 KiB
# Web: Certifikáty
from flask import render_template, g, redirect, url_for, flash
from flask.helpers import send_file
from flask_wtf import FlaskForm
import flask_wtf.file
import locale
from markupsafe import Markup
import os
import pikepdf
from pikepdf.models.metadata import encode_pdf_date
from sqlalchemy import func, select, and_
from sqlalchemy.orm import joinedload
from tempfile import NamedTemporaryFile
from typing import Tuple, Optional, Dict
import werkzeug.exceptions
import wtforms
from wtforms import validators
import mo
import mo.config as config
import mo.db as db
import mo.email
from mo.jobs.certs import schedule_create_certs, DesignParams, BackgroundType
from mo.rights import Right
import mo.submit
import mo.util
from mo.util import merge_objects
from mo.web import app
from mo.web.org_contest import get_context
import mo.web.fields as mo_fields
from mo.web.user import send_certificate
def validate_background(form: 'CertSetForm', field: wtforms.Field) -> None:
if not field.data:
return
file = field.data.stream
submitter = mo.submit.Submitter()
try:
submitter.check_certificate_background(file.name)
except mo.submit.SubmitException as e:
raise wtforms.ValidationError(str(e))
class SpaceField(wtforms.FloatField):
def __init__(self, label: str, **kwargs):
super().__init__(label,
validators=[validators.InputRequired(),
validators.NumberRange(-10, 200, "Hodnota musí být mezi %(min)s a %(max)s")],
**kwargs)
default_design = DesignParams()
class CertSetForm(FlaskForm):
signer1_name = mo_fields.String("1. podepisující: jméno")
signer1_title = mo_fields.String("1. podepisující: funkce", render_kw={'placeholder': 'např. Předseda okresní komise MO'})
signer2_name = mo_fields.String("2. podepisující: jméno")
signer2_title = mo_fields.String("2. podepisující: funkce", render_kw={'placeholder': 'např. Ředitel školy'})
issue_place = mo_fields.String("Místo vydání")
issue_date = mo_fields.String("Datum vydání", render_kw={'placeholder': 'např. 3. března 2025'})
background_type = wtforms.SelectField(choices=BackgroundType.choices(), coerce=BackgroundType.coerce)
upload_background = flask_wtf.file.FileField(validators=[validate_background],
render_kw={'accept': 'application/pdf'},
description="Zde můžete nahrát obrázek ve formátu PDF, který se použije jako pozadí diplomu.")
space1 = SpaceField("Mezera 1", description=f"Mezera mezi horním okrajem a hlavičkou (default: {default_design.space1}).")
space2 = SpaceField("Mezera 2", description=f"Mezera mezi hlavičkou a jménem (default: {default_design.space2}).")
space3 = SpaceField("Mezera 3", description=f"Mezera mezi jménem a umístěním (default: {default_design.space3}).")
space4 = SpaceField("Mezera 4", description=f"Mezera mezi umístěním a podpisy (default: {default_design.space4}).")
space5 = SpaceField("Mezera 5", description=f"Mezera mezi podpisy a patičkou (default: {default_design.space5}).")
space6 = SpaceField("Mezera 6", description=f"Mezera mezi patičkou a dolním okrajem (default: {default_design.space6}).")
logo_visible = wtforms.BooleanField("Logo MO a JČMF v patičce")
tex_hacks = mo_fields.String("Nastavení sazby") # description se nastavuje později
generate = wtforms.SubmitField("Vytvořit diplomy")
save = wtforms.SubmitField("Pouze uložit nastavení")
delete = wtforms.SubmitField("Smazat diplomy")
def osmo_validate(self, cset) -> bool:
if (delete := getattr(self, 'delete')) is not None and delete.data:
return True
ok = True
if self.background_type.data == BackgroundType.custom and self.upload_background.data is None and not cset.background_file:
self.upload_background.errors.append('Nahrajte obrázek na pozadí.') # FIXME: typing
ok = False
if all(getattr(self, key).data >= 0 for key in DesignParams.SPACE_PARAMS):
self.space1.errors.append('Alespoň jedna z mezer musí být pružná.') # FIXME: typing
ok = False
return ok
class CertApproveForm(FlaskForm):
ctype = wtforms.HiddenField()
approve = wtforms.SubmitField("Schválit")
unapprove = wtforms.SubmitField("Zrušit schválení")
@app.route('/org/contest/c/<int:ct_id>/certificates', methods=('GET', 'POST'))
def org_certificates(ct_id: int):
ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants)
assert ctx.contest and ctx.master_contest
group_rounds = ctx.round.get_group_rounds(True)
group_rounds.sort(key=lambda r: r.round_code())
contest = ctx.master_contest
ct_id = contest.contest_id
can_change = ctx.rights.have_right(Right.manage_contest) and ctx.master_round.round_type != db.RoundType.other
sess = db.get_session()
cset = (sess.query(db.CertSet)
.with_for_update()
.filter_by(contest=contest)
.one_or_none())
if cset is None:
new_cset = True
cset = db.CertSet(
contest_id=ct_id,
)
dparams = DesignParams(issue_date=mo.now.strftime('%e. %B %Y').strip())
else:
new_cset = False
dparams = DesignParams.from_json(cset.design_params)
if can_change:
form = CertSetForm(obj=merge_objects(cset, dparams))
form.tex_hacks.description = Markup("Speciální nastavení sazby diplomů (viz <a href='" + url_for('doc_admin') + "'>návod</a>)")
if not ctx.rights.have_right(Right.edit_tex_hacks):
del form.tex_hacks
if new_cset:
del form.delete
approve_form = CertApproveForm()
else:
form = None
approve_form = None
if approve_form and approve_form.validate_on_submit() and (approve_form.approve.data or approve_form.unapprove.data):
try:
ctype = db.CertType.coerce(approve_form.ctype.data)
except ValueError:
raise werkzeug.exceptions.UnprocessableEntity()
cfile = sess.query(db.CertFile).filter_by(cert_set_id=ct_id, type=ctype).one_or_none()
if cfile:
if approve_form.approve.data:
cfile.approved = True
mo.util.log(
type=db.LogType.cert_set,
what=contest.contest_id,
details={'action': 'approve', 'reason': 'web'},
)
app.logger.info(f'Schváleny diplomy pro soutěž #{contest.contest_id}')
flash(f'Diplomy ({ctype.friendly_name()}) schváleny.', 'success')
elif approve_form.unapprove.data:
cfile.approved = False
mo.util.log(
type=db.LogType.cert_set,
what=contest.contest_id,
details={'action': 'unapprove', 'reason': 'web'},
)
app.logger.info(f'Zrušeno schválení diplomů pro soutěž #{contest.contest_id}')
flash(f'Odvoláno schválení diplomů ({ctype.friendly_name()}).', 'success')
sess.commit()
return redirect(ctx.url_for('org_certificates'))
else:
flash('Tento typ diplomů nebyl vytvořen.', 'danger')
elif form:
if not form.is_submitted():
pass
elif not form.validate() or not form.osmo_validate(cset):
flash('V nastavení diplomů byly nalezeny chyby.', 'danger')
elif not new_cset and form.delete.data:
sess.delete(cset)
mo.util.log(
type=db.LogType.cert_set,
what=contest.contest_id,
details={'action': 'delete', 'cert_set': db.row2dict(cset), 'reason': 'web'},
)
sess.commit()
app.logger.info(f'Smazány diplomy pro soutěž #{contest.contest_id}')
flash('Diplomy smazány.', 'success')
return redirect(ctx.url_for('org_certificates'))
elif form.generate.data or form.save.data:
form.populate_obj(cset)
form.populate_obj(dparams)
cset.design_params = dparams.to_json()
if new_cset:
# To je potřeba udělat už teď, protože cset.dir_path() níže potřebuje funkční relationships
sess.add(cset)
sess.flush()
if dparams.background_type == BackgroundType.custom:
if form.upload_background.data:
old_background = cset.background_file
out_dir = cset.dir_path()
cset.background_file = mo.util.link_to_dir(form.upload_background.data.stream.name,
out_dir,
base_dir=mo.util.data_dir('certs'),
prefix='background-',
suffix='.pdf',
make_dirs=True)
app.logger.info(f'Nahráno pozadí diplomů {cset.background_file}')
else:
old_background = None
else:
old_background = cset.background_file
cset.background_file = None
changes = None
if new_cset:
mo.util.log(
type=db.LogType.cert_set,
what=contest.contest_id,
details={'action': 'new', 'cert_set': db.row2dict(cset), 'reason': 'web'},
)
app.logger.info(f'Založeny diplomy pro soutěž #{contest.contest_id}')
sess.commit()
elif sess.is_modified(cset):
cset.changed_at = mo.now
changes = db.get_object_changes(cset)
mo.util.log(
type=db.LogType.cert_set,
what=contest.contest_id,
details={'action': 'edit', 'changes': changes, 'reason': 'web'},
)
app.logger.info(f'Upraveno nastavení diplomů pro soutěž #{contest.contest_id}')
sess.commit()
if old_background:
mo.util.unlink_if_exists(os.path.join(mo.util.data_dir('certs'), old_background))
if form.generate.data:
if cset.job is not None and cset.job.is_active():
flash('Počkejte, až doběhne předchozí dávka na tvorbu diplomů.', 'danger')
else:
cset.job_id = schedule_create_certs(contest, g.user)
sess.commit()
return redirect(url_for('org_job_wait', id=cset.job_id, back=ctx.url_for('org_certificates')))
else:
if changes is not None:
flash('Nastavení uloženo.', 'success')
else:
flash('Žádné změny k uložení.', 'info')
return redirect(ctx.url_for('org_certificates'))
pions = sess.query(db.Participation).filter_by(contest_id=ct_id).options(joinedload(db.Participation.user)).all()
pions.sort(key=lambda p: p.user.sort_key())
users_pions = [(p.user, p) for p in pions]
cert_files_by_type: Dict[db.CertType, Optional[db.CertFile]] = {}
for cf in sess.query(db.CertFile).filter_by(cert_set_id=ct_id).all():
cert_files_by_type[cf.type] = cf
cert_file_columns = [(t, cert_files_by_type.get(t)) for t in db.CertType]
certs_by_uid_type: Dict[Tuple[int, db.CertType], db.Certificate] = {}
for c in sess.query(db.Certificate).filter_by(cert_set_id=ct_id).all():
certs_by_uid_type[c.user_id, c.type] = c
return render_template(
'org_certificates.html',
ctx=ctx,
group_rounds=group_rounds,
form=form,
approve_form=approve_form,
cset=cset,
users_pions=users_pions,
cert_file_columns=cert_file_columns,
certs_by_uid_type=certs_by_uid_type,
settings_changed=(cset.changed_at is not None and (cset.certs_issued_at is None or cset.certs_issued_at < cset.changed_at)),
scoretable_changed=(cset.scoretable != contest.scoretable),
form_has_errors=form and form.is_submitted(),
)
@app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/all/<filename>')
@app.route('/org/contest/c/<int:ct_id>/certificate/<cert_type>/<int:user_id>/<filename>')
def org_cert_file(ct_id: int, cert_type: str, filename: str, user_id: Optional[int] = None):
ctx = get_context(ct_id=ct_id, right_needed=Right.view_contestants)
assert ctx.contest and ctx.master_contest
return send_certificate(ct_id, cert_type, filename, user_id)
@app.route('/org/contest/c/<int:ct_id>/certificates/standard-bg.pdf')
def org_certificates_standard_bg(ct_id: int):
ctx = get_context(ct_id=ct_id)
bg = BackgroundType.standard.find_default_background(ctx.round)
if bg is not None:
return send_file(bg, mimetype='application/pdf')
else:
raise werkzeug.exceptions.NotFound()
@app.route('/org/contest/school-results/<int:school_id>/c/<int:ct_id>/certificates/<cert_type>/<filename>')
def org_school_results_certs(school_id: int, ct_id: int, cert_type: str, filename: str):
sess = db.get_session()
place = sess.query(db.Place).filter_by(place_id=school_id).one_or_none()
if place is None:
raise werkzeug.exceptions.NotFound()
if not g.gatekeeper.rights_for(place=place).have_right(Right.view_school_contestants):
raise werkzeug.exceptions.Forbidden()
contest = sess.query(db.Contest).options(joinedload(db.Contest.round)).get(ct_id)
if contest is None or contest.round.year != config.CURRENT_YEAR:
raise werkzeug.exceptions.NotFound()
if contest.state != db.RoundState.closed:
raise werkzeug.exceptions.Forbidden()
try:
ctype: db.CertType = db.CertType.coerce(cert_type)
except ValueError:
raise werkzeug.exceptions.NotFound()
if filename != ctype.file_name(plural=True) + '.pdf':
raise werkzeug.exceptions.NotFound()
cfile = sess.query(db.CertFile).filter_by(cert_set_id=ct_id, type=ctype, approved=True).one_or_none()
if cfile is None:
raise werkzeug.exceptions.NotFound()
users_and_certs = (sess.query(db.User, db.Certificate)
.select_from(db.Certificate)
.join(db.Participant, and_(db.Participant.user_id == db.Certificate.user_id,
db.Participant.year == config.CURRENT_YEAR,
db.Participant.school == school_id))
.join(db.Certificate.user)
.filter(db.Certificate.cert_set_id == ct_id,
db.Certificate.type == ctype)
.all())
users_and_certs.sort(key=lambda x: x[0].sort_key())
try:
file = os.path.join(mo.util.data_dir('certs'), cfile.pdf_file)
if not os.path.isfile(file):
app.logger.error(f'Certifikát {file} je v DB, ale soubor neexistuje')
raise werkzeug.exceptions.NotFound()
with pikepdf.open(file, attempt_recovery=False) as src:
with pikepdf.new() as dst:
for _, cert in users_and_certs:
dst.pages.append(src.pages[cert.page_number - 1])
dst.docinfo['/Title'] = f'Matematická Olympiáda – {ctype.friendly_name(plural=True)}'
dst.docinfo['/Creator'] = 'Odevzdávací Systém Matematické Olympiády'
dst.docinfo['/CreationDate'] = encode_pdf_date(mo.now.astimezone())
tmp_file = NamedTemporaryFile(dir=mo.util.data_dir('tmp'), prefix='cert-')
dst.save(tmp_file.name)
except pikepdf.PdfError as e:
app.logger.error(f'Chyba při zpracování PDF certifikátů: {e}')
raise werkzeug.exceptions.InternalServerError()
return send_file(open(tmp_file.name, 'rb'), mimetype='application/pdf')
@app.route('/org/contest/school-results/<int:school_id>/')
def org_school_results(school_id: int):
sess = db.get_session()
place = sess.query(db.Place).filter_by(place_id=school_id).one_or_none()
if place is None:
raise werkzeug.exceptions.NotFound()
# Úmyslně nekontrolujeme práva ke kategorii
if not g.gatekeeper.rights_for(place=place).have_right(Right.view_school_contestants):
raise werkzeug.exceptions.Forbidden()
pants_subq = (sess.query(db.Participant.user_id)
.filter_by(year=config.CURRENT_YEAR,
school=school_id)
.subquery())
contest_counts = (sess.query(db.Contest.contest_id, func.count(db.Participation.user_id))
.select_from(db.Contest)
.join(db.Contest.round)
.join(db.Contest.participations)
.filter(db.Contest.contest_id == db.Contest.master_contest_id,
db.Round.year == config.CURRENT_YEAR,
db.Participation.user_id.in_(select(pants_subq)),
db.Participation.state == db.PartState.active)
.group_by(db.Contest.contest_id)
.all())
contest_ids = [cid for cid, _ in contest_counts]
num_pants_by_cid = {cid: cnt for cid, cnt in contest_counts}
contests = (sess.query(db.Contest)
.filter(db.Contest.contest_id.in_(contest_ids))
.options(joinedload(db.Contest.round),
joinedload(db.Contest.place))
.all())
contests.sort(key=lambda c: (c.round.category, c.round.seq, locale.strxfrm(c.place.name))) # part nepotřebujeme, vše jsou hlavní soutěže
cert_counts = (sess.query(db.CertFile.cert_set_id, db.CertFile.type, func.count(db.Certificate.user_id))
.select_from(db.Certificate)
.join(db.Certificate.cert_file)
.filter(db.Certificate.cert_set_id.in_(contest_ids))
.filter(db.Certificate.user_id.in_(select(pants_subq)))
.filter(db.CertFile.approved)
.group_by(db.CertFile.cert_set_id, db.CertFile.type)
.all())
cert_counts_by_cid_type = {(cid, ctype): cnt for cid, ctype, cnt in cert_counts}
contests_with_cert = {cid for cid, ctype, cnt in cert_counts}
return render_template(
'org_school_results.html',
place=place,
contests=contests,
num_pants_by_cid=num_pants_by_cid,
cert_counts_by_cid_type=cert_counts_by_cid_type,
contests_with_cert=contests_with_cert,
)
# URL je zadrátované do mo.email.send_grading_info_email
@app.route('/org/contest/school-results/')
def org_school_results_all():
school_place_ids = set(ur.place_id for ur in g.gatekeeper.roles if ur.role == db.RoleType.garant_skola)
if len(school_place_ids) == 1:
return redirect(url_for('org_school_results', school_id=list(school_place_ids)[0]))
sess = db.get_session()
schools = sess.query(db.Place).filter(db.Place.place_id.in_(school_place_ids)).all()
schools.sort(key=lambda p: locale.strxfrm(p.name))
return render_template('org_school_results_all.html', schools=schools)