Select Git revision
-
Petr Baudis authored
sendmsg() with only ancilliary message does not work, apparently. Therefore, to make things cleaner, pass command/reply directly using sendmsg() instead of newline-terminated strings.
Petr Baudis authoredsendmsg() with only ancilliary message does not work, apparently. Therefore, to make things cleaner, pass command/reply directly using sendmsg() instead of newline-terminated strings.
util.py 5.75 KiB
# Různé
from dataclasses import dataclass
import datetime
import dateutil.tz
import email.message
import email.headerregistry
import locale
import logging
import os
import re
import secrets
import subprocess
import sys
from typing import Any, Optional, NoReturn
import textwrap
import urllib.parse
import mo
import mo.db as db
import mo.config as config
# Uživatel, který se uvádí jako pachatel v databázovém logu
current_log_user: Optional[db.User] = None
# Logger pro všechny naše moduly
logger = logging.getLogger('mo')
logger.setLevel(logging.DEBUG)
logger.propagate = True
def get_now() -> datetime.datetime:
return datetime.datetime.now(tz=dateutil.tz.UTC)
def init_standalone():
"""Společná inicializační funkce pro samostatné programy nezávislé na webu."""
logging.basicConfig(format='%(asctime)-15s.%(msecs)03d %(levelname)-5.5s (%(name)s) %(message)s')
# sqlalchemy má logger, ke kterému si při prvním vypsání hlášky přidá handler,
# není-li tam zatím žádný. Tak přidáme dummy handler. Vše se propaguje do root loggeru.
sqla_logger = logging.getLogger('sqlalchemy.engine.base.Engine')
sqla_logger.addHandler(logging.NullHandler())
mo.now = get_now()
locale.setlocale(locale.LC_COLLATE, 'cs_CZ.UTF-8')
def log(type: db.LogType, what: int, details: Any):
"""Zapíše záznam do databázového logu."""
entry = db.Log(
changed_by=current_log_user.user_id if current_log_user else None,
type=type,
id=what,
details=details,
)
db.get_session().add(entry)
def send_user_email(user: db.User, subject: str, body: str) -> bool:
logger.info(f'Mail: "{subject}" -> {user.email}')
mail_from = getattr(config, 'MAIL_FROM', None)
if mail_from is None:
logger.error('Mail: V configu chybí nastavení MAIL_FROM')
return False
msg = email.message.EmailMessage()
msg['From'] = email.headerregistry.Address(
display_name='Odevzdávací Systém MO',
addr_spec=mail_from,
)
msg['To'] = [
email.headerregistry.Address(
display_name=user.full_name(),
addr_spec=user.email,
)
]
msg['Subject'] = 'OSMO – ' + subject
msg['Date'] = datetime.datetime.now()
msg.set_content(body, cte='quoted-printable')
mail_instead = getattr(config, 'MAIL_INSTEAD', None)
if mail_instead is None:
send_to = user.email
else:
send_to = mail_instead
sm = subprocess.Popen(
[
'/usr/sbin/sendmail',
'-oi',
'-f',
mail_from,
send_to,
],
stdin=subprocess.PIPE,
)
sm.communicate(msg.as_bytes())
if sm.returncode != 0:
logger.error('Mail: Sendmail failed with return code {}'.format(sm.returncode))
return False
return True
def password_reset_url(token: str) -> str:
return config.WEB_ROOT + 'auth/reset?' + urllib.parse.urlencode({'token': token}, safe=':')
def send_new_account_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Založen nový účet', textwrap.dedent('''\
Vítejte!
Právě Vám byl založen účet v Odevzávacím systému Matematické olympiády.
Nastavte si prosím heslo na následující stránce:
{}
Váš OSMO
'''.format(password_reset_url(token))))
def send_password_reset_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Obnova hesla', textwrap.dedent('''\
Někdo (pravděpodobně Vy) požádal o obnovení hesla k Vašemu účtu v Odevzávacím
systému Matematické olympiády. Heslo si můžete nastavit, případně požadavek
zrušit, na následující stránce:
{}
Váš OSMO
'''.format(password_reset_url(token))))
def die(msg: str) -> NoReturn:
print(msg, file=sys.stderr)
sys.exit(1)
@dataclass
class RoundCode:
year: int
cat: str
seq: int
def __str__(self):
return f'{self.year}-{self.cat}-{self.seq}'
@staticmethod
def parse(code: str) -> Optional['RoundCode']:
m = re.match(r'(\d+)-([A-Z0-9]+)-(\d+)', code)
if m:
return RoundCode(year=int(m[1]), cat=m[2], seq=int(m[3]))
else:
return None
def get_round_by_code(code: RoundCode) -> Optional[db.Round]:
return db.get_session().query(db.Round).filter_by(year=code.year, category=code.cat, seq=code.seq).one_or_none()
def data_dir(name: str) -> str:
return os.path.join(config.DATA_DIR, name)
def link_to_dir(src: str, dest_dir: str, prefix: str = "", suffix: str = "") -> str:
"""Vytvoří hardlink na zdrojový soubor pod unikátním jménem v cílovém adresáři."""
while True:
dest = os.path.join(dest_dir, prefix + secrets.token_hex(8) + suffix)
try:
os.link(src, dest)
return dest
except FileExistsError:
logger.warning('Iteruji link_to_dir: %s už existuje', dest)
def unlink_if_exists(name: str):
try:
os.unlink(name)
except FileNotFoundError:
pass
def normalize_grade(grade: str) -> int:
"""Pokusí se převést třídu ve formátu 7 nebo 3/4 na číslo odpovídající
třídě na základní škole (maturitní ročník gymnázia je tedy 9+4 = 13).
* Základní škola: nic
* Gymnázia: /8, /6 nebo /4
* Nerozpoznané ročníky a chyby při převodu: -1"""
try:
parts = grade.split('/')
if len(parts) == 1:
return int(parts[0])
if len(parts) > 2:
return -1
year = int(parts[0])
school_type = int(parts[1])
if school_type in (8, 6, 4):
return year + 13 - school_type
else:
return -1
except ValueError:
return -1