Select Git revision
data_lib.py
-
Jiří Kalvoda authoredJiří Kalvoda authored
users.py 10.15 KiB
# Správa uživatelů
import bcrypt
import datetime
import dateutil.tz
import email.errors
import email.headerregistry
import re
import secrets
from typing import Optional, Tuple
import mo
import mo.config as config
import mo.db as db
import mo.util
from mo.util import logger
import mo.tokens
def normalize_grade(rocnik: str, school: db.School) -> str:
""" Aktuálně provádí jen kontrolu formátu. """
if not re.fullmatch(r'\d(/\d)?', rocnik):
raise mo.CheckError('Ročník má neplatný formát, musí to být buď číslice, nebo číslice/číslice')
if not school.is_zs and re.fullmatch(r'\d', rocnik):
raise mo.CheckError(f'Ročník pro střední školu ({school.place.name}) zapisujte ve formátu číslice/číslice')
if not school.is_ss and re.fullmatch(r'\d/\d', rocnik):
raise mo.CheckError(f'Ročník pro základní školu ({school.place.name}) zapisujte jako číslici 1–9')
return rocnik
def validate_born_year(r: int) -> None:
if r < 2000 or r > 2099:
raise mo.CheckError('Rok narození musí být v intervalu [2000,2099]')
def validate_and_find_school(kod: str) -> db.Place:
if kod == "":
raise mo.CheckError('Škola je povinná')
place = db.get_place_by_code(kod, fetch_school=True)
if not place:
raise mo.CheckError(f'Škola s kódem "{kod}" nenalezena' +
('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
if place.type != db.PlaceType.school:
raise mo.CheckError(f'Kód školy "{kod}" neodpovídá škole')
return place
def find_or_create_user(email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool, reason: str) -> Tuple[db.User, bool]:
sess = db.get_session()
user = sess.query(db.User).filter_by(email=email).one_or_none()
is_new = user is None
if user is None: # HACK: Podmínku je nutné zapsat znovu místo užití is_new, jinak si s tím mypy neporadí
if not krestni or not prijmeni:
raise mo.CheckError('Osoba s daným emailem zatím neexistuje, je nutné uvést její jméno.')
user = db.User(email=email, first_name=krestni, last_name=prijmeni, is_org=is_org)
sess.add(user)
sess.flush() # Aby uživatel dostal user_id
logger.info(f'{reason.title()}: Založen uživatel user=#{user.user_id} email=<{user.email}>')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'create-user', 'reason': reason, 'new': db.row2dict(user)},
)
else:
if (krestni and user.first_name != krestni) or (prijmeni and user.last_name != prijmeni):
raise mo.CheckError(f'Osoba již registrována s odlišným jménem {user.full_name()}')
if (user.is_admin or user.is_org) != is_org:
if is_org:
raise mo.CheckError('Nelze předefinovat účastníka na organizátora')
else:
raise mo.CheckError('Nelze předefinovat organizátora na účastníka')
return user, is_new
def find_or_create_participant(user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str], reason: str) -> Tuple[db.Participant, bool]:
sess = db.get_session()
part = sess.query(db.Participant).get((user.user_id, year))
is_new = part is None
if part is None:
if not school_id:
raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést školu.')
if not birth_year:
raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést rok narození.')
if not grade:
raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést ročník.')
part = db.Participant(user=user, year=year, school=school_id, birth_year=birth_year, grade=grade)
sess.add(part)
logger.info(f'{reason.title()}: Založen účastník #{user.user_id}')
mo.util.log(
type=db.LogType.participant,
what=user.user_id,
details={'action': 'create-participant', 'reason': reason, 'new': db.row2dict(part)},
)
else:
if ((school_id and part.school != school_id)
or (grade and part.grade != grade)
or (birth_year and part.birth_year != birth_year)):
raise mo.CheckError('Účastník již zaregistrován s odlišnou školou/ročníkem/rokem narození')
return part, is_new
def find_or_create_participation(user: db.User, contest: db.Contest, place: Optional[db.Place], reason: str) -> Tuple[db.Participation, bool]:
if place is None:
place = contest.place
sess = db.get_session()
pions = (sess.query(db.Participation)
.filter_by(user=user)
.filter(db.Participation.contest.has(db.Contest.round == contest.round))
.all())
is_new = pions == []
if is_new:
pion = db.Participation(user=user, contest=contest, place_id=place.place_id, state=db.PartState.active)
sess.add(pion)
logger.info(f'{reason.title()}: Založena účast user=#{user.user_id} contest=#{contest.contest_id} place=#{place.place_id}')
mo.util.log(
type=db.LogType.participant,
what=user.user_id,
details={'action': 'add-to-contest', 'reason': reason, 'new': db.row2dict(pion)},
)
elif len(pions) == 1:
pion = pions[0]
if pion.place != place:
raise mo.CheckError(f'Již se tohoto kola účastní v {contest.round.get_level().name_locative("jiném", "jiné", "jiném")} ({pion.place.get_code()})')
else:
raise mo.CheckError('Již se tohoto kola účastní ve více oblastech, což by nemělo být možné')
return pion, is_new
def normalize_email(addr: str) -> str:
if not re.fullmatch(r'.+@.+', addr):
raise mo.CheckError('V e-mailové adrese chybí zavináč')
if re.search(r'[ \t]', addr):
raise mo.CheckError('E-mailová adresa obsahuje mezeru')
m = re.search(r'[^!-~]+', addr)
if m:
if m[0].isprintable():
raise mo.CheckError(f'E-mailová adresa obsahuje nepovolené znaky: {m[0]}')
else:
raise mo.CheckError('E-mailová adresa obsahuje netisknutelné znaky: ' + repr(m[0]))
try:
# Tady úmyslně používáme knihovnu jen ke kontrole a ne k normalizaci,
# protože nechceme riskovat, že se normalizovaný tvar časem změní.
email.headerregistry.Address(addr_spec=addr)
except (email.errors.HeaderParseError, ValueError):
raise mo.CheckError('Chybná syntaxe mailové adresy')
# XXX: Striktně vzato, tohle není korektní, protože některé domény mohou
# mít case-sensitive levou stranu adresy. Ale i na nich se prakticky nevyskytují
# levé strany s velkými písmeny, zatímco uživatelé při psaní adres běžně velká
# a malá písmena zaměňují. Menší zlo tedy je normalizovat na malá písmena.
return addr.lower()
def user_by_email(email: str) -> Optional[db.User]:
try:
email = normalize_email(email)
except mo.CheckError:
return None
return db.get_session().query(db.User).filter_by(email=email).first()
def user_by_uid(uid: int) -> db.User:
return db.get_session().query(db.User).get(uid)
password_help = 'Heslo musí mít alespoň 8 znaků. Doporučujeme kombinovat velká a malá písmena a číslice.'
def validate_password(passwd: str) -> bool:
return len(passwd) >= 8
def set_password(user: db.User, passwd: str, reset: bool = False):
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(passwd.encode('utf-8'), salt)
user.password_hash = hashed.decode('us-ascii')
if reset:
user.reset_at = mo.now
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'do-reset'},
)
def check_password(user: db.User, passwd: str):
return user.password_hash is not None and \
bcrypt.checkpw(passwd.encode('utf-8'), user.password_hash.encode('us-ascii'))
def login(user: db.User):
user.last_login_at = mo.now
def make_activation_token(user: db.User) -> str:
user.reset_at = mo.now
when = int(mo.now.timestamp())
return mo.tokens.sign_token([str(user.user_id), str(when)], 'activate')
def check_activation_token(token: str) -> Optional[db.User]:
token = mo.util.clean_up_token(token)
fields = mo.tokens.verify_token(token, 'activate')
if not fields or len(fields) != 2:
return None
user_id = int(fields[0])
token_time = datetime.datetime.fromtimestamp(int(fields[1]), tz=dateutil.tz.UTC)
user = user_by_uid(user_id)
if not user:
return None
elif token_time < mo.now - datetime.timedelta(days=28):
return None
else:
return user
def new_reg_request(type: db.RegReqType, client: str) -> Optional[db.RegRequest]:
sess = db.get_session()
# Zatím jen jednoduchý rate limit, časem možno vylepšit
in_last_minute = db.get_count(sess.query(db.RegRequest).filter(db.RegRequest.created_at >= mo.now - datetime.timedelta(minutes=1)))
if in_last_minute >= config.REG_MAX_PER_MINUTE:
return None
email_token = mo.tokens.sign_token([str(int(mo.now.timestamp())), secrets.token_hex(16)], 'reg-request')
return db.RegRequest(
type=type,
created_at=mo.now,
expires_at=mo.now + datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY),
email_token=email_token,
client=client,
)
def expire_reg_requests():
sess = db.get_session()
conn = sess.connection()
table = db.RegRequest.__table__
conn.execute(table.delete().where(table.c.expires_at < mo.now))
sess.commit()
def request_reset_password(user: db.User, client: str) -> Optional[db.RegRequest]:
logger.info('Login: Požadavek na reset hesla pro <%s>', user.email)
rr = new_reg_request(db.RegReqType.reset_passwd, client)
if rr:
db.get_session().add(rr)
rr.user_id = user.user_id
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'ask-reset'},
)
return rr