Select Git revision
org_contest_list.html
__init__.py 6.86 KiB
# Asynchronní úlohy
from datetime import timedelta
import os
import shutil
from sqlalchemy import or_
from typing import Optional, Dict, Callable, List
import mo
import mo.config as config
import mo.db as db
import mo.rights
import mo.util
from mo.util import logger
def send_notify():
"""Pošle notifikaci procesu, který zpracovává joby.
Běžíme-li jako součást webu, init webu tuto funkci vymění."""
logger.debug('Job: Není komu poslat notifikaci')
class TheJob:
"""Job z pohledu Pythonu."""
job: db.Job
job_id: Optional[int]
gatekeeper: Optional[mo.rights.Gatekeeper]
errors: List[str]
expires_in_minutes: int
def __init__(self, job_id: Optional[int] = None):
"""Pokud chceme pracovat s existujícím jobem, zadáme jeho ID."""
self.job_id = job_id
self.errors = []
def load(self) -> db.Job:
if getattr(self, 'job', None) is None:
sess = db.get_session()
self.job = sess.query(db.Job).with_for_update().get(self.job_id)
return self.job
def create(self, type: db.JobType, for_user: db.User) -> db.Job:
self.job = db.Job(type=type, state=db.JobState.preparing, user=for_user)
# Do DB přidáváme nehotový job, protože potřebujeme znát job_id pro založení adresáře
sess = db.get_session()
sess.add(self.job)
sess.flush()
self.job_id = self.job.job_id
logger.info(f'Job: Vytvořen job #{self.job_id} pro uživatele #{self.job.user_id}')
job_dir = self.job.dir_path()
if os.path.exists(job_dir):
# Hypoteticky by se mohlo stát, že se recykluje job_id od jobu, jehož
# vytvoření selhalo před commitem. Zkusíme tedy smazat prázdný adresář.
os.rmdir(job_dir)
os.mkdir(job_dir)
return self.job
def attach_file(self, tmp_name: str, attachment_name: str) -> str:
"""Vytvoří hardlink na daný pracovní soubor v adresáři jobu."""
full_name = self.job.file_path(attachment_name)
os.link(tmp_name, full_name)
logger.debug(f'Job: Příloha {tmp_name} -> {full_name}')
return attachment_name
def submit(self):
self.job.state = db.JobState.ready
db.get_session().commit()
send_notify()
def _finish_remove(self):
sess = db.get_session()
job = self.job
job_dir = self.job.dir_path()
if os.path.exists(job_dir):
shutil.rmtree(job_dir)
sess.delete(job)
sess.commit()
def expire(self):
sess = db.get_session()
if not self.load():
# Mezitím úlohu smazal někdo jiný
logger.info(f'Job: Job #{self.job_id} expiroval někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Expiruji job #{job.job_id}')
assert job.state in (db.JobState.done, db.JobState.failed)
return self._finish_remove()
def remove_loaded(self):
logger.info(f'Job: Ruším job #{self.job.job_id}')
return self._finish_remove()
def error(self, msg):
logger.info(f'Job: >> {msg}')
self.errors.append(msg)
def _check_runnable(self, retry: bool) -> Optional[str]:
s = self.job.state
if s == db.JobState.ready:
return None
elif s == db.JobState.running:
# Může se stát, že ho mezitím začal vyřizovat jiný proces
return 'právě běží'
elif s in (db.JobState.done, db.JobState.failed):
return None if retry else 'je už hotový'
else:
return 'je v neznámém stavu'
def run(self, retry: bool = False):
sess = db.get_session()
if not self.load():
# Někdo ho mezitím smazal
logger.info(f'Job: Job #{self.job_id} neexistuje')
sess.rollback()
return
reject_reason = self._check_runnable(retry)
if reject_reason is not None:
logger.info(f'Job: Job #{self.job_id} {reject_reason}')
sess.rollback()
return
job = self.job
logger.info(f'Job: Spouštím job #{job.job_id} ({job.type}) uživatele #{job.user_id}')
job.state = db.JobState.running
job.finished_at = None
job.expires_at = None
sess.commit()
try:
self.gatekeeper = mo.rights.Gatekeeper(job.user)
self.expires_in_minutes = config.JOB_EXPIRATION
_handler_table[job.type](self)
if self.errors:
logger.info(f'Job: Neúspěšně dokončen job #{job.job_id} ({job.result})')
job.state = db.JobState.failed
job.out_json = {'errors': self.errors}
if job.result == "":
job.result = 'Došlo k chybám, viz detail'
else:
logger.info(f'Job: Úspěšně dokončen job #{job.job_id} ({job.result})')
job.state = db.JobState.done
except Exception as e:
logger.error(f'Job: Chyba při zpracování jobu #{job.job_id}: %s', e, exc_info=e)
job.state = db.JobState.failed
job.result = 'Interní chyba, informujte prosím správce systému.'
job.finished_at = mo.util.get_now()
job.expires_at = job.finished_at + timedelta(minutes=self.expires_in_minutes)
sess.commit()
def process_jobs():
sess = db.get_session()
assert hasattr(mo, 'now'), 'mo.now není nastaveno'
logger.debug('Job: Zpracovávám frontu')
# Vyhýbáme se dlouho běžícím transakcím, abychom nekolidovali s ostatními procesy.
# Proto zpracování každého jobu běží ve své vlastní transakci a počítáme s tím,
# že mezi nalezením jobu a jeho zpracováním se stav mohl změnit.
# Nejprve se podíváme, jestli máme nějaké expirované joby ke smazání
expired = (sess.query(db.Job.job_id)
.filter(or_(db.Job.state == db.JobState.done, db.Job.state == db.JobState.failed))
.filter(db.Job.expires_at < mo.now)
.all())
sess.rollback()
for job_id, in expired:
tj = TheJob(job_id)
tj.expire()
# Probereme joby, které by měly běžet
ready = (sess.query(db.Job.job_id)
.filter_by(state=db.JobState.ready)
.order_by(db.Job.created_at)
.all())
sess.rollback()
for job_id in ready:
tj = TheJob(job_id)
tj.run()
_handler_table: Dict[db.JobType, Callable[[TheJob], None]] = {}
def register_handler(type: db.JobType, handler: Callable[[TheJob], None]):
assert type not in _handler_table
_handler_table[type] = handler
def job_handler(type: db.JobType):
def decorate(func):
register_handler(type, func)
return func
return decorate
# Moduly implementující jednotlivé typy jobů
import mo.jobs.protocols
import mo.jobs.submit