Skip to content
Snippets Groups Projects
Select Git revision
  • master
1 result

README

Blame
    • Petr Baudis's avatar
      a1a8e13f
      Rewrite the communication protocol · a1a8e13f
      Petr Baudis authored
      sendmsg() with only ancilliary message does not work, apparently.
      Therefore, to make things cleaner, pass command/reply directly using
      sendmsg() instead of newline-terminated strings.
      a1a8e13f
      History
      Rewrite the communication protocol
      Petr Baudis authored
      sendmsg() with only ancilliary message does not work, apparently.
      Therefore, to make things cleaner, pass command/reply directly using
      sendmsg() instead of newline-terminated strings.
    util.py 5.75 KiB
    # Různé
    
    from dataclasses import dataclass
    import datetime
    import dateutil.tz
    import email.message
    import email.headerregistry
    import locale
    import logging
    import os
    import re
    import secrets
    import subprocess
    import sys
    from typing import Any, Optional, NoReturn
    import textwrap
    import urllib.parse
    
    import mo
    import mo.db as db
    import mo.config as config
    
    # Uživatel, který se uvádí jako pachatel v databázovém logu
    current_log_user: Optional[db.User] = None
    
    # Logger pro všechny naše moduly
    logger = logging.getLogger('mo')
    logger.setLevel(logging.DEBUG)
    logger.propagate = True
    
    
    def get_now() -> datetime.datetime:
        return datetime.datetime.now(tz=dateutil.tz.UTC)
    
    
    def init_standalone():
        """Společná inicializační funkce pro samostatné programy nezávislé na webu."""
        logging.basicConfig(format='%(asctime)-15s.%(msecs)03d %(levelname)-5.5s (%(name)s) %(message)s')
    
        # sqlalchemy má logger, ke kterému si při prvním vypsání hlášky přidá handler,
        # není-li tam zatím žádný. Tak přidáme dummy handler. Vše se propaguje do root loggeru.
        sqla_logger = logging.getLogger('sqlalchemy.engine.base.Engine')
        sqla_logger.addHandler(logging.NullHandler())
    
        mo.now = get_now()
        locale.setlocale(locale.LC_COLLATE, 'cs_CZ.UTF-8')
    
    
    def log(type: db.LogType, what: int, details: Any):
        """Zapíše záznam do databázového logu."""
    
        entry = db.Log(
            changed_by=current_log_user.user_id if current_log_user else None,
            type=type,
            id=what,
            details=details,
        )
        db.get_session().add(entry)
    
    
    def send_user_email(user: db.User, subject: str, body: str) -> bool:
        logger.info(f'Mail: "{subject}" -> {user.email}')
    
        mail_from = getattr(config, 'MAIL_FROM', None)
        if mail_from is None:
            logger.error('Mail: V configu chybí nastavení MAIL_FROM')
            return False
    
        msg = email.message.EmailMessage()
        msg['From'] = email.headerregistry.Address(
            display_name='Odevzdávací Systém MO',
            addr_spec=mail_from,
        )
        msg['To'] = [
            email.headerregistry.Address(
                display_name=user.full_name(),
                addr_spec=user.email,
            )
        ]
        msg['Subject'] = 'OSMO – ' + subject
        msg['Date'] = datetime.datetime.now()
    
        msg.set_content(body, cte='quoted-printable')
    
        mail_instead = getattr(config, 'MAIL_INSTEAD', None)
        if mail_instead is None:
            send_to = user.email
        else:
            send_to = mail_instead
    
        sm = subprocess.Popen(
            [
                '/usr/sbin/sendmail',
                '-oi',
                '-f',
                mail_from,
                send_to,
            ],
            stdin=subprocess.PIPE,
        )
        sm.communicate(msg.as_bytes())
    
        if sm.returncode != 0:
            logger.error('Mail: Sendmail failed with return code {}'.format(sm.returncode))
            return False
    
        return True
    
    
    def password_reset_url(token: str) -> str:
        return config.WEB_ROOT + 'auth/reset?' + urllib.parse.urlencode({'token': token}, safe=':')
    
    
    def send_new_account_email(user: db.User, token: str) -> bool:
        return send_user_email(user, 'Založen nový účet', textwrap.dedent('''\
            Vítejte!
    
            Právě Vám byl založen účet v Odevzávacím systému Matematické olympiády.
            Nastavte si prosím heslo na následující stránce:
    
                    {}
    
            Váš OSMO
        '''.format(password_reset_url(token))))
    
    
    def send_password_reset_email(user: db.User, token: str) -> bool:
        return send_user_email(user, 'Obnova hesla', textwrap.dedent('''\
            Někdo (pravděpodobně Vy) požádal o obnovení hesla k Vašemu účtu v Odevzávacím
            systému Matematické olympiády. Heslo si můžete nastavit, případně požadavek
            zrušit, na následující stránce:
    
                    {}
    
            Váš OSMO
        '''.format(password_reset_url(token))))
    
    
    def die(msg: str) -> NoReturn:
        print(msg, file=sys.stderr)
        sys.exit(1)
    
    
    @dataclass
    class RoundCode:
        year: int
        cat: str
        seq: int
    
        def __str__(self):
            return f'{self.year}-{self.cat}-{self.seq}'
    
        @staticmethod
        def parse(code: str) -> Optional['RoundCode']:
            m = re.match(r'(\d+)-([A-Z0-9]+)-(\d+)', code)
            if m:
                return RoundCode(year=int(m[1]), cat=m[2], seq=int(m[3]))
            else:
                return None
    
    
    def get_round_by_code(code: RoundCode) -> Optional[db.Round]:
        return db.get_session().query(db.Round).filter_by(year=code.year, category=code.cat, seq=code.seq).one_or_none()
    
    
    def data_dir(name: str) -> str:
        return os.path.join(config.DATA_DIR, name)
    
    
    def link_to_dir(src: str, dest_dir: str, prefix: str = "", suffix: str = "") -> str:
        """Vytvoří hardlink na zdrojový soubor pod unikátním jménem v cílovém adresáři."""
    
        while True:
            dest = os.path.join(dest_dir, prefix + secrets.token_hex(8) + suffix)
            try:
                os.link(src, dest)
                return dest
            except FileExistsError:
                logger.warning('Iteruji link_to_dir: %s už existuje', dest)
    
    
    def unlink_if_exists(name: str):
        try:
            os.unlink(name)
        except FileNotFoundError:
            pass
    
    
    def normalize_grade(grade: str) -> int:
        """Pokusí se převést třídu ve formátu 7 nebo 3/4 na číslo odpovídající
        třídě na základní škole (maturitní ročník gymnázia je tedy 9+4 = 13).
        * Základní škola: nic
        * Gymnázia: /8, /6 nebo /4
        * Nerozpoznané ročníky a chyby při převodu: -1"""
        try:
            parts = grade.split('/')
            if len(parts) == 1:
                return int(parts[0])
            if len(parts) > 2:
                return -1
            year = int(parts[0])
            school_type = int(parts[1])
            if school_type in (8, 6, 4):
                return year + 13 - school_type
            else:
                return -1
        except ValueError:
            return -1