Skip to content
Snippets Groups Projects

Generování protokolů a zpracování scanů

Merged Martin Mareš requested to merge mj/protokoly into devel
1 file
+ 1
1
Compare changes
  • Side-by-side
  • Inline
+ 354
0
 
# Implementace jobů na práci s protokoly
 
 
from PIL import Image
 
from dataclasses import dataclass
 
import multiprocessing
 
import os
 
import poppler
 
import pyzbar.pyzbar as pyzbar
 
import re
 
from sqlalchemy import delete
 
from sqlalchemy.orm import joinedload
 
from sqlalchemy.orm.query import Query
 
import subprocess
 
from typing import List, Optional
 
 
import mo
 
import mo.config as config
 
import mo.db as db
 
from mo.jobs import TheJob, job_handler
 
from mo.util import logger, part_path
 
import mo.util_format
 
 
 
#
 
# Job create_protocols: Vygeneruje formuláře protokolů
 
#
 
# Vstupní JSON:
 
# { 'contest_id': ID contestu,
 
# 'site_id': ID soutěžního místa nebo none,
 
# 'task_ids': [task_id, ...],
 
# 'num_universal': počet papírů s univerzalní hlavičkou,
 
# 'num_blank': pocet pokračovacích papírů,
 
# }
 
#
 
# Výstupní JSON:
 
# null
 
#
 
 
 
def schedule_create_protocols(contest: db.Contest, site: Optional[db.Place], for_user: db.User, tasks: List[db.Task], num_universal: int, num_blank: int):
 
place = site or contest.place
 
 
the_job = TheJob()
 
job = the_job.create(db.JobType.create_protocols, for_user)
 
job.description = f'Formuláře protokolů {contest.round.round_code_short()} {place.name}'
 
job.in_json = {
 
'contest_id': contest.contest_id,
 
'site_id': site.place_id if site else None,
 
'task_ids': [t.task_id for t in tasks],
 
'num_universal': num_universal,
 
'num_blank': num_blank,
 
}
 
the_job.submit()
 
 
 
def tex_arg(s: str) -> str:
 
# Primitivní escapování do TeXu. Nesnaží se ani tak o věrnou intepretaci všech znaků,
 
# jako o zabránění pádu TeXu kvůli divným znakům.
 
s = re.sub(r'[\\{}#$%^~]', '?', s)
 
s = re.sub(r'([&_])', r'\\\1', s)
 
return '{' + s + '}'
 
 
 
def _get_user_id_query(contest: db.Contest, site_id: Optional[int]) -> Query:
 
q = db.get_session().query(db.Participation.user_id).filter_by(contest=contest, state=db.PartState.active)
 
if site_id is not None:
 
q = q.filter_by(place_id=site_id)
 
return q
 
 
 
def _get_pants(contest: db.Contest, site_id: Optional[int]) -> List[db.Participant]:
 
user_id_subq = _get_user_id_query(contest, site_id).subquery()
 
 
pants = (db.get_session().query(db.Participant)
 
.options(joinedload(db.Participant.user), joinedload(db.Participant.school_place))
 
.filter_by(year=config.CURRENT_YEAR)
 
.filter(db.Participant.user_id.in_(user_id_subq))
 
.all())
 
pants.sort(key=lambda p: p.user.sort_key())
 
 
return pants
 
 
 
@job_handler(db.JobType.create_protocols)
 
def handle_create_protocols(the_job: TheJob):
 
job = the_job.job
 
assert job.in_json is not None
 
contest_id: int = job.in_json['contest_id'] # type: ignore
 
site_id: int = job.in_json['site_id'] # type: ignore
 
task_ids: List[int] = job.in_json['task_ids'] # type: ignore
 
num_universal: int = job.in_json['num_universal'] # type: ignore
 
num_blank: int = job.in_json['num_blank'] # type: ignore
 
 
sess = db.get_session()
 
contest = sess.query(db.Contest).options(joinedload(db.Contest.round)).get(contest_id)
 
assert contest is not None
 
round = contest.round
 
 
pants = _get_pants(contest, site_id)
 
tasks = sess.query(db.Task).filter_by(round=round).filter(db.Task.task_id.in_(task_ids)).order_by(db.Task.code).all()
 
 
pages = []
 
for p in pants:
 
for t in tasks:
 
args = [
 
':'.join(['MO', round.round_code_short(), t.code, str(p.user_id)]),
 
p.user.full_name(),
 
p.grade,
 
p.school_place.name or '???',
 
t.code,
 
]
 
pages.append('\\proto' + "".join([tex_arg(x) for x in args]))
 
 
for _ in range(num_universal):
 
pages.append('\\universal')
 
 
for _ in range(num_blank):
 
pages.append('\\blank')
 
 
if not pages:
 
the_job.error("Nebyly vyžádány žádné protokoly")
 
return
 
 
temp_dir = job.dir_path()
 
logger.debug('Job: Vytvářím protokoly v %s (%s listů)', temp_dir, len(pages))
 
 
tex_src = os.path.join(temp_dir, 'protokoly.tex')
 
with open(tex_src, 'w') as f:
 
f.write('\\input protokol.tex\n\n')
 
kolo = f'{round.name} {round.year}. ročníku Matematické olympiády'
 
kat = f'Kategorie {round.category}'
 
if round.level > 0:
 
kat += ', ' + contest.place.name
 
f.write('\\def\\kolo' + tex_arg(kolo) + '\n\n')
 
f.write('\\def\\kat' + tex_arg(kat) + '\n\n')
 
 
for p in pages:
 
f.write(p + '\n')
 
 
f.write('\n\\bye\n')
 
 
env = dict(os.environ)
 
env['TEXINPUTS'] = part_path('tex') + '//:'
 
 
subprocess.run(
 
['luatex', '--interaction=errorstopmode', 'protokoly.tex'],
 
check=True,
 
cwd=temp_dir,
 
env=env,
 
stdin=subprocess.DEVNULL,
 
stdout=subprocess.DEVNULL,
 
stderr=subprocess.DEVNULL,
 
)
 
 
job.out_file = 'protokoly.pdf'
 
job.result = 'Celkem ' + mo.util_format.inflect_number(len(pages), 'list', 'listy', 'listů')
 
 
 
#
 
# Job process_scans: Zpracuje nascanované protokoly
 
#
 
# Vstupní JSON:
 
# { 'contest_id': ID contestu,
 
# 'site_id': ID soutěžního místa nebo none,
 
# 'task_ids': [task_id, ...],
 
# 'in_files': [názvy vstupních souborů]
 
# }
 
#
 
# Výstupní JSON:
 
# null
 
#
 
# Výstupn soubory:
 
# p-{file_nr:02d}-{page_nr:04d}-(full|small).png
 
#
 
 
 
def schedule_process_scans(contest: db.Contest, site: Optional[db.Place], for_user: db.User, tasks: List[db.Task], in_file_names: List[str]):
 
place = site or contest.place
 
 
the_job = TheJob()
 
job = the_job.create(db.JobType.process_scans, for_user)
 
job.description = f'Zpracování scanů {contest.round.round_code_short()} {place.name}'
 
 
in_files = []
 
num_files = 0
 
for ifn in in_file_names:
 
num_files += 1
 
in_name = f'input-{num_files:03d}.pdf'
 
the_job.attach_file(ifn, in_name)
 
in_files.append(in_name)
 
assert in_files
 
 
job.in_json = {
 
'contest_id': contest.contest_id,
 
'site_id': site.place_id if site else None,
 
'task_ids': [t.task_id for t in tasks],
 
'in_files': in_files,
 
}
 
the_job.submit()
 
 
 
@dataclass
 
class ScanJobArgs:
 
in_path: str
 
out_prefix: str
 
 
 
@dataclass
 
class ScanJobPage:
 
code: Optional[str]
 
 
 
@job_handler(db.JobType.process_scans)
 
def handle_process_scans(the_job: TheJob):
 
job = the_job.job
 
assert job.in_json is not None
 
contest_id = job.in_json['contest_id'] # type: ignore
 
site_id = job.in_json['site_id'] # type: ignore
 
task_ids = job.in_json['task_ids'] # type: ignore
 
in_files: List[str] = job.in_json['in_files'] # type: ignore
 
 
sess = db.get_session()
 
contest = sess.query(db.Contest).options(joinedload(db.Contest.round)).get(contest_id)
 
assert contest is not None
 
round = contest.round
 
round_code = round.round_code_short()
 
 
user_ids = set(u[0] for u in _get_user_id_query(contest, site_id).all())
 
 
tasks = sess.query(db.Task).filter(db.Task.task_id.in_(task_ids)).all()
 
tasks_by_code = {t.code: t for t in tasks}
 
 
# Jelikož se plánujeme zamyslet na dlouhou dobu, uzavřeme databázovou session.
 
sess.commit()
 
 
with multiprocessing.Pool(1) as pool:
 
args = [ScanJobArgs(in_path=job.file_path(fn),
 
out_prefix=job.file_path(f'p-{fi:02d}'))
 
for fi, fn in enumerate(in_files)]
 
results = pool.map(_process_scan_file, args)
 
 
def _parse_code(pr: ScanJobPage, sp: db.ScanPage) -> Optional[str]:
 
if pr.code is None:
 
return None
 
 
fields = pr.code.split(':')
 
if fields[0] != 'MO':
 
return 'Neznámý prefix'
 
 
if len(fields) == 2:
 
if fields[1] == '*':
 
# Univerzální hlavička úlohy
 
sp.seq_id = db.SCAN_PAGE_FIX
 
return None
 
if fields[1] == '+':
 
# Pokračovací papír s kódem
 
sp.seq_id = db.SCAN_PAGE_CONTINUE
 
return None
 
 
elif len(fields) == 4:
 
if not fields[3].isnumeric():
 
return 'User ID není číslo'
 
user_id = int(fields[3])
 
 
if fields[1] != round_code:
 
return 'Nesouhlasí kód kola'
 
if fields[2] not in tasks_by_code:
 
return 'Neznámá úloha'
 
if user_id not in user_ids:
 
return 'Neznámý účastník'
 
sp.user_id = user_id
 
sp.task_id = tasks_by_code[fields[2]].task_id
 
sp.seq_id = 0
 
return None
 
 
return 'Neznamý formát kódu'
 
 
# Pokud jsme job spustili podruhé (ruční retry), chceme smazat všechny záznamy v scan_pages.
 
# Pozor, nesynchronizujeme ORM, ale nevadí to, protože v této chvíli mame čerstvou session.
 
conn = sess.connection()
 
conn.execute(delete(db.ScanPage.__table__).where(db.ScanPage.job_id == job.job_id))
 
 
num_pages = 0
 
for fi, fn in enumerate(in_files):
 
for pi, pr in enumerate(results[fi]):
 
sp = db.ScanPage(
 
job_id=job.job_id,
 
file_nr=fi,
 
page_nr=pi,
 
seq_id=db.SCAN_PAGE_FIX,
 
)
 
 
err = _parse_code(pr, sp)
 
if err is not None:
 
logger.debug(f'Scan: {fi}/{pi} ({pr.code}): {err}')
 
sp.seq_id = db.SCAN_PAGE_UFO
 
 
sess.add(sp)
 
num_pages += 1
 
 
job.result = 'Celkem ' + mo.util_format.inflect_number(num_pages, 'strana', 'strany', 'stran')
 
the_job.expires_in_minutes = config.JOB_EXPIRATION_LONG
 
 
 
def _process_scan_file(args: ScanJobArgs) -> List[ScanJobPage]:
 
# Zpracuje jeden soubor se scany. Běží v odděleném procesu.
 
 
# FIXME: Ošetření chyb
 
logger.debug(f'Scan: Analyzuji soubor {args.in_path}')
 
pdf = poppler.load_from_file(args.in_path)
 
 
renderer = poppler.PageRenderer()
 
renderer.set_render_hint(poppler.RenderHint.antialiasing, True)
 
renderer.set_render_hint(poppler.RenderHint.text_antialiasing, True)
 
dpi = 300
 
 
output = []
 
for page_nr in range(pdf.pages):
 
page = pdf.create_page(page_nr)
 
page_img = renderer.render_page(page, xres=dpi, yres=dpi)
 
 
full_img = Image.frombytes(
 
"RGBA",
 
(page_img.width, page_img.height),
 
page_img.data,
 
"raw",
 
str(page_img.format),
 
)
 
del page_img
 
 
full_img = full_img.convert('L') # Grayscale
 
full_size = full_img.size
 
 
codes = pyzbar.decode(full_img, symbols=[pyzbar.ZBarSymbol.QRCODE])
 
codes = [c for c in codes if c.type == 'QRCODE' and c.data.startswith(b'MO:')]
 
qr = None
 
if codes:
 
if len(codes) > 1:
 
logger.warning(f'Scan: Strana #{page_nr} obsahuje více QR kódů')
 
code = codes[0]
 
qr = code.data.decode('US-ASCII')
 
# FIXME: Tady by se dala podle kódu otočit stránka
 
 
output.append(ScanJobPage(code=qr))
 
 
full_img.save(f'{args.out_prefix}-{page_nr:04d}-full.png')
 
 
# FIXME: Potřebujeme vytvářet miniaturu?
 
small_img = full_img.resize((full_size[0] // 4, full_size[1] // 4))
 
small_img.save(f'{args.out_prefix}-{page_nr:04d}-small.png')
 
 
logger.debug(f'Scan: Strana #{page_nr}: {qr}')
 
 
return output
Loading