diff --git a/bin/deploy b/bin/deploy index 5b91c1982f51fb4f1050f4733e99e92945e40f01..1f0c2d0451e669c2cf9cdd30ef0650182dee9901 100755 --- a/bin/deploy +++ b/bin/deploy @@ -23,7 +23,7 @@ else fi echo "Zakládám adresáře" -mkdir -p $DEST/{log,var,data/{imports,submits,tmp}} +mkdir -p $DEST/{log,var,data/{imports,jobs,submits,tmp}} echo "Instaluji balíček" pip install -c constraints.txt . diff --git a/etc/config.py.example b/etc/config.py.example index 9b7af82e8d267238b255db90abddee810ee839fb..f73c340ff9ff51211387f400732a8d5a086f64cc 100644 --- a/etc/config.py.example +++ b/etc/config.py.example @@ -21,3 +21,6 @@ WEB_ROOT = 'https://mo.mff.cuni.cz/osmo-test/' # Maximální velikost uploadu. Pozor, je omezena i konfigurací Nginxu. MAX_CONTENT_LENGTH = 16777216 + +# Adresář, do kterého ukládáme data (pro vývoj relativní, pro instalaci absolutní) +DATA_DIR = 'data' diff --git a/mo/jobs/__init__.py b/mo/jobs/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8110ce5c02c46ec78320e01ffdcc00c0eb3c24af --- /dev/null +++ b/mo/jobs/__init__.py @@ -0,0 +1,171 @@ +# Asynchronní úlohy + +from datetime import timedelta +import os +import secrets +from sqlalchemy import or_ +from typing import Optional, Dict, Callable + +import mo +import mo.db as db +import mo.util +from mo.util import logger + + +def send_notify(): + """Pošle notifikaci procesu, který zpracovává joby. + Běžíme-li jako součást webu, init webu tuto funkci vymění.""" + logger.debug('Job: Není komu poslat notifikaci') + + +def job_file_path(name: str) -> str: + return os.path.join(mo.util.data_dir('jobs'), name) + + +class TheJob: + """Job z pohledu Pythonu.""" + + job: db.Job + job_id: Optional[int] + + def __init__(self, job_id: Optional[int] = None): + """Pokud chceme pracovat s existujícím jobem, zadáme jeho ID.""" + self.job_id = job_id + + def load(self) -> db.Job: + sess = db.get_session() + self.job = sess.query(db.Job).with_for_update().get(self.job_id) + return self.job + + def create(self, type: db.JobType, for_user: db.User) -> db.Job: + self.job = db.Job(type=type, state=db.JobState.ready, user=for_user) + return self.job + + def attach_file(self, tmp_name: str, suffix: str): + """Vytvoří hardlink na daný pracovní soubor v adresáři jobů.""" + + while True: + name = secrets.token_hex(16) + suffix + try: + os.link(tmp_name, job_file_path(name)) + break + except FileExistsError: + pass + + logger.debug(f'Job: Příloha {tmp_name} -> {name}') + return name + + def submit(self): + sess = db.get_session() + sess.add(self.job) + sess.flush() + self.job_id = self.job.job_id + logger.info(f'Job: Vytvořen job #{self.job_id}') + sess.commit() + send_notify() + + def _finish_remove(self): + sess = db.get_session() + job = self.job + + if job.in_file is not None: + os.unlink(job_file_path(job.in_file)) + + if job.out_file is not None: + os.unlink(job_file_path(job.out_file)) + + sess.delete(job) + sess.commit() + + def expire(self): + sess = db.get_session() + if not self.load(): + # Mezitím úlohu smazal někdo jiný + logger.info(f'Job: Job #{self.job.job_id} expiroval někdo jiný') + sess.rollback() + return + + job = self.job + logger.info(f'Job: Expiruji job #{job.job_id}') + assert job.state in (db.JobStatedb.JobState.done, db.JobState.failed) + return self._finish_remove() + + def remove_loaded(self): + logger.info(f'Job: Ruším job #{self.job.job_id}') + return self._finish_remove() + + def run(self): + sess = db.get_session() + if not self.load() or self.job.state != db.JobState.ready: + # Někdo ho mezitím smazal nebo vyřídil + logger.info(f'Job: Job #{self.job.job_id} vyřizuje někdo jiný') + sess.rollback() + return + + job = self.job + logger.info(f'Job: Spouštím job #{job.job_id} ({job.type})') + job.state = db.JobState.running + sess.commit() + + try: + _handler_table[job.type](self) + logger.info(f'Job: Úspěšně dokončen job #{job.job_id}') + job.state = db.JobState.done + except Exception as e: + logger.error(f'Job: Chyba při zpracování jobu #{job.job_id}: %s', e, exc_info=e) + job.state = db.JobState.failed + return + + job.finished_at = mo.util.get_now() + job.expires_at = job.finished_at + timedelta(minutes=5) # FIXME + sess.commit() + + +def process_jobs(): + sess = db.get_session() + assert hasattr(mo, 'now'), 'mo.now není nastaveno' + + logger.debug('Job: Zpracovávám frontu') + + # Vyhýbáme se dlouho běžícím transakcím, abychom nekolidovali s ostatními procesy. + # Proto zpracování každého jobu běží ve své vlastní transakci a počítáme s tím, + # že mezi nalezením jobu a jeho zpracováním se stav mohl změnit. + + # Nejprve se podíváme, jestli máme nějaké expirované joby ke smazání + expired = (sess.query(db.Job.job_id) + .filter(or_(db.Job.state == db.JobState.done, db.Job.state == db.JobState.failed)) + .filter(db.Job.expires_at < mo.now) + .all()) + sess.rollback() + for job_id, in expired: + tj = TheJob(job_id) + tj.expire() + + # Probereme joby, které by měly běžet + ready = (sess.query(db.Job.job_id) + .filter_by(state=db.JobState.ready) + .all()) + sess.rollback() + for job_id in ready: + tj = TheJob(job_id) + tj.run() + + +_handler_table: Dict[db.JobType, Callable[[TheJob], None]] = {} + + +def register_handler(type: db.JobType, handler: Callable[[TheJob], None]): + assert type not in _handler_table + _handler_table[type] = handler + + +def job_handler(type: db.JobType): + def decorate(func): + register_handler(type, func) + return func + + return decorate + + +# Moduly implementující jednotlivé typy jobů +import mo.jobs.submit diff --git a/mo/jobs/submit.py b/mo/jobs/submit.py new file mode 100644 index 0000000000000000000000000000000000000000..9ae525e5d208af7e3fba27c9b46279c00cf12dd8 --- /dev/null +++ b/mo/jobs/submit.py @@ -0,0 +1,17 @@ +# Implementace jobů pracujících se submity + +from mo.util import logger +import mo.db as db +from mo.jobs import TheJob, job_handler + + +@job_handler(db.JobType.download_submits) +def handle_download_submits(tj: TheJob): + logger.debug('Entering download_submits') + pass + + +@job_handler(db.JobType.upload_feedback) +def handle_upload_feedback(tj: TheJob): + logger.debug('Entering upload_feedback') + pass