Skip to content
Snippets Groups Projects
Commit e5387cfa authored by Martin Mareš's avatar Martin Mareš
Browse files

Registrace: Zakládání účtu

parent b2b5863b
No related branches found
No related tags found
1 merge request!86Registrace
......@@ -47,3 +47,9 @@ JOB_GC_PERIOD = 60
# Za jak dlouho expiruje dokončená dávka [min]
JOB_EXPIRATION = 5
# Kolik nejvýše dovolujeme registrací za minutu
REG_MAX_PER_MINUTE = 10
# Jak dlouho vydrží tokeny používané při registraci a změnách e-mailu [min]
REG_TOKEN_VALIDITY = 10
......@@ -5,9 +5,11 @@ import datetime
import email.errors
import email.headerregistry
import re
import secrets
from typing import Optional, Tuple
import mo
import mo.config as config
import mo.db as db
import mo.util
from mo.util import logger
......@@ -243,3 +245,22 @@ def do_reset_password(user: db.User):
what=user.user_id,
details={'action': 'do-reset'},
)
def new_reg_request(type: db.RegReqType, client: str) -> Optional[db.RegRequest]:
sess = db.get_session()
# Zatím jen jednoduchý rate limit, časem možno vylepšit
in_last_minute = db.get_count(sess.query(db.RegRequest).filter(db.RegRequest.created_at >= mo.now - datetime.timedelta(minutes=1)))
if in_last_minute >= config.REG_MAX_PER_MINUTE:
return None
email_token = mo.tokens.sign_token([str(int(mo.now.timestamp())), secrets.token_hex(16)], 'reg-request')
return db.RegRequest(
type=type,
created_at=mo.now,
expires_at=mo.now + datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY),
email_token=email_token,
client=client,
)
......@@ -117,6 +117,10 @@ def password_reset_url(token: str) -> str:
return config.WEB_ROOT + 'auth/reset?' + urllib.parse.urlencode({'token': token}, safe=':')
def confirm_create_url(token: str) -> str:
return config.WEB_ROOT + 'auth/confirm?' + urllib.parse.urlencode({'token': token}, safe=':')
def send_new_account_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Založen nový účet', textwrap.dedent('''\
Vítejte!
......@@ -142,6 +146,18 @@ def send_password_reset_email(user: db.User, token: str) -> bool:
'''.format(password_reset_url(token))))
def send_confirm_create_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Založení účtu', textwrap.dedent('''\
Někdo (pravděpodobně Vy) požádal o založení účtu s touto e-mailovou adresou
v Odevzdávacím systému Matematické olympiády. Pokud účet chcete založit,
následujte tento odkaz:
{}
Váš OSMO
'''.format(confirm_create_url(token))))
def die(msg: str) -> NoReturn:
print(msg, file=sys.stderr)
sys.exit(1)
......
import datetime
import dateutil.tz
from enum import Enum, auto
from flask import render_template, request, g, redirect, url_for, session
from flask.helpers import flash
from flask_wtf import FlaskForm
import html
from markupsafe import Markup
import random
import secrets
from sqlalchemy.orm import joinedload
from typing import Optional, Dict
import werkzeug.exceptions
import wtforms
from wtforms import validators, ValidationError
from wtforms.fields.html5 import EmailField
import wtforms.validators as validators
from sqlalchemy.orm import joinedload
from typing import Optional
import mo.util
import mo.config as config
import mo.db as db
import mo.rights
import mo.tokens
import mo.users
import mo.util
from mo.web import app, NeedLoginError
import mo.web.fields as mo_fields
......@@ -175,6 +182,306 @@ def reset():
return login_and_redirect(user)
class RegStatus(Enum):
new = auto()
ok = auto()
expired = auto()
already_exists = auto()
# Jen v 1. kroku:
rate_limited = auto()
wrong_captcha = auto()
# Jen v 2. kroku:
already_spent = auto()
class Reg1:
create_time: datetime.datetime
seed: str
status: RegStatus
email_token: str
x: int
y: int
def __init__(self, from_token: Optional[str] = None):
self.status = self._parse_token(from_token)
if self.status == RegStatus.ok:
self._gen_captcha()
else:
self._reset()
def _reset(self):
self.create_time = mo.now
self.seed = secrets.token_hex(16)
app.logger.debug(f'Reg1: Nový token: seed={self.seed}')
self._gen_captcha()
def as_token(self) -> str:
return mo.tokens.sign_token([str(int(self.create_time.timestamp())), self.seed], 'reg1')
def _parse_token(self, token: Optional[str]) -> RegStatus:
if token is None:
return RegStatus.new
fields = mo.tokens.verify_token(token, 'reg1')
if not fields:
app.logger.debug(f'Reg1: Neplatný token: {token}')
return RegStatus.new
self.create_time = datetime.datetime.fromtimestamp(int(fields[0]), tz=dateutil.tz.UTC)
self.seed = fields[1]
app.logger.debug(f'Reg1: Přijat token: {self.create_time} {self.seed}')
token_age = mo.now - self.create_time
if token_age > datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY):
app.logger.debug('Reg1: Token expiroval')
return RegStatus.expired
return RegStatus.ok
def _init_rng(self) -> random.Random:
rng = random.Random()
rng.seed(mo.tokens.hash('rng-init', self.seed))
return rng
def _gen_captcha(self):
rng = self._init_rng()
self.x = rng.randrange(1, 10)
self.y = rng.randrange(1, 10)
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}')
def captcha_task(self) -> str:
cisla = ['nula', 'jedna', 'dva', 'tři', 'čtyři', 'pět', 'šest', 'sedm', 'osm', 'devět',
'deset', 'jedenáct', 'dvanáct', 'třináct', 'čtrnáct', 'patnáct', 'šestnáct', 'sedmnáct', 'osmnáct', 'devatenáct']
return f'Napište číslem, kolik je {cisla[self.x]} krát {cisla[self.y]}.'
def captcha_check_answer(self, answer: str) -> bool:
correct = self.x * self.y
if answer == str(correct):
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}={answer} správně')
return True
else:
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}={answer} špatně')
return False
def create_reg_request(self, email: str) -> bool:
sess = db.get_session()
rr = sess.query(db.RegRequest).with_for_update().filter_by(captcha_token=self.seed).one_or_none()
if rr:
self._reset()
self.status = RegStatus.expired
sess.rollback()
app.logger.info('Reg1: Captcha token použit znovu')
return False
rr = mo.users.new_reg_request(db.RegReqType.register, request.remote_addr)
if not rr:
self._reset()
self.status = RegStatus.rate_limited
sess.rollback()
app.logger.info('Reg1: Rate limit')
return False
self.email_token = rr.email_token
rr.email = email
rr.captcha_token = self.seed
sess.add(rr)
sess.commit()
return True
def process(self, email: str, captcha: str) -> bool:
# XXX: Nejdříve zapisujeme registraci do DB, a teprve pak ověřujeme captchu.
# Tímto způsobem je těžší captchu obejít (protože je rate-limitovaná), ale
# zase je snazší páchat DoS útok na celou registraci (protože je rate-limitovaná).
if not self.create_reg_request(email):
return False
if not self.captcha_check_answer(captcha):
self._reset()
self.status = RegStatus.wrong_captcha
return False
if mo.users.user_by_email(email):
self._reset()
self.status = RegStatus.already_exists
app.logger.info(f'Reg1: Účet s e-mailem {email} už existuje')
return False
return True
class Reg1Form(FlaskForm):
email = mo_fields.Email(validators=[validators.DataRequired()])
token = wtforms.HiddenField()
captcha = wtforms.StringField('Kontrolní odpověď', validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Vytvořit účet')
@app.route('/auth/create', methods=('GET', 'POST'))
def create_acct():
form = Reg1Form()
reg1 = Reg1(form.token.data)
if reg1.status == RegStatus.ok and form.validate_on_submit() and reg1.process(form.email.data, form.captcha.data):
app.logger.debug(f'Reg1: E-mailový token {reg1.email_token}')
flash('Odeslán e-mail s odkazem na založení účtu.', 'success')
user = db.User(email=form.email.data, first_name='Nový', last_name='Uživatel')
mo.util.send_confirm_create_email(user, reg1.email_token)
return redirect(url_for('confirm_reg'))
form.captcha.description = reg1.captcha_task()
if reg1.status != RegStatus.ok:
form.token.data = reg1.as_token()
form.captcha.data = ""
if reg1.status == RegStatus.expired:
flash('Vypršela platnost formuláře, vyplňte ho prosím znovu.', 'danger')
elif reg1.status == RegStatus.rate_limited:
flash('Přichází příliš mnoho registrací najednou, zkuste to prosím za chvíli znovu.', 'danger')
elif reg1.status == RegStatus.already_exists:
form.email.errors.append('Účet s touto adresou už existuje. ' + Markup('<a href="' + html.escape(url_for('login', email=form.email.data)) + '">Přihlásit se.</a>'))
elif reg1.status == RegStatus.wrong_captcha:
form.captcha.errors.append('Chybný výsledek. Zkuste to znovu: ' + reg1.captcha_task())
return render_template('acct_reg1.html', form=form)
class Reg2:
reg_type: db.RegReqType
status: RegStatus
rr: db.RegRequest
user: db.User
messages: Dict[db.RegReqType, Dict[RegStatus, str]] = {
db.RegReqType.register: {
RegStatus.new: 'Chybný registrační kód. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.',
RegStatus.expired: 'Vypršela platnost registrace, vyplňte ji prosím znovu.',
RegStatus.already_spent: 'Tento odkaz na potvrzení registrace byl již využit.',
RegStatus.already_exists: 'Účet s touto adresou už existuje.',
},
}
def __init__(self, token: str, expected_type: db.RegReqType):
self.reg_type = expected_type
self.status = self._parse_token(token)
if self.status == RegStatus.ok:
self.status = self._load_rr(token)
def _parse_token(self, token: Optional[str]) -> RegStatus:
if not token:
return RegStatus.new
token = mo.util.clean_up_token(token)
fields = mo.tokens.verify_token(token, 'reg-request')
if not fields:
app.logger.debug(f'Reg2: Neplatný token: {token}')
return RegStatus.new
create_time = datetime.datetime.fromtimestamp(int(fields[0]), tz=dateutil.tz.UTC)
app.logger.debug(f'Reg2: Přijat token: {token}')
token_age = mo.now - create_time
if token_age > datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY):
app.logger.debug('Reg2: Token expiroval')
return RegStatus.expired
return RegStatus.ok
def _load_rr(self, token: str) -> RegStatus:
sess = db.get_session()
rr = sess.query(db.RegRequest).with_for_update().filter_by(email_token=token).one_or_none()
if not rr:
app.logger.info('Reg2: Registrace nenalezena')
return RegStatus.expired
if rr.expires_at < mo.now:
app.logger.info('Reg2: Registrace expirovala')
return RegStatus.expired
if rr.used_at is not None:
app.logger.info('Reg2: Registrace spotřebována')
return RegStatus.already_spent
if rr.type != self.reg_type:
app.logger.info('Reg2: Token špatného typu')
return RegStatus.new
self.rr = rr
return RegStatus.ok
def create(self, first_name: str, last_name: str, passwd: str) -> bool:
rr = self.rr
assert rr.email is not None
email = mo.users.normalize_email(rr.email) # Pro jistotu
sess = db.get_session()
if db.get_session().query(db.User).with_for_update().filter_by(email=email).one_or_none():
# Účet mohl začít existovat mezi 1. a 2. krokem registrace
app.logger.info(f'Reg2: Účet s e-mailem {email} začal během registrace existovat')
self.status = RegStatus.already_exists
return False
user = db.User(
email=email,
first_name=first_name,
last_name=last_name,
)
mo.users.set_password(user, passwd)
rr.used_at = mo.now
sess.add(user)
sess.flush()
app.logger.info(f'Reg2: Založen uživatel user=#{user.user_id} email=<{user.email}>')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'register', 'new': db.row2dict(user)},
)
sess.commit()
self.user = user
return True
def flash_message(self):
msgs = self.messages[self.reg_type]
if self.status in msgs:
flash(msgs[self.status], 'danger')
class Reg2Form(FlaskForm):
email = wtforms.StringField('E-mail', render_kw={"disabled": "disabled"})
first_name = mo_fields.FirstName(validators=[validators.DataRequired()])
last_name = mo_fields.LastName(validators=[validators.DataRequired()])
new_passwd = mo_fields.NewPassword('Heslo', validators=[validators.DataRequired()])
new_passwd2 = mo_fields.RepeatPassword(validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Vytvořit účet')
@app.route('/auth/confirm/r', methods=('GET', 'POST'))
def confirm_reg():
token = request.args.get('token')
if token is None:
return render_template('acct_reg2.html', form=None)
reg2 = Reg2(token, db.RegReqType.register)
if reg2.status != RegStatus.ok:
reg2.flash_message()
return redirect(url_for('create_acct'))
form = Reg2Form()
if form.validate_on_submit():
if reg2.create(form.first_name.data, form.last_name.data, form.new_passwd.data):
flash('Založení účtu a přihlášení do systému proběhlo úspěšně.', 'success')
app.logger.info(f'Login: Přihlásil se uživatel <{reg2.user.email}> po založení účtu')
return login_and_redirect(reg2.user, flash_msg='Účet úspěšně založen.')
reg2.flash_message()
form.email.data = reg2.rr.email
return render_template('acct_reg2.html', form=form)
@app.errorhandler(werkzeug.exceptions.Forbidden)
def handle_forbidden(e):
return render_template('forbidden.html')
{% extends "base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Založení účtu{% endblock %}
{% block body %}
<p>Nejprve vyplňte svou e-mailovou adresu, která také bude sloužit jako přihlašovací jméno.
Na ni vám pošleme ověřovací e-mail.
{{ wtf.quick_form(form, form_type='horizontal', button_map={'submit': 'primary'}) }}
{% endblock %}
{% extends "base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Založení účtu{% endblock %}
{% block body %}
{% if form %}
<p>S údaji o účtu budeme zacházet v souladu se <a href='{{ url_for('doc_gdpr') }}'>zásadami
zpracování osobních údajů</a>.
{{ wtf.quick_form(form, form_type='horizontal', button_map={'submit': 'primary'}) }}
{% else %}
<p>Počkejte prosím, až vám přijde e-mail a klikněte na odkaz v něm uvedený.
{% endif %}
{% endblock %}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment