Select Git revision
kgrams_test.py
-
Pavel Veselý authoredPavel Veselý authored
auth.py 6.01 KiB
import datetime
from flask import render_template, request, g, redirect, url_for, session
from flask.helpers import flash
from flask_wtf import FlaskForm
import werkzeug.exceptions
import wtforms
from wtforms.fields.html5 import EmailField
import wtforms.validators as validators
from sqlalchemy.orm import joinedload
from typing import Optional
import mo.util
import mo.db as db
import mo.rights
import mo.users
from mo.web import app, NeedLoginError
import mo.web.fields as mo_fields
class LoginForm(FlaskForm):
next = wtforms.HiddenField()
email = mo_fields.Email(validators=[validators.DataRequired()])
passwd = wtforms.PasswordField('Heslo')
submit = wtforms.SubmitField('Přihlásit se')
reset = wtforms.SubmitField('Zapomenuté heslo')
def login_and_redirect(user: db.User, url: Optional[str] = None):
session.clear()
session['uid'] = user.user_id
if not url:
if user.is_admin or user.is_org:
url = url_for('org_index')
else:
url = url_for('index')
else:
url = request.script_root + url
return redirect(url)
@app.route('/auth/login', methods=('GET', 'POST'))
def login():
form = LoginForm(email=request.args.get('email'))
if not form.validate_on_submit():
return render_template('login.html', form=form, error=None)
email = form.email.data
user = mo.users.user_by_email(email)
if not user:
app.logger.error('Login: Neznámý uživatel <%s>', email)
flash('Neznámý uživatel', 'danger')
elif form.reset.data:
app.logger.info('Login: Požadavek na reset hesla pro <%s>', email)
min_time_between_resets = datetime.timedelta(minutes=1)
now = datetime.datetime.now().astimezone()
if (user.reset_at is not None
and now - user.reset_at < min_time_between_resets):
flash('Poslední požadavek na obnovení hesla byl odeslán příliš nedávno', 'danger')
else:
token = mo.users.ask_reset_password(user)
db.get_session().commit()
mo.util.send_password_reset_email(user, token)
flash('Na uvedenou adresu byl odeslán e-mail s odkazem na obnovu hesla', 'success')
elif not form.passwd.data or not mo.users.check_password(user, form.passwd.data):
app.logger.error('Login: Špatné heslo pro uživatele <%s>', email)
flash('Chybné heslo', 'danger')
else:
if user.is_admin:
typ = ' (admin)'
elif user.is_org:
typ = ' (org)'
elif user.is_test:
typ = ' (test)'
else:
typ = ""
app.logger.info('Login: Přihlásil se uživatel #%s <%s>%s', user.user_id, email, typ)
mo.users.login(user)
db.get_session().commit()
return login_and_redirect(user, url=form.next.data)
return render_template('login.html', form=form)
@app.route('/auth/logout', methods=('POST',))
def logout():
session.clear()
return redirect(url_for('index'))
@app.route('/auth/incarnate/<int:id>', methods=('POST',))
def incarnate(id):
if not g.user.is_admin:
raise werkzeug.exceptions.Forbidden()
new_user = db.get_session().query(db.User).get(id)
if not new_user:
raise werkzeug.exceptions.NotFound()
app.logger.info('Login: Uživatel #%s se převtělil na #%s', g.user.user_id, new_user.user_id)
return login_and_redirect(new_user)
@app.route('/user/settings')
def user_settings():
sess = db.get_session()
roles = []
if g.user:
roles = (sess.query(db.UserRole)
.filter_by(user_id=g.user.user_id)
.options(joinedload(db.UserRole.place))
.all())
return render_template('settings.html', roles=roles, roles_by_type=mo.rights.roles_by_type)
@app.errorhandler(NeedLoginError)
def handle_need_login(e):
form = LoginForm()
form.next.data = request.path
return render_template('login.html', form=form), e.code
class ResetForm(FlaskForm):
email = EmailField('E-mail', description='Účet pro který se nastavuje nové heslo', render_kw={"disabled": "disabled"})
token = wtforms.HiddenField()
passwd = wtforms.PasswordField('Nové heslo', description=mo.users.password_help)
submit = wtforms.SubmitField('Nastavit heslo')
cancel = wtforms.SubmitField('Zrušit obnovu hesla')
@app.route('/auth/reset', methods=('GET', 'POST'))
def reset():
token = request.args.get('token')
if not token:
flash('Žádný token pro resetování hesla', 'danger')
return redirect(url_for('login'))
user = mo.users.check_reset_password(token)
if not user:
flash('Neplatný požadavek na obnovu hesla', 'danger')
return redirect(url_for('login'))
form = ResetForm(token=token, email=user.email)
ok = form.validate_on_submit()
if not ok:
return render_template('reset.html', form=form)
if form.cancel.data:
mo.users.cancel_reset_password(user)
app.logger.info('Login: Zrušen reset hesla pro uživatele <%s>', user.email)
db.get_session().commit()
flash('Obnova hesla zrušena', 'warning')
return redirect(url_for('login'))
elif not mo.users.validate_password(form.passwd.data):
flash(mo.users.password_help, 'danger')
return render_template('reset.html', form=form)
else:
mo.users.do_reset_password(user)
mo.users.set_password(user, form.passwd.data)
app.logger.info('Login: Reset hesla pro uživatele <%s>', user.email)
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'reset-passwd'},
)
mo.users.login(user)
app.logger.info('Login: Přihlásil se uživatel <%s> po resetování hesla', user.email)
db.get_session().commit()
flash('Nastavení nového hesla a přihlášení do systému proběhlo úspěšně', 'success')
return login_and_redirect(user)
@app.errorhandler(werkzeug.exceptions.Forbidden)
def handle_forbidden(e):
return render_template('forbidden.html')