From c3f8387d577f560d8cd395f019878f3c5f0a83b6 Mon Sep 17 00:00:00 2001
From: Martin Mares <mj@ucw.cz>
Date: Sat, 3 Jul 2021 13:48:42 +0200
Subject: [PATCH] =?UTF-8?q?Jobs:=20Ka=C5=BEd=C3=BD=20job=20m=C3=A1=20sv?=
=?UTF-8?q?=C5=AFj=20adres=C3=A1=C5=99?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Pro každý job založíme data/jobs/<job_id> a všechny soubory, které
k jobu patří, ukládáme tam.
Umožní nám to:
(1) Lépe diagnostikovat joby, které skončily chybou, protože
všechny pracovní soubory zůstanou zachované.
(2) Joby s více výstupními soubory (to bude potřeba pro zpracování scanů).
(3) Hezky pojmenované pracovní (a tím pádem i výstupní) soubory.
---
mo/db.py | 10 ++++++++
mo/jobs/__init__.py | 59 +++++++++++++++++++++------------------------
mo/web/org_jobs.py | 18 +++++++++++---
mo/web/util.py | 2 +-
4 files changed, 54 insertions(+), 35 deletions(-)
diff --git a/mo/db.py b/mo/db.py
index 7d0ef659..afa1ccca 100644
--- a/mo/db.py
+++ b/mo/db.py
@@ -5,6 +5,7 @@ import datetime
import decimal
from enum import Enum as PythonEnum, auto
import locale
+import os
import re
from sqlalchemy import \
Boolean, Column, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint, \
@@ -22,6 +23,7 @@ from sqlalchemy.sql.sqltypes import Numeric
from typing import Optional, List, Tuple
import mo
+import mo.config as config
from mo.place_level import place_levels, PlaceLevel
from mo.util_format import timeformat_short, timedelta, time_and_timedelta
@@ -728,6 +730,14 @@ class Job(Base):
user = relationship('User')
+ def dir_path(self) -> str:
+ """Adresář se soubory příslušejícími k jobu."""
+ # Nepoužíváme mo.util.data_dir, abychom se vyhnuli cyklické závislosti modulů.
+ return os.path.join(config.DATA_DIR, 'jobs', str(self.job_id))
+
+ def file_path(self, name: str) -> str:
+ return os.path.join(self.dir_path(), name)
+
class Message(Base):
__tablename__ = 'messages'
diff --git a/mo/jobs/__init__.py b/mo/jobs/__init__.py
index 31560882..d37becbc 100644
--- a/mo/jobs/__init__.py
+++ b/mo/jobs/__init__.py
@@ -2,6 +2,7 @@
from datetime import timedelta
import os
+import shutil
from sqlalchemy import or_
from typing import Optional, Dict, Callable, List
@@ -19,20 +20,6 @@ def send_notify():
logger.debug('Job: Není komu poslat notifikaci')
-def job_file_path(name: str) -> str:
- return os.path.join(mo.util.data_dir('jobs'), name)
-
-
-def job_file_size(name: Optional[str]) -> Optional[int]:
- if name is None:
- return None
-
- try:
- return os.path.getsize(job_file_path(name))
- except OSError:
- return -1
-
-
class TheJob:
"""Job z pohledu Pythonu."""
@@ -52,35 +39,45 @@ class TheJob:
return self.job
def create(self, type: db.JobType, for_user: db.User) -> db.Job:
- self.job = db.Job(type=type, state=db.JobState.ready, user=for_user)
- return self.job
-
- def attach_file(self, tmp_name: str, suffix: str):
- """Vytvoří hardlink na daný pracovní soubor v adresáři jobů."""
-
- full_name = mo.util.link_to_dir(tmp_name, mo.util.data_dir('jobs'), suffix=suffix)
- name = os.path.basename(full_name)
- logger.debug(f'Job: Příloha {tmp_name} -> {name}')
- return name
+ self.job = db.Job(type=type, state=db.JobState.preparing, user=for_user)
- def submit(self):
+ # Do DB přidáváme nehotový job, protože potřebujeme znát job_id pro založení adresáře
sess = db.get_session()
sess.add(self.job)
sess.flush()
self.job_id = self.job.job_id
+
logger.info(f'Job: Vytvořen job #{self.job_id} pro uživatele #{self.job.user_id}')
- sess.commit()
+
+ job_dir = self.job.dir_path()
+ if os.path.exists(job_dir):
+ # Hypoteticky by se mohlo stát, že se recykluje job_id od jobu, jehož
+ # vytvoření selhalo před commitem. Zkusíme tedy smazat prázdný adresář.
+ os.rmdir(job_dir)
+ os.mkdir(job_dir)
+
+ return self.job
+
+ def attach_file(self, tmp_name: str, attachment_name: str) -> str:
+ """Vytvoří hardlink na daný pracovní soubor v adresáři jobu."""
+
+ full_name = self.job.file_path(attachment_name)
+ os.link(tmp_name, full_name)
+ logger.debug(f'Job: Příloha {tmp_name} -> {full_name}')
+ return attachment_name
+
+ def submit(self):
+ self.job.state = db.JobState.ready
+ db.get_session().commit()
send_notify()
def _finish_remove(self):
sess = db.get_session()
job = self.job
- if job.in_file is not None:
- mo.util.unlink_if_exists(job_file_path(job.in_file))
-
- if job.out_file is not None:
- mo.util.unlink_if_exists(job_file_path(job.out_file))
+ job_dir = self.job.dir_path()
+ if os.path.exists(job_dir):
+ shutil.rmtree(job_dir)
sess.delete(job)
sess.commit()
diff --git a/mo/web/org_jobs.py b/mo/web/org_jobs.py
index 2fedcf41..8cf9786e 100644
--- a/mo/web/org_jobs.py
+++ b/mo/web/org_jobs.py
@@ -1,12 +1,14 @@
from flask import render_template, g, redirect, url_for, flash
from flask_wtf.form import FlaskForm
+import os
from sqlalchemy.orm import joinedload
+from typing import Optional
import werkzeug.exceptions
import wtforms
import mo
import mo.db as db
-from mo.jobs import TheJob, job_file_size
+from mo.jobs import TheJob
from mo.web import app
import mo.web.util
@@ -60,6 +62,16 @@ def get_job(id: int) -> db.Job:
return job
+def job_file_size(job: db.Job, name: Optional[str]) -> Optional[int]:
+ if name is None:
+ return None
+
+ try:
+ return os.path.getsize(job.file_path(name))
+ except OSError:
+ return -1
+
+
@app.route('/org/jobs/<int:id>/')
def org_job(id: int):
job = get_job(id)
@@ -72,8 +84,8 @@ def org_job(id: int):
'org_job.html',
job=job,
has_errors=has_errors,
- in_size=job_file_size(job.in_file),
- out_size=job_file_size(job.out_file),
+ in_size=job_file_size(job, job.in_file),
+ out_size=job_file_size(job, job.out_file),
)
diff --git a/mo/web/util.py b/mo/web/util.py
index 1be8eb80..1340d474 100644
--- a/mo/web/util.py
+++ b/mo/web/util.py
@@ -113,7 +113,7 @@ def send_task_paper(paper: db.Paper, orig: bool = False) -> Response:
def send_job_result(job: db.Job) -> Response:
assert job.out_file is not None
- file = mo.jobs.job_file_path(job.out_file)
+ file = job.file_path(job.out_file)
if file.endswith('.zip'):
type = 'application/zip'
--
GitLab