Skip to content
Snippets Groups Projects

Zjednodušení práce s joby

Merged Jiří Setnička requested to merge jirka/jobs into devel
1 file
+ 1
1
Compare changes
  • Side-by-side
  • Inline
+ 7
6
@@ -16,7 +16,7 @@ import mo.util
from mo.util import logger, ExceptionInfo
def send_notify():
def send_notify(priority: int):
"""Pošle notifikaci procesu, který zpracovává joby.
Běžíme-li jako součást webu, init webu tuto funkci vymění."""
logger.debug('Job: Není komu poslat notifikaci')
@@ -42,8 +42,8 @@ class TheJob:
self.job = sess.query(db.Job).with_for_update().get(self.job_id)
return self.job
def create(self, type: db.JobType, for_user: db.User) -> db.Job:
self.job = db.Job(type=type, state=db.JobState.preparing, user=for_user)
def create(self, type: db.JobType, for_user: db.User, priority: int = 0) -> db.Job:
self.job = db.Job(type=type, state=db.JobState.preparing, user=for_user, priority=priority)
# Do DB přidáváme nehotový job, protože potřebujeme znát job_id pro založení adresáře
sess = db.get_session()
@@ -73,7 +73,7 @@ class TheJob:
def submit(self):
self.job.state = db.JobState.ready
db.get_session().commit()
send_notify()
send_notify(self.job.priority)
def _finish_remove(self):
sess = db.get_session()
@@ -184,7 +184,7 @@ class TheJob:
mo.email.send_internal_error_email(f'Job #{job.job_id}', err_attrs, exc_info)
def process_jobs():
def process_jobs(min_priority: int = 0):
sess = db.get_session()
assert hasattr(mo, 'now'), 'mo.now není nastaveno'
@@ -207,10 +207,11 @@ def process_jobs():
# Probereme joby, které by měly běžet
ready = (sess.query(db.Job.job_id)
.filter_by(state=db.JobState.ready)
.filter(db.Job.priority >= min_priority)
.order_by(db.Job.created_at)
.all())
sess.rollback()
for job_id in ready:
for job_id, in ready:
tj = TheJob(job_id)
tj.run()
Loading