Skip to content
Snippets Groups Projects
Select Git revision
  • 1b1eada3f6231b37bb5e76b68916c1263c9bff7a
  • master default protected
2 results

kgrams_test.py

Blame
  • auth.py 6.01 KiB
    import datetime
    
    from flask import render_template, request, g, redirect, url_for, session
    from flask.helpers import flash
    from flask_wtf import FlaskForm
    import werkzeug.exceptions
    import wtforms
    from wtforms.fields.html5 import EmailField
    import wtforms.validators as validators
    from sqlalchemy.orm import joinedload
    from typing import Optional
    
    import mo.util
    import mo.db as db
    import mo.rights
    import mo.users
    from mo.web import app, NeedLoginError
    import mo.web.fields as mo_fields
    
    
    class LoginForm(FlaskForm):
        next = wtforms.HiddenField()
        email = mo_fields.Email(validators=[validators.DataRequired()])
        passwd = wtforms.PasswordField('Heslo')
        submit = wtforms.SubmitField('Přihlásit se')
        reset = wtforms.SubmitField('Zapomenuté heslo')
    
    
    def login_and_redirect(user: db.User, url: Optional[str] = None):
        session.clear()
        session['uid'] = user.user_id
        if not url:
            if user.is_admin or user.is_org:
                url = url_for('org_index')
            else:
                url = url_for('index')
        else:
            url = request.script_root + url
        return redirect(url)
    
    
    @app.route('/auth/login', methods=('GET', 'POST'))
    def login():
        form = LoginForm(email=request.args.get('email'))
    
        if not form.validate_on_submit():
            return render_template('login.html', form=form, error=None)
    
        email = form.email.data
        user = mo.users.user_by_email(email)
    
        if not user:
            app.logger.error('Login: Neznámý uživatel <%s>', email)
            flash('Neznámý uživatel', 'danger')
        elif form.reset.data:
            app.logger.info('Login: Požadavek na reset hesla pro <%s>', email)
    
            min_time_between_resets = datetime.timedelta(minutes=1)
            now = datetime.datetime.now().astimezone()
            if (user.reset_at is not None
                    and now - user.reset_at < min_time_between_resets):
                flash('Poslední požadavek na obnovení hesla byl odeslán příliš nedávno', 'danger')
            else:
                token = mo.users.ask_reset_password(user)
                db.get_session().commit()
    
                mo.util.send_password_reset_email(user, token)
                flash('Na uvedenou adresu byl odeslán e-mail s odkazem na obnovu hesla', 'success')
    
        elif not form.passwd.data or not mo.users.check_password(user, form.passwd.data):
            app.logger.error('Login: Špatné heslo pro uživatele <%s>', email)
            flash('Chybné heslo', 'danger')
        else:
            if user.is_admin:
                typ = ' (admin)'
            elif user.is_org:
                typ = ' (org)'
            elif user.is_test:
                typ = ' (test)'
            else:
                typ = ""
            app.logger.info('Login: Přihlásil se uživatel #%s <%s>%s', user.user_id, email, typ)
            mo.users.login(user)
            db.get_session().commit()
            return login_and_redirect(user, url=form.next.data)
    
        return render_template('login.html', form=form)
    
    
    @app.route('/auth/logout', methods=('POST',))
    def logout():
        session.clear()
        return redirect(url_for('index'))
    
    
    @app.route('/auth/incarnate/<int:id>', methods=('POST',))
    def incarnate(id):
        if not g.user.is_admin:
            raise werkzeug.exceptions.Forbidden()
    
        new_user = db.get_session().query(db.User).get(id)
        if not new_user:
            raise werkzeug.exceptions.NotFound()
    
        app.logger.info('Login: Uživatel #%s se převtělil na #%s', g.user.user_id, new_user.user_id)
        return login_and_redirect(new_user)
    
    
    @app.route('/user/settings')
    def user_settings():
        sess = db.get_session()
        roles = []
        if g.user:
            roles = (sess.query(db.UserRole)
                     .filter_by(user_id=g.user.user_id)
                     .options(joinedload(db.UserRole.place))
                     .all())
        return render_template('settings.html', roles=roles, roles_by_type=mo.rights.roles_by_type)
    
    
    @app.errorhandler(NeedLoginError)
    def handle_need_login(e):
        form = LoginForm()
        form.next.data = request.path
        return render_template('login.html', form=form), e.code
    
    
    class ResetForm(FlaskForm):
        email = EmailField('E-mail', description='Účet pro který se nastavuje nové heslo', render_kw={"disabled": "disabled"})
        token = wtforms.HiddenField()
        passwd = wtforms.PasswordField('Nové heslo', description=mo.users.password_help)
        submit = wtforms.SubmitField('Nastavit heslo')
        cancel = wtforms.SubmitField('Zrušit obnovu hesla')
    
    
    @app.route('/auth/reset', methods=('GET', 'POST'))
    def reset():
        token = request.args.get('token')
        if not token:
            flash('Žádný token pro resetování hesla', 'danger')
            return redirect(url_for('login'))
    
        user = mo.users.check_reset_password(token)
        if not user:
            flash('Neplatný požadavek na obnovu hesla', 'danger')
            return redirect(url_for('login'))
    
        form = ResetForm(token=token, email=user.email)
        ok = form.validate_on_submit()
        if not ok:
            return render_template('reset.html', form=form)
    
        if form.cancel.data:
            mo.users.cancel_reset_password(user)
            app.logger.info('Login: Zrušen reset hesla pro uživatele <%s>', user.email)
            db.get_session().commit()
            flash('Obnova hesla zrušena', 'warning')
            return redirect(url_for('login'))
        elif not mo.users.validate_password(form.passwd.data):
            flash(mo.users.password_help, 'danger')
            return render_template('reset.html', form=form)
        else:
            mo.users.do_reset_password(user)
            mo.users.set_password(user, form.passwd.data)
            app.logger.info('Login: Reset hesla pro uživatele <%s>', user.email)
            mo.util.log(
                type=db.LogType.user,
                what=user.user_id,
                details={'action': 'reset-passwd'},
            )
            mo.users.login(user)
            app.logger.info('Login: Přihlásil se uživatel <%s> po resetování hesla', user.email)
            db.get_session().commit()
            flash('Nastavení nového hesla a přihlášení do systému proběhlo úspěšně', 'success')
            return login_and_redirect(user)
    
    
    @app.errorhandler(werkzeug.exceptions.Forbidden)
    def handle_forbidden(e):
        return render_template('forbidden.html')