Select Git revision
jednosmerne-na-konec.py
-
Martin Mareš authoredMartin Mareš authored
__init__.py 4.99 KiB
# Asynchronní úlohy
from datetime import timedelta
import os
import secrets
from sqlalchemy import or_
from typing import Optional, Dict, Callable
import mo
import mo.db as db
import mo.util
from mo.util import logger
def send_notify():
"""Pošle notifikaci procesu, který zpracovává joby.
Běžíme-li jako součást webu, init webu tuto funkci vymění."""
logger.debug('Job: Není komu poslat notifikaci')
def job_file_path(name: str) -> str:
return os.path.join(mo.util.data_dir('jobs'), name)
class TheJob:
"""Job z pohledu Pythonu."""
job: db.Job
job_id: Optional[int]
def __init__(self, job_id: Optional[int] = None):
"""Pokud chceme pracovat s existujícím jobem, zadáme jeho ID."""
self.job_id = job_id
def load(self) -> db.Job:
sess = db.get_session()
self.job = sess.query(db.Job).with_for_update().get(self.job_id)
return self.job
def create(self, type: db.JobType, for_user: db.User) -> db.Job:
self.job = db.Job(type=type, state=db.JobState.ready, user=for_user)
return self.job
def attach_file(self, tmp_name: str, suffix: str):
"""Vytvoří hardlink na daný pracovní soubor v adresáři jobů."""
while True:
name = secrets.token_hex(16) + suffix
try:
os.link(tmp_name, job_file_path(name))
break
except FileExistsError:
pass
logger.debug(f'Job: Příloha {tmp_name} -> {name}')
return name
def submit(self):
sess = db.get_session()
sess.add(self.job)
sess.flush()
self.job_id = self.job.job_id
logger.info(f'Job: Vytvořen job #{self.job_id}')
sess.commit()
send_notify()
def _finish_remove(self):
sess = db.get_session()
job = self.job
if job.in_file is not None:
os.unlink(job_file_path(job.in_file))
if job.out_file is not None:
os.unlink(job_file_path(job.out_file))
sess.delete(job)
sess.commit()
def expire(self):
sess = db.get_session()
if not self.load():
# Mezitím úlohu smazal někdo jiný
logger.info(f'Job: Job #{self.job.job_id} expiroval někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Expiruji job #{job.job_id}')
assert job.state in (db.JobState.done, db.JobState.failed)
return self._finish_remove()
def remove_loaded(self):
logger.info(f'Job: Ruším job #{self.job.job_id}')
return self._finish_remove()
def run(self):
sess = db.get_session()
if not self.load() or self.job.state != db.JobState.ready:
# Někdo ho mezitím smazal nebo vyřídil
logger.info(f'Job: Job #{self.job.job_id} vyřizuje někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Spouštím job #{job.job_id} ({job.type})')
job.state = db.JobState.running
sess.commit()
try:
_handler_table[job.type](self)
logger.info(f'Job: Úspěšně dokončen job #{job.job_id}')
job.state = db.JobState.done
except Exception as e:
logger.error(f'Job: Chyba při zpracování jobu #{job.job_id}: %s', e, exc_info=e)
job.state = db.JobState.failed
return
job.finished_at = mo.util.get_now()
job.expires_at = job.finished_at + timedelta(minutes=5) # FIXME
sess.commit()
def process_jobs():
sess = db.get_session()
assert hasattr(mo, 'now'), 'mo.now není nastaveno'
logger.debug('Job: Zpracovávám frontu')
# Vyhýbáme se dlouho běžícím transakcím, abychom nekolidovali s ostatními procesy.
# Proto zpracování každého jobu běží ve své vlastní transakci a počítáme s tím,
# že mezi nalezením jobu a jeho zpracováním se stav mohl změnit.
# Nejprve se podíváme, jestli máme nějaké expirované joby ke smazání
expired = (sess.query(db.Job.job_id)
.filter(or_(db.Job.state == db.JobState.done, db.Job.state == db.JobState.failed))
.filter(db.Job.expires_at < mo.now)
.all())
sess.rollback()
for job_id, in expired:
tj = TheJob(job_id)
tj.expire()
# Probereme joby, které by měly běžet
ready = (sess.query(db.Job.job_id)
.filter_by(state=db.JobState.ready)
.all())
sess.rollback()
for job_id in ready:
tj = TheJob(job_id)
tj.run()
_handler_table: Dict[db.JobType, Callable[[TheJob], None]] = {}
def register_handler(type: db.JobType, handler: Callable[[TheJob], None]):
assert type not in _handler_table
_handler_table[type] = handler
def job_handler(type: db.JobType):
def decorate(func):
register_handler(type, func)
return func
return decorate
# Moduly implementující jednotlivé typy jobů
import mo.jobs.submit