Skip to content
Snippets Groups Projects
Commit d33c3c91 authored by Martin Mareš's avatar Martin Mareš
Browse files

Společná infrastruktura pro asynchronní joby

parent 4a1190fe
No related branches found
No related tags found
2 merge requests!14Asynchronní joby,!9WIP: Zárodek uživatelské části webu a submitování
......@@ -23,7 +23,7 @@ else
fi
echo "Zakládám adresáře"
mkdir -p $DEST/{log,var,data/{imports,submits,tmp}}
mkdir -p $DEST/{log,var,data/{imports,jobs,submits,tmp}}
echo "Instaluji balíček"
pip install -c constraints.txt .
......
......@@ -21,3 +21,6 @@ WEB_ROOT = 'https://mo.mff.cuni.cz/osmo-test/'
# Maximální velikost uploadu. Pozor, je omezena i konfigurací Nginxu.
MAX_CONTENT_LENGTH = 16777216
# Adresář, do kterého ukládáme data (pro vývoj relativní, pro instalaci absolutní)
DATA_DIR = 'data'
# Asynchronní úlohy
from datetime import timedelta
import os
import secrets
from sqlalchemy import or_
from typing import Optional, Dict, Callable
import mo
import mo.db as db
import mo.util
from mo.util import logger
def send_notify():
"""Pošle notifikaci procesu, který zpracovává joby.
Běžíme-li jako součást webu, init webu tuto funkci vymění."""
logger.debug('Job: Není komu poslat notifikaci')
def job_file_path(name: str) -> str:
return os.path.join(mo.util.data_dir('jobs'), name)
class TheJob:
"""Job z pohledu Pythonu."""
job: db.Job
job_id: Optional[int]
def __init__(self, job_id: Optional[int] = None):
"""Pokud chceme pracovat s existujícím jobem, zadáme jeho ID."""
self.job_id = job_id
def load(self) -> db.Job:
sess = db.get_session()
self.job = sess.query(db.Job).with_for_update().get(self.job_id)
return self.job
def create(self, type: db.JobType, for_user: db.User) -> db.Job:
self.job = db.Job(type=type, state=db.JobState.ready, user=for_user)
return self.job
def attach_file(self, tmp_name: str, suffix: str):
"""Vytvoří hardlink na daný pracovní soubor v adresáři jobů."""
while True:
name = secrets.token_hex(16) + suffix
try:
os.link(tmp_name, job_file_path(name))
break
except FileExistsError:
pass
logger.debug(f'Job: Příloha {tmp_name} -> {name}')
return name
def submit(self):
sess = db.get_session()
sess.add(self.job)
sess.flush()
self.job_id = self.job.job_id
logger.info(f'Job: Vytvořen job #{self.job_id}')
sess.commit()
send_notify()
def _finish_remove(self):
sess = db.get_session()
job = self.job
if job.in_file is not None:
os.unlink(job_file_path(job.in_file))
if job.out_file is not None:
os.unlink(job_file_path(job.out_file))
sess.delete(job)
sess.commit()
def expire(self):
sess = db.get_session()
if not self.load():
# Mezitím úlohu smazal někdo jiný
logger.info(f'Job: Job #{self.job.job_id} expiroval někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Expiruji job #{job.job_id}')
assert job.state in (db.JobStatedb.JobState.done, db.JobState.failed)
return self._finish_remove()
def remove_loaded(self):
logger.info(f'Job: Ruším job #{self.job.job_id}')
return self._finish_remove()
def run(self):
sess = db.get_session()
if not self.load() or self.job.state != db.JobState.ready:
# Někdo ho mezitím smazal nebo vyřídil
logger.info(f'Job: Job #{self.job.job_id} vyřizuje někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Spouštím job #{job.job_id} ({job.type})')
job.state = db.JobState.running
sess.commit()
try:
_handler_table[job.type](self)
logger.info(f'Job: Úspěšně dokončen job #{job.job_id}')
job.state = db.JobState.done
except Exception as e:
logger.error(f'Job: Chyba při zpracování jobu #{job.job_id}: %s', e, exc_info=e)
job.state = db.JobState.failed
return
job.finished_at = mo.util.get_now()
job.expires_at = job.finished_at + timedelta(minutes=5) # FIXME
sess.commit()
def process_jobs():
sess = db.get_session()
assert hasattr(mo, 'now'), 'mo.now není nastaveno'
logger.debug('Job: Zpracovávám frontu')
# Vyhýbáme se dlouho běžícím transakcím, abychom nekolidovali s ostatními procesy.
# Proto zpracování každého jobu běží ve své vlastní transakci a počítáme s tím,
# že mezi nalezením jobu a jeho zpracováním se stav mohl změnit.
# Nejprve se podíváme, jestli máme nějaké expirované joby ke smazání
expired = (sess.query(db.Job.job_id)
.filter(or_(db.Job.state == db.JobState.done, db.Job.state == db.JobState.failed))
.filter(db.Job.expires_at < mo.now)
.all())
sess.rollback()
for job_id, in expired:
tj = TheJob(job_id)
tj.expire()
# Probereme joby, které by měly běžet
ready = (sess.query(db.Job.job_id)
.filter_by(state=db.JobState.ready)
.all())
sess.rollback()
for job_id in ready:
tj = TheJob(job_id)
tj.run()
_handler_table: Dict[db.JobType, Callable[[TheJob], None]] = {}
def register_handler(type: db.JobType, handler: Callable[[TheJob], None]):
assert type not in _handler_table
_handler_table[type] = handler
def job_handler(type: db.JobType):
def decorate(func):
register_handler(type, func)
return func
return decorate
# Moduly implementující jednotlivé typy jobů
import mo.jobs.submit
# Implementace jobů pracujících se submity
from mo.util import logger
import mo.db as db
from mo.jobs import TheJob, job_handler
@job_handler(db.JobType.download_submits)
def handle_download_submits(tj: TheJob):
logger.debug('Entering download_submits')
pass
@job_handler(db.JobType.upload_feedback)
def handle_upload_feedback(tj: TheJob):
logger.debug('Entering upload_feedback')
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment