from flask import render_template, g, redirect, url_for, flash, request from flask_wtf import FlaskForm import werkzeug.exceptions import wtforms from sqlalchemy import or_ from typing import Optional from wtforms import validators from wtforms.validators import Required import mo import mo.db as db import mo.rights import mo.util import mo.users from mo.web import app class PagerForm(FlaskForm): limit = wtforms.IntegerField() offset = wtforms.IntegerField() next = wtforms.SubmitField("Další") previous = wtforms.SubmitField("Předchozí") class UsersFilterForm(PagerForm): # participants year = wtforms.IntegerField("Ročník") school_code = wtforms.StringField("Škola") # rounds->participations round_year = wtforms.IntegerField("Ročník") round_category = wtforms.SelectField("Kategorie", choices=['*'] + sorted(db.get_categories())) round_seq = wtforms.SelectField("Kolo", choices=['*'] + sorted(db.get_seqs())) contest_site_code = wtforms.StringField("Soutěžní místo") participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices())) submit = wtforms.SubmitField("Filtrovat") @app.route('/org/user/') def org_users(): sess = db.get_session() rr = mo.rights.Rights(g.user) rr.get_generic() q = sess.query(db.User).filter_by(is_admin=False, is_org=False) filter = UsersFilterForm(request.args) filter_errors = [] participant_filter = sess.query(db.Participant.user_id) participant_filter_apply = False if filter.year.data: participant_filter = participant_filter.filter_by(year=filter.year.data) participant_filter_apply = True if filter.school_code.data: place = db.place_by_code(filter.school_code.data) if place: participant_filter = participant_filter.filter_by(school=place.place_id) participant_filter_apply = True else: filter_errors.append("Neexistující kód školy") if participant_filter_apply: q = q.filter(db.User.user_id.in_(participant_filter)) round_filter = sess.query(db.Round.round_id) round_filter_apply = False if filter.round_year.data: round_filter = round_filter.filter_by(year=filter.round_year.data) round_filter_apply = True if filter.round_category.data and filter.round_category.data != "*": round_filter = round_filter.filter_by(category=filter.round_category.data) round_filter_apply = True if filter.round_seq.data and filter.round_seq.data != "*": round_filter = round_filter.filter_by(seq=filter.round_seq.data) round_filter_apply = True contest_filter = sess.query(db.Contest.contest_id) contest_filter_apply = False if round_filter_apply: contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter)) contest_filter_apply = True if filter.contest_site_code.data: place = db.place_by_code(filter.contest_site_code.data) if place: contest_filter = contest_filter.filter_by(place_id=place.place_id) contest_filter_apply = True else: filter_errors.append("Neexistující kód soutěžního místa") participation_filter = sess.query(db.Participation.user_id) participation_filter_apply = False if contest_filter_apply: participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter)) participation_filter_apply = True if filter.participation_state.data and filter.participation_state.data != '*': participation_filter = participation_filter.filter_by(state=filter.participation_state.data) participation_filter_apply = True if participation_filter_apply: q = q.filter(db.User.user_id.in_(participation_filter)) # print(str(q)) count = q.count() if not filter.offset.data: filter.offset.data = 0 if not filter.limit.data: filter.limit.data = 50 if filter.previous.data: filter.offset.data = max(0, filter.offset.data - 50) if filter.next.data: filter.offset.data = min(count // 50 * 50, filter.offset.data + 50) q = q.offset(filter.offset.data) q = q.limit(filter.limit.data) users = q.all() return render_template( 'org_users.html', users=users, count=count, filter=filter, filter_errors=filter_errors, can_edit=rr.have_right(mo.rights.Right.edit_users), can_add=rr.have_right(mo.rights.Right.add_users), ) class OrgsFilterForm(PagerForm): # TODO: filtering by roles? submit = wtforms.SubmitField("Filtrovat") @app.route('/org/org/') def org_orgs(): sess = db.get_session() rr = mo.rights.Rights(g.user) rr.get_generic() q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org)) filter = OrgsFilterForm(request.args) # TODO: filtering by roles? count = q.count() if not filter.offset.data: filter.offset.data = 0 if not filter.limit.data: filter.limit.data = 50 if filter.previous.data: filter.offset.data = max(0, filter.offset.data - 50) if filter.next.data: filter.offset.data = min(count // 50 * 50, filter.offset.data + 50) q = q.offset(filter.offset.data) q = q.limit(filter.limit.data) users = q.all() return render_template( 'org_orgs.html', users=users, count=count, filter=filter, filter_errors=None, can_edit=rr.have_right(mo.rights.Right.edit_orgs), can_add=rr.have_right(mo.rights.Right.add_orgs), ) class FormAddRole(FlaskForm): role = wtforms.SelectField('Role', choices=[(name.name, role.name) for (name, role) in mo.rights.roles_by_type.items()]) place_code = wtforms.StringField('Oblast') year = wtforms.IntegerField('Ročník', validators=[validators.Optional()]) category = wtforms.StringField("Kategorie") seq = wtforms.IntegerField("Kolo", validators=[validators.Optional()]) submit = wtforms.SubmitField('Přidat roli') class FormRemoveRole(FlaskForm): remove_role_id = wtforms.IntegerField() remove = wtforms.SubmitField('Odebrat roli') @app.route('/org/org/<int:id>/', methods=('GET', 'POST')) def org_org(id: int): sess = db.get_session() user = sess.query(db.User).get(id) if not user or (not user.is_org and not user.is_admin): raise werkzeug.exceptions.NotFound() rr = mo.rights.Rights(g.user) rr.get_generic() can_assign_rights = rr.have_right(mo.rights.Right.assign_rights) form_add_role = FormAddRole() form_remove_role = FormRemoveRole() role_errors = [] if can_assign_rights: if form_add_role.submit.data and form_add_role.validate_on_submit(): new_role = db.UserRole() form_add_role.populate_obj(new_role) new_role.user_id = id new_role.place = db.get_root_place() new_role.assigned_by = g.user.user_id ok = True place_code = form_add_role.place_code.data if place_code: place = db.get_place_by_code(place_code) if not place: role_errors.append("Nepovedlo se nalézt místo podle kódu") ok = False else: new_role.place = place if not rr.can_set_role(new_role): role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role') ok = False if ok: sess.add(new_role) sess.flush() mo.util.log( type=db.LogType.user_role, what=id, details={'action': 'new', 'role': db.row2dict(new_role)}, ) sess.commit() app.logger.info(f"New role for user id {id} added: {db.row2dict(new_role)}") flash(f'Role "{new_role}" úspěšně přidána', 'success') return redirect(url_for('org_user', id=id)) if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit(): role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data) if not role: raise werkzeug.exceptions.NotFound if id == g.user.user_id: role_errors.append('Nelze odebrat vlastní roli') elif not rr.can_set_role(role): role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role') else: sess.delete(role) mo.util.log( type=db.LogType.user_role, what=id, details={'action': 'delete', 'role': db.row2dict(role)}, ) sess.commit() app.logger.info(f"Role for user {id} removed: {db.row2dict(role)}") flash(f'Role "{role}" úspěšně odebrána', 'success') return redirect(url_for('org_user', id=id)) return render_template( 'org_org.html', user=user, can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights, roles_by_type=mo.rights.roles_by_type, role_errors=role_errors, form_add_role=form_add_role, form_remove_role=form_remove_role, ) @app.route('/org/user/<int:id>/', methods=('GET', 'POST')) def org_user(id: int): sess = db.get_session() user = sess.query(db.User).get(id) if not user: raise werkzeug.exceptions.NotFound() if user.is_org or user.is_admin: return redirect(url_for('org_org', id=id)) rr = mo.rights.Rights(g.user) rr.get_generic() participants = sess.query(db.Participant).filter_by(user_id=user.user_id) rounds = sess.query(db.Participation).filter_by(user_id=user.user_id) return render_template( 'org_user.html', user=user, can_edit=rr.can_edit_user(user), participants=participants, rounds=rounds ) class UserEditForm(FlaskForm): first_name = wtforms.StringField("Jméno", validators=[Required()]) last_name = wtforms.StringField("Příjmení", validators=[Required()]) note = wtforms.TextAreaField("Poznámka") submit = wtforms.SubmitField("Uložit") class NewUserForm(UserEditForm): email = wtforms.StringField("E-mail", validators=[Required()]) submit = wtforms.SubmitField("Vytvořit") @app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit") @app.route('/org/user/<int:id>/edit', methods=("GET", "POST")) def org_user_edit(id: int): sess = db.get_session() user = mo.users.user_by_uid(id) if not user: raise werkzeug.exceptions.NotFound() is_org = request.endpoint == "org_org_edit" if not is_org and (user.is_admin or user.is_org): return redirect(url_for("org_org_edit", id=id)) if is_org and not (user.is_admin or user.is_org): return redirect(url_for("org_user_edit", id=id)) rr = mo.rights.Rights(g.user) rr.get_generic() if not rr.can_edit_user(user): raise werkzeug.exceptions.Forbidden() form = UserEditForm(obj=user) if form.validate_on_submit(): form.populate_obj(user) if sess.is_modified(user): changes = db.get_object_changes(user) app.logger.info(f"User {id} modified, changes: {changes}") mo.util.log( type=db.LogType.user, what=id, details={'action': 'edit', 'changes': changes}, ) sess.commit() flash('Změny uživatele uloženy', 'success') else: flash(u'Žádné změny k uložení', 'info') return redirect(url_for('org_user', id=id)) return render_template('org_user_edit.html', user=user, form=form, is_org=is_org) @app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new") @app.route('/org/user/new/', methods=('GET', 'POST')) def org_user_new(): sess = db.get_session() rr = mo.rights.Rights(g.user) rr.get_generic() is_org = request.endpoint == "org_org_new" if is_org and not rr.have_right(mo.rights.Right.add_orgs): raise werkzeug.exceptions.Forbidden() elif not rr.have_right(mo.rights.Right.add_users): raise werkzeug.exceptions.Forbidden() form = NewUserForm() if form.validate_on_submit(): check = True if mo.users.user_by_email(form.email.data) is not None: flash('Účet s daným e-mailem již existuje', 'danger') check = False if check: new_user = db.User() form.populate_obj(new_user) new_user.is_org = is_org sess.add(new_user) sess.flush() app.logger.info(f"New user created: {db.row2dict(new_user)}") mo.util.log( type=db.LogType.user, what=new_user.user_id, details={'action': 'new', 'user': db.row2dict(new_user)}, ) sess.commit() flash('Nový uživatel vytvořen', 'success') # Send password (re)set link token = mo.users.ask_reset_password(new_user) link = url_for('reset', token=token, _external=True) db.get_session().commit() try: mo.util.send_password_reset_email(new_user, link) flash('E-mail s odkazem pro nastavení hesla odeslán na {}'.format(new_user.email), 'success') except RuntimeError as e: app.logger.error('Login: problém při posílání e-mailu: {}'.format(e)) flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger') if is_org: return redirect(url_for('org_org', id=new_user.user_id)) return redirect(url_for('org_user', id=new_user.user_id)) return render_template('org_user_new.html', form=form, is_org=is_org)