Project 'mj/mo-submit' was moved to 'mo-p/osmo'. Please update any links and bookmarks that may still have the old path.
Select Git revision
org_users.py 16.46 KiB
from typing import Optional
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import werkzeug.exceptions
import wtforms
from sqlalchemy import or_
from sqlalchemy.orm import joinedload, subqueryload
from wtforms import validators
from wtforms.validators import Required
import mo
import mo.db as db
from mo.rights import Right
import mo.util
import mo.users
from mo.web import app
from mo.web.util import PagerForm
class UsersFilterForm(PagerForm):
# user
search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True})
search_email = wtforms.TextField("E-mail")
# participants
year = wtforms.IntegerField("Ročník")
school_code = wtforms.StringField("Škola")
# rounds->participations
round_year = wtforms.IntegerField("Ročník")
round_category = wtforms.SelectField("Kategorie")
round_seq = wtforms.SelectField("Kolo")
contest_site_code = wtforms.StringField("Soutěžní oblast")
participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices()))
submit = wtforms.SubmitField("Filtrovat")
# Výstupní hodnoty filtru, None při nepoužitém filtru, prázdná db hodnota při
# nepovedené filtraci (neexistující místo a podobně)
f_search_name: Optional[str] = None
f_search_email: Optional[str] = None
f_year: Optional[int] = None
f_school: Optional[db.Place] = None
f_round_category: Optional[str] = None
f_round_seq: Optional[int] = None
f_contest_site: Optional[db.Place] = None
f_participation_state: Optional[db.PartState] = None
def __init__(self, formdata, **kwargs):
super().__init__(formdata=formdata, **kwargs)
self.round_category.choices = ['*'] + sorted(db.get_categories())
self.round_seq.choices = ['*'] + sorted(db.get_seqs())
def validate(self):
self.f_search_name = f"%{self.search_name.data}%" if self.search_name.data else None
self.f_search_email = f"%{self.search_email.data}%" if self.search_email.data else None
self.f_year = self.year.data
self.f_round_year = self.round_year.data
if self.school_code.data:
self.f_school = db.get_place_by_code(self.school_code.data)
if not self.f_school:
flash(f"Zadaná škola '{self.school_code.data}' neexistuje", "danger")
self.f_school = db.Place()
if self.contest_site_code.data:
self.f_contest_site = db.get_place_by_code(self.contest_site_code.data)
if not self.f_contest_site:
flash(f"Zadaná soutěžní oblast '{self.contest_site_code.data}' neexistuje", "danger")
self.f_contest_site = db.Place()
self.f_round_category = None if self.round_category.data == '*' else self.round_category.data
self.f_round_seq = None if self.round_seq.data == '*' else self.round_seq.data
self.f_participation_state = None if self.participation_state.data == '*' else self.participation_state.data
@app.route('/org/user/')
def org_users():
sess = db.get_session()
rr = g.gatekeeper.rights_generic()
q = sess.query(db.User).filter_by(is_admin=False, is_org=False).options(
subqueryload(db.User.participants).joinedload(db.Participant.school_place)
)
filter = UsersFilterForm(request.args)
filter.validate()
if filter.f_search_name:
q = q.filter(or_(
db.User.first_name.ilike(filter.f_search_name),
db.User.last_name.ilike(filter.f_search_name)
))
if filter.f_search_email:
q = q.filter(db.User.email.ilike(filter.f_search_email))
if filter.f_year or filter.f_school:
participant_filter = sess.query(db.Participant.user_id)
if filter.f_year:
participant_filter = participant_filter.filter_by(year=filter.f_year)
if filter.f_school:
participant_filter = participant_filter.filter_by(school=filter.f_school.place_id)
q = q.filter(db.User.user_id.in_(participant_filter))
round_filter = sess.query(db.Round.round_id)
round_filter_apply = False
if filter.f_round_year:
round_filter = round_filter.filter_by(year=filter.f_round_year)
round_filter_apply = True
if filter.f_round_category:
round_filter = round_filter.filter_by(category=filter.f_round_category)
round_filter_apply = True
if filter.round_seq.data and filter.round_seq.data != "*":
round_filter = round_filter.filter_by(seq=filter.round_seq.data)
round_filter_apply = True
contest_filter = sess.query(db.Contest.contest_id)
contest_filter_apply = False
if round_filter_apply:
contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter))
contest_filter_apply = True
if filter.f_contest_site:
contest_filter = contest_filter.filter_by(place_id=filter.f_contest_site.place_id)
contest_filter_apply = True
participation_filter = sess.query(db.Participation.user_id)
participation_filter_apply = False
if contest_filter_apply:
participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter))
participation_filter_apply = True
if filter.f_participation_state:
participation_filter = participation_filter.filter_by(state=filter.f_participation_state)
participation_filter_apply = True
if participation_filter_apply:
q = q.filter(db.User.user_id.in_(participation_filter))
# print(str(q))
(count, q) = filter.apply_limits(q, pagesize=50)
users = q.all()
return render_template(
'org_users.html', users=users, count=count,
filter=filter,
can_edit=rr.have_right(Right.edit_users),
can_add=rr.have_right(Right.add_users),
)
class OrgsFilterForm(PagerForm):
# user
search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True})
search_email = wtforms.TextField("E-mail")
# TODO: filtering by roles?
submit = wtforms.SubmitField("Filtrovat")
# Výstupní hodnoty filtru, None při nepoužitém filtru, prázdná db hodnota při
# nepovedené filtraci (neexistující místo a podobně)
f_search_name: Optional[str] = None
f_search_email: Optional[str] = None
def validate(self):
self.f_search_name = f"%{self.search_name.data}%" if self.search_name.data else None
self.f_search_email = f"%{self.search_email.data}%" if self.search_email.data else None
@app.route('/org/org/')
def org_orgs():
sess = db.get_session()
rr = g.gatekeeper.rights_generic()
q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org)).options(
subqueryload(db.User.roles).joinedload(db.UserRole.place)
)
filter = OrgsFilterForm(request.args)
filter.validate()
if filter.f_search_name:
q = q.filter(or_(
db.User.first_name.ilike(filter.f_search_name),
db.User.last_name.ilike(filter.f_search_name)
))
if filter.f_search_email:
q = q.filter(db.User.email.ilike(filter.f_search_email))
(count, q) = filter.apply_limits(q, pagesize=50)
users = q.all()
return render_template(
'org_orgs.html', users=users, count=count,
filter=filter,
can_edit=rr.have_right(Right.edit_orgs),
can_add=rr.have_right(Right.add_orgs),
)
class FormAddRole(FlaskForm):
role = wtforms.SelectField('Role', choices=db.RoleType.choices(), coerce=db.RoleType.coerce, render_kw={'autofocus': True})
place_code = wtforms.StringField('Oblast')
year = wtforms.IntegerField('Ročník', validators=[validators.Optional()])
category = wtforms.StringField("Kategorie", validators=[validators.Length(max=2)], filters=[lambda x: x or None])
seq = wtforms.IntegerField("Kolo", validators=[validators.Optional()])
submit = wtforms.SubmitField('Přidat roli')
class FormRemoveRole(FlaskForm):
remove_role_id = wtforms.IntegerField()
remove = wtforms.SubmitField('Odebrat roli')
@app.route('/org/org/<int:id>/', methods=('GET', 'POST'))
def org_org(id: int):
sess = db.get_session()
user = (sess.query(db.User)
.options(subqueryload(db.User.roles).joinedload(db.UserRole.place, db.UserRole.assigned_by_user))
.get(id))
if not user or (not user.is_org and not user.is_admin):
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_generic()
can_assign_rights = rr.have_right(Right.assign_rights)
form_add_role = FormAddRole()
form_remove_role = FormRemoveRole()
role_errors = []
if can_assign_rights:
if form_add_role.submit.data and form_add_role.validate_on_submit():
new_role = db.UserRole()
form_add_role.populate_obj(new_role)
new_role.user_id = id
new_role.place = db.get_root_place()
new_role.assigned_by = g.user.user_id
ok = True
place_code = form_add_role.place_code.data
if place_code:
place = db.get_place_by_code(place_code)
if not place:
role_errors.append("Nepovedlo se nalézt místo podle kódu")
ok = False
else:
new_role.place = place
if not g.gatekeeper.can_set_role(new_role):
role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
ok = False
if ok:
sess.add(new_role)
sess.flush()
mo.util.log(
type=db.LogType.user_role,
what=id,
details={'action': 'new', 'role': db.row2dict(new_role)},
)
sess.commit()
app.logger.info(f"New role for user id {id} added: {db.row2dict(new_role)}")
flash(f'Role "{new_role}" úspěšně přidána', 'success')
return redirect(url_for('org_user', id=id))
if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit():
role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data)
if not role:
raise werkzeug.exceptions.NotFound()
if not g.gatekeeper.can_set_role(role):
role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role')
else:
sess.delete(role)
mo.util.log(
type=db.LogType.user_role,
what=id,
details={'action': 'delete', 'role': db.row2dict(role)},
)
sess.commit()
app.logger.info(f"Role for user {id} removed: {db.row2dict(role)}")
flash(f'Role "{role}" úspěšně odebrána', 'success')
return redirect(url_for('org_user', id=id))
return render_template(
'org_org.html', user=user,
can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights,
can_incarnate=g.user.is_admin,
roles_by_type=mo.rights.roles_by_type, role_errors=role_errors,
form_add_role=form_add_role, form_remove_role=form_remove_role,
)
@app.route('/org/user/<int:id>/', methods=('GET', 'POST'))
def org_user(id: int):
sess = db.get_session()
user = mo.users.user_by_uid(id)
if not user:
raise werkzeug.exceptions.NotFound()
if user.is_org or user.is_admin:
return redirect(url_for('org_org', id=id))
rr = g.gatekeeper.rights_generic()
participants = sess.query(db.Participant).filter_by(user_id=user.user_id)
participations = (
sess.query(db.Participation, db.Contest, db.Round)
.select_from(db.Participation)
.join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id)
.join(db.Round)
.filter(db.Participation.user == user)
.options(joinedload(db.Contest.place))
.order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part)
.all()
)
return render_template(
'org_user.html', user=user, can_edit=rr.can_edit_user(user),
can_incarnate=g.user.is_admin,
participants=participants, participations=participations,
)
class UserEditForm(FlaskForm):
first_name = wtforms.StringField("Jméno", validators=[Required()], render_kw={'autofocus': True})
last_name = wtforms.StringField("Příjmení", validators=[Required()])
email = wtforms.StringField("E-mail", validators=[Required()])
note = wtforms.TextAreaField("Poznámka")
submit = wtforms.SubmitField("Uložit")
def validate_email(form, field):
try:
field.data = mo.users.normalize_email(field.data)
except mo.CheckError as e:
raise wtforms.ValidationError(str(e))
@app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit")
@app.route('/org/user/<int:id>/edit', methods=("GET", "POST"))
def org_user_edit(id: int):
sess = db.get_session()
user = mo.users.user_by_uid(id)
if not user:
raise werkzeug.exceptions.NotFound()
is_org = request.endpoint == "org_org_edit"
if not is_org and (user.is_admin or user.is_org):
return redirect(url_for("org_org_edit", id=id))
if is_org and not (user.is_admin or user.is_org):
return redirect(url_for("org_user_edit", id=id))
rr = g.gatekeeper.rights_generic()
if not rr.can_edit_user(user):
raise werkzeug.exceptions.Forbidden()
form = UserEditForm(obj=user)
if (user.is_org or user.is_admin) and not g.user.is_admin:
# emaily u organizátorů může editovat jen správce
del form.email
if form.validate_on_submit():
check = True
if mo.users.user_by_email(form.email.data) is not None:
flash('Zadaný e-mail nelze použít, existuje jiný účet s tímto e-mailem', 'danger')
check = False
if check:
form.populate_obj(user)
if sess.is_modified(user):
changes = db.get_object_changes(user)
app.logger.info(f"User {id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.user,
what=id,
details={'action': 'edit', 'changes': changes},
)
sess.commit()
flash('Změny uživatele uloženy', 'success')
else:
flash(u'Žádné změny k uložení', 'info')
return redirect(url_for('org_user', id=id))
return render_template('org_user_edit.html', user=user, form=form, is_org=is_org)
@app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new")
@app.route('/org/user/new/', methods=('GET', 'POST'))
def org_user_new():
sess = db.get_session()
rr = g.gatekeeper.rights_generic()
is_org = request.endpoint == "org_org_new"
if is_org and not rr.have_right(Right.add_orgs):
raise werkzeug.exceptions.Forbidden()
elif not rr.have_right(Right.add_users):
raise werkzeug.exceptions.Forbidden()
form = UserEditForm()
form.submit.label.text = 'Vytvořit'
if form.validate_on_submit():
check = True
if mo.users.user_by_email(form.email.data) is not None:
flash('Účet s daným e-mailem již existuje', 'danger')
check = False
if check:
new_user = db.User()
form.populate_obj(new_user)
new_user.is_org = is_org
sess.add(new_user)
sess.flush()
app.logger.info(f"New user created: {db.row2dict(new_user)}")
mo.util.log(
type=db.LogType.user,
what=new_user.user_id,
details={'action': 'new', 'user': db.row2dict(new_user)},
)
sess.commit()
flash('Nový uživatel vytvořen', 'success')
# Send password (re)set link
token = mo.users.ask_reset_password(new_user)
db.get_session().commit()
if mo.util.send_new_account_email(new_user, token):
flash('E-mail s odkazem pro nastavení hesla odeslán na {}'.format(new_user.email), 'success')
else:
flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger')
if is_org:
return redirect(url_for('org_org', id=new_user.user_id))
return redirect(url_for('org_user', id=new_user.user_id))
return render_template('org_user_new.html', form=form, is_org=is_org)