from typing import Optional from flask import render_template, g, redirect, url_for, flash, request from flask_wtf import FlaskForm import werkzeug.exceptions import wtforms from sqlalchemy import or_ from sqlalchemy.orm import joinedload, subqueryload from wtforms import validators from wtforms.fields.simple import SubmitField from wtforms.validators import Required import mo import mo.db as db from mo.rights import Right import mo.util import mo.users from mo.web import app from mo.web.util import PagerForm class UsersFilterForm(PagerForm): # user search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True}) search_email = wtforms.TextField("E-mail") # participants year = wtforms.IntegerField("Ročník") school_code = wtforms.StringField("Škola") # rounds->participations round_year = wtforms.IntegerField("Ročník") round_category = wtforms.SelectField("Kategorie") round_seq = wtforms.SelectField("Kolo") contest_site_code = wtforms.StringField("Soutěžní oblast") participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices())) submit = wtforms.SubmitField("Filtrovat") # Výstupní hodnoty filtru, None při nepoužitém filtru, prázdná db hodnota při # nepovedené filtraci (neexistující místo a podobně) f_search_name: Optional[str] = None f_search_email: Optional[str] = None f_year: Optional[int] = None f_school: Optional[db.Place] = None f_round_category: Optional[str] = None f_round_seq: Optional[int] = None f_contest_site: Optional[db.Place] = None f_participation_state: Optional[db.PartState] = None def __init__(self, formdata, **kwargs): super().__init__(formdata=formdata, **kwargs) self.round_category.choices = ['*'] + sorted(db.get_categories()) self.round_seq.choices = ['*'] + sorted(db.get_seqs()) def validate(self): self.f_search_name = f"%{self.search_name.data}%" if self.search_name.data else None self.f_search_email = f"%{self.search_email.data}%" if self.search_email.data else None self.f_year = self.year.data self.f_round_year = self.round_year.data if self.school_code.data: self.f_school = db.get_place_by_code(self.school_code.data) if not self.f_school: flash(f"Zadaná škola '{self.school_code.data}' neexistuje", "danger") self.f_school = db.Place() if self.contest_site_code.data: self.f_contest_site = db.get_place_by_code(self.contest_site_code.data) if not self.f_contest_site: flash(f"Zadaná soutěžní oblast '{self.contest_site_code.data}' neexistuje", "danger") self.f_contest_site = db.Place() self.f_round_category = None if self.round_category.data == '*' else self.round_category.data self.f_round_seq = None if self.round_seq.data == '*' else self.round_seq.data self.f_participation_state = None if self.participation_state.data == '*' else self.participation_state.data @app.route('/org/user/') def org_users(): sess = db.get_session() rr = g.gatekeeper.rights_generic() q = sess.query(db.User).filter_by(is_admin=False, is_org=False).options( subqueryload(db.User.participants).joinedload(db.Participant.school_place) ) filter = UsersFilterForm(request.args) filter.validate() if filter.f_search_name: q = q.filter(or_( db.User.first_name.ilike(filter.f_search_name), db.User.last_name.ilike(filter.f_search_name) )) if filter.f_search_email: q = q.filter(db.User.email.ilike(filter.f_search_email)) if filter.f_year or filter.f_school: participant_filter = sess.query(db.Participant.user_id) if filter.f_year: participant_filter = participant_filter.filter_by(year=filter.f_year) if filter.f_school: participant_filter = participant_filter.filter_by(school=filter.f_school.place_id) q = q.filter(db.User.user_id.in_(participant_filter)) round_filter = sess.query(db.Round.round_id) round_filter_apply = False if filter.f_round_year: round_filter = round_filter.filter_by(year=filter.f_round_year) round_filter_apply = True if filter.f_round_category: round_filter = round_filter.filter_by(category=filter.f_round_category) round_filter_apply = True if filter.round_seq.data and filter.round_seq.data != "*": round_filter = round_filter.filter_by(seq=filter.round_seq.data) round_filter_apply = True contest_filter = sess.query(db.Contest.contest_id) contest_filter_apply = False if round_filter_apply: contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter)) contest_filter_apply = True if filter.f_contest_site: contest_filter = contest_filter.filter_by(place_id=filter.f_contest_site.place_id) contest_filter_apply = True participation_filter = sess.query(db.Participation.user_id) participation_filter_apply = False if contest_filter_apply: participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter)) participation_filter_apply = True if filter.f_participation_state: participation_filter = participation_filter.filter_by(state=filter.f_participation_state) participation_filter_apply = True if participation_filter_apply: q = q.filter(db.User.user_id.in_(participation_filter)) # print(str(q)) (count, q) = filter.apply_limits(q, pagesize=50) users = q.all() return render_template( 'org_users.html', users=users, count=count, filter=filter, can_edit=rr.have_right(Right.edit_users), can_add=rr.have_right(Right.add_users), ) class OrgsFilterForm(PagerForm): # user search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True}) search_email = wtforms.TextField("E-mail") # TODO: filtering by roles? submit = wtforms.SubmitField("Filtrovat") # Výstupní hodnoty filtru, None při nepoužitém filtru, prázdná db hodnota při # nepovedené filtraci (neexistující místo a podobně) f_search_name: Optional[str] = None f_search_email: Optional[str] = None def validate(self): self.f_search_name = f"%{self.search_name.data}%" if self.search_name.data else None self.f_search_email = f"%{self.search_email.data}%" if self.search_email.data else None @app.route('/org/org/') def org_orgs(): sess = db.get_session() rr = g.gatekeeper.rights_generic() q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org)).options( subqueryload(db.User.roles).joinedload(db.UserRole.place) ) filter = OrgsFilterForm(request.args) filter.validate() if filter.f_search_name: q = q.filter(or_( db.User.first_name.ilike(filter.f_search_name), db.User.last_name.ilike(filter.f_search_name) )) if filter.f_search_email: q = q.filter(db.User.email.ilike(filter.f_search_email)) (count, q) = filter.apply_limits(q, pagesize=50) users = q.all() return render_template( 'org_orgs.html', users=users, count=count, filter=filter, can_edit=rr.have_right(Right.edit_orgs), can_add=rr.have_right(Right.add_orgs), ) class FormAddRole(FlaskForm): role = wtforms.SelectField('Role', choices=db.RoleType.choices(), coerce=db.RoleType.coerce, render_kw={'autofocus': True}) place_code = wtforms.StringField('Oblast') year = wtforms.IntegerField('Ročník', validators=[validators.Optional()]) category = wtforms.StringField("Kategorie", validators=[validators.Length(max=2)], filters=[lambda x: x or None]) seq = wtforms.IntegerField("Kolo", validators=[validators.Optional()]) submit = wtforms.SubmitField('Přidat roli') class FormRemoveRole(FlaskForm): remove_role_id = wtforms.IntegerField() remove = wtforms.SubmitField('Odebrat roli') class ResendInviteForm(FlaskForm): resend_invite = SubmitField() def do(self, user: db.User): token = mo.users.ask_reset_password(user) db.get_session().commit() if user.last_login_at is None and mo.util.send_new_account_email(user, token): flash('Uvítací e-mail s odkazem pro nastavení hesla odeslán na {}'.format(user.email), 'success') elif mo.util.send_password_reset_email(user, token): flash('E-mail s odkazem pro resetování hesla odeslán na {}'.format(user.email), 'success') else: flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger') @app.route('/org/org/<int:id>/', methods=('GET', 'POST')) def org_org(id: int): sess = db.get_session() user = (sess.query(db.User) .options(subqueryload(db.User.roles).joinedload(db.UserRole.place, db.UserRole.assigned_by_user)) .get(id)) if not user or (not user.is_org and not user.is_admin): raise werkzeug.exceptions.NotFound() rr = g.gatekeeper.rights_generic() can_assign_rights = rr.have_right(Right.assign_rights) resend_invite_form: Optional[ResendInviteForm] = None if rr.can_edit_user(user): resend_invite_form = ResendInviteForm() if resend_invite_form.resend_invite.data and resend_invite_form.validate_on_submit(): resend_invite_form.do(user) return redirect(url_for('org_org', id=id)) form_add_role = FormAddRole() form_remove_role = FormRemoveRole() role_errors = [] if can_assign_rights: if form_add_role.submit.data and form_add_role.validate_on_submit(): new_role = db.UserRole() form_add_role.populate_obj(new_role) new_role.user_id = id new_role.place = db.get_root_place() new_role.assigned_by = g.user.user_id ok = True place_code = form_add_role.place_code.data if place_code: place = db.get_place_by_code(place_code) if not place: role_errors.append("Nepovedlo se nalézt místo podle kódu") ok = False else: new_role.place = place if not g.gatekeeper.can_set_role(new_role): role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role') ok = False if ok: sess.add(new_role) sess.flush() mo.util.log( type=db.LogType.user_role, what=id, details={'action': 'new', 'role': db.row2dict(new_role)}, ) sess.commit() app.logger.info(f"New role for user id {id} added: {db.row2dict(new_role)}") flash(f'Role "{new_role}" úspěšně přidána', 'success') return redirect(url_for('org_user', id=id)) if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit(): role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data) if not role: raise werkzeug.exceptions.NotFound() if not g.gatekeeper.can_set_role(role): role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role') else: sess.delete(role) mo.util.log( type=db.LogType.user_role, what=id, details={'action': 'delete', 'role': db.row2dict(role)}, ) sess.commit() app.logger.info(f"Role for user {id} removed: {db.row2dict(role)}") flash(f'Role "{role}" úspěšně odebrána', 'success') return redirect(url_for('org_user', id=id)) return render_template( 'org_org.html', user=user, can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights, can_incarnate=g.user.is_admin, roles_by_type=mo.rights.roles_by_type, role_errors=role_errors, form_add_role=form_add_role, form_remove_role=form_remove_role, resend_invite_form=resend_invite_form, ) @app.route('/org/user/<int:id>/', methods=('GET', 'POST')) def org_user(id: int): sess = db.get_session() user = mo.users.user_by_uid(id) if not user: raise werkzeug.exceptions.NotFound() if user.is_org or user.is_admin: return redirect(url_for('org_org', id=id)) rr = g.gatekeeper.rights_generic() resend_invite_form: Optional[ResendInviteForm] = None if rr.can_edit_user(user): resend_invite_form = ResendInviteForm() if resend_invite_form.resend_invite.data and resend_invite_form.validate_on_submit(): resend_invite_form.do(user) return redirect(url_for('org_user', id=id)) participants = sess.query(db.Participant).filter_by(user_id=user.user_id) participations = ( sess.query(db.Participation, db.Contest, db.Round) .select_from(db.Participation) .join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id) .join(db.Round) .filter(db.Participation.user == user) .options(joinedload(db.Contest.place)) .order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part) .all() ) return render_template( 'org_user.html', user=user, can_edit=rr.can_edit_user(user), can_incarnate=g.user.is_admin, participants=participants, participations=participations, resend_invite_form=resend_invite_form, ) class UserEditForm(FlaskForm): first_name = wtforms.StringField("Jméno", validators=[Required()], render_kw={'autofocus': True}) last_name = wtforms.StringField("Příjmení", validators=[Required()]) email = wtforms.StringField("E-mail", validators=[Required()]) note = wtforms.TextAreaField("Poznámka") is_test = wtforms.BooleanField("Testovací účet") allow_duplicate_name = wtforms.BooleanField("Přidat účet s duplicitním jménem") submit = wtforms.SubmitField("Uložit") def validate_email(form, field): try: field.data = mo.users.normalize_email(field.data) except mo.CheckError as e: raise wtforms.ValidationError(str(e)) @app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit") @app.route('/org/user/<int:id>/edit', methods=("GET", "POST")) def org_user_edit(id: int): sess = db.get_session() user = mo.users.user_by_uid(id) if not user: raise werkzeug.exceptions.NotFound() is_org = request.endpoint == "org_org_edit" if not is_org and (user.is_admin or user.is_org): return redirect(url_for("org_org_edit", id=id)) if is_org and not (user.is_admin or user.is_org): return redirect(url_for("org_user_edit", id=id)) rr = g.gatekeeper.rights_generic() if not rr.can_edit_user(user): raise werkzeug.exceptions.Forbidden() form = UserEditForm(obj=user) del form.allow_duplicate_name if (user.is_org or user.is_admin) and not g.user.is_admin: # emaily u organizátorů může editovat jen správce del form.email if form.validate_on_submit(): check = True if hasattr(form, 'email') and form.email is not None: other_user = mo.users.user_by_email(form.email.data) if other_user is not None and other_user != user: flash('Zadaný e-mail nelze použít, existuje jiný účet s tímto e-mailem', 'danger') check = False if check: form.populate_obj(user) if sess.is_modified(user): changes = db.get_object_changes(user) app.logger.info(f"User {id} modified, changes: {changes}") mo.util.log( type=db.LogType.user, what=id, details={'action': 'edit', 'changes': changes}, ) sess.commit() flash('Změny uživatele uloženy', 'success') else: flash(u'Žádné změny k uložení', 'info') return redirect(url_for('org_user', id=id)) return render_template('org_user_edit.html', user=user, form=form, is_org=is_org) @app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new") @app.route('/org/user/new/', methods=('GET', 'POST')) def org_user_new(): sess = db.get_session() rr = g.gatekeeper.rights_generic() is_org = request.endpoint == "org_org_new" if is_org and not rr.have_right(Right.add_orgs): raise werkzeug.exceptions.Forbidden() elif not rr.have_right(Right.add_users): raise werkzeug.exceptions.Forbidden() form = UserEditForm() form.submit.label.text = 'Vytvořit' is_duplicate_name = False if form.validate_on_submit(): check = True if mo.users.user_by_email(form.email.data) is not None: flash('Účet s daným e-mailem již existuje', 'danger') check = False if is_org: if (mo.db.get_session().query(db.User) .filter_by(first_name=form.first_name.data, last_name=form.last_name.data, is_org=True) .first() is not None): is_duplicate_name = True if not form.allow_duplicate_name.data: flash('Organizátor s daným jménem již existuje. V případě, že se nejedná o chybu, zaškrtněte políčko ve formuláři.', 'danger') check = False if check: new_user = db.User() form.populate_obj(new_user) new_user.is_org = is_org sess.add(new_user) sess.flush() app.logger.info(f"New user created: {db.row2dict(new_user)}") mo.util.log( type=db.LogType.user, what=new_user.user_id, details={'action': 'new', 'user': db.row2dict(new_user)}, ) sess.commit() flash('Nový uživatel vytvořen', 'success') # Send password (re)set link token = mo.users.ask_reset_password(new_user) db.get_session().commit() if mo.util.send_new_account_email(new_user, token): flash('E-mail s odkazem pro nastavení hesla odeslán na {}'.format(new_user.email), 'success') else: flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger') if is_org: return redirect(url_for('org_org', id=new_user.user_id)) return redirect(url_for('org_user', id=new_user.user_id)) if not is_duplicate_name: del form.allow_duplicate_name return render_template('org_user_new.html', form=form, is_org=is_org)