Select Git revision
cuckoo_hash.py
util.py 5.19 KiB
import decimal
from flask import Response, send_file, url_for
from flask_wtf import FlaskForm
import os
from sqlalchemy.orm.query import Query
from typing import Tuple, Optional, Union
import unicodedata
import werkzeug.exceptions
import werkzeug.utils
import wtforms
from wtforms.fields.html5 import DecimalField
from markupsafe import Markup
import mo.db as db
import mo.jobs
import mo.util
from mo.util import logger
class PagerForm(FlaskForm):
limit = wtforms.IntegerField()
offset = wtforms.IntegerField()
next = wtforms.SubmitField("Další")
previous = wtforms.SubmitField("Předchozí")
submit = wtforms.SubmitField("Filtrovat")
def apply_limits(self, query: Query, pagesize: int = 50) -> Tuple[int, Query]:
count = db.get_count(query)
# Default settings
if not self.offset.data or self.submit.data:
self.offset.data = 0
if not self.limit.data or self.submit.data:
self.limit.data = pagesize
if self.previous.data:
self.offset.data = max(0, self.offset.data - pagesize)
if self.next.data:
self.offset.data = min(count // pagesize * pagesize, self.offset.data + pagesize)
query = query.offset(self.offset.data)
query = query.limit(self.limit.data)
return (count, query)
def task_statement_exists(round: db.Round) -> bool:
if round.tasks_file is None:
return False
file = os.path.join(mo.util.data_dir('statements'), round.tasks_file)
return os.path.isfile(file)
def send_task_statement(round: db.Round) -> Response:
assert round.tasks_file is not None
file = os.path.join(mo.util.data_dir('statements'), round.tasks_file)
if os.path.isfile(file):
return send_file(file, mimetype='application/pdf')
else:
logger.error(f'Zadání {file} je v DB, ale soubor neexistuje')
raise werkzeug.exceptions.NotFound()
def _task_paper_filename(user: db.User, paper: db.Paper) -> str:
# Tato jména parsuje dávkový upload
full_name = user.full_name()
ascii_name = (unicodedata.normalize('NFD', full_name)
.encode('ascii', 'ignore')
.decode('utf-8'))
if paper.type == db.PaperType.solution:
typ = 'reseni'
elif paper.type == db.PaperType.feedback:
typ = 'opravene'
else:
assert False
fn = f'{paper.task.code}_{typ}_{user.user_id}_{paper.paper_id}_{ascii_name}.pdf'
return werkzeug.utils.secure_filename(fn)
def org_paper_link(contest_or_id: Union[db.Contest, int],
site: Optional[db.Place],
user: db.User,
paper: db.Paper,
orig: bool = False) -> str:
"""Doporučujeme preloadovat paper.task."""
if isinstance(contest_or_id, db.Contest):
contest_or_id = contest_or_id.contest_id
return url_for('org_submit_paper' if not orig else 'org_submit_paper_orig',
contest_id=contest_or_id,
paper_id=paper.paper_id,
site_id=site.place_id if site else None,
filename=_task_paper_filename(user, paper))
def send_task_paper(paper: db.Paper, orig: bool = False) -> Response:
if orig:
name = paper.orig_file_name
else:
name = paper.file_name or paper.orig_file_name
if not name:
logger.error(f'Paper #{paper.paper_id} nemá žádný soubor')
raise werkzeug.exceptions.NotFound()
file = os.path.join(mo.util.data_dir('submits'), name)
if os.path.isfile(file):
return send_file(file, mimetype='application/pdf')
else:
logger.error(f'Soubor {file} je v papers, ale ve FS neexistuje')
raise werkzeug.exceptions.NotFound()
def send_job_result(job: db.Job) -> Response:
assert job.out_file is not None
file = mo.jobs.job_file_path(job.out_file)
if file.endswith('.zip'):
type = 'application/zip'
else:
type = 'application/binary'
if os.path.isfile(file):
return send_file(file,
mimetype=type,
as_attachment=True,
attachment_filename=job.out_file)
else:
logger.error(f'Soubor {file} je výsledkem jobu, ale ve FS neexistuje')
raise werkzeug.exceptions.NotFound()
class MODecimalField(DecimalField):
"""Upravený DecimalField, který formátuje číslo podle jeho skutečného počtu
desetinných míst a zadané `places` používá jen jako maximální počet desetinných míst."""
def _value(self):
if self.data is not None:
# Spočítání počtu desetinných míst, zbytek necháme na původní implementaci
max_places = self.places
self.places = 0
d = decimal.Decimal(1)
while self.data % d != 0 and self.places < max_places:
self.places += 1
d /= 10
return super(MODecimalField, self)._value()
def user_html_flags(u: db.User) -> Markup:
r = []
if u.is_test:
r.append("<span class='user-test' title='Testovací uživatel'>*</span>")
if u.is_inactive():
r.append("<span class='user-inactive' title='Účet dosud nebyl aktivován'>*</span>")
return Markup(" " + "".join(r))