Skip to content
Snippets Groups Projects

Generování protokolů a zpracování scanů

Merged Martin Mareš requested to merge mj/protokoly into devel
2 files
+ 121
0
Compare changes
  • Side-by-side
  • Inline

Files

+ 59
38
@@ -2,6 +2,7 @@
from datetime import timedelta
import os
import shutil
from sqlalchemy import or_
from typing import Optional, Dict, Callable, List
@@ -19,20 +20,6 @@ def send_notify():
logger.debug('Job: Není komu poslat notifikaci')
def job_file_path(name: str) -> str:
return os.path.join(mo.util.data_dir('jobs'), name)
def job_file_size(name: Optional[str]) -> Optional[int]:
if name is None:
return None
try:
return os.path.getsize(job_file_path(name))
except OSError:
return -1
class TheJob:
"""Job z pohledu Pythonu."""
@@ -40,6 +27,7 @@ class TheJob:
job_id: Optional[int]
gatekeeper: Optional[mo.rights.Gatekeeper]
errors: List[str]
expires_in_minutes: int
def __init__(self, job_id: Optional[int] = None):
"""Pokud chceme pracovat s existujícím jobem, zadáme jeho ID."""
@@ -47,40 +35,51 @@ class TheJob:
self.errors = []
def load(self) -> db.Job:
if getattr(self, 'job', None) is None:
sess = db.get_session()
self.job = sess.query(db.Job).with_for_update().get(self.job_id)
return self.job
def create(self, type: db.JobType, for_user: db.User) -> db.Job:
self.job = db.Job(type=type, state=db.JobState.ready, user=for_user)
return self.job
def attach_file(self, tmp_name: str, suffix: str):
"""Vytvoří hardlink na daný pracovní soubor v adresáři jobů."""
self.job = db.Job(type=type, state=db.JobState.preparing, user=for_user)
full_name = mo.util.link_to_dir(tmp_name, mo.util.data_dir('jobs'), suffix=suffix)
name = os.path.basename(full_name)
logger.debug(f'Job: Příloha {tmp_name} -> {name}')
return name
def submit(self):
# Do DB přidáváme nehotový job, protože potřebujeme znát job_id pro založení adresáře
sess = db.get_session()
sess.add(self.job)
sess.flush()
self.job_id = self.job.job_id
logger.info(f'Job: Vytvořen job #{self.job_id} pro uživatele #{self.job.user_id}')
sess.commit()
job_dir = self.job.dir_path()
if os.path.exists(job_dir):
# Hypoteticky by se mohlo stát, že se recykluje job_id od jobu, jehož
# vytvoření selhalo před commitem. Zkusíme tedy smazat prázdný adresář.
os.rmdir(job_dir)
os.mkdir(job_dir)
return self.job
def attach_file(self, tmp_name: str, attachment_name: str) -> str:
"""Vytvoří hardlink na daný pracovní soubor v adresáři jobu."""
full_name = self.job.file_path(attachment_name)
os.link(tmp_name, full_name)
logger.debug(f'Job: Příloha {tmp_name} -> {full_name}')
return attachment_name
def submit(self):
self.job.state = db.JobState.ready
db.get_session().commit()
send_notify()
def _finish_remove(self):
sess = db.get_session()
job = self.job
if job.in_file is not None:
mo.util.unlink_if_exists(job_file_path(job.in_file))
if job.out_file is not None:
mo.util.unlink_if_exists(job_file_path(job.out_file))
job_dir = self.job.dir_path()
if os.path.exists(job_dir):
shutil.rmtree(job_dir)
sess.delete(job)
sess.commit()
@@ -106,21 +105,42 @@ class TheJob:
logger.info(f'Job: >> {msg}')
self.errors.append(msg)
def run(self):
def _check_runnable(self, retry: bool) -> Optional[str]:
s = self.job.state
if s == db.JobState.ready:
return None
elif s == db.JobState.running:
# Může se stát, že ho mezitím začal vyřizovat jiný proces
return 'právě běží'
elif s in (db.JobState.done, db.JobState.failed):
return None if retry else 'je už hotový'
else:
return 'je v neznámém stavu'
def run(self, retry: bool = False):
sess = db.get_session()
if not self.load() or self.job.state != db.JobState.ready:
# Někdo ho mezitím smazal nebo vyřídil
logger.info(f'Job: Job #{self.job_id} vyřizuje někdo jiný')
if not self.load():
# Někdo ho mezitím smazal
logger.info(f'Job: Job #{self.job_id} neexistuje')
sess.rollback()
return
reject_reason = self._check_runnable(retry)
if reject_reason is not None:
logger.info(f'Job: Job #{self.job_id} {reject_reason}')
sess.rollback()
return
job = self.job
logger.info(f'Job: Spouštím job #{job.job_id} ({job.type}) uživatele #{job.user_id}')
job.state = db.JobState.running
job.finished_at = None
job.expires_at = None
sess.commit()
try:
self.gatekeeper = mo.rights.Gatekeeper(job.user)
self.expires_in_minutes = config.JOB_EXPIRATION
_handler_table[job.type](self)
if self.errors:
logger.info(f'Job: Neúspěšně dokončen job #{job.job_id} ({job.result})')
@@ -137,7 +157,7 @@ class TheJob:
job.result = 'Interní chyba, informujte prosím správce systému.'
job.finished_at = mo.util.get_now()
job.expires_at = job.finished_at + timedelta(minutes=config.JOB_EXPIRATION)
job.expires_at = job.finished_at + timedelta(minutes=self.expires_in_minutes)
sess.commit()
@@ -189,4 +209,5 @@ def job_handler(type: db.JobType):
# Moduly implementující jednotlivé typy jobů
import mo.jobs.protocols
import mo.jobs.submit
Loading