Select Git revision
-
Martin Mareš authored
Zatím se používá pouze pro pomocné skripty.
Martin Mareš authoredZatím se používá pouze pro pomocné skripty.
org_users.py 13.77 KiB
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import werkzeug.exceptions
import wtforms
from sqlalchemy import or_
from typing import Optional
from wtforms import validators
from wtforms.validators import Required
import mo
import mo.db as db
import mo.rights
import mo.util
import mo.users
from mo.web import app
class PagerForm(FlaskForm):
limit = wtforms.IntegerField()
offset = wtforms.IntegerField()
next = wtforms.SubmitField("Další")
previous = wtforms.SubmitField("Předchozí")
class UsersFilterForm(PagerForm):
# participants
year = wtforms.IntegerField("Ročník")
school_code = wtforms.StringField("Škola")
# rounds->participations
round_year = wtforms.IntegerField("Ročník")
round_category = wtforms.SelectField("Kategorie", choices=['*'] + sorted(db.get_categories()))
round_seq = wtforms.SelectField("Kolo", choices=['*'] + sorted(db.get_seqs()))
contest_site_code = wtforms.StringField("Soutěžní místo")
participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices()))
submit = wtforms.SubmitField("Filtrovat")
@app.route('/org/user/')
def org_users():
sess = db.get_session()
rr = mo.rights.Rights(g.user)
rr.get_generic()
q = sess.query(db.User).filter_by(is_admin=False, is_org=False)
filter = UsersFilterForm(request.args)
filter_errors = []
participant_filter = sess.query(db.Participant.user_id)
participant_filter_apply = False
if filter.year.data:
participant_filter = participant_filter.filter_by(year=filter.year.data)
participant_filter_apply = True
if filter.school_code.data:
place = db.place_by_code(filter.school_code.data)
if place:
participant_filter = participant_filter.filter_by(school=place.place_id)
participant_filter_apply = True
else:
filter_errors.append("Neexistující kód školy")
if participant_filter_apply:
q = q.filter(db.User.user_id.in_(participant_filter))
round_filter = sess.query(db.Round.round_id)
round_filter_apply = False
if filter.round_year.data:
round_filter = round_filter.filter_by(year=filter.round_year.data)
round_filter_apply = True
if filter.round_category.data and filter.round_category.data != "*":
round_filter = round_filter.filter_by(category=filter.round_category.data)
round_filter_apply = True
if filter.round_seq.data and filter.round_seq.data != "*":
round_filter = round_filter.filter_by(seq=filter.round_seq.data)
round_filter_apply = True
contest_filter = sess.query(db.Contest.contest_id)
contest_filter_apply = False
if round_filter_apply:
contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter))
contest_filter_apply = True
if filter.contest_site_code.data:
place = db.place_by_code(filter.contest_site_code.data)
if place:
contest_filter = contest_filter.filter_by(place_id=place.place_id)
contest_filter_apply = True
else:
filter_errors.append("Neexistující kód soutěžního místa")
participation_filter = sess.query(db.Participation.user_id)
participation_filter_apply = False
if contest_filter_apply:
participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter))
participation_filter_apply = True
if filter.participation_state.data and filter.participation_state.data != '*':
participation_filter = participation_filter.filter_by(state=filter.participation_state.data)
participation_filter_apply = True
if participation_filter_apply:
q = q.filter(db.User.user_id.in_(participation_filter))
# print(str(q))
count = q.count()
if not filter.offset.data:
filter.offset.data = 0
if not filter.limit.data:
filter.limit.data = 50
if filter.previous.data:
filter.offset.data = max(0, filter.offset.data - 50)
if filter.next.data:
filter.offset.data = min(count // 50 * 50, filter.offset.data + 50)
q = q.offset(filter.offset.data)
q = q.limit(filter.limit.data)
users = q.all()
return render_template(
'org_users.html', users=users, count=count,
filter=filter, filter_errors=filter_errors,
can_edit=rr.have_right(mo.rights.Right.edit_users),
can_add=rr.have_right(mo.rights.Right.add_users),
)
class OrgsFilterForm(PagerForm):
# TODO: filtering by roles?
submit = wtforms.SubmitField("Filtrovat")
@app.route('/org/org/')
def org_orgs():
sess = db.get_session()
rr = mo.rights.Rights(g.user)
rr.get_generic()
q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org))
filter = OrgsFilterForm(request.args)
# TODO: filtering by roles?
count = q.count()
if not filter.offset.data:
filter.offset.data = 0
if not filter.limit.data:
filter.limit.data = 50
if filter.previous.data:
filter.offset.data = max(0, filter.offset.data - 50)
if filter.next.data:
filter.offset.data = min(count // 50 * 50, filter.offset.data + 50)
q = q.offset(filter.offset.data)
q = q.limit(filter.limit.data)
users = q.all()
return render_template(
'org_orgs.html', users=users, count=count,
filter=filter, filter_errors=None,
can_edit=rr.have_right(mo.rights.Right.edit_orgs),
can_add=rr.have_right(mo.rights.Right.add_orgs),
)
class FormAddRole(FlaskForm):
role = wtforms.SelectField('Role', choices=[(name.name, role.name) for (name, role) in mo.rights.roles_by_type.items()])
place_code = wtforms.StringField('Oblast')
year = wtforms.IntegerField('Ročník', validators=[validators.Optional()])
category = wtforms.StringField("Kategorie")
seq = wtforms.IntegerField("Kolo", validators=[validators.Optional()])
submit = wtforms.SubmitField('Přidat roli')
class FormRemoveRole(FlaskForm):
remove_role_id = wtforms.IntegerField()
remove = wtforms.SubmitField('Odebrat roli')
@app.route('/org/org/<int:id>/', methods=('GET', 'POST'))
def org_org(id: int):
sess = db.get_session()
user = sess.query(db.User).get(id)
if not user or (not user.is_org and not user.is_admin):
raise werkzeug.exceptions.NotFound()
rr = mo.rights.Rights(g.user)
rr.get_generic()
can_assign_rights = rr.have_right(mo.rights.Right.assign_rights)
form_add_role = FormAddRole()
form_remove_role = FormRemoveRole()
role_errors = []
if can_assign_rights:
if form_add_role.submit.data and form_add_role.validate_on_submit():
new_role = db.UserRole()
form_add_role.populate_obj(new_role)
new_role.user_id = id
new_role.place = db.get_root_place()
new_role.assigned_by = g.user.user_id
ok = True
place_code = form_add_role.place_code.data
if place_code:
place = db.get_place_by_code(place_code)
if not place:
role_errors.append("Nepovedlo se nalézt místo podle kódu")
ok = False
else:
new_role.place = place
if not rr.can_set_role(new_role):
role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
ok = False
if ok:
sess.add(new_role)
sess.flush()
mo.util.log(
type=db.LogType.user_role,
what=id,
details={'action': 'new', 'role': db.row2dict(new_role)},
)
sess.commit()
app.logger.info(f"New role for user id {id} added: {db.row2dict(new_role)}")
flash(f'Role "{new_role}" úspěšně přidána', 'success')
return redirect(url_for('org_user', id=id))
if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit():
role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data)
if not role:
raise werkzeug.exceptions.NotFound
if id == g.user.user_id:
role_errors.append('Nelze odebrat vlastní roli')
elif not rr.can_set_role(role):
role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role')
else:
sess.delete(role)
mo.util.log(
type=db.LogType.user_role,
what=id,
details={'action': 'delete', 'role': db.row2dict(role)},
)
sess.commit()
app.logger.info(f"Role for user {id} removed: {db.row2dict(role)}")
flash(f'Role "{role}" úspěšně odebrána', 'success')
return redirect(url_for('org_user', id=id))
return render_template(
'org_org.html', user=user,
can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights,
roles_by_type=mo.rights.roles_by_type, role_errors=role_errors,
form_add_role=form_add_role, form_remove_role=form_remove_role,
)
@app.route('/org/user/<int:id>/', methods=('GET', 'POST'))
def org_user(id: int):
sess = db.get_session()
user = sess.query(db.User).get(id)
if not user:
raise werkzeug.exceptions.NotFound()
if user.is_org or user.is_admin:
return redirect(url_for('org_org', id=id))
rr = mo.rights.Rights(g.user)
rr.get_generic()
participants = sess.query(db.Participant).filter_by(user_id=user.user_id)
rounds = sess.query(db.Participation).filter_by(user_id=user.user_id)
return render_template(
'org_user.html', user=user, can_edit=rr.can_edit_user(user),
participants=participants, rounds=rounds
)
class UserEditForm(FlaskForm):
first_name = wtforms.StringField("Jméno", validators=[Required()])
last_name = wtforms.StringField("Příjmení", validators=[Required()])
note = wtforms.TextAreaField("Poznámka")
submit = wtforms.SubmitField("Uložit")
class NewUserForm(UserEditForm):
email = wtforms.StringField("E-mail", validators=[Required()])
submit = wtforms.SubmitField("Vytvořit")
@app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit")
@app.route('/org/user/<int:id>/edit', methods=("GET", "POST"))
def org_user_edit(id: int):
sess = db.get_session()
user = mo.users.user_by_uid(id)
if not user:
raise werkzeug.exceptions.NotFound()
is_org = request.endpoint == "org_org_edit"
if not is_org and (user.is_admin or user.is_org):
return redirect(url_for("org_org_edit", id=id))
if is_org and not (user.is_admin or user.is_org):
return redirect(url_for("org_user_edit", id=id))
rr = mo.rights.Rights(g.user)
rr.get_generic()
if not rr.can_edit_user(user):
raise werkzeug.exceptions.Forbidden()
form = UserEditForm(obj=user)
if form.validate_on_submit():
form.populate_obj(user)
if sess.is_modified(user):
changes = db.get_object_changes(user)
app.logger.info(f"User {id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.user,
what=id,
details={'action': 'edit', 'changes': changes},
)
sess.commit()
flash('Změny uživatele uloženy', 'success')
else:
flash(u'Žádné změny k uložení', 'info')
return redirect(url_for('org_user', id=id))
return render_template('org_user_edit.html', user=user, form=form, is_org=is_org)
@app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new")
@app.route('/org/user/new/', methods=('GET', 'POST'))
def org_user_new():
sess = db.get_session()
rr = mo.rights.Rights(g.user)
rr.get_generic()
is_org = request.endpoint == "org_org_new"
if is_org and not rr.have_right(mo.rights.Right.add_orgs):
raise werkzeug.exceptions.Forbidden()
elif not rr.have_right(mo.rights.Right.add_users):
raise werkzeug.exceptions.Forbidden()
form = NewUserForm()
if form.validate_on_submit():
check = True
if mo.users.user_by_email(form.email.data) is not None:
flash('Účet s daným e-mailem již existuje', 'danger')
check = False
if check:
new_user = db.User()
form.populate_obj(new_user)
new_user.is_org = is_org
sess.add(new_user)
sess.flush()
app.logger.info(f"New user created: {db.row2dict(new_user)}")
mo.util.log(
type=db.LogType.user,
what=new_user.user_id,
details={'action': 'new', 'user': db.row2dict(new_user)},
)
sess.commit()
flash('Nový uživatel vytvořen', 'success')
# Send password (re)set link
token = mo.users.ask_reset_password(new_user)
link = url_for('reset', token=token, _external=True)
db.get_session().commit()
try:
mo.util.send_password_reset_email(new_user, link)
flash('E-mail s odkazem pro nastavení hesla odeslán na {}'.format(new_user.email), 'success')
except RuntimeError as e:
app.logger.error('Login: problém při posílání e-mailu: {}'.format(e))
flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger')
if is_org:
return redirect(url_for('org_org', id=new_user.user_id))
return redirect(url_for('org_user', id=new_user.user_id))
return render_template('org_user_new.html', form=form, is_org=is_org)