Skip to content
Snippets Groups Projects
Select Git revision
  • 4f4e722f236facbb45141a93ef80dcd3f592f1e4
  • master default
  • zs2021
  • zs1920
4 results

ciferny-soucet.py

Blame
  • __init__.py 5.94 KiB
    # Asynchronní úlohy
    
    from datetime import timedelta
    import os
    from sqlalchemy import or_
    from typing import Optional, Dict, Callable, List
    
    import mo
    import mo.config as config
    import mo.db as db
    import mo.rights
    import mo.util
    from mo.util import logger
    
    
    def send_notify():
        """Pošle notifikaci procesu, který zpracovává joby.
        Běžíme-li jako součást webu, init webu tuto funkci vymění."""
        logger.debug('Job: Není komu poslat notifikaci')
    
    
    def job_file_path(name: str) -> str:
        return os.path.join(mo.util.data_dir('jobs'), name)
    
    
    def job_file_size(name: Optional[str]) -> Optional[int]:
        if name is None:
            return None
    
        try:
            return os.path.getsize(job_file_path(name))
        except OSError:
            return -1
    
    
    class TheJob:
        """Job z pohledu Pythonu."""
    
        job: db.Job
        job_id: Optional[int]
        gatekeeper: Optional[mo.rights.Gatekeeper]
        errors: List[str]
    
        def __init__(self, job_id: Optional[int] = None):
            """Pokud chceme pracovat s existujícím jobem, zadáme jeho ID."""
            self.job_id = job_id
            self.errors = []
    
        def load(self) -> db.Job:
            sess = db.get_session()
            self.job = sess.query(db.Job).with_for_update().get(self.job_id)
            return self.job
    
        def create(self, type: db.JobType, for_user: db.User) -> db.Job:
            self.job = db.Job(type=type, state=db.JobState.ready, user=for_user)
            return self.job
    
        def attach_file(self, tmp_name: str, suffix: str):
            """Vytvoří hardlink na daný pracovní soubor v adresáři jobů."""
    
            full_name = mo.util.link_to_dir(tmp_name, mo.util.data_dir('jobs'), suffix=suffix)
            name = os.path.basename(full_name)
            logger.debug(f'Job: Příloha {tmp_name} -> {name}')
            return name
    
        def submit(self):
            sess = db.get_session()
            sess.add(self.job)
            sess.flush()
            self.job_id = self.job.job_id
            logger.info(f'Job: Vytvořen job #{self.job_id} pro uživatele #{self.job.user_id}')
            sess.commit()
            send_notify()
    
        def _finish_remove(self):
            sess = db.get_session()
            job = self.job
    
            if job.in_file is not None:
                mo.util.unlink_if_exists(job_file_path(job.in_file))
    
            if job.out_file is not None:
                mo.util.unlink_if_exists(job_file_path(job.out_file))
    
            sess.delete(job)
            sess.commit()
    
        def expire(self):
            sess = db.get_session()
            if not self.load():
                # Mezitím úlohu smazal někdo jiný
                logger.info(f'Job: Job #{self.job.job_id} expiroval někdo jiný')
                sess.rollback()
                return
    
            job = self.job
            logger.info(f'Job: Expiruji job #{job.job_id}')
            assert job.state in (db.JobState.done, db.JobState.failed)
            return self._finish_remove()
    
        def remove_loaded(self):
            logger.info(f'Job: Ruším job #{self.job.job_id}')
            return self._finish_remove()
    
        def error(self, msg):
            logger.info(f'Job: >> {msg}')
            self.errors.append(msg)
    
        def run(self):
            sess = db.get_session()
            if not self.load() or self.job.state != db.JobState.ready:
                # Někdo ho mezitím smazal nebo vyřídil
                logger.info(f'Job: Job #{self.job_id} vyřizuje někdo jiný')
                sess.rollback()
                return
    
            job = self.job
            logger.info(f'Job: Spouštím job #{job.job_id} ({job.type}) uživatele #{job.user_id}')
            job.state = db.JobState.running
            sess.commit()
    
            try:
                self.gatekeeper = mo.rights.Gatekeeper(job.user)
                _handler_table[job.type](self)
                if self.errors:
                    logger.info(f'Job: Neúspěšně dokončen job #{job.job_id} ({job.result})')
                    job.state = db.JobState.failed
                    job.out_json = {'errors': self.errors}
                    if job.result == "":
                        job.result = 'Došlo k chybám, viz detail'
                else:
                    logger.info(f'Job: Úspěšně dokončen job #{job.job_id} ({job.result})')
                    job.state = db.JobState.done
            except Exception as e:
                logger.error(f'Job: Chyba při zpracování jobu #{job.job_id}: %s', e, exc_info=e)
                job.state = db.JobState.failed
                job.result = 'Interní chyba, informujte prosím správce systému.'
    
            job.finished_at = mo.util.get_now()
            job.expires_at = job.finished_at + timedelta(minutes=config.JOB_EXPIRATION)
            sess.commit()
    
    
    def process_jobs():
        sess = db.get_session()
        assert hasattr(mo, 'now'), 'mo.now není nastaveno'
    
        logger.debug('Job: Zpracovávám frontu')
    
        # Vyhýbáme se dlouho běžícím transakcím, abychom nekolidovali s ostatními procesy.
        # Proto zpracování každého jobu běží ve své vlastní transakci a počítáme s tím,
        # že mezi nalezením jobu a jeho zpracováním se stav mohl změnit.
    
        # Nejprve se podíváme, jestli máme nějaké expirované joby ke smazání
        expired = (sess.query(db.Job.job_id)
                   .filter(or_(db.Job.state == db.JobState.done, db.Job.state == db.JobState.failed))
                   .filter(db.Job.expires_at < mo.now)
                   .all())
        sess.rollback()
        for job_id, in expired:
            tj = TheJob(job_id)
            tj.expire()
    
        # Probereme joby, které by měly běžet
        ready = (sess.query(db.Job.job_id)
                 .filter_by(state=db.JobState.ready)
                 .order_by(db.Job.created_at)
                 .all())
        sess.rollback()
        for job_id in ready:
            tj = TheJob(job_id)
            tj.run()
    
    
    _handler_table: Dict[db.JobType, Callable[[TheJob], None]] = {}
    
    
    def register_handler(type: db.JobType, handler: Callable[[TheJob], None]):
        assert type not in _handler_table
        _handler_table[type] = handler
    
    
    def job_handler(type: db.JobType):
        def decorate(func):
            register_handler(type, func)
            return func
    
        return decorate
    
    
    # Moduly implementující jednotlivé typy jobů
    import mo.jobs.submit