Select Git revision
-
Martin Mareš authoredMartin Mareš authored
acct.py 24.28 KiB
import datetime
import dateutil.tz
from enum import Enum, auto
from flask import render_template, request, g, redirect, url_for, session
from flask.helpers import flash
from flask_wtf import FlaskForm
import html
from markupsafe import Markup
import random
import secrets
from sqlalchemy.orm import joinedload
from typing import Optional, Dict
import werkzeug.exceptions
import wtforms
from wtforms import validators, ValidationError
import mo.config as config
import mo.db as db
import mo.rights
import mo.tokens
import mo.users
import mo.util
from mo.web import app, NeedLoginError
import mo.web.fields as mo_fields
# POZOR: V podstromu /acct/ není vyžadován login
class LoginForm(FlaskForm):
next = wtforms.HiddenField()
email = mo_fields.Email(validators=[validators.DataRequired()])
passwd = mo_fields.Password('Heslo')
submit = wtforms.SubmitField('Přihlásit se')
reset = wtforms.SubmitField('Zapomenuté heslo')
def login_and_redirect(user: db.User, flash_msg: Optional[str] = None, url: Optional[str] = None):
session.clear()
session['uid'] = user.user_id
if not url:
if user.is_admin or user.is_org:
url = url_for('org_index')
else:
url = url_for('index')
else:
url = request.script_root + url
if flash_msg:
flash(flash_msg, 'success')
return redirect(url)
@app.route('/acct/login', methods=('GET', 'POST'))
def login():
form = LoginForm(email=request.args.get('email'))
if not form.validate_on_submit():
return render_template('login.html', form=form, error=None)
email = form.email.data
user = mo.users.user_by_email(email)
if not user:
app.logger.error('Login: Neznámý uživatel <%s>', email)
flash('Neznámý uživatel', 'danger')
elif form.reset.data:
if user.is_admin:
flash('Obnova hesla účtu správce není možná.', 'danger')
else:
rr = mo.users.request_reset_password(user, request.remote_addr)
if rr:
db.get_session().commit()
mo.email.send_password_reset_email(user, rr.email_token)
flash('Na uvedenou adresu byl odeslán e-mail s odkazem na obnovu hesla.', 'success')
else:
flash('Příliš časté požadavky na obnovu hesla.', 'danger')
elif not form.passwd.data or not mo.users.check_password(user, form.passwd.data):
app.logger.error('Login: Špatné heslo pro uživatele <%s>', email)
flash('Chybné heslo', 'danger')
else:
if user.is_admin:
typ = ' (admin)'
elif user.is_org:
typ = ' (org)'
elif user.is_test:
typ = ' (test)'
else:
typ = ""
app.logger.info('Login: Přihlásil se uživatel #%s <%s>%s', user.user_id, email, typ)
mo.users.login(user)
db.get_session().commit()
return login_and_redirect(user, url=form.next.data)
return render_template('login.html', form=form)
@app.route('/acct/logout', methods=('POST',))
def logout():
session.clear()
return redirect(url_for('index'))
@app.route('/acct/incarnate/<int:id>', methods=('POST',))
def incarnate(id):
if not g.user:
raise NeedLoginError()
if not g.user.is_admin:
raise werkzeug.exceptions.Forbidden()
new_user = db.get_session().query(db.User).get(id)
if not new_user:
raise werkzeug.exceptions.NotFound()
app.logger.info('Login: Uživatel #%s se převtělil na #%s', g.user.user_id, new_user.user_id)
return login_and_redirect(new_user, flash_msg='Převtělení proběhlo')
class AcctSettingsForm(FlaskForm):
email_notify = wtforms.BooleanField('Posílat e-mailové notifikace')
submit = wtforms.SubmitField('Nastavit')
# URL je explicitně uvedeno v mo.email.settings_url
@app.route('/acct/settings', methods=('GET', 'POST'))
def user_settings():
sess = db.get_session()
user = g.user
if not user:
raise NeedLoginError()
form = AcctSettingsForm()
if not form.submit.data:
form.email_notify.data = user.email_notify
if form.validate_on_submit():
user.email_notify = form.email_notify.data
app.logger.info(f'Settings: Změněny preference uživatele #{user.user_id}: {db.get_object_changes(user)}')
# Do databázového logu nezapisujeme, nemá smysl logovat prkotiny.
sess.commit()
flash('Nastavení změněno.', 'success')
return redirect(url_for('user_settings'))
roles = (sess.query(db.UserRole)
.filter_by(user_id=g.user.user_id)
.options(joinedload(db.UserRole.place))
.all())
if g.user.is_org or g.user.is_admin:
pant = None
else:
pant = sess.query(db.Participant).get((g.user.user_id, config.CURRENT_YEAR))
return render_template(
'settings.html',
user=g.user, pant=pant, roles=roles, roles_by_type=mo.rights.roles_by_type,
form=form)
class PersonalSettingsForm(FlaskForm):
email = mo_fields.Email(validators=[validators.DataRequired()])
current_passwd = mo_fields.Password('Aktuální heslo', validators=[validators.DataRequired()])
new_passwd = mo_fields.NewPassword(
description=mo.users.password_help + ' Pokud nechcete heslo měnit, ponechte toto políčko prázdné.',
)
new_passwd2 = mo_fields.RepeatPassword()
submit = wtforms.SubmitField('Nastavit')
def validate_current_passwd(form, field):
if not mo.users.check_password(g.user, field.data):
raise ValidationError('Chybné heslo.')
@app.route('/acct/settings/personal', methods=('GET', 'POST'))
def user_settings_personal():
sess = db.get_session()
user = g.user
if not user:
raise NeedLoginError()
form = PersonalSettingsForm()
if not form.submit.data:
form.email.data = user.email
if form.validate_on_submit():
ok = True
if form.new_passwd.data:
app.logger.info(f'Settings: Změněno heslo uživatele #{user.user_id}')
mo.users.set_password(user, form.new_passwd.data)
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'change-passwd'},
)
sess.commit()
flash('Heslo změněno.', 'success')
if form.email.data != user.email:
rr = mo.users.new_reg_request(db.RegReqType.change_email, request.remote_addr)
if rr:
rr.user_id = user.user_id
rr.email = form.email.data
sess.add(rr)
sess.commit()
app.logger.info(f'Settings: Požadavek na změnu e-mailu uživatele #{user.user_id}')
flash('Odeslán e-mail s odkazem na potvrzení nové adresy.', 'success')
mo.email.send_confirm_change_email(user, rr.email_token)
else:
app.logger.info('Settings: Rate limit')
flash('Příliš mnoho požadavků na změny e-mailu. Počkejte prosím chvíli a zkuste to znovu.', 'danger')
ok = False
if ok:
return redirect(url_for('user_settings'))
return render_template('settings_change.html', form=form)
@app.errorhandler(NeedLoginError)
def handle_need_login(e):
form = LoginForm()
form.next.data = request.path
return render_template('login.html', form=form), e.code
class ResetForm(FlaskForm):
email = wtforms.StringField('E-mail', description='Účet pro který se nastavuje nové heslo', render_kw={"disabled": "disabled"})
new_passwd = mo_fields.NewPassword(validators=[validators.DataRequired()])
new_passwd2 = mo_fields.RepeatPassword(validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Nastavit heslo')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/activate', methods=('GET', 'POST'))
def activate():
token = request.args.get('token')
if not token:
flash('Chybí token pro aktivaci účtu', 'danger')
return redirect(url_for('login'))
user = mo.users.check_activation_token(token)
if not user:
flash('Neplatný kód pro aktivaci účtu. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.', 'danger')
return redirect(url_for('login'))
if user.last_login_at is not None:
flash('Tento účet už byl aktivován. Pokud neznáte heslo, použijte tlačítko pro obnovu hesla.', 'danger')
return redirect(url_for('login'))
form = ResetForm(email=user.email)
ok = form.validate_on_submit()
if not ok:
return render_template('acct_activate.html', form=form)
app.logger.info('Login: Aktivace účtu uživatele <%s>', user.email)
mo.users.set_password(user, form.new_passwd.data, reset=True)
mo.users.login(user)
db.get_session().commit()
return login_and_redirect(user, flash_msg='Nastavení nového hesla a přihlášení do systému proběhlo úspěšně')
class RegStatus(Enum):
new = auto()
ok = auto()
expired = auto()
already_exists = auto()
# Jen v 1. kroku:
rate_limited = auto()
wrong_captcha = auto()
# Jen v 2. kroku:
already_spent = auto()
class Reg1:
create_time: datetime.datetime
seed: str
status: RegStatus
email_token: str
x: int
y: int
def __init__(self, from_token: Optional[str] = None):
self.status = self._parse_token(from_token)
if self.status == RegStatus.ok:
self._gen_captcha()
else:
self._reset()
def _reset(self):
self.create_time = mo.now
self.seed = secrets.token_hex(16)
app.logger.debug(f'Reg1: Nový token: seed={self.seed}')
self._gen_captcha()
def as_token(self) -> str:
return mo.tokens.sign_token([str(int(self.create_time.timestamp())), self.seed], 'reg1')
def _parse_token(self, token: Optional[str]) -> RegStatus:
if token is None:
return RegStatus.new
fields = mo.tokens.verify_token(token, 'reg1')
if not fields:
app.logger.debug(f'Reg1: Neplatný token: {token}')
return RegStatus.new
self.create_time = datetime.datetime.fromtimestamp(int(fields[0]), tz=dateutil.tz.UTC)
self.seed = fields[1]
app.logger.debug(f'Reg1: Přijat token: {self.create_time} {self.seed}')
token_age = mo.now - self.create_time
if token_age > datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY):
app.logger.debug('Reg1: Token expiroval')
return RegStatus.expired
return RegStatus.ok
def _init_rng(self) -> random.Random:
rng = random.Random()
rng.seed(mo.tokens.hash('rng-init', self.seed))
return rng
def _gen_captcha(self):
rng = self._init_rng()
self.x = rng.randrange(1, 10)
self.y = rng.randrange(1, 10)
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}')
def captcha_task(self) -> str:
cisla = ['nula', 'jedna', 'dva', 'tři', 'čtyři', 'pět', 'šest', 'sedm', 'osm', 'devět',
'deset', 'jedenáct', 'dvanáct', 'třináct', 'čtrnáct', 'patnáct', 'šestnáct', 'sedmnáct', 'osmnáct', 'devatenáct']
return f'Napište číslem, kolik je {cisla[self.x]} krát {cisla[self.y]}.'
def captcha_check_answer(self, answer: str) -> bool:
correct = self.x * self.y
if answer == str(correct):
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}={answer} správně')
return True
else:
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}={answer} špatně')
return False
def create_reg_request(self, email: str) -> bool:
sess = db.get_session()
rr = sess.query(db.RegRequest).with_for_update().filter_by(captcha_token=self.seed).one_or_none()
if rr:
self._reset()
self.status = RegStatus.expired
sess.rollback()
app.logger.info('Reg1: Captcha token použit znovu')
return False
rr = mo.users.new_reg_request(db.RegReqType.register, mo.util.assert_not_none(request.remote_addr))
if not rr:
self._reset()
self.status = RegStatus.rate_limited
sess.rollback()
app.logger.info('Reg1: Rate limit')
return False
self.email_token = rr.email_token
rr.email = email
rr.captcha_token = self.seed
sess.add(rr)
sess.commit()
return True
def process(self, email: str, captcha: str) -> bool:
# XXX: Nejdříve zapisujeme registraci do DB, a teprve pak ověřujeme captchu.
# Tímto způsobem je těžší captchu obejít (protože je rate-limitovaná), ale
# zase je snazší páchat DoS útok na celou registraci (protože je rate-limitovaná).
if not self.create_reg_request(email):
return False
if not self.captcha_check_answer(captcha):
self._reset()
self.status = RegStatus.wrong_captcha
return False
if mo.users.user_by_email(email):
self._reset()
self.status = RegStatus.already_exists
app.logger.info(f'Reg1: Účet s e-mailem {email} už existuje')
return False
return True
class Reg1Form(FlaskForm):
email = mo_fields.Email(validators=[validators.DataRequired()])
token = wtforms.HiddenField()
captcha = mo_fields.String('Kontrolní odpověď', validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Vytvořit účet')
@app.route('/acct/create', methods=('GET', 'POST'))
def create_acct():
form = Reg1Form()
reg1 = Reg1(form.token.data)
if reg1.status == RegStatus.ok and form.validate_on_submit() and reg1.process(form.email.data, form.captcha.data):
app.logger.debug(f'Reg1: E-mailový token {reg1.email_token}')
flash('Odeslán e-mail s odkazem na založení účtu.', 'success')
user = db.User(email=form.email.data, first_name='Nový', last_name='Uživatel')
mo.email.send_confirm_create_email(user, reg1.email_token)
return redirect(url_for('confirm_reg'))
form.captcha.description = reg1.captcha_task()
if reg1.status != RegStatus.ok:
form.token.data = reg1.as_token()
form.captcha.data = ""
if reg1.status == RegStatus.expired:
flash('Vypršela platnost formuláře, vyplňte ho prosím znovu.', 'danger')
elif reg1.status == RegStatus.rate_limited:
flash('Přichází příliš mnoho registrací najednou, zkuste to prosím za chvíli znovu.', 'danger')
elif reg1.status == RegStatus.already_exists:
form.email.errors.append('Účet s touto adresou už existuje. ' + Markup('<a href="' + html.escape(url_for('login', email=form.email.data)) + '">Přihlásit se.</a>'))
elif reg1.status == RegStatus.wrong_captcha:
form.captcha.errors.append('Chybný výsledek. Zkuste to znovu: ' + reg1.captcha_task())
return render_template('acct_reg1.html', form=form)
class Reg2:
reg_type: db.RegReqType
status: RegStatus
rr: db.RegRequest
user: db.User
messages: Dict[db.RegReqType, Dict[RegStatus, str]] = {
db.RegReqType.register: {
RegStatus.new: 'Chybný registrační kód. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.',
RegStatus.expired: 'Vypršela platnost registrace, vyplňte ji prosím znovu.',
RegStatus.already_spent: 'Tento odkaz na potvrzení registrace byl již využit.',
RegStatus.already_exists: 'Účet s touto adresou už existuje.',
},
db.RegReqType.change_email: {
RegStatus.new: 'Chybný potvrzovací kód. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.',
RegStatus.expired: 'Vypršela platnost potvrzovacího kódu, požádejte prosím o změnu e-mailu znovu.',
RegStatus.already_spent: 'Tento odkaz na potvrzení změny e-mailu byl již využit.',
},
db.RegReqType.reset_passwd: {
RegStatus.new: 'Chybný kód pro obnovení hesla. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.',
RegStatus.expired: 'Vypršela platnost kódu pro obnovení hesla, požádejte prosím o obnovu znovu.',
RegStatus.already_spent: 'Tento odkaz na obnovení hesla byl již využit.',
},
}
def __init__(self, token: str, expected_type: db.RegReqType):
self.reg_type = expected_type
self.status = self._parse_token(token)
if self.status == RegStatus.ok:
self.status = self._load_rr(token)
def _parse_token(self, token: Optional[str]) -> RegStatus:
if not token:
return RegStatus.new
token = mo.util.clean_up_token(token)
fields = mo.tokens.verify_token(token, 'reg-request')
if not fields:
app.logger.debug(f'Reg2: Neplatný token: {token}')
return RegStatus.new
create_time = datetime.datetime.fromtimestamp(int(fields[0]), tz=dateutil.tz.UTC)
app.logger.debug(f'Reg2: Přijat token: {token}')
token_age = mo.now - create_time
if token_age > datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY):
app.logger.debug('Reg2: Token expiroval')
return RegStatus.expired
return RegStatus.ok
def _load_rr(self, token: str) -> RegStatus:
sess = db.get_session()
rr = sess.query(db.RegRequest).with_for_update().filter_by(email_token=token).one_or_none()
if not rr:
app.logger.info('Reg2: Registrace nenalezena')
return RegStatus.expired
if rr.expires_at < mo.now:
app.logger.info('Reg2: Registrace expirovala')
return RegStatus.expired
if rr.used_at is not None:
app.logger.info('Reg2: Registrace spotřebována')
return RegStatus.already_spent
if rr.type != self.reg_type:
app.logger.info('Reg2: Token špatného typu')
return RegStatus.new
self.rr = rr
return RegStatus.ok
def create(self, first_name: str, last_name: str, passwd: str) -> bool:
rr = self.rr
assert rr.email is not None
email = mo.users.normalize_email(rr.email) # Pro jistotu
sess = db.get_session()
try:
user, is_new, _ = mo.users.find_or_create_user(email, first_name, last_name, is_org=False, reason='register')
except mo.CheckError as e:
app.logger.info(f'Reg2: Založení účtu {email} selhalo: {e}')
self.status = RegStatus.already_exists
return False
if not is_new:
# Účet mohl začít existovat mezi 1. a 2. krokem registrace
app.logger.info(f'Reg2: Účet s e-mailem {email} začal během registrace existovat')
self.status = RegStatus.already_exists
return False
mo.users.set_password(user, passwd)
mo.users.login(user)
rr.used_at = mo.now
sess.commit()
self.user = user
return True
def change_email(self):
sess = db.get_session()
user = self.rr.user
user.email = self.rr.email
app.logger.info(f'Reg2: Uživatel #{user.user_id} si změnil email na <{user.email}>')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={
'action': 'change-settings',
'changes': db.get_object_changes(user),
},
)
self.rr.used_at = mo.now
sess.commit()
def change_passwd(self, new_passwd: str):
sess = db.get_session()
user = self.rr.user
app.logger.info(f'Reg2: Uživatel #{user.user_id} si resetoval heslo')
mo.users.set_password(user, new_passwd, reset=True)
mo.users.login(user)
self.rr.used_at = mo.now
sess.commit()
def spend_request(self):
self.rr.used_at = mo.now
db.get_session().commit()
def flash_message(self):
msgs = self.messages[self.reg_type]
if self.status in msgs:
flash(msgs[self.status], 'danger')
class Reg2Form(FlaskForm):
email = wtforms.StringField('E-mail', render_kw={"disabled": "disabled"})
first_name = mo_fields.FirstName(validators=[validators.DataRequired()])
last_name = mo_fields.LastName(validators=[validators.DataRequired()])
new_passwd = mo_fields.NewPassword('Heslo', validators=[validators.DataRequired()])
new_passwd2 = mo_fields.RepeatPassword(validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Vytvořit účet')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/confirm/r', methods=('GET', 'POST'))
def confirm_reg():
token = request.args.get('token')
if token is None:
return render_template('acct_reg2.html', form=None)
reg2 = Reg2(token, db.RegReqType.register)
if reg2.status != RegStatus.ok:
reg2.flash_message()
return redirect(url_for('create_acct'))
form = Reg2Form()
if form.validate_on_submit():
if reg2.create(form.first_name.data, form.last_name.data, form.new_passwd.data):
flash('Založení účtu a přihlášení do systému proběhlo úspěšně.', 'success')
app.logger.info(f'Login: Přihlásil se uživatel <{reg2.user.email}> po založení účtu')
return login_and_redirect(reg2.user, flash_msg='Účet úspěšně založen.')
reg2.flash_message()
form.email.data = reg2.rr.email
return render_template('acct_reg2.html', form=form)
class ConfirmEmailForm(FlaskForm):
orig_email = wtforms.StringField('Původní e-mail', render_kw={"disabled": "disabled"})
new_email = wtforms.StringField('Nový e-mail', render_kw={"disabled": "disabled"})
submit = wtforms.SubmitField('Potvrdit změnu')
cancel = wtforms.SubmitField('Zrušit požadavek')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/confirm/e', methods=('GET', 'POST'))
def confirm_email():
reg2 = Reg2(request.args.get('token'), db.RegReqType.change_email)
if reg2.status != RegStatus.ok:
reg2.flash_message()
return redirect(url_for('user_settings'))
form = ConfirmEmailForm()
if form.validate_on_submit():
if form.submit.data:
reg2.change_email()
flash('E-mail změněn.', 'success')
elif form.cancel.data:
reg2.spend_request()
flash('Požadavek na změnu e-mailu zrušen.', 'success')
return redirect(url_for('user_settings'))
form.orig_email.data = reg2.rr.user.email
form.new_email.data = reg2.rr.email
return render_template('acct_confirm_email.html', form=form)
class CancelResetForm(FlaskForm):
cancel = wtforms.SubmitField('Zrušit obnovu hesla')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/confirm/p', methods=('GET', 'POST'))
def confirm_reset():
reg2 = Reg2(request.args.get('token'), db.RegReqType.reset_passwd)
if reg2.status != RegStatus.ok:
reg2.flash_message()
return redirect(url_for('login'))
form = ResetForm(email=reg2.rr.user.email)
if form.validate_on_submit() and form.submit.data:
reg2.change_passwd(form.new_passwd.data)
return login_and_redirect(reg2.rr.user, flash_msg='Nastavení nového hesla a přihlášení do systému proběhlo úspěšně')
cform = CancelResetForm()
if cform.validate_on_submit() and cform.cancel.data:
reg2.spend_request()
flash('Požadavek na změnu hesla zrušen.', 'success')
return redirect(url_for('user_settings'))
return render_template('acct_reset_passwd.html', form=form, cancel_form=cform)
@app.errorhandler(werkzeug.exceptions.Forbidden)
def handle_forbidden(e):
return render_template('forbidden.html'), 403
if getattr(config, 'INSECURE_TEST_LOGIN', False):
@app.route('/test-login/<email>')
def test_login(email: str):
if not email.endswith('@test'):
app.logger.error('Test login: Uživatel <%s> nekončí na @test', email)
raise werkzeug.exceptions.NotFound()
user = mo.users.user_by_email(email)
if not user:
app.logger.error('Test login: Neznámý uživatel <%s>', email)
raise werkzeug.exceptions.Forbidden()
app.logger.info('Test login: Přihlásil se uživatel #%s <%s>', user.user_id, email)
mo.users.login(user)
db.get_session().commit()
return login_and_redirect(user)