Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • devel
  • fo
  • fo-base
  • honza/add-contestant
  • honza/kolo-vs-soutez
  • honza/mr6
  • honza/mr7
  • honza/mra
  • honza/mrd
  • honza/mrf
  • honza/submit-images
  • jh-stress-test-wip
  • jirka/typing
  • jk/issue-196
  • jk/issue-96
  • master
  • mj/submit-images
  • shorten-schools
18 results

Target

Select target project
  • mj/mo-submit
1 result
Select Git revision
  • devel
  • fo
  • fo-base
  • honza/add-contestant
  • honza/kolo-vs-soutez
  • honza/mr6
  • honza/mr7
  • honza/mra
  • honza/mrd
  • honza/mrf
  • honza/submit-images
  • jh-stress-test-wip
  • jirka/typing
  • jk/issue-196
  • jk/issue-96
  • master
  • mj/submit-images
  • shorten-schools
18 results
Show changes
Commits on Source (47)
......@@ -4,6 +4,7 @@ import argparse
import sys
import mo.db as db
import mo.email
import mo.users
import mo.util
......@@ -13,8 +14,7 @@ parser.add_argument(dest='first_name', help='křestní jméno (jedno nebo více)
parser.add_argument(dest='last_name', help='příjmení (jedno nebo více)')
parser.add_argument('--org', default=False, action='store_true', help='přidělí uživateli organizátorská práva')
parser.add_argument('--admin', default=False, action='store_true', help='přidělí uživateli správcovská práva')
parser.add_argument('--passwd', type=str, help='nastaví počáteční heslo')
parser.add_argument('--mail', default=False, action='store_true', help='pošle uživateli mail o založení účtu')
parser.add_argument('--passwd', type=str, help='nastaví počáteční heslo (jinak pošle aktivační mail)')
args = parser.parse_args()
email = mo.users.normalize_email(args.email)
......@@ -45,11 +45,9 @@ mo.util.log(db.LogType.user, user.user_id, {
if args.passwd is not None:
mo.users.set_password(user, args.passwd)
if args.mail:
token = mo.users.ask_reset_password(user)
token = mo.users.make_activation_token(user)
session.commit()
if args.mail:
mo.util.send_new_account_email(user, token)
if args.passwd is None:
mo.email.send_new_account_email(user, token)
#!/usr/bin/env python3
import argparse
import sys
import mo.config as config
import mo.config
import mo.email
import mo.db as db
import mo.users
import mo.util
parser = argparse.ArgumentParser(description='Resetuje uživateli heslo a pošle mail')
parser = argparse.ArgumentParser(description='Pošle uživateli nový aktivační mail')
parser.add_argument(dest='email', help='e-mailová adresa')
parser.add_argument('--new', default=False, action='store_true', help='pošle mail o založení účtu')
parser.add_argument('--mail-instead', metavar='EMAIL', default=None, help='pošle mail někomu jinému')
args = parser.parse_args()
......@@ -22,13 +21,10 @@ user = mo.users.user_by_email(args.email)
if user is None:
mo.util.die('Tento uživatel neexistuje')
token = mo.users.ask_reset_password(user)
token = mo.users.make_activation_token(user)
session.commit()
if args.mail_instead:
mo.config.MAIL_INSTEAD = args.mail_instead
if args.new:
mo.util.send_new_account_email(user, token)
else:
mo.util.send_password_reset_email(user, token)
mo.email.send_new_account_email(user, token)
-- CREATE ROLE mo_osmo LOGIN PASSWORD 'pass';
-- CREATE DATABASE mo_osmo WITH OWNER=mo_osmo;
-- GRANT mo_osmo TO some_admin;
-- CREATE EXTENSION unaccent;
SET ROLE mo_osmo;
-- Funkce pro odakcentování textu pomocí extension unaccent.
-- Je immutable, takže se dá používat i v indexech.
-- Zdroj: http://stackoverflow.com/questions/11005036/does-postgresql-support-accent-insensitive-collations
CREATE OR REPLACE FUNCTION f_unaccent(text)
RETURNS text AS
$func$
SELECT unaccent('unaccent', $1)
$func$ LANGUAGE sql IMMUTABLE SET search_path = public, pg_temp;
-- Uživatelský účet
CREATE TABLE users (
user_id serial PRIMARY KEY,
......@@ -15,7 +25,7 @@ CREATE TABLE users (
is_test boolean NOT NULL DEFAULT false, -- testovací účastník, není vidět ve výsledkovkách
created_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_login_at timestamp with time zone DEFAULT NULL,
reset_at timestamp with time zone DEFAULT NULL, -- poslední požadavek na reset hesla
reset_at timestamp with time zone DEFAULT NULL, -- poslední reset/aktivace nebo žádost o ně
password_hash varchar(255) DEFAULT NULL, -- heš hesla (je-li nastaveno)
note text NOT NULL DEFAULT '' -- poznámka viditelná pro orgy
);
......@@ -45,6 +55,8 @@ CREATE TABLE places (
);
CREATE INDEX places_parent_index ON places (parent);
-- XXX: Potřebujeme operator class text_pattern_ops, aby index fungoval na prefixové LIKE
CREATE INDEX places_noacc_index ON places ((lower(f_unaccent(name))) text_pattern_ops);
-- Rekurzivní dotaz na nadřazené regiony:
-- WITH RECURSIVE parent_regions(parent, place_id) AS (
......@@ -89,6 +101,12 @@ CREATE TYPE score_mode AS ENUM (
'mo' -- jednoznačné pořadí podle pravidel MO
);
CREATE TYPE enroll_mode AS ENUM ( -- režim přihlašování účastníků
'manual', -- přihlašuje organizátor
'register', -- účastník se registruje
'confirm' -- účastník se registruje, ale org ho musí potvrdit
);
CREATE TABLE rounds (
round_id serial PRIMARY KEY,
master_round_id int DEFAULT NULL REFERENCES rounds(round_id),
......@@ -109,6 +127,8 @@ CREATE TABLE rounds (
score_successful_limit int DEFAULT NULL, -- bodový limit na označení za úspěšného řešitele
points_step numeric(2,1) NOT NULL DEFAULT 1, -- s jakou přesností jsou přidělovány body (celé aneb 1, 0.5, 0.1)
has_messages boolean NOT NULL DEFAULT false, -- má zprávičky
enroll_mode enroll_mode NOT NULL DEFAULT 'manual', -- režim přihlašování (pro vyšší kola vždy 'manual')
enroll_advert varchar(255) NOT NULL DEFAULT '', -- popis v přihlašovacím formuláři
UNIQUE (year, category, seq, part)
);
......@@ -133,17 +153,19 @@ CREATE TABLE participants (
school int NOT NULL REFERENCES places(place_id),
birth_year int NOT NULL,
grade varchar(20) NOT NULL, -- třída ve tvaru "X/Y"
registered_on timestamp with time zone DEFAULT NULL, -- kdy se účastník přihlásil (NULL, pokud ho přihlásil organizátor)
PRIMARY KEY (user_id, year)
);
-- Účast v soutěžním kole
CREATE TYPE part_state AS ENUM (
'registered', -- sám se přihlásil
'invited', -- pozván
'refused', -- odmítl účast
'present', -- soutěžil
'absent', -- bez omluvy nedorazil
'registered', -- sám se přihlásil, čeká na potvrzení organizátorem
-- 'invited', -- pozván (už nepoužíváme)
'active', -- soutěží (přihlášku zadal/potvrdil organizátor)
'refused', -- organizátor odmítl přihlášku
-- 'present', -- soutěžil (už nepoužíváme)
'absent', -- nedorazil
'disqualified' -- diskvalifikovaný
);
......@@ -312,3 +334,24 @@ CREATE TABLE messages (
markdown text NOT NULL,
html text NOT NULL
);
-- Požadavky na registraci a změny vlastností účtu
CREATE TYPE reg_req_type AS ENUM (
'register',
'change_email',
'reset_password'
);
CREATE TABLE reg_requests (
reg_id serial PRIMARY KEY,
type reg_req_type NOT NULL,
created_at timestamp with time zone NOT NULL,
expires_at timestamp with time zone NOT NULL,
used_at timestamp with time zone DEFAULT NULL,
email varchar(255) DEFAULT NULL, -- adresa, kterou potvrzujeme
captcha_token varchar(255) DEFAULT NULL, -- token pro fázi 1 registrace
email_token varchar(255) UNIQUE NOT NULL, -- token pro fázi 2 registrace
user_id int DEFAULT NULL REFERENCES users(user_id) ON DELETE CASCADE,
client varchar(255) NOT NULL -- kdo si registraci vyžádal
);
SET ROLE 'mo_osmo';
CREATE TYPE enroll_mode AS ENUM ( -- režim přihlašování účastníků
'manual', -- přihlašuje organizátor
'register', -- účastník se registruje
'confirm' -- účastník se registruje, ale org ho musí potvrdit
);
ALTER TABLE rounds ADD COLUMN enroll_mode enroll_mode NOT NULL DEFAULT 'manual';
ALTER TABLE rounds ADD COLUMN enroll_advert varchar(255) NOT NULL DEFAULT '';
ALTER TYPE part_state ADD VALUE 'active' AFTER 'invited';
ALTER TABLE participants ADD COLUMN registered_on timestamp with time zone DEFAULT NULL;
UPDATE participations SET state='active' WHERE state IN ('registered', 'invited');
CREATE TYPE reg_req_type AS ENUM (
'register',
'change_email',
'reset_passwd'
);
CREATE TABLE reg_requests (
reg_id serial PRIMARY KEY,
type reg_req_type NOT NULL,
created_at timestamp with time zone NOT NULL,
expires_at timestamp with time zone NOT NULL,
used_at timestamp with time zone DEFAULT NULL,
email varchar(255) DEFAULT NULL,
captcha_token varchar(255) DEFAULT NULL,
email_token varchar(255) UNIQUE NOT NULL,
user_id int DEFAULT NULL REFERENCES users(user_id) ON DELETE CASCADE,
client varchar(255) NOT NULL
);
SET ROLE 'mo_osmo';
-- Musí udělat správce (ale uvnitř naší DB):
-- CREATE EXTENSION unaccent;
CREATE OR REPLACE FUNCTION f_unaccent(text)
RETURNS text AS
$func$
SELECT unaccent('unaccent', $1)
$func$ LANGUAGE sql IMMUTABLE SET search_path = public, pg_temp;
CREATE INDEX places_noacc_index ON places ((lower(f_unaccent(name))) text_pattern_ops);
......@@ -42,12 +42,14 @@ MAX_BATCH_CONTENT_LENGTH = 1000000000
# Adresář, do kterého ukládáme data (pro vývoj relativní, pro instalaci absolutní)
DATA_DIR = 'data'
# Jak často se má provádět periodická kontrola dávek [s]
JOB_GC_PERIOD = 60
# Jak často se má spouštět garbage collector na dávky a tokeny [s]
GC_PERIOD = 60
# Za jak dlouho expiruje dokončená dávka [min]
JOB_EXPIRATION = 5
# Automatické přihlašování účastníků do testovací soutěže
# (kolo aktuální_ročník-T-1, celostátní soutěž)
AUTO_REGISTER_TEST = False
# Kolik nejvýše dovolujeme registrací za minutu
REG_MAX_PER_MINUTE = 10
# Jak dlouho vydrží tokeny používané při registraci a změnách e-mailu [min]
REG_TOKEN_VALIDITY = 10
......@@ -3,7 +3,7 @@
import datetime
# Aktuální ročník
current_year = 70
current_year = 71
# Referenční čas nastavovaný v initu requestu (web) nebo při volání skriptu
now: datetime.datetime
......
......@@ -16,12 +16,13 @@ from sqlalchemy.orm.attributes import get_history
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.expression import CTE
from sqlalchemy.sql.functions import ReturnTypeFromArgs
from sqlalchemy.sql.sqltypes import Numeric
from typing import Optional, List, Tuple
import mo
from mo.place_level import place_levels, PlaceLevel
from mo.util_format import timedelta, time_and_timedelta
from mo.util_format import timeformat_short, timedelta, time_and_timedelta
# HACK: Work-around for https://github.com/dropbox/sqlalchemy-stubs/issues/114
from typing import TYPE_CHECKING, TypeVar, Type, Any
......@@ -34,6 +35,11 @@ else:
from sqlalchemy import Enum
# Funkce f_unaccent je definovaná v db.ddl, naučme Alchymii volat ji:
class f_unaccent(ReturnTypeFromArgs):
pass
Base = declarative_base()
metadata = Base.metadata
......@@ -99,6 +105,7 @@ class Place(Base):
nuts = Column(String(255), unique=True, server_default=text("NULL::character varying"))
note = Column(Text, nullable=False, server_default=text("''::text"))
parent_place = relationship('Place', primaryjoin='Place.parent == Place.place_id', remote_side='Place.place_id')
children = relationship('Place')
school = relationship('School', uselist=False, back_populates='place')
......@@ -203,6 +210,22 @@ round_score_mode_names = {
}
class RoundEnrollMode(MOEnum):
manual = auto()
register = auto()
confirm = auto()
def friendly_name(self) -> str:
return round_enroll_mode_names[self]
round_enroll_mode_names = {
RoundEnrollMode.manual: "Jen organizátoři",
RoundEnrollMode.register: "Účastníci sami",
RoundEnrollMode.confirm: "Potvrzení organizátorem",
}
# V DB jako numeric(2,1), používá se tak snadněji, než enum
round_points_step_names = {
decimal.Decimal('1'): "Celé body",
......@@ -237,6 +260,8 @@ class Round(Base):
score_successful_limit = Column(Numeric)
points_step = Column(Numeric, nullable=False)
has_messages = Column(Boolean, nullable=False, server_default=text("false"))
enroll_mode = Column(Enum(RoundEnrollMode, name='enroll_mode'), nullable=False, server_default=text("'basic'::enroll_mode"))
enroll_advert = Column(String(255), nullable=False, server_default=text("''::text"))
master = relationship('Round', primaryjoin='Round.master_round_id == Round.round_id', remote_side='Round.round_id', post_update=True)
......@@ -281,6 +306,14 @@ class Round(Base):
return round_points_step_names[self.points_step]
return str(self.points_step)
def format_times(self) -> str:
times = []
if self.ct_tasks_start is not None:
times.append('od ' + timeformat_short(self.ct_tasks_start))
if self.ct_submit_end is not None:
times.append('do ' + timeformat_short(self.ct_submit_end))
return " ".join(times)
class User(Base):
__tablename__ = 'users'
......@@ -403,6 +436,7 @@ class Participant(Base):
school = Column(Integer, ForeignKey('places.place_id'), nullable=False)
birth_year = Column(Integer, nullable=False)
grade = Column(String(20), nullable=False)
registered_on = Column(DateTime(True))
user = relationship('User')
school_place = relationship('Place', primaryjoin='Participant.school == Place.place_id')
......@@ -410,9 +444,8 @@ class Participant(Base):
class PartState(MOEnum):
registered = auto()
invited = auto()
active = auto()
refused = auto()
present = auto()
absent = auto()
disqualified = auto()
......@@ -422,9 +455,8 @@ class PartState(MOEnum):
part_state_names = {
PartState.registered: 'přihlášený',
PartState.invited: 'pozvaný',
PartState.active: 'soutěží',
PartState.refused: 'odmítnutý',
PartState.present: 'přítomný',
PartState.absent: 'nepřítomný',
PartState.disqualified: 'diskvalifikovaný',
}
......@@ -510,6 +542,12 @@ class UserRole(Base):
return " ".join(parts)
def applies_to(self, at: Optional[Place] = None, year: Optional[int] = None, cat: Optional[str] = None, seq: Optional[int] = None) -> bool:
return ((at is None or self.place_id == at.place_id)
and (self.year is None or year is None or self.year == year)
and (self.category is None or cat is None or self.category == cat or (self.category == 'Z' and cat.startswith('Z')))
and (self.seq is None or seq is None or self.seq == seq))
class PaperType(MOEnum):
solution = auto()
......@@ -644,6 +682,29 @@ class Message(Base):
created_by_user = relationship('User')
class RegReqType(MOEnum):
register = auto()
change_email = auto()
reset_passwd = auto()
class RegRequest(Base):
__tablename__ = 'reg_requests'
reg_id = Column(Integer, primary_key=True, server_default=text("nextval('reg_requests_reg_id_seq'::regclass)"))
type = Column(Enum(RegReqType, name='reg_req_type'), nullable=False)
created_at = Column(DateTime(True), nullable=False)
expires_at = Column(DateTime(True), nullable=False)
used_at = Column(DateTime(True))
captcha_token = Column(Text)
email = Column(Text)
email_token = Column(Text, nullable=False, unique=True)
user_id = Column(Integer, ForeignKey('users.user_id'))
client = Column(Text, nullable=False)
user = relationship('User')
_engine: Optional[Engine] = None
_session: Optional[Session] = None
flask_db: Any = None
......
# Rozesílání e-mailových notifikací všeho druhu
import datetime
import email.message
import email.headerregistry
import subprocess
import textwrap
import urllib.parse
import mo.db as db
import mo.config as config
from mo.util import logger
def send_user_email(user: db.User, subject: str, body: str) -> bool:
logger.info(f'Mail: "{subject}" -> {user.email}')
mail_from = getattr(config, 'MAIL_FROM', None)
if mail_from is None:
logger.error('Mail: V configu chybí nastavení MAIL_FROM')
return False
msg = email.message.EmailMessage()
msg['From'] = email.headerregistry.Address(
display_name='Odevzdávací Systém MO',
addr_spec=mail_from,
)
msg['To'] = [
email.headerregistry.Address(
display_name=user.full_name(),
addr_spec=user.email,
)
]
msg['Reply-To'] = email.headerregistry.Address(
display_name='Správce OSMO',
addr_spec=config.MAIL_CONTACT,
)
msg['Subject'] = 'OSMO – ' + subject
msg['Date'] = datetime.datetime.now()
msg.set_content(body, cte='quoted-printable')
mail_instead = getattr(config, 'MAIL_INSTEAD', None)
if mail_instead is None:
send_to = user.email
else:
send_to = mail_instead
sm = subprocess.Popen(
[
'/usr/sbin/sendmail',
'-oi',
'-f',
mail_from,
send_to,
],
stdin=subprocess.PIPE,
)
sm.communicate(msg.as_bytes())
if sm.returncode != 0:
logger.error('Mail: Sendmail failed with return code {}'.format(sm.returncode))
return False
return True
def activate_url(token: str) -> str:
return config.WEB_ROOT + 'acct/activate?' + urllib.parse.urlencode({'token': token}, safe=':')
def confirm_url(type: str, token: str) -> str:
return config.WEB_ROOT + f'acct/confirm/{type}?' + urllib.parse.urlencode({'token': token}, safe=':')
def contestant_list_url(contest: db.Contest, registered_only: bool) -> str:
url = config.WEB_ROOT + f'org/contest/c/{contest.contest_id}/ucastnici'
if registered_only:
url += '?participation_state=registered'
return url
def send_new_account_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Založen nový účet', textwrap.dedent('''\
Vítejte!
Právě Vám byl založen účet v Odevzdávacím systému Matematické olympiády.
Nastavte si prosím heslo na následující stránce:
{}
Váš OSMO
'''.format(activate_url(token))))
def send_password_reset_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Obnova hesla', textwrap.dedent('''\
Někdo (pravděpodobně Vy) požádal o obnovení hesla k Vašemu účtu v Odevzdávacím
systému Matematické olympiády. Heslo si můžete nastavit, případně požadavek
zrušit, na následující stránce:
{}
Váš OSMO
'''.format(confirm_url('p', token))))
def send_confirm_create_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Založení účtu', textwrap.dedent('''\
Někdo (pravděpodobně Vy) požádal o založení účtu s touto e-mailovou adresou
v Odevzdávacím systému Matematické olympiády. Pokud účet chcete založit,
následujte tento odkaz:
{}
Váš OSMO
'''.format(confirm_url('r', token))))
def send_confirm_change_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Změna e-mailové adresy', textwrap.dedent('''\
Někdo (pravděpodobně Vy) požádal o nastavení e-mailové adresy k účtu
v Odevzdávacím systému Matematické olympiády na tuto adresu.
Pokud změnu chcete provést, následujte tento odkaz:
{}
Váš OSMO
'''.format(confirm_url('e', token))))
def send_join_notify_email(dest: db.User, who: db.User, contest: db.Contest) -> bool:
round = contest.round
place = contest.place
if contest.round.enroll_mode == db.RoundEnrollMode.confirm:
confirm = 'Přihlášku je potřeba potvrdit v seznamu účastníků soutěže.'
url = 'Přihlášky k potvrzení: ' + contestant_list_url(contest, True)
else:
confirm = 'Přihláška byla schválena automaticky.'
url = 'Seznam účastníků: ' + contestant_list_url(contest, False)
return send_user_email(dest, f'Nový účastník kategorie {round.category}', textwrap.dedent(f'''\
Nový účastník se přihlásil do MO v oblasti, kterou garantujete.
Jméno: {who.full_name()}
E-mail: {who.email}
Kategorie: {round.category}
Kolo: {round.name}
Místo: {place.name}
{confirm}
{url}
Váš OSMO
'''))
......@@ -10,6 +10,7 @@ from typing import List, Optional, Any, Dict, Type, Union
import mo.csv
from mo.csv import FileFormat, MissingHeaderError
import mo.db as db
import mo.email
import mo.rights
import mo.users
import mo.util
......@@ -187,7 +188,7 @@ class Import:
return r
def find_or_create_user(self, email: str, krestni: str, prijmeni: str, is_org: bool) -> Optional[db.User]:
def find_or_create_user(self, email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool) -> Optional[db.User]:
try:
user, is_new = mo.users.find_or_create_user(email, krestni, prijmeni, is_org, reason='import')
except mo.CheckError as e:
......@@ -211,7 +212,7 @@ class Import:
return pts
def find_or_create_participant(self, user: db.User, year: int, school_id: int, birth_year: int, grade: str) -> Optional[db.Participant]:
def find_or_create_participant(self, user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str]) -> Optional[db.Participant]:
try:
part, is_new = mo.users.find_or_create_participant(user, year, school_id, birth_year, grade, reason='import')
except mo.CheckError as e:
......@@ -372,9 +373,9 @@ class Import:
for uid in self.new_user_ids:
u = sess.query(db.User).get(uid)
if u and not u.password_hash and not u.reset_at:
token = mo.users.ask_reset_password(u)
token = mo.users.make_activation_token(u)
sess.commit()
mo.util.send_new_account_email(u, token)
mo.email.send_new_account_email(u, token)
else:
sess.rollback()
......@@ -432,28 +433,23 @@ class ContestImport(Import):
assert isinstance(r, ContestImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
school_place = self.parse_school(r.kod_skoly)
rocnik = self.parse_grade(r.rocnik, (school_place.school if school_place else None))
rok_naroz = self.parse_born(r.rok_naroz)
krestni = self.parse_name(r.krestni) if r.krestni else None
prijmeni = self.parse_name(r.prijmeni) if r.prijmeni else None
school_place = self.parse_school(r.kod_skoly) if r.kod_skoly else None
rocnik = self.parse_grade(r.rocnik, (school_place.school if school_place else None)) if r.rocnik else None
rok_naroz = self.parse_born(r.rok_naroz) if r.rok_naroz else None
misto = self.parse_opt_place(r.kod_mista, 'místo')
oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
if (len(self.errors) > num_prev_errs
or email is None
or krestni is None
or prijmeni is None
or school_place is None
or rocnik is None
or rok_naroz is None):
or email is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=False)
if user is None:
return
part = self.find_or_create_participant(user, mo.current_year, school_place.place_id, rok_naroz, rocnik)
part = self.find_or_create_participant(user, mo.current_year, school_place.place_id if school_place else None, rok_naroz, rocnik)
if part is None:
return
......
......@@ -167,7 +167,7 @@ class Rights:
return right in self.rights
def get_roles(self) -> List[db.RoleType]:
"""Seznam unikátních rolí, stříděných od nejvýznamnější."""
"""Seznam unikátních rolí, setříděných od nejvýznamnější."""
role_set = set(ur.role for ur in self.user_roles)
return sorted(role_set, key=lambda r: role_order_by_type[r])
......@@ -325,10 +325,7 @@ class Gatekeeper:
rights.rights = set()
def try_role(role: db.UserRole, at: Optional[db.Place]):
if ((at is None or role.place_id == at.place_id)
and (role.year is None or year is None or role.year == year)
and (role.category is None or cat is None or role.category == cat or (role.category == 'Z' and cat.startswith('Z')))
and (role.seq is None or seq is None or role.seq == seq)
if (role.applies_to(at=at, year=year, cat=cat, seq=seq)
and (min_role is None or role_order_by_type[min_role] >= role_order_by_type[role.role])):
rights.user_roles.append(role)
r = roles_by_type[role.role]
......
......@@ -106,7 +106,7 @@ class Score:
def __init__(
self, round: db.Round, contest: Optional[db.Contest] = None,
# Ze kterých stavů chceme výsledkovku počítat
part_states: List[db.PartState] = [db.PartState.registered, db.PartState.invited, db.PartState.present],
part_states: List[db.PartState] = [db.PartState.registered, db.PartState.active],
):
self.round = round
self.contest = contest
......
# Podepsané tokeny
import hashlib
import hmac
import urllib.parse
from typing import Optional, List
......@@ -8,9 +9,7 @@ import mo.config as config
def _sign_token(token: str, use: str) -> str:
key = '-'.join(('sign-token', use, config.SECRET_KEY))
mac = hmac.HMAC(key.encode('us-ascii'), token.encode('us-ascii'), 'sha256')
return mac.hexdigest()[:16]
return sign(token, 'token-' + use)
def sign_token(fields: List[str], use: str) -> str:
......@@ -27,3 +26,18 @@ def verify_token(token: str, use: str) -> Optional[List[str]]:
if _sign_token(':'.join(enc_fields), use) != sign:
return None
return [urllib.parse.unquote(f) for f in enc_fields]
def sign(msg: str, use: str) -> str:
"""Podpis parametrizovaný tajným klíčem a účelem."""
key = use + '#' + config.SECRET_KEY
mac = hmac.HMAC(key.encode('us-ascii'), msg.encode('us-ascii'), 'sha256')
return mac.hexdigest()[:16]
def hash(msg: str, use: str) -> str:
"""Hešovací funkce parametrizovaná tajným klíčem a účelem."""
m = '#'.join((use, config.SECRET_KEY, msg)).encode('us-ascii')
return hashlib.sha256(m).hexdigest()
......@@ -2,12 +2,15 @@
import bcrypt
import datetime
import dateutil.tz
import email.errors
import email.headerregistry
import re
import secrets
from typing import Optional, Tuple
import mo
import mo.config as config
import mo.db as db
import mo.util
from mo.util import logger
......@@ -49,11 +52,13 @@ def validate_and_find_school(kod: str) -> db.Place:
return place
def find_or_create_user(email: str, krestni: str, prijmeni: str, is_org: bool, reason: str) -> Tuple[db.User, bool]:
def find_or_create_user(email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool, reason: str) -> Tuple[db.User, bool]:
sess = db.get_session()
user = sess.query(db.User).filter_by(email=email).one_or_none()
is_new = user is None
if user is None: # HACK: Podmínku je nutné zapsat znovu místo užití is_new, jinak si s tím mypy neporadí
if not krestni or not prijmeni:
raise mo.CheckError('Osoba s daným emailem zatím neexistuje, je nutné uvést její jméno.')
user = db.User(email=email, first_name=krestni, last_name=prijmeni, is_org=is_org)
sess.add(user)
sess.flush() # Aby uživatel dostal user_id
......@@ -64,7 +69,7 @@ def find_or_create_user(email: str, krestni: str, prijmeni: str, is_org: bool, r
details={'action': 'create-user', 'reason': reason, 'new': db.row2dict(user)},
)
else:
if user.first_name != krestni or user.last_name != prijmeni:
if (krestni and user.first_name != krestni) or (prijmeni and user.last_name != prijmeni):
raise mo.CheckError(f'Osoba již registrována s odlišným jménem {user.full_name()}')
if (user.is_admin or user.is_org) != is_org:
if is_org:
......@@ -74,11 +79,17 @@ def find_or_create_user(email: str, krestni: str, prijmeni: str, is_org: bool, r
return user, is_new
def find_or_create_participant(user: db.User, year: int, school_id: int, birth_year: int, grade: str, reason: str) -> Tuple[db.Participant, bool]:
def find_or_create_participant(user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str], reason: str) -> Tuple[db.Participant, bool]:
sess = db.get_session()
part = sess.query(db.Participant).get((user.user_id, year))
is_new = part is None
if part is None:
if not school_id:
raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést školu.')
if not birth_year:
raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést rok narození.')
if not grade:
raise mo.CheckError('Osoba s daným emailem zatím není zaregistrovaná do ročníku, je nutné uvést ročník.')
part = db.Participant(user=user, year=year, school=school_id, birth_year=birth_year, grade=grade)
sess.add(part)
logger.info(f'{reason.title()}: Založen účastník #{user.user_id}')
......@@ -88,9 +99,9 @@ def find_or_create_participant(user: db.User, year: int, school_id: int, birth_y
details={'action': 'create-participant', 'reason': reason, 'new': db.row2dict(part)},
)
else:
if (part.school != school_id
or part.grade != grade
or part.birth_year != birth_year):
if ((school_id and part.school != school_id)
or (grade and part.grade != grade)
or (birth_year and part.birth_year != birth_year)):
raise mo.CheckError('Účastník již zaregistrován s odlišnou školou/ročníkem/rokem narození')
return part, is_new
......@@ -107,7 +118,7 @@ def find_or_create_participation(user: db.User, contest: db.Contest, place: Opti
is_new = pions == []
if is_new:
pion = db.Participation(user=user, contest=contest, place_id=place.place_id, state=db.PartState.invited)
pion = db.Participation(user=user, contest=contest, place_id=place.place_id, state=db.PartState.active)
sess.add(pion)
logger.info(f'{reason.title()}: Založena účast user=#{user.user_id} contest=#{contest.contest_id} place=#{place.place_id}')
mo.util.log(
......@@ -165,17 +176,23 @@ def user_by_uid(uid: int) -> db.User:
return db.get_session().query(db.User).get(uid)
def set_password(user: db.User, passwd: str):
password_help = 'Heslo musí mít alespoň 8 znaků. Doporučujeme kombinovat velká a malá písmena a číslice.'
def validate_password(passwd: str) -> bool:
return len(passwd) >= 8
def set_password(user: db.User, passwd: str, reset: bool = False):
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(passwd.encode('utf-8'), salt)
user.password_hash = hashed.decode('us-ascii')
user.reset_at = None
user.last_login_at = datetime.datetime.now()
if reset:
user.reset_at = mo.now
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={"action": "set-passwd"},
details={'action': 'do-reset'},
)
......@@ -185,63 +202,68 @@ def check_password(user: db.User, passwd: str):
def login(user: db.User):
user.last_login_at = datetime.datetime.now()
user.reset_at = None
def ask_reset_password(user: db.User) -> str:
user.reset_at = datetime.datetime.now()
when = int(user.reset_at.timestamp())
token = mo.tokens.sign_token([str(user.user_id), str(when)], 'reset')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'ask-reset'},
)
user.last_login_at = mo.now
return token
def make_activation_token(user: db.User) -> str:
user.reset_at = mo.now
when = int(mo.now.timestamp())
return mo.tokens.sign_token([str(user.user_id), str(when)], 'activate')
def check_reset_password(token: str) -> Optional[db.User]:
# Někteří klienti při kopírování adresy z mailu do prohlížeče
# přidávají divné Unicodové znaky (přepnutí směru psaní atd., viz issue #58).
token = re.sub(r'[^!-~]', "", token)
fields = mo.tokens.verify_token(token, 'reset')
def check_activation_token(token: str) -> Optional[db.User]:
token = mo.util.clean_up_token(token)
fields = mo.tokens.verify_token(token, 'activate')
if not fields or len(fields) != 2:
return None
user = db.get_session().query(db.User).filter_by(user_id=int(fields[0])).first()
user_id = int(fields[0])
token_time = datetime.datetime.fromtimestamp(int(fields[1]), tz=dateutil.tz.UTC)
if user.password_hash is None:
reset_token_validity_time = datetime.timedelta(days=28)
user = user_by_uid(user_id)
if not user:
return None
elif token_time < mo.now - datetime.timedelta(days=28):
return None
else:
reset_token_validity_time = datetime.timedelta(hours=24)
return user
now = datetime.datetime.now().astimezone()
if (user
and user.reset_at is not None
and fields[1] == str(int(user.reset_at.timestamp()))
and now - user.reset_at < reset_token_validity_time):
return user
else:
def new_reg_request(type: db.RegReqType, client: str) -> Optional[db.RegRequest]:
sess = db.get_session()
# Zatím jen jednoduchý rate limit, časem možno vylepšit
in_last_minute = db.get_count(sess.query(db.RegRequest).filter(db.RegRequest.created_at >= mo.now - datetime.timedelta(minutes=1)))
if in_last_minute >= config.REG_MAX_PER_MINUTE:
return None
email_token = mo.tokens.sign_token([str(int(mo.now.timestamp())), secrets.token_hex(16)], 'reg-request')
def cancel_reset_password(user: db.User):
user.reset_at = None
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'cancel-reset'},
return db.RegRequest(
type=type,
created_at=mo.now,
expires_at=mo.now + datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY),
email_token=email_token,
client=client,
)
def do_reset_password(user: db.User):
user.reset_at = None
def expire_reg_requests():
sess = db.get_session()
conn = sess.connection()
table = db.RegRequest.__table__
conn.execute(table.delete().where(table.c.expires_at < mo.now))
sess.commit()
def request_reset_password(user: db.User, client: str) -> Optional[db.RegRequest]:
logger.info('Login: Požadavek na reset hesla pro <%s>', user.email)
rr = new_reg_request(db.RegReqType.reset_passwd, client)
if rr:
db.get_session().add(rr)
rr.user_id = user.user_id
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'do-reset'},
details={'action': 'ask-reset'},
)
return rr
......@@ -4,18 +4,13 @@ from dataclasses import dataclass
import datetime
import decimal
import dateutil.tz
import email.message
import email.headerregistry
import locale
import logging
import os
import re
import secrets
import subprocess
import sys
from typing import Any, Optional, NoReturn, Tuple, List
import textwrap
import urllib.parse
import mo
import mo.db as db
......@@ -60,88 +55,6 @@ def log(type: db.LogType, what: int, details: Any):
db.get_session().add(entry)
def send_user_email(user: db.User, subject: str, body: str) -> bool:
logger.info(f'Mail: "{subject}" -> {user.email}')
mail_from = getattr(config, 'MAIL_FROM', None)
if mail_from is None:
logger.error('Mail: V configu chybí nastavení MAIL_FROM')
return False
msg = email.message.EmailMessage()
msg['From'] = email.headerregistry.Address(
display_name='Odevzdávací Systém MO',
addr_spec=mail_from,
)
msg['To'] = [
email.headerregistry.Address(
display_name=user.full_name(),
addr_spec=user.email,
)
]
msg['Reply-To'] = email.headerregistry.Address(
display_name='Správce OSMO',
addr_spec=config.MAIL_CONTACT,
)
msg['Subject'] = 'OSMO – ' + subject
msg['Date'] = datetime.datetime.now()
msg.set_content(body, cte='quoted-printable')
mail_instead = getattr(config, 'MAIL_INSTEAD', None)
if mail_instead is None:
send_to = user.email
else:
send_to = mail_instead
sm = subprocess.Popen(
[
'/usr/sbin/sendmail',
'-oi',
'-f',
mail_from,
send_to,
],
stdin=subprocess.PIPE,
)
sm.communicate(msg.as_bytes())
if sm.returncode != 0:
logger.error('Mail: Sendmail failed with return code {}'.format(sm.returncode))
return False
return True
def password_reset_url(token: str) -> str:
return config.WEB_ROOT + 'auth/reset?' + urllib.parse.urlencode({'token': token}, safe=':')
def send_new_account_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Založen nový účet', textwrap.dedent('''\
Vítejte!
Právě Vám byl založen účet v Odevzdávacím systému Matematické olympiády.
Nastavte si prosím heslo na následující stránce:
{}
Váš OSMO
'''.format(password_reset_url(token))))
def send_password_reset_email(user: db.User, token: str) -> bool:
return send_user_email(user, 'Obnova hesla', textwrap.dedent('''\
Někdo (pravděpodobně Vy) požádal o obnovení hesla k Vašemu účtu v Odevzdávacím
systému Matematické olympiády. Heslo si můžete nastavit, případně požadavek
zrušit, na následující stránce:
{}
Váš OSMO
'''.format(password_reset_url(token))))
def die(msg: str) -> NoReturn:
print(msg, file=sys.stderr)
sys.exit(1)
......@@ -268,3 +181,9 @@ def parse_int_list(a: str, maxim: int = 200) -> List[int]:
raise mo.CheckError("Větší číslo nemůže být před menším")
r += [c[0]] if len(c) == 1 else range(c[0], c[1] + 1)
return r
def clean_up_token(token: str) -> str:
# Někteří klienti při kopírování adresy z mailu do prohlížeče
# přidávají divné Unicodové znaky (přepnutí směru psaní atd., viz issue #58).
return re.sub(r'[^!-~]', "", token)
......@@ -41,6 +41,13 @@ def timeformat(dt: datetime) -> str:
return dt.astimezone().strftime("%Y-%m-%d %H:%M")
def timeformat_short(dt: datetime) -> str:
if dt is None:
return ''
else:
return dt.astimezone().strftime("%Y-%m-%d")
def timedelta(d: datetime, ref: Optional[datetime] = None, descriptive: bool = False) -> str:
"""Vyrábí česky formátované řetězece 'za 3 minuty', 'před 27 dny' a podobně
z rozdílu daného datetime a referenčního času (například now).
......
......@@ -115,6 +115,7 @@ app.assets.add_assets([
'bootstrap.min.css',
'mo.css',
'js/news-reloader.js',
'js/osmo.js',
])
......@@ -141,7 +142,7 @@ def init_request():
if not user:
# Uživatel mezitím přestal existovat
app.logger.error('Zrušena session pro neexistujícího uživatele uid=%s', session['uid'])
return mo.web.auth.logout()
return mo.web.acct.logout()
else:
user = None
......@@ -171,18 +172,28 @@ app.before_request(init_request)
### UWSGI glue ###
# Čas od času se probudíme a spustíme garbage collector:
# - projdeme joby pro případ, že by se ztratil signál
# - expirujeme zastaralé joby
# - expirujeme zastaralé registrační tokeny
@app.cli.command('gc')
def gc():
"""Run garbage collector."""
mo.now = mo.util.get_now()
mo.jobs.process_jobs()
mo.users.expire_reg_requests()
try:
import uwsgi
from uwsgidecorators import timer, signal
# Čas od času se probudíme a projdeme joby pro případ, že by se ztratil signál.
# Také při tom expirujeme zastaralé joby.
@timer(config.JOB_GC_PERIOD, target='mule')
@timer(config.GC_PERIOD, target='mule')
def mule_timer(signum):
# app.logger.debug('Mule: Timer tick')
with app.app_context():
mo.now = mo.util.get_now()
mo.jobs.process_jobs()
garbage_collect()
# Obykle při vložení jobu dostaneme signál.
@signal(42, target='mule')
......@@ -205,7 +216,8 @@ except ImportError:
# Většina webu je v samostatných modulech
import mo.web.auth
import mo.web.api
import mo.web.acct
import mo.web.jinja
import mo.web.menu
import mo.web.misc
......
import datetime
import dateutil.tz
from enum import Enum, auto
from flask import render_template, request, g, redirect, url_for, session
from flask.helpers import flash
from flask_wtf import FlaskForm
import html
from markupsafe import Markup
import random
import secrets
from sqlalchemy.orm import joinedload
from typing import Optional, Dict
import werkzeug.exceptions
import wtforms
from wtforms import validators, ValidationError
from wtforms.fields.html5 import EmailField
import mo.config as config
import mo.db as db
import mo.rights
import mo.tokens
import mo.users
import mo.util
from mo.web import app, NeedLoginError
import mo.web.fields as mo_fields
class LoginForm(FlaskForm):
next = wtforms.HiddenField()
email = mo_fields.Email(validators=[validators.DataRequired()])
passwd = wtforms.PasswordField('Heslo')
submit = wtforms.SubmitField('Přihlásit se')
reset = wtforms.SubmitField('Zapomenuté heslo')
def login_and_redirect(user: db.User, flash_msg: Optional[str] = None, url: Optional[str] = None):
session.clear()
session['uid'] = user.user_id
if not url:
if user.is_admin or user.is_org:
url = url_for('org_index')
else:
url = url_for('index')
else:
url = request.script_root + url
if flash_msg:
flash(flash_msg, 'success')
return redirect(url)
@app.route('/acct/login', methods=('GET', 'POST'))
def login():
form = LoginForm(email=request.args.get('email'))
if not form.validate_on_submit():
return render_template('login.html', form=form, error=None)
email = form.email.data
user = mo.users.user_by_email(email)
if not user:
app.logger.error('Login: Neznámý uživatel <%s>', email)
flash('Neznámý uživatel', 'danger')
elif form.reset.data:
rr = mo.users.request_reset_password(user, request.remote_addr)
if rr:
db.get_session().commit()
mo.email.send_password_reset_email(user, rr.email_token)
flash('Na uvedenou adresu byl odeslán e-mail s odkazem na obnovu hesla.', 'success')
else:
flash('Příliš časté požadavky na obnovu hesla.', 'danger')
elif not form.passwd.data or not mo.users.check_password(user, form.passwd.data):
app.logger.error('Login: Špatné heslo pro uživatele <%s>', email)
flash('Chybné heslo', 'danger')
else:
if user.is_admin:
typ = ' (admin)'
elif user.is_org:
typ = ' (org)'
elif user.is_test:
typ = ' (test)'
else:
typ = ""
app.logger.info('Login: Přihlásil se uživatel #%s <%s>%s', user.user_id, email, typ)
mo.users.login(user)
db.get_session().commit()
return login_and_redirect(user, url=form.next.data)
return render_template('login.html', form=form)
@app.route('/acct/logout', methods=('POST',))
def logout():
session.clear()
return redirect(url_for('index'))
@app.route('/acct/incarnate/<int:id>', methods=('POST',))
def incarnate(id):
if not g.user.is_admin:
raise werkzeug.exceptions.Forbidden()
new_user = db.get_session().query(db.User).get(id)
if not new_user:
raise werkzeug.exceptions.NotFound()
app.logger.info('Login: Uživatel #%s se převtělil na #%s', g.user.user_id, new_user.user_id)
return login_and_redirect(new_user, flash_msg='Převtělení proběhlo')
@app.route('/user/settings')
def user_settings():
sess = db.get_session()
roles = (sess.query(db.UserRole)
.filter_by(user_id=g.user.user_id)
.options(joinedload(db.UserRole.place))
.all())
if g.user.is_org or g.user.is_admin:
pant = None
else:
pant = sess.query(db.Participant).get((g.user.user_id, mo.current_year))
return render_template('settings.html', user=g.user, pant=pant, roles=roles, roles_by_type=mo.rights.roles_by_type)
class SettingsForm(FlaskForm):
email = mo_fields.Email(validators=[validators.DataRequired()])
current_passwd = wtforms.PasswordField('Aktuální heslo', validators=[validators.DataRequired()])
new_passwd = mo_fields.NewPassword(
description=mo.users.password_help + ' Pokud nechcete heslo měnit, ponechte toto políčko prázdné.',
)
new_passwd2 = mo_fields.RepeatPassword()
submit = wtforms.SubmitField('Nastavit')
def validate_current_passwd(form, field):
if not mo.users.check_password(g.user, field.data):
raise ValidationError('Chybné heslo.')
@app.route('/user/settings/change', methods=('GET', 'POST'))
def user_settings_change():
sess = db.get_session()
user = g.user
form = SettingsForm()
if not form.submit.data:
form.email.data = user.email
if form.validate_on_submit():
ok = True
if form.new_passwd.data:
app.logger.info(f'Settings: Změněno heslo uživatele #{user.user_id}')
mo.users.set_password(user, form.new_passwd.data)
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'change-passwd'},
)
sess.commit()
flash('Heslo změněno.', 'success')
if form.email.data != user.email:
rr = mo.users.new_reg_request(db.RegReqType.change_email, request.remote_addr)
if rr:
rr.user_id = user.user_id
rr.email = form.email.data
sess.add(rr)
sess.commit()
app.logger.info(f'Settings: Požadavek na změnu e-mailu uživatele #{user.user_id}')
flash('Odeslán e-mail s odkazem na potvrzení nové adresy.', 'success')
mo.email.send_confirm_change_email(user, rr.email_token)
else:
app.logger.info('Settings: Rate limit')
flash('Příliš mnoho požadavků na změny e-mailu. Počkejte prosím chvíli a zkuste to znovu.', 'danger')
ok = False
if ok:
return redirect(url_for('user_settings'))
return render_template('settings_change.html', form=form)
@app.errorhandler(NeedLoginError)
def handle_need_login(e):
form = LoginForm()
form.next.data = request.path
return render_template('login.html', form=form), e.code
class ResetForm(FlaskForm):
email = wtforms.StringField('E-mail', description='Účet pro který se nastavuje nové heslo', render_kw={"disabled": "disabled"})
new_passwd = mo_fields.NewPassword(validators=[validators.DataRequired()])
new_passwd2 = mo_fields.RepeatPassword(validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Nastavit heslo')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/activate', methods=('GET', 'POST'))
def activate():
token = request.args.get('token')
if not token:
flash('Chybí token pro aktivaci účtu', 'danger')
return redirect(url_for('login'))
user = mo.users.check_activation_token(token)
if not user:
flash('Neplatný kód pro aktivaci účtu. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.', 'danger')
return redirect(url_for('login'))
if user.last_login_at is not None:
flash('Tento účet už byl aktivován. Pokud neznáte heslo, použijte tlačítko pro obnovu hesla.', 'danger')
return redirect(url_for('login'))
form = ResetForm(email=user.email)
ok = form.validate_on_submit()
if not ok:
return render_template('acct_activate.html', form=form)
app.logger.info('Login: Aktivace účtu uživatele <%s>', user.email)
mo.users.set_password(user, form.new_passwd.data, reset=True)
mo.users.login(user)
db.get_session().commit()
return login_and_redirect(user, flash_msg='Nastavení nového hesla a přihlášení do systému proběhlo úspěšně')
class RegStatus(Enum):
new = auto()
ok = auto()
expired = auto()
already_exists = auto()
# Jen v 1. kroku:
rate_limited = auto()
wrong_captcha = auto()
# Jen v 2. kroku:
already_spent = auto()
class Reg1:
create_time: datetime.datetime
seed: str
status: RegStatus
email_token: str
x: int
y: int
def __init__(self, from_token: Optional[str] = None):
self.status = self._parse_token(from_token)
if self.status == RegStatus.ok:
self._gen_captcha()
else:
self._reset()
def _reset(self):
self.create_time = mo.now
self.seed = secrets.token_hex(16)
app.logger.debug(f'Reg1: Nový token: seed={self.seed}')
self._gen_captcha()
def as_token(self) -> str:
return mo.tokens.sign_token([str(int(self.create_time.timestamp())), self.seed], 'reg1')
def _parse_token(self, token: Optional[str]) -> RegStatus:
if token is None:
return RegStatus.new
fields = mo.tokens.verify_token(token, 'reg1')
if not fields:
app.logger.debug(f'Reg1: Neplatný token: {token}')
return RegStatus.new
self.create_time = datetime.datetime.fromtimestamp(int(fields[0]), tz=dateutil.tz.UTC)
self.seed = fields[1]
app.logger.debug(f'Reg1: Přijat token: {self.create_time} {self.seed}')
token_age = mo.now - self.create_time
if token_age > datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY):
app.logger.debug('Reg1: Token expiroval')
return RegStatus.expired
return RegStatus.ok
def _init_rng(self) -> random.Random:
rng = random.Random()
rng.seed(mo.tokens.hash('rng-init', self.seed))
return rng
def _gen_captcha(self):
rng = self._init_rng()
self.x = rng.randrange(1, 10)
self.y = rng.randrange(1, 10)
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}')
def captcha_task(self) -> str:
cisla = ['nula', 'jedna', 'dva', 'tři', 'čtyři', 'pět', 'šest', 'sedm', 'osm', 'devět',
'deset', 'jedenáct', 'dvanáct', 'třináct', 'čtrnáct', 'patnáct', 'šestnáct', 'sedmnáct', 'osmnáct', 'devatenáct']
return f'Napište číslem, kolik je {cisla[self.x]} krát {cisla[self.y]}.'
def captcha_check_answer(self, answer: str) -> bool:
correct = self.x * self.y
if answer == str(correct):
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}={answer} správně')
return True
else:
app.logger.debug(f'Reg1: Captcha: {self.x}*{self.y}={answer} špatně')
return False
def create_reg_request(self, email: str) -> bool:
sess = db.get_session()
rr = sess.query(db.RegRequest).with_for_update().filter_by(captcha_token=self.seed).one_or_none()
if rr:
self._reset()
self.status = RegStatus.expired
sess.rollback()
app.logger.info('Reg1: Captcha token použit znovu')
return False
rr = mo.users.new_reg_request(db.RegReqType.register, request.remote_addr)
if not rr:
self._reset()
self.status = RegStatus.rate_limited
sess.rollback()
app.logger.info('Reg1: Rate limit')
return False
self.email_token = rr.email_token
rr.email = email
rr.captcha_token = self.seed
sess.add(rr)
sess.commit()
return True
def process(self, email: str, captcha: str) -> bool:
# XXX: Nejdříve zapisujeme registraci do DB, a teprve pak ověřujeme captchu.
# Tímto způsobem je těžší captchu obejít (protože je rate-limitovaná), ale
# zase je snazší páchat DoS útok na celou registraci (protože je rate-limitovaná).
if not self.create_reg_request(email):
return False
if not self.captcha_check_answer(captcha):
self._reset()
self.status = RegStatus.wrong_captcha
return False
if mo.users.user_by_email(email):
self._reset()
self.status = RegStatus.already_exists
app.logger.info(f'Reg1: Účet s e-mailem {email} už existuje')
return False
return True
class Reg1Form(FlaskForm):
email = mo_fields.Email(validators=[validators.DataRequired()])
token = wtforms.HiddenField()
captcha = wtforms.StringField('Kontrolní odpověď', validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Vytvořit účet')
@app.route('/acct/create', methods=('GET', 'POST'))
def create_acct():
form = Reg1Form()
reg1 = Reg1(form.token.data)
if reg1.status == RegStatus.ok and form.validate_on_submit() and reg1.process(form.email.data, form.captcha.data):
app.logger.debug(f'Reg1: E-mailový token {reg1.email_token}')
flash('Odeslán e-mail s odkazem na založení účtu.', 'success')
user = db.User(email=form.email.data, first_name='Nový', last_name='Uživatel')
mo.email.send_confirm_create_email(user, reg1.email_token)
return redirect(url_for('confirm_reg'))
form.captcha.description = reg1.captcha_task()
if reg1.status != RegStatus.ok:
form.token.data = reg1.as_token()
form.captcha.data = ""
if reg1.status == RegStatus.expired:
flash('Vypršela platnost formuláře, vyplňte ho prosím znovu.', 'danger')
elif reg1.status == RegStatus.rate_limited:
flash('Přichází příliš mnoho registrací najednou, zkuste to prosím za chvíli znovu.', 'danger')
elif reg1.status == RegStatus.already_exists:
form.email.errors.append('Účet s touto adresou už existuje. ' + Markup('<a href="' + html.escape(url_for('login', email=form.email.data)) + '">Přihlásit se.</a>'))
elif reg1.status == RegStatus.wrong_captcha:
form.captcha.errors.append('Chybný výsledek. Zkuste to znovu: ' + reg1.captcha_task())
return render_template('acct_reg1.html', form=form)
class Reg2:
reg_type: db.RegReqType
status: RegStatus
rr: db.RegRequest
user: db.User
messages: Dict[db.RegReqType, Dict[RegStatus, str]] = {
db.RegReqType.register: {
RegStatus.new: 'Chybný registrační kód. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.',
RegStatus.expired: 'Vypršela platnost registrace, vyplňte ji prosím znovu.',
RegStatus.already_spent: 'Tento odkaz na potvrzení registrace byl již využit.',
RegStatus.already_exists: 'Účet s touto adresou už existuje.',
},
db.RegReqType.change_email: {
RegStatus.new: 'Chybný potvrzovací kód. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.',
RegStatus.expired: 'Vypršela platnost potvrzovacího kódu, požádejte prosím o změnu e-mailu znovu.',
RegStatus.already_spent: 'Tento odkaz na potvrzení změny e-mailu byl již využit.',
},
db.RegReqType.reset_passwd: {
RegStatus.new: 'Chybný kód pro obnovení hesla. Zkontrolujte, že jste odkaz z e-mailu zkopírovali správně.',
RegStatus.expired: 'Vypršela platnost kódu pro obnovení hesla, požádejte prosím o obnovu znovu.',
RegStatus.already_spent: 'Tento odkaz na obnovení hesla byl již využit.',
},
}
def __init__(self, token: str, expected_type: db.RegReqType):
self.reg_type = expected_type
self.status = self._parse_token(token)
if self.status == RegStatus.ok:
self.status = self._load_rr(token)
def _parse_token(self, token: Optional[str]) -> RegStatus:
if not token:
return RegStatus.new
token = mo.util.clean_up_token(token)
fields = mo.tokens.verify_token(token, 'reg-request')
if not fields:
app.logger.debug(f'Reg2: Neplatný token: {token}')
return RegStatus.new
create_time = datetime.datetime.fromtimestamp(int(fields[0]), tz=dateutil.tz.UTC)
app.logger.debug(f'Reg2: Přijat token: {token}')
token_age = mo.now - create_time
if token_age > datetime.timedelta(minutes=config.REG_TOKEN_VALIDITY):
app.logger.debug('Reg2: Token expiroval')
return RegStatus.expired
return RegStatus.ok
def _load_rr(self, token: str) -> RegStatus:
sess = db.get_session()
rr = sess.query(db.RegRequest).with_for_update().filter_by(email_token=token).one_or_none()
if not rr:
app.logger.info('Reg2: Registrace nenalezena')
return RegStatus.expired
if rr.expires_at < mo.now:
app.logger.info('Reg2: Registrace expirovala')
return RegStatus.expired
if rr.used_at is not None:
app.logger.info('Reg2: Registrace spotřebována')
return RegStatus.already_spent
if rr.type != self.reg_type:
app.logger.info('Reg2: Token špatného typu')
return RegStatus.new
self.rr = rr
return RegStatus.ok
def create(self, first_name: str, last_name: str, passwd: str) -> bool:
rr = self.rr
assert rr.email is not None
email = mo.users.normalize_email(rr.email) # Pro jistotu
sess = db.get_session()
if db.get_session().query(db.User).with_for_update().filter_by(email=email).one_or_none():
# Účet mohl začít existovat mezi 1. a 2. krokem registrace
app.logger.info(f'Reg2: Účet s e-mailem {email} začal během registrace existovat')
self.status = RegStatus.already_exists
return False
user = db.User(
email=email,
first_name=first_name,
last_name=last_name,
)
mo.users.set_password(user, passwd)
rr.used_at = mo.now
sess.add(user)
sess.flush()
app.logger.info(f'Reg2: Založen uživatel user=#{user.user_id} email=<{user.email}>')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'register', 'new': db.row2dict(user)},
)
sess.commit()
self.user = user
return True
def change_email(self):
sess = db.get_session()
user = g.user
user.email = self.rr.email
app.logger.info(f'Reg2: Uživatel #{user.user_id} si změnil email na <{user.email}>')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={
'action': 'change-settings',
'changes': db.get_object_changes(user),
},
)
self.rr.used_at = mo.now
sess.commit()
def change_passwd(self, new_passwd: str):
sess = db.get_session()
user = self.rr.user
app.logger.info(f'Reg2: Uživatel #{user.user_id} si resetoval heslo')
mo.users.set_password(user, new_passwd, reset=True)
mo.users.login(user)
self.rr.used_at = mo.now
sess.commit()
def spend_request(self):
self.rr.used_at = mo.now
db.get_session().commit()
def flash_message(self):
msgs = self.messages[self.reg_type]
if self.status in msgs:
flash(msgs[self.status], 'danger')
class Reg2Form(FlaskForm):
email = wtforms.StringField('E-mail', render_kw={"disabled": "disabled"})
first_name = mo_fields.FirstName(validators=[validators.DataRequired()])
last_name = mo_fields.LastName(validators=[validators.DataRequired()])
new_passwd = mo_fields.NewPassword('Heslo', validators=[validators.DataRequired()])
new_passwd2 = mo_fields.RepeatPassword(validators=[validators.DataRequired()])
submit = wtforms.SubmitField('Vytvořit účet')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/confirm/r', methods=('GET', 'POST'))
def confirm_reg():
token = request.args.get('token')
if token is None:
return render_template('acct_reg2.html', form=None)
reg2 = Reg2(token, db.RegReqType.register)
if reg2.status != RegStatus.ok:
reg2.flash_message()
return redirect(url_for('create_acct'))
form = Reg2Form()
if form.validate_on_submit():
if reg2.create(form.first_name.data, form.last_name.data, form.new_passwd.data):
flash('Založení účtu a přihlášení do systému proběhlo úspěšně.', 'success')
app.logger.info(f'Login: Přihlásil se uživatel <{reg2.user.email}> po založení účtu')
return login_and_redirect(reg2.user, flash_msg='Účet úspěšně založen.')
reg2.flash_message()
form.email.data = reg2.rr.email
return render_template('acct_reg2.html', form=form)
class ConfirmEmailForm(FlaskForm):
orig_email = wtforms.StringField('Původní e-mail', render_kw={"disabled": "disabled"})
new_email = wtforms.StringField('Nový e-mail', render_kw={"disabled": "disabled"})
submit = wtforms.SubmitField('Potvrdit změnu')
cancel = wtforms.SubmitField('Zrušit požadavek')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/confirm/e', methods=('GET', 'POST'))
def confirm_email():
reg2 = Reg2(request.args.get('token'), db.RegReqType.change_email)
if reg2.status != RegStatus.ok:
reg2.flash_message()
return redirect(url_for('user_settings'))
form = ConfirmEmailForm()
if form.validate_on_submit():
if form.submit.data:
reg2.change_email()
flash('E-mail změněn.', 'success')
elif form.cancel.data:
reg2.spend_request()
flash('Požadavek na změnu e-mailu zrušen.', 'success')
return redirect(url_for('user_settings'))
form.orig_email.data = g.user.email
form.new_email.data = reg2.rr.email
return render_template('acct_confirm_email.html', form=form)
class CancelResetForm(FlaskForm):
cancel = wtforms.SubmitField('Zrušit obnovu hesla')
# URL je explicitně uvedeno v mo.email.activate_url
@app.route('/acct/confirm/p', methods=('GET', 'POST'))
def confirm_reset():
reg2 = Reg2(request.args.get('token'), db.RegReqType.reset_passwd)
if reg2.status != RegStatus.ok:
reg2.flash_message()
return redirect(url_for('login'))
form = ResetForm(email=reg2.rr.user.email)
if form.validate_on_submit() and form.submit.data:
reg2.change_passwd(form.new_passwd.data)
return login_and_redirect(reg2.rr.user, flash_msg='Nastavení nového hesla a přihlášení do systému proběhlo úspěšně')
cform = CancelResetForm()
if cform.validate_on_submit() and cform.cancel.data:
reg2.spend_request()
flash('Požadavek na změnu hesla zrušen.', 'success')
return redirect(url_for('user_settings'))
return render_template('acct_reset_passwd.html', form=form, cancel_form=cform)
@app.errorhandler(werkzeug.exceptions.Forbidden)
def handle_forbidden(e):
return render_template('forbidden.html')
from flask import request
from flask.json import jsonify
from sqlalchemy import func
from sqlalchemy.orm import joinedload
import werkzeug.exceptions
import mo.db as db
from mo.util_format import inflect_with_number
from mo.web import app
@app.route('/api/')
def api_root():
"""Slouží jako prefix pro konstrukci URL v JavaScriptu."""
raise werkzeug.exceptions.NotFound()
@app.route('/api/find-town')
def api_find_town():
query = request.args.get('q')
if query is None or len(query) < 2:
return jsonify(error='Zadejte alespoň 2 znaky jména obce.')
elif '%' in query:
return jsonify(error='Nepovolené znaky ve jménu obce.')
else:
max_places = 50
places = (db.get_session().query(db.Place)
.filter_by(level=3)
.filter(func.lower(db.f_unaccent(db.Place.name)).like(func.lower(db.f_unaccent(query + '%'))))
.options(joinedload(db.Place.parent_place))
.order_by(db.Place.name, db.Place.place_id)
.limit(max_places)
.all())
if not places:
return jsonify(error='Nenalezena žádná obec.')
# XXX: Nemůže se stát, že nastane přesná shoda a k tomu příliš mnoho nepřesných?
if len(places) >= max_places:
return jsonify(error='Nalezeno příliš mnoho obcí. Zadejte prosím více znaků jména.')
res = []
for p in places:
name = p.name
if p.name != p.parent_place.name:
name += f' (okres {p.parent_place.name})'
res.append([p.place_id, name])
msg = inflect_with_number(len(res), 'Nalezena %s obec.', 'Nalezeny %s obce.', 'Nalezeno %s obcí.')
return jsonify(found=res, msg=msg)
@app.route('/api/get-schools')
def api_get_schools():
town = request.args.get('town')
if town is None or not town.isnumeric():
raise werkzeug.exceptions.BadRequest()
town_id = int(town)
places = (db.get_session().query(db.Place)
.filter_by(level=4, type=db.PlaceType.school, parent=town_id)
.options(joinedload(db.Place.school))
.order_by(db.Place.name)
.all())
zs = []
ss = []
for p in places:
s = {
'id': p.place_id,
'name': p.name,
}
if p.school.is_zs:
zs.append(s)
if p.school.is_ss:
ss.append(s)
return jsonify(zs=zs, ss=ss)
import datetime
from flask import render_template, request, g, redirect, url_for, session
from flask.helpers import flash
from flask_wtf import FlaskForm
import werkzeug.exceptions
import wtforms
from wtforms.fields.html5 import EmailField
import wtforms.validators as validators
from sqlalchemy.orm import joinedload
from typing import Optional
import mo.util
import mo.db as db
import mo.rights
import mo.users
from mo.web import app, NeedLoginError
class LoginForm(FlaskForm):
next = wtforms.HiddenField()
email = EmailField('E-mail', validators=[validators.DataRequired()])
passwd = wtforms.PasswordField('Heslo')
submit = wtforms.SubmitField('Přihlásit se')
reset = wtforms.SubmitField('Zapomenuté heslo')
def login_and_redirect(user: db.User, url: Optional[str] = None):
session.clear()
session['uid'] = user.user_id
if not url:
if user.is_admin or user.is_org:
url = url_for('org_index')
else:
url = url_for('index')
else:
url = request.script_root + url
return redirect(url)
@app.route('/auth/login', methods=('GET', 'POST'))
def login():
form = LoginForm(email=request.args.get('email'))
if not form.validate_on_submit():
return render_template('login.html', form=form, error=None)
email = form.email.data
user = mo.users.user_by_email(email)
if not user:
app.logger.error('Login: Neznámý uživatel <%s>', email)
flash('Neznámý uživatel', 'danger')
elif form.reset.data:
app.logger.info('Login: Požadavek na reset hesla pro <%s>', email)
min_time_between_resets = datetime.timedelta(minutes=1)
now = datetime.datetime.now().astimezone()
if (user.reset_at is not None
and now - user.reset_at < min_time_between_resets):
flash('Poslední požadavek na obnovení hesla byl odeslán příliš nedávno', 'danger')
else:
token = mo.users.ask_reset_password(user)
db.get_session().commit()
mo.util.send_password_reset_email(user, token)
flash('Na uvedenou adresu byl odeslán e-mail s odkazem na obnovu hesla', 'success')
elif not form.passwd.data or not mo.users.check_password(user, form.passwd.data):
app.logger.error('Login: Špatné heslo pro uživatele <%s>', email)
flash('Chybné heslo', 'danger')
else:
if user.is_admin:
typ = ' (admin)'
elif user.is_org:
typ = ' (org)'
elif user.is_test:
typ = ' (test)'
else:
typ = ""
app.logger.info('Login: Přihlásil se uživatel #%s <%s>%s', user.user_id, email, typ)
mo.users.login(user)
db.get_session().commit()
return login_and_redirect(user, url=form.next.data)
return render_template('login.html', form=form)
@app.route('/auth/logout', methods=('POST',))
def logout():
session.clear()
return redirect(url_for('index'))
@app.route('/auth/incarnate/<int:id>', methods=('POST',))
def incarnate(id):
if not g.user.is_admin:
raise werkzeug.exceptions.Forbidden()
new_user = db.get_session().query(db.User).get(id)
if not new_user:
raise werkzeug.exceptions.NotFound()
app.logger.info('Login: Uživatel #%s se převtělil na #%s', g.user.user_id, new_user.user_id)
return login_and_redirect(new_user)
@app.route('/user/settings')
def user_settings():
sess = db.get_session()
roles = []
if g.user:
roles = (sess.query(db.UserRole)
.filter_by(user_id=g.user.user_id)
.options(joinedload(db.UserRole.place))
.all())
return render_template('settings.html', roles=roles, roles_by_type=mo.rights.roles_by_type)
@app.errorhandler(NeedLoginError)
def handle_need_login(e):
form = LoginForm()
form.next.data = request.path
return render_template('login.html', form=form), e.code
class ResetForm(FlaskForm):
email = EmailField('E-mail', description='Účet pro který se nastavuje nové heslo', render_kw={"disabled": "disabled"})
token = wtforms.HiddenField()
passwd = wtforms.PasswordField('Nové heslo', description='Heslo musí mít alespoň 8 znaků. Doporučujeme kombinovat velká a malá písmena a číslice.')
submit = wtforms.SubmitField('Nastavit heslo')
cancel = wtforms.SubmitField('Zrušit obnovu hesla')
@app.route('/auth/reset', methods=('GET', 'POST'))
def reset():
token = request.args.get('token')
if not token:
flash('Žádný token pro resetování hesla', 'danger')
return redirect(url_for('login'))
user = mo.users.check_reset_password(token)
if not user:
flash('Neplatný požadavek na obnovu hesla', 'danger')
return redirect(url_for('login'))
form = ResetForm(token=token, email=user.email)
ok = form.validate_on_submit()
if not ok:
return render_template('reset.html', form=form)
if form.cancel.data:
mo.users.cancel_reset_password(user)
app.logger.info('Login: Zrušen reset hesla pro uživatele <%s>', user.email)
db.get_session().commit()
flash('Obnova hesla zrušena', 'warning')
return redirect(url_for('login'))
elif len(form.passwd.data) < 8:
flash('Heslo musí být aspoň 8 znaků dlouhé', 'danger')
return render_template('reset.html', form=form)
else:
mo.users.do_reset_password(user)
mo.users.set_password(user, form.passwd.data)
app.logger.info('Login: Reset hesla pro uživatele <%s>', user.email)
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'reset-passwd'},
)
mo.users.login(user)
app.logger.info('Login: Přihlásil se uživatel <%s> po resetování hesla', user.email)
db.get_session().commit()
flash('Nastavení nového hesla a přihlášení do systému proběhlo úspěšně', 'success')
return login_and_redirect(user)
@app.errorhandler(werkzeug.exceptions.Forbidden)
def handle_forbidden(e):
return render_template('forbidden.html')