Skip to content
Snippets Groups Projects
Select Git revision
  • 3d4fe8677fd9115fa44ff9295a7db287788101a7
  • master default protected
2 results

data_lib.py

Blame
  • users.py 10.15 KiB
    # Správa uživatelů
    
    import bcrypt
    import datetime
    import dateutil.tz
    import email.errors
    import email.headerregistry
    import re
    import secrets
    from typing import Optional, Tuple
    
    import mo
    import mo.config as config
    import mo.db as db
    import mo.util
    from mo.util import logger
    import mo.tokens
    
    
    def normalize_grade(rocnik: str, school: db.School) -> str:
        """ Aktuálně provádí jen kontrolu formátu. """
    
        if not re.fullmatch(r'\d(/\d)?', rocnik):
            raise mo.CheckError('Ročník má neplatný formát, musí to být buď číslice, nebo číslice/číslice')
    
        if not school.is_zs and re.fullmatch(r'\d', rocnik):
            raise mo.CheckError(f'Ročník pro střední školu ({school.place.name}) zapisujte ve formátu číslice/číslice')
    
        if not school.is_ss and re.fullmatch(r'\d/\d', rocnik):
            raise mo.CheckError(f'Ročník pro základní školu ({school.place.name}) zapisujte jako číslici 1–9')
    
        return rocnik
    
    
    def validate_born_year(r: int) -> None:
        if r < 2000 or r > 2099:
            raise mo.CheckError('Rok narození musí být v intervalu [2000,2099]')
    
    
    def validate_and_find_school(kod: str) -> db.Place:
        if kod == "":
            raise mo.CheckError('Škola je povinná')
    
        place = db.get_place_by_code(kod, fetch_school=True)
        if not place:
            raise mo.CheckError(f'Škola s kódem "{kod}" nenalezena' +
                                ('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
    
        if place.type != db.PlaceType.school:
            raise mo.CheckError(f'Kód školy "{kod}" neodpovídá škole')
    
        return place
    
    
    def find_or_create_user(email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool, reason: str) -> Tuple[db.User, bool]:
        sess = db.get_session()
        user = sess.query(db.User).filter_by(email=email).one_or_none()
        is_new = user is None
        if user is None:  # HACK: Podmínku je nutné zapsat znovu místo užití is_new, jinak si s tím mypy neporadí
            if not krestni or not prijmeni:
                raise mo.CheckError('Osoba s daným emailem zatím neexistuje, je nutné uvést její jméno.')
            user = db.User(email=email, first_name=krestni, last_name=prijmeni, is_org=is_org)
            sess.add(user)
            sess.flush()    # Aby uživatel dostal user_id
            logger.info(f'{reason.title()}: Založen uživatel user=#{user.user_id} email=<{user.email}>')
            mo.util.log(
                type=db.LogType.user,
                what=user.user_id,
                details={'action': 'create-user', 'reason': reason, 'new': db.row2dict(user)},
            )
        else:
            if (krestni and user.first_name != krestni) or (prijmeni and user.last_name != prijmeni):
                raise mo.CheckError(f'Osoba již registrována s odlišným jménem {user.full_name()}')
            if (user.is_admin or user.is_org) != is_org:
                if is_org:
                    raise mo.CheckError('Nelze předefinovat účastníka na organizátora')
                else:
                    raise mo.CheckError('Nelze předefinovat organizátora na účastníka')
        return user, is_new
    
    
    def find_or_create_participant(user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str], reason: str) -> Tuple[db.Participant, bool]:
        sess = db.get_session()
        part = sess.query(db.Participant).get((user.user_id, year))
        is_new = part is None
        if part is None:
            if not school_id:
                raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést školu.')
            if not birth_year:
                raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést rok narození.')
            if not grade:
                raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést ročník.')
            part = db.Participant(user=user, year=year, school=school_id, birth_year=birth_year, grade=grade)
            sess.add(part)
            logger.info(f'{reason.title()}: Založen účastník #{user.user_id}')
            mo.util.log(
                type=db.LogType.participant,
                what=user.user_id,
                details={'action': 'create-participant', 'reason': reason, 'new': db.row2dict(part)},
            )
        else:
            if ((school_id and part.school != school_id)
                    or (grade and part.grade != grade)
                    or (birth_year and part.birth_year != birth_year)):
                raise mo.CheckError('Účastník již zaregistrován s odlišnou školou/ročníkem/rokem narození')
        return part, is_new
    
    
    def find_or_create_participation(user: db.User, contest: db.Contest, place: Optional[db.Place], reason: str) -> Tuple[db.Participation, bool]:
        if place is None:
            place = contest.place
    
        sess = db.get_session()
        pions = (sess.query(db.Participation)
                 .filter_by(user=user)
                 .filter(db.Participation.contest.has(db.Contest.round == contest.round))
                 .all())
    
        is_new = pions == []
        if is_new:
            pion = db.Participation(user=user, contest=contest, place_id=place.place_id, state=db.PartState.active)
            sess.add(pion)
            logger.info(f'{reason.title()}: Založena účast user=#{user.user_id} contest=#{contest.contest_id} place=#{place.place_id}')
            mo.util.log(
                type=db.LogType.participant,
                what=user.user_id,
                details={'action': 'add-to-contest', 'reason': reason, 'new': db.row2dict(pion)},
            )
        elif len(pions) == 1:
            pion = pions[0]
            if pion.place != place:
                raise mo.CheckError(f'Již se tohoto kola účastní v {contest.round.get_level().name_locative("jiném", "jiné", "jiném")} ({pion.place.get_code()})')
        else:
            raise mo.CheckError('Již se tohoto kola účastní ve více oblastech, což by nemělo být možné')
    
        return pion, is_new
    
    
    def normalize_email(addr: str) -> str:
        if not re.fullmatch(r'.+@.+', addr):
            raise mo.CheckError('V e-mailové adrese chybí zavináč')
    
        if re.search(r'[ \t]', addr):
            raise mo.CheckError('E-mailová adresa obsahuje mezeru')
    
        m = re.search(r'[^!-~]+', addr)
        if m:
            if m[0].isprintable():
                raise mo.CheckError(f'E-mailová adresa obsahuje nepovolené znaky: {m[0]}')
            else:
                raise mo.CheckError('E-mailová adresa obsahuje netisknutelné znaky: ' + repr(m[0]))
    
        try:
            # Tady úmyslně používáme knihovnu jen ke kontrole a ne k normalizaci,
            # protože nechceme riskovat, že se normalizovaný tvar časem změní.
            email.headerregistry.Address(addr_spec=addr)
        except (email.errors.HeaderParseError, ValueError):
            raise mo.CheckError('Chybná syntaxe mailové adresy')
    
        # XXX: Striktně vzato, tohle není korektní, protože některé domény mohou
        # mít case-sensitive levou stranu adresy. Ale i na nich se prakticky nevyskytují
        # levé strany s velkými písmeny, zatímco uživatelé při psaní adres běžně velká
        # a malá písmena zaměňují. Menší zlo tedy je normalizovat na malá písmena.
        return addr.lower()
    
    
    def user_by_email(email: str) -> Optional[db.User]:
        try:
            email = normalize_email(email)
        except mo.CheckError:
            return None
        return db.get_session().query(db.User).filter_by(email=email).first()
    
    
    def user_by_uid(uid: int) -> db.User:
        return db.get_session().query(db.User).get(uid)
    
    
    password_help = 'Heslo musí mít alespoň 8 znaků. Doporučujeme kombinovat velká a malá písmena a číslice.'
    
    
    def validate_password(passwd: str) -> bool:
        return len(passwd) >= 8
    
    
    def set_password(user: db.User, passwd: str, reset: bool = False):
        salt = bcrypt.gensalt()
        hashed = bcrypt.hashpw(passwd.encode('utf-8'), salt)
        user.password_hash = hashed.decode('us-ascii')
        if reset:
            user.reset_at = mo.now
            mo.util.log(
                type=db.LogType.user,
                what=user.user_id,
                details={'action': 'do-reset'},
            )
    
    
    def check_password(user: db.User, passwd: str):
        return user.password_hash is not None and \
            bcrypt.checkpw(passwd.encode('utf-8'), user.password_hash.encode('us-ascii'))
    
    
    def login(user: db.User):
        user.last_login_at = mo.now
    
    
    def make_activation_token(user: db.User) -> str:
        user.reset_at = mo.now
        when = int(mo.now.timestamp())
        return mo.tokens.sign_token([str(user.user_id), str(when)], 'activate')
    
    
    def check_activation_token(token: str) -> Optional[db.User]:
        token = mo.util.clean_up_token(token)
        fields = mo.tokens.verify_token(token, 'activate')
        if not fields or len(fields) != 2:
            return None
        user_id = int(fields[0])
        token_time = datetime.datetime.fromtimestamp(int(fields[1]), tz=dateutil.tz.UTC)
    
        user = user_by_uid(user_id)
        if not user:
            return None
        elif token_time < mo.now - datetime.timedelta(days=28):
            return None
        else:
            return user
    
    
    def new_reg_request(type: db.RegReqType, client: str) -> Optional[db.RegRequest]:
        sess = db.get_session()
    
        # Zatím jen jednoduchý rate limit, časem možno vylepšit
        in_last_minute = db.get_count(sess.query(db.RegRequest).filter(db.RegRequest.created_at >= mo.now - datetime.timedelta(minutes=1)))
        if in_last_minute >= config.REG_MAX_PER_MINUTE:
            return None
    
        email_token = mo.tokens.sign_token([str(int(mo.now.timestamp())), secrets.token_hex(16)], 'reg-request')
    
        return db.RegRequest(
            type=type,
            created_at=mo.now,
            expires_at=mo.now + datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY),
            email_token=email_token,
            client=client,
        )
    
    
    def expire_reg_requests():
        sess = db.get_session()
        conn = sess.connection()
        table = db.RegRequest.__table__
        conn.execute(table.delete().where(table.c.expires_at < mo.now))
        sess.commit()
    
    
    def request_reset_password(user: db.User, client: str) -> Optional[db.RegRequest]:
        logger.info('Login: Požadavek na reset hesla pro <%s>', user.email)
        rr = new_reg_request(db.RegReqType.reset_passwd, client)
        if rr:
            db.get_session().add(rr)
            rr.user_id = user.user_id
            mo.util.log(
                type=db.LogType.user,
                what=user.user_id,
                details={'action': 'ask-reset'},
            )
        return rr