Select Git revision
ciferny-soucet.py
-
Martin Mareš authoredMartin Mareš authored
__init__.py 5.94 KiB
# Asynchronní úlohy
from datetime import timedelta
import os
from sqlalchemy import or_
from typing import Optional, Dict, Callable, List
import mo
import mo.config as config
import mo.db as db
import mo.rights
import mo.util
from mo.util import logger
def send_notify():
"""Pošle notifikaci procesu, který zpracovává joby.
Běžíme-li jako součást webu, init webu tuto funkci vymění."""
logger.debug('Job: Není komu poslat notifikaci')
def job_file_path(name: str) -> str:
return os.path.join(mo.util.data_dir('jobs'), name)
def job_file_size(name: Optional[str]) -> Optional[int]:
if name is None:
return None
try:
return os.path.getsize(job_file_path(name))
except OSError:
return -1
class TheJob:
"""Job z pohledu Pythonu."""
job: db.Job
job_id: Optional[int]
gatekeeper: Optional[mo.rights.Gatekeeper]
errors: List[str]
def __init__(self, job_id: Optional[int] = None):
"""Pokud chceme pracovat s existujícím jobem, zadáme jeho ID."""
self.job_id = job_id
self.errors = []
def load(self) -> db.Job:
sess = db.get_session()
self.job = sess.query(db.Job).with_for_update().get(self.job_id)
return self.job
def create(self, type: db.JobType, for_user: db.User) -> db.Job:
self.job = db.Job(type=type, state=db.JobState.ready, user=for_user)
return self.job
def attach_file(self, tmp_name: str, suffix: str):
"""Vytvoří hardlink na daný pracovní soubor v adresáři jobů."""
full_name = mo.util.link_to_dir(tmp_name, mo.util.data_dir('jobs'), suffix=suffix)
name = os.path.basename(full_name)
logger.debug(f'Job: Příloha {tmp_name} -> {name}')
return name
def submit(self):
sess = db.get_session()
sess.add(self.job)
sess.flush()
self.job_id = self.job.job_id
logger.info(f'Job: Vytvořen job #{self.job_id} pro uživatele #{self.job.user_id}')
sess.commit()
send_notify()
def _finish_remove(self):
sess = db.get_session()
job = self.job
if job.in_file is not None:
mo.util.unlink_if_exists(job_file_path(job.in_file))
if job.out_file is not None:
mo.util.unlink_if_exists(job_file_path(job.out_file))
sess.delete(job)
sess.commit()
def expire(self):
sess = db.get_session()
if not self.load():
# Mezitím úlohu smazal někdo jiný
logger.info(f'Job: Job #{self.job.job_id} expiroval někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Expiruji job #{job.job_id}')
assert job.state in (db.JobState.done, db.JobState.failed)
return self._finish_remove()
def remove_loaded(self):
logger.info(f'Job: Ruším job #{self.job.job_id}')
return self._finish_remove()
def error(self, msg):
logger.info(f'Job: >> {msg}')
self.errors.append(msg)
def run(self):
sess = db.get_session()
if not self.load() or self.job.state != db.JobState.ready:
# Někdo ho mezitím smazal nebo vyřídil
logger.info(f'Job: Job #{self.job_id} vyřizuje někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Spouštím job #{job.job_id} ({job.type}) uživatele #{job.user_id}')
job.state = db.JobState.running
sess.commit()
try:
self.gatekeeper = mo.rights.Gatekeeper(job.user)
_handler_table[job.type](self)
if self.errors:
logger.info(f'Job: Neúspěšně dokončen job #{job.job_id} ({job.result})')
job.state = db.JobState.failed
job.out_json = {'errors': self.errors}
if job.result == "":
job.result = 'Došlo k chybám, viz detail'
else:
logger.info(f'Job: Úspěšně dokončen job #{job.job_id} ({job.result})')
job.state = db.JobState.done
except Exception as e:
logger.error(f'Job: Chyba při zpracování jobu #{job.job_id}: %s', e, exc_info=e)
job.state = db.JobState.failed
job.result = 'Interní chyba, informujte prosím správce systému.'
job.finished_at = mo.util.get_now()
job.expires_at = job.finished_at + timedelta(minutes=config.JOB_EXPIRATION)
sess.commit()
def process_jobs():
sess = db.get_session()
assert hasattr(mo, 'now'), 'mo.now není nastaveno'
logger.debug('Job: Zpracovávám frontu')
# Vyhýbáme se dlouho běžícím transakcím, abychom nekolidovali s ostatními procesy.
# Proto zpracování každého jobu běží ve své vlastní transakci a počítáme s tím,
# že mezi nalezením jobu a jeho zpracováním se stav mohl změnit.
# Nejprve se podíváme, jestli máme nějaké expirované joby ke smazání
expired = (sess.query(db.Job.job_id)
.filter(or_(db.Job.state == db.JobState.done, db.Job.state == db.JobState.failed))
.filter(db.Job.expires_at < mo.now)
.all())
sess.rollback()
for job_id, in expired:
tj = TheJob(job_id)
tj.expire()
# Probereme joby, které by měly běžet
ready = (sess.query(db.Job.job_id)
.filter_by(state=db.JobState.ready)
.order_by(db.Job.created_at)
.all())
sess.rollback()
for job_id in ready:
tj = TheJob(job_id)
tj.run()
_handler_table: Dict[db.JobType, Callable[[TheJob], None]] = {}
def register_handler(type: db.JobType, handler: Callable[[TheJob], None]):
assert type not in _handler_table
_handler_table[type] = handler
def job_handler(type: db.JobType):
def decorate(func):
register_handler(type, func)
return func
return decorate
# Moduly implementující jednotlivé typy jobů
import mo.jobs.submit