Skip to content
Snippets Groups Projects
Select Git revision
  • 4f4e722f236facbb45141a93ef80dcd3f592f1e4
  • master default
  • zs2021
  • zs1920
4 results

prvocisla-else.py

Blame
  • protocols.py 11.37 KiB
    # Implementace jobů na práci s protokoly
    
    from PIL import Image
    from dataclasses import dataclass
    import multiprocessing
    import os
    import poppler
    import pyzbar.pyzbar as pyzbar
    import re
    from sqlalchemy import delete
    from sqlalchemy.orm import joinedload
    from sqlalchemy.orm.query import Query
    import subprocess
    from typing import List, Optional
    
    import mo
    import mo.config as config
    import mo.db as db
    from mo.jobs import TheJob, job_handler
    from mo.util import logger, part_path
    import mo.util_format
    
    
    #
    # Job create_protocols: Vygeneruje formuláře protokolů
    #
    # Vstupní JSON:
    #        { 'contest_id': ID contestu,
    #          'site_id': ID soutěžního místa nebo none,
    #          'task_ids': [task_id, ...],
    #          'num_universal': počet papírů s univerzalní hlavičkou,
    #          'num_blank': pocet pokračovacích papírů,
    #        }
    #
    # Výstupní JSON:
    #        null
    #
    
    
    def schedule_create_protocols(contest: db.Contest, site: Optional[db.Place], for_user: db.User, tasks: List[db.Task], num_universal: int, num_blank: int):
        place = site or contest.place
    
        the_job = TheJob()
        job = the_job.create(db.JobType.create_protocols, for_user)
        job.description = f'Formuláře protokolů {contest.round.round_code_short()} {place.name}'
        job.in_json = {
            'contest_id': contest.contest_id,
            'site_id': site.place_id if site else None,
            'task_ids': [t.task_id for t in tasks],
            'num_universal': num_universal,
            'num_blank': num_blank,
        }
        the_job.submit()
    
    
    def tex_arg(s: str) -> str:
        # Primitivní escapování do TeXu. Nesnaží se ani tak o věrnou intepretaci všech znaků,
        # jako o zabránění pádu TeXu kvůli divným znakům.
        s = re.sub(r'[\\{}#$%^~]', '?', s)
        s = re.sub(r'([&_])', r'\\\1', s)
        return '{' + s + '}'
    
    
    def _get_user_id_query(contest: db.Contest, site_id: Optional[int]) -> Query:
        q = db.get_session().query(db.Participation.user_id).filter_by(contest=contest, state=db.PartState.active)
        if site_id is not None:
            q = q.filter_by(place_id=site_id)
        return q
    
    
    def _get_pants(contest: db.Contest, site_id: Optional[int]) -> List[db.Participant]:
        user_id_subq = _get_user_id_query(contest, site_id).subquery()
    
        pants = (db.get_session().query(db.Participant)
                 .options(joinedload(db.Participant.user), joinedload(db.Participant.school_place))
                 .filter_by(year=config.CURRENT_YEAR)
                 .filter(db.Participant.user_id.in_(user_id_subq))
                 .all())
        pants.sort(key=lambda p: p.user.sort_key())
    
        return pants
    
    
    @job_handler(db.JobType.create_protocols)
    def handle_create_protocols(the_job: TheJob):
        job = the_job.job
        assert job.in_json is not None
        contest_id: int = job.in_json['contest_id']  # type: ignore
        site_id: int = job.in_json['site_id']        # type: ignore
        task_ids: List[int] = job.in_json['task_ids']      # type: ignore
        num_universal: int = job.in_json['num_universal']    # type: ignore
        num_blank: int = job.in_json['num_blank']    # type: ignore
    
        sess = db.get_session()
        contest = sess.query(db.Contest).options(joinedload(db.Contest.round)).get(contest_id)
        assert contest is not None
        round = contest.round
    
        pants = _get_pants(contest, site_id)
        tasks = sess.query(db.Task).filter_by(round=round).filter(db.Task.task_id.in_(task_ids)).order_by(db.Task.code).all()
    
        pages = []
        for p in pants:
            for t in tasks:
                args = [
                    ':'.join(['MO', round.round_code_short(), t.code, str(p.user_id)]),
                    p.user.full_name(),
                    p.grade,
                    p.school_place.name or '???',
                    t.code,
                ]
                pages.append('\\proto' + "".join([tex_arg(x) for x in args]))
    
        for _ in range(num_universal):
            pages.append('\\universal')
    
        for _ in range(num_blank):
            pages.append('\\blank')
    
        if not pages:
            the_job.error("Nebyly vyžádány žádné protokoly")
            return
    
        temp_dir = job.dir_path()
        logger.debug('Job: Vytvářím protokoly v %s (%s listů)', temp_dir, len(pages))
    
        tex_src = os.path.join(temp_dir, 'protokoly.tex')
        with open(tex_src, 'w') as f:
            f.write('\\input protokol.tex\n\n')
            kolo = f'{round.name} {round.year}. ročníku Matematické olympiády'
            kat = f'Kategorie {round.category}'
            if round.level > 0:
                kat += ', ' + contest.place.name
            f.write('\\def\\kolo' + tex_arg(kolo) + '\n\n')
            f.write('\\def\\kat' + tex_arg(kat) + '\n\n')
    
            for p in pages:
                f.write(p + '\n')
    
            f.write('\n\\bye\n')
    
        env = dict(os.environ)
        env['TEXINPUTS'] = part_path('tex') + '//:'
    
        subprocess.run(
            ['luatex', '--interaction=errorstopmode', 'protokoly.tex'],
            check=True,
            cwd=temp_dir,
            env=env,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    
        job.out_file = 'protokoly.pdf'
        job.result = 'Celkem ' + mo.util_format.inflect_number(len(pages), 'list', 'listy', 'listů')
    
    
    #
    # Job process_scans: Zpracuje nascanované protokoly
    #
    # Vstupní JSON:
    #        { 'contest_id': ID contestu,
    #          'site_id': ID soutěžního místa nebo none,
    #          'task_ids': [task_id, ...],
    #          'in_files': [názvy vstupních souborů]
    #        }
    #
    # Výstupní JSON:
    #        null
    #
    # Výstupn soubory:
    #       p-{file_nr:02d}-{page_nr:04d}-(full|small).png
    #
    
    
    def schedule_process_scans(contest: db.Contest, site: Optional[db.Place], for_user: db.User, tasks: List[db.Task], in_file_names: List[str]) -> int:
        place = site or contest.place
    
        the_job = TheJob()
        job = the_job.create(db.JobType.process_scans, for_user)
        job.description = f'Zpracování scanů {contest.round.round_code_short()} {place.name}'
    
        in_files = []
        num_files = 0
        for ifn in in_file_names:
            num_files += 1
            in_name = f'input-{num_files:03d}.pdf'
            the_job.attach_file(ifn, in_name)
            in_files.append(in_name)
        assert in_files
    
        job.in_json = {
            'contest_id': contest.contest_id,
            'site_id': site.place_id if site else None,
            'task_ids': [t.task_id for t in tasks],
            'in_files': in_files,
        }
        the_job.submit()
        return the_job.job_id
    
    
    @dataclass
    class ScanJobArgs:
        in_path: str
        out_prefix: str
    
    
    @dataclass
    class ScanJobPage:
        code: Optional[str]
    
    
    @job_handler(db.JobType.process_scans)
    def handle_process_scans(the_job: TheJob):
        job = the_job.job
        assert job.in_json is not None
        contest_id = job.in_json['contest_id']  # type: ignore
        site_id = job.in_json['site_id']        # type: ignore
        task_ids = job.in_json['task_ids']      # type: ignore
        in_files: List[str] = job.in_json['in_files']  # type: ignore
    
        sess = db.get_session()
        contest = sess.query(db.Contest).options(joinedload(db.Contest.round)).get(contest_id)
        assert contest is not None
        round = contest.round
        round_code = round.round_code_short()
    
        user_ids = set(u[0] for u in _get_user_id_query(contest, site_id).all())
    
        tasks = sess.query(db.Task).filter(db.Task.task_id.in_(task_ids)).all()
        tasks_by_code = {t.code: t for t in tasks}
    
        # Jelikož se plánujeme zamyslet na dlouhou dobu, uzavřeme databázovou session.
        sess.commit()
    
        with multiprocessing.Pool(1) as pool:
            args = [ScanJobArgs(in_path=job.file_path(fn),
                                out_prefix=job.file_path(f'p-{fi:02d}'))
                    for fi, fn in enumerate(in_files)]
            results = pool.map(_process_scan_file, args)
    
        def _parse_code(pr: ScanJobPage, sp: db.ScanPage) -> Optional[str]:
            if pr.code is None:
                return None
    
            fields = pr.code.split(':')
            if fields[0] != 'MO':
                return 'Neznámý prefix'
    
            if len(fields) == 2:
                if fields[1] == '*':
                    # Univerzální hlavička úlohy
                    sp.seq_id = db.SCAN_PAGE_FIX
                    return None
                if fields[1] == '+':
                    # Pokračovací papír s kódem
                    sp.seq_id = db.SCAN_PAGE_CONTINUE
                    return None
    
            elif len(fields) == 4:
                if not fields[3].isnumeric():
                    return 'User ID není číslo'
                user_id = int(fields[3])
    
                if fields[1] != round_code:
                    return 'Nesouhlasí kód kola'
                if fields[2] not in tasks_by_code:
                    return 'Neznámá úloha'
                if user_id not in user_ids:
                    return 'Neznámý účastník'
                sp.user_id = user_id
                sp.task_id = tasks_by_code[fields[2]].task_id
                sp.seq_id = 1
                return None
    
            return 'Neznamý formát kódu'
    
        # Pokud jsme job spustili podruhé (ruční retry), chceme smazat všechny záznamy v scan_pages.
        # Pozor, nesynchronizujeme ORM, ale nevadí to, protože v této chvíli mame čerstvou session.
        conn = sess.connection()
        conn.execute(delete(db.ScanPage.__table__).where(db.ScanPage.job_id == job.job_id))
    
        num_pages = 0
        for fi, fn in enumerate(in_files):
            for pi, pr in enumerate(results[fi]):
                sp = db.ScanPage(
                    job_id=job.job_id,
                    file_nr=fi,
                    page_nr=pi,
                    seq_id=db.SCAN_PAGE_FIX,
                )
    
                err = _parse_code(pr, sp)
                if err is not None:
                    logger.debug(f'Scan: {fi}/{pi} ({pr.code}): {err}')
                    sp.seq_id = db.SCAN_PAGE_UFO
    
                sess.add(sp)
                num_pages += 1
    
        job.result = 'Celkem ' + mo.util_format.inflect_number(num_pages, 'strana', 'strany', 'stran')
        the_job.expires_in_minutes = config.JOB_EXPIRATION_LONG
    
    
    def _process_scan_file(args: ScanJobArgs) -> List[ScanJobPage]:
        # Zpracuje jeden soubor se scany. Běží v odděleném procesu.
    
        # FIXME: Ošetření chyb
        logger.debug(f'Scan: Analyzuji soubor {args.in_path}')
        pdf = poppler.load_from_file(args.in_path)
    
        renderer = poppler.PageRenderer()
        renderer.set_render_hint(poppler.RenderHint.antialiasing, True)
        renderer.set_render_hint(poppler.RenderHint.text_antialiasing, True)
        dpi = 300
    
        output = []
        for page_nr in range(pdf.pages):
            page = pdf.create_page(page_nr)
            page_img = renderer.render_page(page, xres=dpi, yres=dpi)
    
            full_img = Image.frombytes(
                "RGBA",
                (page_img.width, page_img.height),
                page_img.data,
                "raw",
                str(page_img.format),
            )
            del page_img
    
            full_img = full_img.convert('L')               # Grayscale
            full_size = full_img.size
    
            codes = pyzbar.decode(full_img, symbols=[pyzbar.ZBarSymbol.QRCODE])
            codes = [c for c in codes if c.type == 'QRCODE' and c.data.startswith(b'MO:')]
            qr = None
            if codes:
                if len(codes) > 1:
                    logger.warning(f'Scan: Strana #{page_nr} obsahuje více QR kódů')
                code = codes[0]
                qr = code.data.decode('US-ASCII')
                # FIXME: Tady by se dala podle kódu otočit stránka
    
            output.append(ScanJobPage(code=qr))
    
            full_img.save(f'{args.out_prefix}-{page_nr:04d}-full.png')
    
            # FIXME: Potřebujeme vytvářet miniaturu?
            small_img = full_img.resize((full_size[0] // 4, full_size[1] // 4))
            small_img.save(f'{args.out_prefix}-{page_nr:04d}-small.png')
    
            logger.debug(f'Scan: Strana #{page_nr}: {qr}')
    
        return output