Skip to content
Snippets Groups Projects
Select Git revision
  • 4421785429124d042d5bd97fce4c6b50ec26940e
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

util.py

Blame
  • util.py 5.19 KiB
    import decimal
    from flask import Response, send_file, url_for
    from flask_wtf import FlaskForm
    import os
    from sqlalchemy.orm.query import Query
    from typing import Tuple, Optional, Union
    import unicodedata
    import werkzeug.exceptions
    import werkzeug.utils
    import wtforms
    from wtforms.fields.html5 import DecimalField
    from markupsafe import Markup
    
    import mo.db as db
    import mo.jobs
    import mo.util
    from mo.util import logger
    
    
    class PagerForm(FlaskForm):
        limit = wtforms.IntegerField()
        offset = wtforms.IntegerField()
        next = wtforms.SubmitField("Další")
        previous = wtforms.SubmitField("Předchozí")
        submit = wtforms.SubmitField("Filtrovat")
    
        def apply_limits(self, query: Query, pagesize: int = 50) -> Tuple[int, Query]:
            count = db.get_count(query)
            # Default settings
            if not self.offset.data or self.submit.data:
                self.offset.data = 0
            if not self.limit.data or self.submit.data:
                self.limit.data = pagesize
    
            if self.previous.data:
                self.offset.data = max(0, self.offset.data - pagesize)
            if self.next.data:
                self.offset.data = min(count // pagesize * pagesize, self.offset.data + pagesize)
    
            query = query.offset(self.offset.data)
            query = query.limit(self.limit.data)
    
            return (count, query)
    
    
    def task_statement_exists(round: db.Round) -> bool:
        if round.tasks_file is None:
            return False
        file = os.path.join(mo.util.data_dir('statements'), round.tasks_file)
        return os.path.isfile(file)
    
    
    def send_task_statement(round: db.Round) -> Response:
        assert round.tasks_file is not None
        file = os.path.join(mo.util.data_dir('statements'), round.tasks_file)
        if os.path.isfile(file):
            return send_file(file, mimetype='application/pdf')
        else:
            logger.error(f'Zadání {file} je v DB, ale soubor neexistuje')
            raise werkzeug.exceptions.NotFound()
    
    
    def _task_paper_filename(user: db.User, paper: db.Paper) -> str:
        # Tato jména parsuje dávkový upload
    
        full_name = user.full_name()
        ascii_name = (unicodedata.normalize('NFD', full_name)
                      .encode('ascii', 'ignore')
                      .decode('utf-8'))
    
        if paper.type == db.PaperType.solution:
            typ = 'reseni'
        elif paper.type == db.PaperType.feedback:
            typ = 'opravene'
        else:
            assert False
    
        fn = f'{paper.task.code}_{typ}_{user.user_id}_{paper.paper_id}_{ascii_name}.pdf'
        return werkzeug.utils.secure_filename(fn)
    
    
    def org_paper_link(contest_or_id: Union[db.Contest, int],
                       site: Optional[db.Place],
                       user: db.User,
                       paper: db.Paper,
                       orig: bool = False) -> str:
        """Doporučujeme preloadovat paper.task."""
        if isinstance(contest_or_id, db.Contest):
            contest_or_id = contest_or_id.contest_id
    
        return url_for('org_submit_paper' if not orig else 'org_submit_paper_orig',
                       contest_id=contest_or_id,
                       paper_id=paper.paper_id,
                       site_id=site.place_id if site else None,
                       filename=_task_paper_filename(user, paper))
    
    
    def send_task_paper(paper: db.Paper, orig: bool = False) -> Response:
        if orig:
            name = paper.orig_file_name
        else:
            name = paper.file_name or paper.orig_file_name
        if not name:
            logger.error(f'Paper #{paper.paper_id} nemá žádný soubor')
            raise werkzeug.exceptions.NotFound()
        file = os.path.join(mo.util.data_dir('submits'), name)
    
        if os.path.isfile(file):
            return send_file(file, mimetype='application/pdf')
        else:
            logger.error(f'Soubor {file} je v papers, ale ve FS neexistuje')
            raise werkzeug.exceptions.NotFound()
    
    
    def send_job_result(job: db.Job) -> Response:
        assert job.out_file is not None
        file = mo.jobs.job_file_path(job.out_file)
    
        if file.endswith('.zip'):
            type = 'application/zip'
        else:
            type = 'application/binary'
    
        if os.path.isfile(file):
            return send_file(file,
                             mimetype=type,
                             as_attachment=True,
                             attachment_filename=job.out_file)
        else:
            logger.error(f'Soubor {file} je výsledkem jobu, ale ve FS neexistuje')
            raise werkzeug.exceptions.NotFound()
    
    
    class MODecimalField(DecimalField):
        """Upravený DecimalField, který formátuje číslo podle jeho skutečného počtu
        desetinných míst a zadané `places` používá jen jako maximální počet desetinných míst."""
        def _value(self):
            if self.data is not None:
                # Spočítání počtu desetinných míst, zbytek necháme na původní implementaci
                max_places = self.places
                self.places = 0
                d = decimal.Decimal(1)
    
                while self.data % d != 0 and self.places < max_places:
                    self.places += 1
                    d /= 10
    
            return super(MODecimalField, self)._value()
    
    def user_html_flags(u: db.User) -> Markup:
        r = []
        if u.is_test:
            r.append("<span class='user-test' title='Testovací uživatel'>*</span>")
        if u.is_inactive():
            r.append("<span class='user-inactive' title='Účet dosud nebyl aktivován'>*</span>")
        return Markup(" " + "".join(r))