Skip to content
Snippets Groups Projects
Select Git revision
  • 61e22e2ec12ceb9666b9658bc199dbe04558155b
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

table.py

Blame
  • users.py 15.59 KiB
    # Správa uživatelů
    
    import bcrypt
    import datetime
    import dateutil.tz
    import dns.exception
    import dns.resolver
    import email.errors
    import email.headerregistry
    import re
    import secrets
    from sqlalchemy.dialects.postgresql import insert as pgsql_insert
    from typing import Optional, Tuple
    
    import mo
    import mo.config as config
    import mo.db as db
    import mo.util
    from mo.util import logger
    import mo.tokens
    
    
    def normalize_grade(rocnik: str, school: db.School) -> str:
        """ Aktuálně provádí jen kontrolu formátu. """
    
        if not re.fullmatch(r'\d(/\d)?', rocnik):
            raise mo.CheckError('Ročník má neplatný formát, musí to být buď číslice, nebo číslice/číslice')
    
        if not school.is_zs and re.fullmatch(r'\d', rocnik):
            raise mo.CheckError(f'Ročník pro střední školu ({school.place.name}) zapisujte ve formátu číslice/číslice')
    
        if not school.is_ss and re.fullmatch(r'\d/\d', rocnik):
            raise mo.CheckError(f'Ročník pro základní školu ({school.place.name}) zapisujte jako číslici 1–9')
    
        return rocnik
    
    
    def validate_born_year(r: int) -> None:
        if r < 2000 or r > 2099:
            raise mo.CheckError('Rok narození musí být v intervalu [2000,2099]')
    
    
    def validate_and_find_school(kod: str) -> db.Place:
        if kod == "":
            raise mo.CheckError('Škola je povinná')
    
        place = db.get_place_by_code(kod, fetch_school=True)
        if not place:
            raise mo.CheckError(f'Škola s kódem "{kod}" nenalezena' +
                                ('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
    
        if place.type != db.PlaceType.school:
            raise mo.CheckError(f'Kód školy "{kod}" neodpovídá škole')
    
        return place
    
    
    class CheckErrorOrgIsUser(mo.CheckError):
        """Při požadavku na orga nalezen uživatel nebo opačně."""
        pass
    
    
    def change_user_to_org(user, reason: str):
        sess = db.get_session()
        pcr = (sess.query(db.Participation, db.Contest, db.Round)
               .select_from(db.Participation)
               .join(db.Contest)
               .join(db.Round)
               .filter(db.Participation.user == user)
               .filter(db.Round.year == config.CURRENT_YEAR)
               .all())
    
        for p, c, r in pcr:
            if (sess.query(db.Solution)
                    .join(db.Task)
                    .filter(db.Task.round == r)
                    .filter(db.Solution.user == user)
                    .count()):
                raise mo.CheckError("Převedení účastníka na organizátora se nezdařilo, protože odevzdal úlohy v aktuálním ročníku. Kontaktujte prosím správce.")
    
        for p, c, r in pcr:
            logger.info(f'Automatické mazání prázdné účasti: user=#{user.user_id} contest=#{c.contest_id}')
            mo.util.log(
                type=db.LogType.participant,
                what=user.user_id,
                details={'action': 'participation-removed', 'reason': 'org-upgrade', 'participation': db.row2dict(p)},
            )
            sess.delete(p)
    
        user.is_org = True
        logger.info(f'{reason.title()}: Změna stavu uživatele user=#{user.user_id} na organizátora')
        changes = db.get_object_changes(user)
        mo.util.log(
            type=db.LogType.user,
            what=user.user_id,
            details={'action': 'user-change-is-org', 'reason': reason, 'changes': changes},
        )
    
    
    def find_or_create_user(email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool, reason: str, allow_change_user_to_org=False) -> Tuple[db.User, bool, bool]:
        sess = db.get_session()
        user = sess.query(db.User).filter_by(email=email).one_or_none()
        is_new = False
        is_change_user_to_org = False
    
        if user is None:  # HACK: Podmínku je nutné zapsat znovu místo užití is_new, jinak si s tím mypy neporadí
            if not krestni or not prijmeni:
                raise mo.CheckError('Osoba s daným e-mailem zatím neexistuje, je nutné uvést její jméno.')
    
            res = sess.connection().execute(
                pgsql_insert(db.User.__table__)
                .values(
                    email=email,
                    first_name=krestni,
                    last_name=prijmeni,
                    is_org=is_org,
                )
                .on_conflict_do_nothing()
                .returning(db.User.user_id)
            )
    
            user = sess.query(db.User).filter_by(email=email).one()
    
            if res.fetchall():
                is_new = True
                logger.info(f'{reason.title()}: Založen uživatel user=#{user.user_id} email=<{user.email}>')
                mo.util.log(
                    type=db.LogType.user,
                    what=user.user_id,
                    details={'action': 'create-user', 'reason': reason, 'new': db.row2dict(user)},
                )
    
        if (krestni and user.first_name != krestni) or (prijmeni and user.last_name != prijmeni):
            raise mo.CheckError(f'Osoba již registrována s odlišným jménem {user.full_name()}')
        if (user.is_admin or user.is_org) != is_org:
            if is_org:
                if allow_change_user_to_org:
                    change_user_to_org(user, reason)
                    is_change_user_to_org = True
                else:
                    raise CheckErrorOrgIsUser('Nelze předefinovat účastníka na organizátora.')
            else:
                raise mo.CheckError('Nelze předefinovat organizátora na účastníka.')
    
        return user, is_new, is_change_user_to_org
    
    
    def find_or_create_participant(user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str], reason: str) -> Tuple[db.Participant, bool]:
        sess = db.get_session()
        part = sess.query(db.Participant).get((user.user_id, year))
        is_new = False
    
        if part is None:
            prev_part = sess.query(db.Participant).filter_by(user_id=user.user_id).order_by(db.Participant.year.desc()).limit(1).one_or_none()
            if not school_id:
                if prev_part:
                    school_id = prev_part.school
                else:
                    raise mo.CheckError('Osoba s daným e-mailem zatím není zaregistrovaná do ročníku, je nutné uvést školu.')
            if not birth_year:
                if prev_part:
                    birth_year = prev_part.birth_year
                else:
                    raise mo.CheckError('Osoba s daným e-mailem zatím není zaregistrovaná do ročníku, je nutné uvést rok narození.')
            if not grade:
                raise mo.CheckError('Osoba s daným e-mailem zatím není zaregistrovaná do ročníku, je nutné uvést ročník.')
    
            res = sess.connection().execute(
                pgsql_insert(db.Participant.__table__)
                .values(
                    user_id=user.user_id,
                    year=year,
                    school=school_id,
                    birth_year=birth_year,
                    grade=grade,
                )
                .on_conflict_do_nothing()
                .returning(db.Participant.user_id)
            )
    
            part = sess.query(db.Participant).get((user.user_id, year))
            assert part is not None
    
            if res.fetchall():
                is_new = True
                logger.info(f'{reason.title()}: Založen účastník #{user.user_id}')
                mo.util.log(
                    type=db.LogType.participant,
                    what=user.user_id,
                    details={'action': 'create-participant', 'reason': reason, 'new': db.row2dict(part)},
                )
    
        if ((school_id and part.school != school_id)
                or (grade and part.grade != grade)
                or (birth_year and part.birth_year != birth_year)):
            raise mo.CheckError('Účastník již zaregistrován s odlišnou školou/ročníkem/rokem narození')
    
        return part, is_new
    
    
    def find_or_create_participation(user: db.User, contest: db.Contest, place: Optional[db.Place], reason: str) -> Tuple[db.Participation, bool]:
        if place is None:
            place = contest.place
    
        sess = db.get_session()
        pion = None
        is_new = False
        retry = False
    
        while pion is None:
            pions = (sess.query(db.Participation)
                     .filter_by(user=user)
                     .filter(db.Participation.contest.has(db.Contest.round == contest.round))
                     .all())
    
            if len(pions) == 0:
                assert not retry
                retry = True
                res = sess.connection().execute(
                    pgsql_insert(db.Participation.__table__)
                    .values(
                        user_id=user.user_id,
                        contest_id=contest.contest_id,
                        place_id=place.place_id,
                        state=db.PartState.active,
                    )
                    .on_conflict_do_nothing()
                    .returning(db.Participation.user_id)
                )
                if res.fetchall():
                    is_new = True
            elif len(pions) == 1:
                pion = pions[0]
            else:
                raise mo.CheckError('Již se tohoto kola účastní ve více oblastech, což by nemělo být možné')
    
        if pion.place != place:
            raise mo.CheckError(f'Již se tohoto kola účastní v {contest.round.get_level().name_locative("jiném", "jiné", "jiném")} ({pion.place.get_code()})')
    
        if is_new:
            logger.info(f'{reason.title()}: Založena účast user=#{user.user_id} contest=#{contest.contest_id} place=#{place.place_id}')
            mo.util.log(
                type=db.LogType.participant,
                what=user.user_id,
                details={'action': 'add-to-contest', 'reason': reason, 'new': db.row2dict(pion)},
            )
    
        return pion, is_new
    
    
    def email_is_fake(addr: str) -> bool:
        return addr.endswith('@nomail') or addr.endswith('@test')
    
    
    bad_domains = {
        'gmail.cz',
    }
    
    
    def email_check_domain(domain: str):
        # Některé domény rovnou odmítáme
        if domain in bad_domains:
            logger.info(f'DNS: Doména <{domain}> na blacklistu')
            raise mo.CheckError(f'Doména {domain} nepřijímá poštu')
    
        for record in ['MX', 'A', 'AAAA']:
            try:
                answer = dns.resolver.resolve(domain, record, lifetime=2, search=False)
                if (record == 'MX'
                        and len(answer.rrset) == 1
                        and answer.rrset[0].preference == 0
                        and str(answer.rrset[0].exchange) == '.'):
                    # Null MX (RFC 7505) explicitně říká, že doména nepřijímá poštu
                    logger.info(f'DNS: Doména <{domain}> má Null NX')
                    raise mo.CheckError(f'Doména {domain} nepřijímá poštu')
            except dns.exception.Timeout:
                # Kontrola je konzervativní, při timeoutu adresu raději schválíme
                logger.info(f'DNS: Timeout při kontrole domény <{domain}>')
                return
            except dns.resolver.NoAnswer:
                logger.debug(f'DNS: Doména <{domain}> neobsahuje {record}')
            except dns.resolver.NXDOMAIN:
                logger.info(f'DNS: Doména <{domain}> neexistuje')
                raise mo.CheckError('Adresa obsahuje neexistující doménu {domain}')
            except dns.exception.DNSException as e:
                logger.warn(f'DNS: Záznam <{domain}>/{record} nejde resolvovat: {e}')
                return
            logger.debug(f'DNS: Doména <{domain}> OK')
            return
    
        logger.info(f'DNS: Doména <{domain}> nepřijímá poštu')
        raise mo.CheckError(f'Doména {domain} nepřijímá poštu')
    
    
    def normalize_email(addr: str, check_existence: bool = False) -> str:
        if not re.fullmatch(r'.+@.+', addr):
            raise mo.CheckError('V e-mailové adrese chybí zavináč')
    
        if re.search(r'[ \t]', addr):
            raise mo.CheckError('E-mailová adresa obsahuje mezeru')
    
        m = re.search(r'[^!-~]+', addr)
        if m:
            if m[0].isprintable():
                raise mo.CheckError(f'E-mailová adresa obsahuje nepovolené znaky: {m[0]}')
            else:
                raise mo.CheckError('E-mailová adresa obsahuje netisknutelné znaky: ' + repr(m[0]))
    
        try:
            # Tady úmyslně používáme knihovnu jen ke kontrole a ne k normalizaci,
            # protože nechceme riskovat, že se normalizovaný tvar časem změní.
            addr_obj = email.headerregistry.Address(addr_spec=addr)
        except (email.errors.HeaderParseError, ValueError):
            raise mo.CheckError('Chybná syntaxe mailové adresy')
    
        if addr_obj.display_name != "":
            raise mo.CheckError('Chybná syntaxe mailové adresy')
    
        if check_existence and not email_is_fake(addr):
            email_check_domain(addr_obj.domain)
    
        # XXX: Striktně vzato, tohle není korektní, protože některé domény mohou
        # mít case-sensitive levou stranu adresy. Ale i na nich se prakticky nevyskytují
        # levé strany s velkými písmeny, zatímco uživatelé při psaní adres běžně velká
        # a malá písmena zaměňují. Menší zlo tedy je normalizovat na malá písmena.
        return addr.lower()
    
    
    def user_by_email(email: str) -> Optional[db.User]:
        try:
            email = normalize_email(email)
        except mo.CheckError:
            return None
        return db.get_session().query(db.User).filter_by(email=email).first()
    
    
    def user_by_uid(uid: int) -> Optional[db.User]:
        return db.get_session().query(db.User).get(uid)
    
    
    password_help = 'Heslo musí mít alespoň 8 znaků. Doporučujeme kombinovat velká a malá písmena a číslice.'
    
    
    def validate_password(passwd: str) -> bool:
        return len(passwd) >= 8
    
    
    def set_password(user: db.User, passwd: str, reset: bool = False):
        salt = bcrypt.gensalt(rounds=9)
        hashed = bcrypt.hashpw(passwd.encode('utf-8'), salt)
        user.password_hash = hashed.decode('us-ascii')
        if reset:
            user.reset_at = mo.now
            mo.util.log(
                type=db.LogType.user,
                what=user.user_id,
                details={'action': 'do-reset'},
            )
    
    
    def check_password(user: db.User, passwd: str):
        return user.password_hash is not None and \
            bcrypt.checkpw(passwd.encode('utf-8'), user.password_hash.encode('us-ascii'))
    
    
    def login(user: db.User):
        user.last_login_at = mo.now
    
    
    def make_activation_token(user: db.User) -> str:
        user.reset_at = mo.now
        when = int(mo.now.timestamp())
        return mo.tokens.sign_token([str(user.user_id), str(when)], 'activate')
    
    
    def check_activation_token(token: str) -> Optional[db.User]:
        token = mo.util.clean_up_token(token)
        fields = mo.tokens.verify_token(token, 'activate')
        if not fields or len(fields) != 2:
            return None
        user_id = int(fields[0])
        token_time = datetime.datetime.fromtimestamp(int(fields[1]), tz=dateutil.tz.UTC)
    
        user = user_by_uid(user_id)
        if not user:
            return None
        elif token_time < mo.now - datetime.timedelta(days=28):
            return None
        else:
            return user
    
    
    def new_reg_request(type: db.RegReqType, client: str) -> Optional[db.RegRequest]:
        sess = db.get_session()
    
        # Zatím jen jednoduchý rate limit, časem možno vylepšit
        in_last_minute = db.get_count(sess.query(db.RegRequest).filter(db.RegRequest.created_at >= mo.now - datetime.timedelta(minutes=1)))
        if in_last_minute >= config.REG_MAX_PER_MINUTE:
            return None
    
        email_token = mo.tokens.sign_token([str(int(mo.now.timestamp())), secrets.token_hex(16)], 'reg-request')
    
        return db.RegRequest(
            type=type,
            created_at=mo.now,
            expires_at=mo.now + datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY),
            email_token=email_token,
            client=client,
        )
    
    
    def expire_reg_requests():
        sess = db.get_session()
        conn = sess.connection()
        table = db.RegRequest.__table__
        conn.execute(table.delete().where(table.c.expires_at < mo.now))
        sess.commit()
    
    
    def request_reset_password(user: db.User, client: str) -> Optional[db.RegRequest]:
        logger.info('Login: Požadavek na reset hesla pro <%s>', user.email)
        assert not user.is_admin
        rr = new_reg_request(db.RegReqType.reset_passwd, client)
        if rr:
            db.get_session().add(rr)
            rr.user_id = user.user_id
            mo.util.log(
                type=db.LogType.user,
                what=user.user_id,
                details={'action': 'ask-reset'},
            )
        return rr