Skip to content
Snippets Groups Projects
Select Git revision
  • 1cfe7fb0ce8279c6cb1025f08fee371cb7531c9e
  • master default protected
2 results

random.h

Blame
  • util.py 7.71 KiB
    # Různé
    
    from dataclasses import dataclass
    import datetime
    import decimal
    import dateutil.tz
    import email.message
    import email.headerregistry
    import locale
    import logging
    import os
    import re
    import secrets
    import subprocess
    import sys
    from typing import Any, Optional, NoReturn, Tuple
    import textwrap
    import urllib.parse
    
    import mo
    import mo.db as db
    import mo.config as config
    from mo.util_format import format_decimal
    
    # Uživatel, který se uvádí jako pachatel v databázovém logu
    current_log_user: Optional[db.User] = None
    
    # Logger pro všechny naše moduly
    logger = logging.getLogger('mo')
    logger.setLevel(logging.DEBUG)
    logger.propagate = True
    
    
    def get_now() -> datetime.datetime:
        return datetime.datetime.now(tz=dateutil.tz.UTC)
    
    
    def init_standalone():
        """Společná inicializační funkce pro samostatné programy nezávislé na webu."""
        logging.basicConfig(format='%(asctime)-15s.%(msecs)03d %(levelname)-5.5s (%(name)s) %(message)s')
    
        # sqlalchemy má logger, ke kterému si při prvním vypsání hlášky přidá handler,
        # není-li tam zatím žádný. Tak přidáme dummy handler. Vše se propaguje do root loggeru.
        sqla_logger = logging.getLogger('sqlalchemy.engine.base.Engine')
        sqla_logger.addHandler(logging.NullHandler())
    
        mo.now = get_now()
        locale.setlocale(locale.LC_COLLATE, 'cs_CZ.UTF-8')
    
    
    def log(type: db.LogType, what: int, details: Any):
        """Zapíše záznam do databázového logu."""
    
        entry = db.Log(
            changed_by=current_log_user.user_id if current_log_user else None,
            type=type,
            id=what,
            details=details,
        )
        db.get_session().add(entry)
    
    
    def send_user_email(user: db.User, subject: str, body: str) -> bool:
        logger.info(f'Mail: "{subject}" -> {user.email}')
    
        mail_from = getattr(config, 'MAIL_FROM', None)
        if mail_from is None:
            logger.error('Mail: V configu chybí nastavení MAIL_FROM')
            return False
    
        msg = email.message.EmailMessage()
        msg['From'] = email.headerregistry.Address(
            display_name='Odevzdávací Systém MO',
            addr_spec=mail_from,
        )
        msg['To'] = [
            email.headerregistry.Address(
                display_name=user.full_name(),
                addr_spec=user.email,
            )
        ]
        msg['Subject'] = 'OSMO – ' + subject
        msg['Date'] = datetime.datetime.now()
    
        msg.set_content(body, cte='quoted-printable')
    
        mail_instead = getattr(config, 'MAIL_INSTEAD', None)
        if mail_instead is None:
            send_to = user.email
        else:
            send_to = mail_instead
    
        sm = subprocess.Popen(
            [
                '/usr/sbin/sendmail',
                '-oi',
                '-f',
                mail_from,
                send_to,
            ],
            stdin=subprocess.PIPE,
        )
        sm.communicate(msg.as_bytes())
    
        if sm.returncode != 0:
            logger.error('Mail: Sendmail failed with return code {}'.format(sm.returncode))
            return False
    
        return True
    
    
    def password_reset_url(token: str) -> str:
        return config.WEB_ROOT + 'auth/reset?' + urllib.parse.urlencode({'token': token}, safe=':')
    
    
    def send_new_account_email(user: db.User, token: str) -> bool:
        return send_user_email(user, 'Založen nový účet', textwrap.dedent('''\
            Vítejte!
    
            Právě Vám byl založen účet v Odevzdávacím systému Matematické olympiády.
            Nastavte si prosím heslo na následující stránce:
    
                    {}
    
            Váš OSMO
        '''.format(password_reset_url(token))))
    
    
    def send_password_reset_email(user: db.User, token: str) -> bool:
        return send_user_email(user, 'Obnova hesla', textwrap.dedent('''\
            Někdo (pravděpodobně Vy) požádal o obnovení hesla k Vašemu účtu v Odevzdávacím
            systému Matematické olympiády. Heslo si můžete nastavit, případně požadavek
            zrušit, na následující stránce:
    
                    {}
    
            Váš OSMO
        '''.format(password_reset_url(token))))
    
    
    def die(msg: str) -> NoReturn:
        print(msg, file=sys.stderr)
        sys.exit(1)
    
    
    @dataclass
    class RoundCode:
        year: int
        cat: str
        seq: int
        part: int
    
        def __str__(self):
            part_code = chr(ord('a') + self.part - 1) if self.part > 0 else ""
            return f'{self.year}-{self.cat}-{self.seq}{part_code}'
    
        @staticmethod
        def parse(code: str) -> Optional['RoundCode']:
            m = re.match(r'(\d+)-([A-Z0-9]+)-(\d+)([a-z]?)', code)
            if m:
                part = ord(m[4]) - ord('a') + 1 if m[4] else 0
                return RoundCode(year=int(m[1]), cat=m[2], seq=int(m[3]), part=part)
            else:
                return None
    
    
    def get_round_by_code(code: RoundCode) -> Optional[db.Round]:
        return db.get_session().query(db.Round).filter_by(year=code.year, category=code.cat, seq=code.seq, part=code.part).one_or_none()
    
    
    def data_dir(name: str) -> str:
        return os.path.join(config.DATA_DIR, name)
    
    
    def link_to_dir(src: str, dest_dir: str, prefix: str = "", suffix: str = "") -> str:
        """Vytvoří hardlink na zdrojový soubor pod unikátním jménem v cílovém adresáři."""
    
        while True:
            dest = os.path.join(dest_dir, prefix + secrets.token_hex(8) + suffix)
            try:
                os.link(src, dest)
                return dest
            except FileExistsError:
                logger.warning('Iteruji link_to_dir: %s už existuje', dest)
    
    
    def unlink_if_exists(name: str):
        try:
            os.unlink(name)
        except FileNotFoundError:
            pass
    
    
    def normalize_grade(grade: str) -> int:
        """Pokusí se převést třídu ve formátu 7 nebo 3/4 na číslo odpovídající
        třídě na základní škole (maturitní ročník gymnázia je tedy 9+4 = 13).
        * Základní škola: nic
        * Gymnázia: /8, /6 nebo /4
        * Nerozpoznané ročníky a chyby při převodu: -1"""
        try:
            parts = grade.split('/')
            if len(parts) == 1:
                return int(parts[0])
            if len(parts) > 2:
                return -1
            year = int(parts[0])
            school_type = int(parts[1])
            if school_type in (8, 6, 4):
                return year + 13 - school_type
            else:
                return -1
        except ValueError:
            return -1
    
    
    def parse_points(
        raw_points: str, for_task: Optional[db.Task] = None, for_round: Optional[db.Round] = None,
    ) -> Tuple[Optional[decimal.Decimal], Optional[str]]:
        """Naparsuje a zkontroluje body. Vrátí body (jako decimal.Decimal nebo None
        při prázdných bodech) a případný error (None pokud nenastal, jinak text chyby)."""
        if raw_points == "":
            return None, None
        try:
            points = decimal.Decimal(raw_points.replace(',', '.'))
        except decimal.InvalidOperation:
            return 0, f"Hodnota '{raw_points}' není číslo"
    
        return points, check_points(points, for_task, for_round)
    
    
    def check_points(points: decimal.Decimal, for_task: Optional[db.Task] = None, for_round: Optional[db.Round] = None) -> Optional[str]:
        """Zkontroluje body. Pokud je vše ok, tak vrátí None, jinak vrátí text chyby."""
        if points < 0:
            return f'Nelze zadat záporné body (zadáno {format_decimal(points)})'
        if for_task and for_task.max_points is not None and points > for_task.max_points:
            return f'Maximální počet bodů za úlohu je {format_decimal(for_task.max_points)}, nelze zadat více (zadáno {format_decimal(points)})'
        if for_round and (points % for_round.master.points_step) != 0:
            points_step = for_round.master.points_step
            if points_step == 1:
                return f'Podle nastavení kola lze zadat pouze celé body (hodnota {points} je neplatná)'
            elif points_step == 0.5:
                return f'Podle nastavení kola nelze zadat body s libovolnými desetinami, pouze půlbody (hodnota {points} je neplatná)'
            else:
                return f'Podle nastavení kola zadat body jen s krokem {points_step} (hodnota {points} je neplatná)'
        return None