Select Git revision
tree_successor_test.py
-
Martin Mareš authoredMartin Mareš authored
users.py 15.59 KiB
# Správa uživatelů
import bcrypt
import datetime
import dateutil.tz
import dns.exception
import dns.resolver
import email.errors
import email.headerregistry
import re
import secrets
from sqlalchemy.dialects.postgresql import insert as pgsql_insert
from typing import Optional, Tuple
import mo
import mo.config as config
import mo.db as db
import mo.util
from mo.util import logger
import mo.tokens
def normalize_grade(rocnik: str, school: db.School) -> str:
""" Aktuálně provádí jen kontrolu formátu. """
if not re.fullmatch(r'\d(/\d)?', rocnik):
raise mo.CheckError('Ročník má neplatný formát, musí to být buď číslice, nebo číslice/číslice')
if not school.is_zs and re.fullmatch(r'\d', rocnik):
raise mo.CheckError(f'Ročník pro střední školu ({school.place.name}) zapisujte ve formátu číslice/číslice')
if not school.is_ss and re.fullmatch(r'\d/\d', rocnik):
raise mo.CheckError(f'Ročník pro základní školu ({school.place.name}) zapisujte jako číslici 1–9')
return rocnik
def validate_born_year(r: int) -> None:
if r < 2000 or r > 2099:
raise mo.CheckError('Rok narození musí být v intervalu [2000,2099]')
def validate_and_find_school(kod: str) -> db.Place:
if kod == "":
raise mo.CheckError('Škola je povinná')
place = db.get_place_by_code(kod, fetch_school=True)
if not place:
raise mo.CheckError(f'Škola s kódem "{kod}" nenalezena' +
('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
if place.type != db.PlaceType.school:
raise mo.CheckError(f'Kód školy "{kod}" neodpovídá škole')
return place
class CheckErrorOrgIsUser(mo.CheckError):
"""Při požadavku na orga nalezen uživatel nebo opačně."""
pass
def change_user_to_org(user, reason: str):
sess = db.get_session()
pcr = (sess.query(db.Participation, db.Contest, db.Round)
.select_from(db.Participation)
.join(db.Contest)
.join(db.Round)
.filter(db.Participation.user == user)
.filter(db.Round.year == config.CURRENT_YEAR)
.all())
for p, c, r in pcr:
if (sess.query(db.Solution)
.join(db.Task)
.filter(db.Task.round == r)
.filter(db.Solution.user == user)
.count()):
raise mo.CheckError("Převedení účastníka na organizátora se nezdařilo, protože odevzdal úlohy v aktuálním ročníku. Kontaktujte prosím správce.")
for p, c, r in pcr:
logger.info(f'Automatické mazání prázdné účasti: user=#{user.user_id} contest=#{c.contest_id}')
mo.util.log(
type=db.LogType.participant,
what=user.user_id,
details={'action': 'participation-removed', 'reason': 'org-upgrade', 'participation': db.row2dict(p)},
)
sess.delete(p)
user.is_org = True
logger.info(f'{reason.title()}: Změna stavu uživatele user=#{user.user_id} na organizátora')
changes = db.get_object_changes(user)
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'user-change-is-org', 'reason': reason, 'changes': changes},
)
def find_or_create_user(email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool, reason: str, allow_change_user_to_org=False) -> Tuple[db.User, bool, bool]:
sess = db.get_session()
user = sess.query(db.User).filter_by(email=email).one_or_none()
is_new = False
is_change_user_to_org = False
if user is None: # HACK: Podmínku je nutné zapsat znovu místo užití is_new, jinak si s tím mypy neporadí
if not krestni or not prijmeni:
raise mo.CheckError('Osoba s daným e-mailem zatím neexistuje, je nutné uvést její jméno.')
res = sess.connection().execute(
pgsql_insert(db.User.__table__)
.values(
email=email,
first_name=krestni,
last_name=prijmeni,
is_org=is_org,
)
.on_conflict_do_nothing()
.returning(db.User.user_id)
)
user = sess.query(db.User).filter_by(email=email).one()
if res.fetchall():
is_new = True
logger.info(f'{reason.title()}: Založen uživatel user=#{user.user_id} email=<{user.email}>')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'create-user', 'reason': reason, 'new': db.row2dict(user)},
)
if (krestni and user.first_name != krestni) or (prijmeni and user.last_name != prijmeni):
raise mo.CheckError(f'Osoba již registrována s odlišným jménem {user.full_name()}')
if (user.is_admin or user.is_org) != is_org:
if is_org:
if allow_change_user_to_org:
change_user_to_org(user, reason)
is_change_user_to_org = True
else:
raise CheckErrorOrgIsUser('Nelze předefinovat účastníka na organizátora.')
else:
raise mo.CheckError('Nelze předefinovat organizátora na účastníka.')
return user, is_new, is_change_user_to_org
def find_or_create_participant(user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str], reason: str) -> Tuple[db.Participant, bool]:
sess = db.get_session()
part = sess.query(db.Participant).get((user.user_id, year))
is_new = False
if part is None:
prev_part = sess.query(db.Participant).filter_by(user_id=user.user_id).order_by(db.Participant.year.desc()).limit(1).one_or_none()
if not school_id:
if prev_part:
school_id = prev_part.school
else:
raise mo.CheckError('Osoba s daným e-mailem zatím není zaregistrovaná do ročníku, je nutné uvést školu.')
if not birth_year:
if prev_part:
birth_year = prev_part.birth_year
else:
raise mo.CheckError('Osoba s daným e-mailem zatím není zaregistrovaná do ročníku, je nutné uvést rok narození.')
if not grade:
raise mo.CheckError('Osoba s daným e-mailem zatím není zaregistrovaná do ročníku, je nutné uvést ročník.')
res = sess.connection().execute(
pgsql_insert(db.Participant.__table__)
.values(
user_id=user.user_id,
year=year,
school=school_id,
birth_year=birth_year,
grade=grade,
)
.on_conflict_do_nothing()
.returning(db.Participant.user_id)
)
part = sess.query(db.Participant).get((user.user_id, year))
assert part is not None
if res.fetchall():
is_new = True
logger.info(f'{reason.title()}: Založen účastník #{user.user_id}')
mo.util.log(
type=db.LogType.participant,
what=user.user_id,
details={'action': 'create-participant', 'reason': reason, 'new': db.row2dict(part)},
)
if ((school_id and part.school != school_id)
or (grade and part.grade != grade)
or (birth_year and part.birth_year != birth_year)):
raise mo.CheckError('Účastník již zaregistrován s odlišnou školou/ročníkem/rokem narození')
return part, is_new
def find_or_create_participation(user: db.User, contest: db.Contest, place: Optional[db.Place], reason: str) -> Tuple[db.Participation, bool]:
if place is None:
place = contest.place
sess = db.get_session()
pion = None
is_new = False
retry = False
while pion is None:
pions = (sess.query(db.Participation)
.filter_by(user=user)
.filter(db.Participation.contest.has(db.Contest.round == contest.round))
.all())
if len(pions) == 0:
assert not retry
retry = True
res = sess.connection().execute(
pgsql_insert(db.Participation.__table__)
.values(
user_id=user.user_id,
contest_id=contest.contest_id,
place_id=place.place_id,
state=db.PartState.active,
)
.on_conflict_do_nothing()
.returning(db.Participation.user_id)
)
if res.fetchall():
is_new = True
elif len(pions) == 1:
pion = pions[0]
else:
raise mo.CheckError('Již se tohoto kola účastní ve více oblastech, což by nemělo být možné')
if pion.place != place:
raise mo.CheckError(f'Již se tohoto kola účastní v {contest.round.get_level().name_locative("jiném", "jiné", "jiném")} ({pion.place.get_code()})')
if is_new:
logger.info(f'{reason.title()}: Založena účast user=#{user.user_id} contest=#{contest.contest_id} place=#{place.place_id}')
mo.util.log(
type=db.LogType.participant,
what=user.user_id,
details={'action': 'add-to-contest', 'reason': reason, 'new': db.row2dict(pion)},
)
return pion, is_new
def email_is_fake(addr: str) -> bool:
return addr.endswith('@nomail') or addr.endswith('@test')
bad_domains = {
'gmail.cz',
}
def email_check_domain(domain: str):
# Některé domény rovnou odmítáme
if domain in bad_domains:
logger.info(f'DNS: Doména <{domain}> na blacklistu')
raise mo.CheckError(f'Doména {domain} nepřijímá poštu')
for record in ['MX', 'A', 'AAAA']:
try:
answer = dns.resolver.resolve(domain, record, lifetime=2, search=False)
if (record == 'MX'
and len(answer.rrset) == 1
and answer.rrset[0].preference == 0
and str(answer.rrset[0].exchange) == '.'):
# Null MX (RFC 7505) explicitně říká, že doména nepřijímá poštu
logger.info(f'DNS: Doména <{domain}> má Null NX')
raise mo.CheckError(f'Doména {domain} nepřijímá poštu')
except dns.exception.Timeout:
# Kontrola je konzervativní, při timeoutu adresu raději schválíme
logger.info(f'DNS: Timeout při kontrole domény <{domain}>')
return
except dns.resolver.NoAnswer:
logger.debug(f'DNS: Doména <{domain}> neobsahuje {record}')
except dns.resolver.NXDOMAIN:
logger.info(f'DNS: Doména <{domain}> neexistuje')
raise mo.CheckError('Adresa obsahuje neexistující doménu {domain}')
except dns.exception.DNSException as e:
logger.warn(f'DNS: Záznam <{domain}>/{record} nejde resolvovat: {e}')
return
logger.debug(f'DNS: Doména <{domain}> OK')
return
logger.info(f'DNS: Doména <{domain}> nepřijímá poštu')
raise mo.CheckError(f'Doména {domain} nepřijímá poštu')
def normalize_email(addr: str, check_existence: bool = False) -> str:
if not re.fullmatch(r'.+@.+', addr):
raise mo.CheckError('V e-mailové adrese chybí zavináč')
if re.search(r'[ \t]', addr):
raise mo.CheckError('E-mailová adresa obsahuje mezeru')
m = re.search(r'[^!-~]+', addr)
if m:
if m[0].isprintable():
raise mo.CheckError(f'E-mailová adresa obsahuje nepovolené znaky: {m[0]}')
else:
raise mo.CheckError('E-mailová adresa obsahuje netisknutelné znaky: ' + repr(m[0]))
try:
# Tady úmyslně používáme knihovnu jen ke kontrole a ne k normalizaci,
# protože nechceme riskovat, že se normalizovaný tvar časem změní.
addr_obj = email.headerregistry.Address(addr_spec=addr)
except (email.errors.HeaderParseError, ValueError):
raise mo.CheckError('Chybná syntaxe mailové adresy')
if addr_obj.display_name != "":
raise mo.CheckError('Chybná syntaxe mailové adresy')
if check_existence and not email_is_fake(addr):
email_check_domain(addr_obj.domain)
# XXX: Striktně vzato, tohle není korektní, protože některé domény mohou
# mít case-sensitive levou stranu adresy. Ale i na nich se prakticky nevyskytují
# levé strany s velkými písmeny, zatímco uživatelé při psaní adres běžně velká
# a malá písmena zaměňují. Menší zlo tedy je normalizovat na malá písmena.
return addr.lower()
def user_by_email(email: str) -> Optional[db.User]:
try:
email = normalize_email(email)
except mo.CheckError:
return None
return db.get_session().query(db.User).filter_by(email=email).first()
def user_by_uid(uid: int) -> Optional[db.User]:
return db.get_session().query(db.User).get(uid)
password_help = 'Heslo musí mít alespoň 8 znaků. Doporučujeme kombinovat velká a malá písmena a číslice.'
def validate_password(passwd: str) -> bool:
return len(passwd) >= 8
def set_password(user: db.User, passwd: str, reset: bool = False):
salt = bcrypt.gensalt(rounds=9)
hashed = bcrypt.hashpw(passwd.encode('utf-8'), salt)
user.password_hash = hashed.decode('us-ascii')
if reset:
user.reset_at = mo.now
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'do-reset'},
)
def check_password(user: db.User, passwd: str):
return user.password_hash is not None and \
bcrypt.checkpw(passwd.encode('utf-8'), user.password_hash.encode('us-ascii'))
def login(user: db.User):
user.last_login_at = mo.now
def make_activation_token(user: db.User) -> str:
user.reset_at = mo.now
when = int(mo.now.timestamp())
return mo.tokens.sign_token([str(user.user_id), str(when)], 'activate')
def check_activation_token(token: str) -> Optional[db.User]:
token = mo.util.clean_up_token(token)
fields = mo.tokens.verify_token(token, 'activate')
if not fields or len(fields) != 2:
return None
user_id = int(fields[0])
token_time = datetime.datetime.fromtimestamp(int(fields[1]), tz=dateutil.tz.UTC)
user = user_by_uid(user_id)
if not user:
return None
elif token_time < mo.now - datetime.timedelta(days=28):
return None
else:
return user
def new_reg_request(type: db.RegReqType, client: str) -> Optional[db.RegRequest]:
sess = db.get_session()
# Zatím jen jednoduchý rate limit, časem možno vylepšit
in_last_minute = db.get_count(sess.query(db.RegRequest).filter(db.RegRequest.created_at >= mo.now - datetime.timedelta(minutes=1)))
if in_last_minute >= config.REG_MAX_PER_MINUTE:
return None
email_token = mo.tokens.sign_token([str(int(mo.now.timestamp())), secrets.token_hex(16)], 'reg-request')
return db.RegRequest(
type=type,
created_at=mo.now,
expires_at=mo.now + datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY),
email_token=email_token,
client=client,
)
def expire_reg_requests():
sess = db.get_session()
conn = sess.connection()
table = db.RegRequest.__table__
conn.execute(table.delete().where(table.c.expires_at < mo.now))
sess.commit()
def request_reset_password(user: db.User, client: str) -> Optional[db.RegRequest]:
logger.info('Login: Požadavek na reset hesla pro <%s>', user.email)
assert not user.is_admin
rr = new_reg_request(db.RegReqType.reset_passwd, client)
if rr:
db.get_session().add(rr)
rr.user_id = user.user_id
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'ask-reset'},
)
return rr