Skip to content
Snippets Groups Projects

Asynchronní joby

1 file
+ 22
13
Compare changes
  • Side-by-side
  • Inline
+ 22
13
@@ -3,10 +3,10 @@
from datetime import timedelta
import os
import secrets
from sqlalchemy import or_
from typing import Optional, Dict, Callable
import mo
import mo.config as config
import mo.db as db
import mo.util
from mo.util import logger
@@ -19,7 +19,7 @@ def send_notify():
def job_file_path(name: str) -> str:
return os.path.join(config.DATA_DIR, 'jobs', name)
return os.path.join(mo.util.data_dir('jobs'), name)
class TheJob:
@@ -64,17 +64,9 @@ class TheJob:
sess.commit()
send_notify()
def expire(self, job_id: int):
def _finish_remove(self):
sess = db.get_session()
if not self.load():
# Mezitím úlohu smazal někdo jiný
logger.info(f'Job: Job #{self.job.job_id} expiroval někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Expiruji job #{job.job_id}')
assert job.state in (db.JobState.done, db.JobState.failed)
if job.in_file is not None:
os.unlink(job_file_path(job.in_file))
@@ -85,6 +77,23 @@ class TheJob:
sess.delete(job)
sess.commit()
def expire(self):
sess = db.get_session()
if not self.load():
# Mezitím úlohu smazal někdo jiný
logger.info(f'Job: Job #{self.job.job_id} expiroval někdo jiný')
sess.rollback()
return
job = self.job
logger.info(f'Job: Expiruji job #{job.job_id}')
assert job.state in (db.JobStatedb.JobState.done, db.JobState.failed)
return self._finish_remove()
def remove_loaded(self):
logger.info(f'Job: Ruším job #{self.job.job_id}')
return self._finish_remove()
def run(self):
sess = db.get_session()
if not self.load() or self.job.state != db.JobState.ready:
@@ -124,11 +133,11 @@ def process_jobs():
# Nejprve se podíváme, jestli máme nějaké expirované joby ke smazání
expired = (sess.query(db.Job.job_id)
.filter(db.Job.state == db.JobState.done or db.Job.state == db.JobState.failed)
.filter(or_(db.Job.state == db.JobState.done, db.Job.state == db.JobState.failed))
.filter(db.Job.expires_at < mo.now)
.all())
sess.rollback()
for job_id in expired:
for job_id, in expired:
tj = TheJob(job_id)
tj.expire()
Loading