Skip to content
Snippets Groups Projects
Select Git revision
  • 4d655dee8cba9c232db862cdd37ff4614fa298b9
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

org_users.py

Blame
  • org_users.py 23.07 KiB
    from typing import Optional, List
    from flask import render_template, g, redirect, url_for, flash, request
    from flask_wtf import FlaskForm
    import werkzeug.exceptions
    import wtforms
    from sqlalchemy import or_
    import flask_sqlalchemy
    from sqlalchemy.orm import joinedload, subqueryload
    
    from wtforms import validators
    from wtforms.fields.simple import SubmitField
    
    from wtforms.validators import Required
    
    import mo
    import mo.db as db
    from mo.rights import Right
    import mo.util
    import mo.users
    from mo.web import app
    import mo.web.fields as mo_fields
    from mo.web.util import PagerForm
    
    
    class UsersFilterForm(PagerForm):
        # user
        search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True})
        search_email = wtforms.TextField("E-mail")
    
        # participants
        year = mo_fields.OptionalInt("Ročník")
        school = mo_fields.School()
    
        # rounds->participations
        round_year = mo_fields.OptionalInt("Ročník")
        round_category = wtforms.SelectField("Kategorie")
        round_seq = wtforms.SelectField("Kolo")
        contest_site = mo_fields.Place("Soutěžní oblast")
        participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices()))
    
        submit = wtforms.SubmitField("Filtrovat")
    
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            self.round_category.choices = ['*'] + sorted(db.get_categories())
            self.round_seq.choices = ['*'] + sorted(db.get_seqs())
    
    
    @app.route('/org/user/', methods=('GET', 'POST'))
    def org_users():
        sess = db.get_session()
        rr = g.gatekeeper.rights_generic()
    
        q = sess.query(db.User).filter_by(is_admin=False, is_org=False).options(
            subqueryload(db.User.participants).joinedload(db.Participant.school_place)
        )
        filter = UsersFilterForm(formdata=request.args)
        if request.args:
            filter.validate()
    
        if filter.search_name.data:
            q = q.filter(or_(
                db.User.first_name.ilike(f"%{filter.search_name.data}%"),
                db.User.last_name .ilike(f"%{filter.search_name.data}%")
            ))
        if filter.search_email.data:
            q = q.filter(db.User.email.ilike(f"%{filter.search_email.data}%"))
    
        if filter.year.data or filter.school.place:
            participant_filter = sess.query(db.Participant.user_id)
            if filter.year.data:
                participant_filter = participant_filter.filter_by(year=filter.year.data)
            if filter.school.place:
                participant_filter = participant_filter.filter_by(school=filter.school.place.place_id)
            q = q.filter(db.User.user_id.in_(participant_filter))
    
        round_filter = sess.query(db.Round.round_id)
        round_filter_apply = False
        if filter.round_year.data:
            round_filter = round_filter.filter_by(year=filter.round_year.data)
            round_filter_apply = True
        if filter.round_category.data and filter.round_category.data != "*":
            round_filter = round_filter.filter_by(category=filter.round_category.data)
            round_filter_apply = True
        if filter.round_seq.data and filter.round_seq.data != "*":
            round_filter = round_filter.filter_by(seq=filter.round_seq.data)
            round_filter_apply = True
    
        contest_filter = sess.query(db.Contest.contest_id)
        contest_filter_apply = False
        if round_filter_apply:
            contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter))
            contest_filter_apply = True
        if filter.contest_site.place:
            contest_filter = contest_filter.filter_by(place_id=filter.contest_site.place.place_id)
            contest_filter_apply = True
    
        participation_filter = sess.query(db.Participation.user_id)
        participation_filter_apply = False
        if contest_filter_apply:
            participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter))
            participation_filter_apply = True
        if filter.participation_state.data and filter.participation_state.data != '*':
            participation_filter = participation_filter.filter_by(state=filter.participation_state.data)
            participation_filter_apply = True
    
        if participation_filter_apply:
            q = q.filter(db.User.user_id.in_(participation_filter))
    
        # print(str(q))
        (count, q) = filter.apply_limits(q, pagesize=50)
        users = q.all()
    
        return render_template(
            'org_users.html', users=users, count=count,
            filter=filter,
            can_edit=rr.have_right(Right.edit_users),
            can_add=rr.have_right(Right.add_users),
        )
    
    
    class OrgsFilterForm(PagerForm):
        # user
        search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True})
        search_email = wtforms.TextField("E-mail")
    
        search_role = wtforms.SelectMultipleField('Role', choices=db.RoleType.choices(), coerce=db.RoleType.coerce, validators=[validators.Optional()])
        search_right_for_place = mo_fields.Place('Právo pro oblast', validators=[validators.Optional()])
        search_in_place = mo_fields.Place('V oblasti', validators=[validators.Optional()])
        search_place_level = wtforms.SelectMultipleField("Úroveň oblasti", choices=[(i.level, i.name) for i in db.place_levels], validators=[validators.Optional()], coerce=int)
        search_year = wtforms.StringField('Ročník', validators=[validators.Optional()])
        search_category = wtforms.StringField("Kategorie", validators=[validators.Optional()])
        search_seq = wtforms.StringField("Kolo", validators=[validators.Optional()])
    
        submit = wtforms.SubmitField("Filtrovat")
        show_role_filter = wtforms.SubmitField("Zobrazit filtrování dle rolí")
        hide_role_filter = wtforms.SubmitField("Skrýt filtrování dle rolí")
    
        is_role_filter = wtforms.HiddenField(default="") # "" -> skrýt. "yes" -> zobrazit
    
        # Výstupní hodnoty filtru, None při nepoužitém filtru nebo špatné hodnotě (takové filtry ignorujeme)
        f_search_name: Optional[str] = None
        f_search_email: Optional[str] = None
        f_search_role: Optional[List[db.RoleType]] = None
        f_search_right_for_place: Optional[db.Place] = None
        f_search_in_place: Optional[db.Place] = None
        f_search_year: Optional[List[int]] = None
        f_search_category: Optional[List[str]] = None
        f_search_place_level: Optional[List[int]] = None
        f_search_seq: Optional[List[int]] = None
    
        def validate_search_name(self, field):
            self.f_search_name = f"%{field.data}%"
    
        def validate_search_email(self, field):
            self.f_search_email = f"%{field.data}%"
    
        def validate_search_role(self, field):
            self.f_search_role = field.data
    
        def validate_search_right_for_place(self, field):
            self.f_search_right_for_place = field.place
    
        def validate_search_in_place(self, field):
            self.f_search_in_place = field.place
    
        def validate_search_year(self, field):
            try:
                self.f_search_year = mo.util.parse_int_list(field.data)
            except mo.CheckError as e:
                raise wtforms.ValidationError(str(e))
    
        def validate_search_category(self, field):
            self.f_search_category = field.data.split(",")
    
        def validate_search_place_level(self, field):
            self.f_search_place_level = field.data
    
        def validate_search_seq(self, field):
            try:
                self.f_search_seq = mo.util.parse_int_list(field.data)
            except mo.CheckError as e:
                raise wtforms.ValidationError(str(e))
    
        def prepare_role_filter(self):
            if self.show_role_filter.data:
                self.is_role_filter.data = "yes"
            if self.hide_role_filter.data:
                self.is_role_filter.data = ""
            if self.is_role_filter.data:
                del self.show_role_filter
            else:
                del self.hide_role_filter
                del self.search_role
                del self.search_right_for_place
                del self.search_in_place
                del self.search_place_level
                del self.search_year
                del self.search_category
                del self.search_seq
    
    
    @app.route('/org/org/', methods=('GET', 'POST'))
    def org_orgs():
        sess = db.get_session()
        rr = g.gatekeeper.rights_generic()
        q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org)).options(
            subqueryload(db.User.roles).joinedload(db.UserRole.place)
        )
        filter = OrgsFilterForm(formdata=request.args)
        if request.args:
            filter.validate()
        filter.prepare_role_filter()
    
        if filter.f_search_name:
            q = q.filter(or_(
                db.User.first_name.ilike(filter.f_search_name),
                db.User.last_name.ilike(filter.f_search_name)
            ))
    
        def query_filter_role(qr: flask_sqlalchemy.BaseQuery) -> flask_sqlalchemy.BaseQuery:
            if filter.f_search_role is not None:
                qr = qr.filter(db.UserRole.role.in_(filter.f_search_role))
            if filter.f_search_category is not None:
                qr = qr.filter(or_(db.UserRole.category.in_(filter.f_search_category), db.UserRole.category == None))
            if filter.f_search_seq is not None:
                qr = qr.filter(or_(db.UserRole.seq.in_(filter.f_search_seq), db.UserRole.seq == None))
            if filter.f_search_year is not None:
                qr = qr.filter(or_(db.UserRole.year.in_(filter.f_search_year), db.UserRole.year == None))
            if filter.f_search_in_place is not None:
                qr = qr.filter(db.UserRole.place_id.in_(db.place_descendant_cte(filter.f_search_in_place)))
            if filter.f_search_right_for_place is not None:
                qr = qr.filter(db.UserRole.place_id.in_([x.place_id for x in db.get_place_parents(filter.f_search_right_for_place)]))
                        # Po n>3 hodinách v mo.db jsem dospěl k závěru, že to hezčeji neumím (neumím vyrobit place_parents_cte)
            if filter.f_search_place_level is not None:
                qr = qr.filter(db.UserRole.place_id.in_(
                    sess.query(db.Place.place_id).filter(db.Place.level.in_(filter.f_search_place_level))
                    ))
            return qr
    
        if filter.is_role_filter.data:
            qr = sess.query(db.UserRole.user_id)
            qr = query_filter_role(qr)
            q = q.filter(db.User.user_id.in_(qr))
    
        q = q.order_by(db.User.user_id)
    
        (count, q) = filter.apply_limits(q, pagesize=50)
        users = q.all()
    
        marked_roles_id: Set[int] = set()
    
        if filter.is_role_filter.data:
            qmr = sess.query(db.UserRole.user_role_id).filter(db.UserRole.user_id.in_([i.user_id for i in users]))
            qmr = query_filter_role(qmr)
            marked_roles_id = set([i[0] for i in qmr.all()])
    
        return render_template(
            'org_orgs.html', users=users, count=count,
            filter=filter,
            marked_roles_id=marked_roles_id,
            can_edit=rr.have_right(Right.edit_orgs),
            can_add=rr.have_right(Right.add_orgs),
        )
    
    
    class FormAddRole(FlaskForm):
        role = wtforms.SelectField('Role', choices=db.RoleType.choices(), coerce=db.RoleType.coerce, render_kw={'autofocus': True})
        place = mo_fields.Place()
        year = wtforms.IntegerField('Ročník', validators=[validators.Optional()])
        category = wtforms.StringField("Kategorie", validators=[validators.Length(max=2)], filters=[lambda x: x or None])
        seq = wtforms.IntegerField("Kolo", validators=[validators.Optional()])
    
        submit = wtforms.SubmitField('Přidat roli')
    
    
    class FormRemoveRole(FlaskForm):
        remove_role_id = wtforms.IntegerField()
        remove = wtforms.SubmitField('Odebrat roli')
    
    
    class ResendInviteForm(FlaskForm):
        resend_invite = SubmitField()
    
        def do(self, user: db.User):
            token = mo.users.ask_reset_password(user)
            db.get_session().commit()
            if user.last_login_at is None and mo.util.send_new_account_email(user, token):
                flash('Uvítací e-mail s odkazem pro nastavení hesla odeslán na {}'.format(user.email), 'success')
            elif mo.util.send_password_reset_email(user, token):
                flash('E-mail s odkazem pro resetování hesla odeslán na {}'.format(user.email), 'success')
            else:
                flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger')
    
    
    @app.route('/org/org/<int:id>/', methods=('GET', 'POST'))
    def org_org(id: int):
        sess = db.get_session()
        user = (sess.query(db.User)
                .options(subqueryload(db.User.roles).joinedload(db.UserRole.place, db.UserRole.assigned_by_user))
                .get(id))
        if not user or (not user.is_org and not user.is_admin):
            raise werkzeug.exceptions.NotFound()
    
        rr = g.gatekeeper.rights_generic()
        can_assign_rights = rr.have_right(Right.assign_rights)
    
        resend_invite_form: Optional[ResendInviteForm] = None
        if rr.can_edit_user(user):
            resend_invite_form = ResendInviteForm()
            if resend_invite_form.resend_invite.data and resend_invite_form.validate_on_submit():
                resend_invite_form.do(user)
                return redirect(url_for('org_org', id=id))
    
        form_add_role = FormAddRole()
        form_remove_role = FormRemoveRole()
        role_errors = []
        if can_assign_rights:
            if form_add_role.submit.data and form_add_role.validate_on_submit():
                new_role = db.UserRole()
                form_add_role.populate_obj(new_role)
    
                new_role.user_id = id
                new_role.place = db.get_root_place()
                new_role.assigned_by = g.user.user_id
    
                ok = True
                new_role.place = form_add_role.place.place
    
                if not g.gatekeeper.can_set_role(new_role):
                    role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
                    ok = False
    
                if ok:
                    sess.add(new_role)
                    sess.flush()
                    mo.util.log(
                        type=db.LogType.user_role,
                        what=id,
                        details={'action': 'new', 'role': db.row2dict(new_role)},
                    )
                    sess.commit()
                    app.logger.info(f"New role for user id {id} added: {db.row2dict(new_role)}")
                    flash(f'Role "{new_role}" úspěšně přidána', 'success')
                    return redirect(url_for('org_user', id=id))
    
            if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit():
                role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data)
                if not role:
                    raise werkzeug.exceptions.NotFound()
    
                if not g.gatekeeper.can_set_role(role):
                    role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role')
                else:
                    sess.delete(role)
                    mo.util.log(
                        type=db.LogType.user_role,
                        what=id,
                        details={'action': 'delete', 'role': db.row2dict(role)},
                    )
                    sess.commit()
                    app.logger.info(f"Role for user {id} removed: {db.row2dict(role)}")
                    flash(f'Role "{role}" úspěšně odebrána', 'success')
                    return redirect(url_for('org_user', id=id))
    
        return render_template(
            'org_org.html', user=user,
            can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights,
            can_incarnate=g.user.is_admin,
            roles_by_type=mo.rights.roles_by_type, role_errors=role_errors,
            form_add_role=form_add_role, form_remove_role=form_remove_role,
            resend_invite_form=resend_invite_form,
        )
    
    
    @app.route('/org/user/<int:id>/', methods=('GET', 'POST'))
    def org_user(id: int):
        sess = db.get_session()
        user = mo.users.user_by_uid(id)
        if not user:
            raise werkzeug.exceptions.NotFound()
        if user.is_org or user.is_admin:
            return redirect(url_for('org_org', id=id))
    
        rr = g.gatekeeper.rights_generic()
    
        resend_invite_form: Optional[ResendInviteForm] = None
        if rr.can_edit_user(user):
            resend_invite_form = ResendInviteForm()
            if resend_invite_form.resend_invite.data and resend_invite_form.validate_on_submit():
                resend_invite_form.do(user)
                return redirect(url_for('org_user', id=id))
    
        participants = sess.query(db.Participant).filter_by(user_id=user.user_id)
        participations = (
            sess.query(db.Participation, db.Contest, db.Round)
            .select_from(db.Participation)
            .join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id)
            .join(db.Round)
            .filter(db.Participation.user == user)
            .options(joinedload(db.Contest.place))
            .order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part)
            .all()
        )
    
        return render_template(
            'org_user.html', user=user, can_edit=rr.can_edit_user(user),
            can_incarnate=g.user.is_admin,
            participants=participants, participations=participations,
            resend_invite_form=resend_invite_form,
        )
    
    
    class UserEditForm(FlaskForm):
        first_name = mo_fields.FirstName(validators=[Required()], render_kw={'autofocus': True})
        last_name = mo_fields.LastName(validators=[Required()])
        email = mo_fields.Email(validators=[Required()])
        note = wtforms.TextAreaField("Poznámka")
        is_test = wtforms.BooleanField("Testovací účet")
        allow_duplicate_name = wtforms.BooleanField("Přidat účet s duplicitním jménem")
        submit = wtforms.SubmitField("Uložit")
    
    
    @app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit")
    @app.route('/org/user/<int:id>/edit', methods=("GET", "POST"))
    def org_user_edit(id: int):
        sess = db.get_session()
        user = mo.users.user_by_uid(id)
        if not user:
            raise werkzeug.exceptions.NotFound()
    
        is_org = request.endpoint == "org_org_edit"
    
        if not is_org and (user.is_admin or user.is_org):
            return redirect(url_for("org_org_edit", id=id))
        if is_org and not (user.is_admin or user.is_org):
            return redirect(url_for("org_user_edit", id=id))
    
        rr = g.gatekeeper.rights_generic()
        if not rr.can_edit_user(user):
            raise werkzeug.exceptions.Forbidden()
    
        form = UserEditForm(obj=user)
        del form.allow_duplicate_name
        if (user.is_org or user.is_admin) and not g.user.is_admin:
            # emaily u organizátorů může editovat jen správce
            del form.email
        if form.validate_on_submit():
            check = True
    
            if hasattr(form, 'email') and form.email is not None:
                other_user = mo.users.user_by_email(form.email.data)
                if other_user is not None and other_user != user:
                    flash('Zadaný e-mail nelze použít, existuje jiný účet s tímto e-mailem', 'danger')
                    check = False
    
            if check:
                form.populate_obj(user)
                if sess.is_modified(user):
                    changes = db.get_object_changes(user)
    
                    app.logger.info(f"User {id} modified, changes: {changes}")
                    mo.util.log(
                        type=db.LogType.user,
                        what=id,
                        details={'action': 'edit', 'changes': changes},
                    )
                    sess.commit()
                    flash('Změny uživatele uloženy', 'success')
                else:
                    flash('Žádné změny k uložení', 'info')
    
                return redirect(url_for('org_user', id=id))
    
        return render_template('org_user_edit.html', user=user, form=form, is_org=is_org)
    
    
    @app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new")
    @app.route('/org/user/new/', methods=('GET', 'POST'))
    def org_user_new():
        sess = db.get_session()
        rr = g.gatekeeper.rights_generic()
    
        is_org = request.endpoint == "org_org_new"
    
        if is_org and not rr.have_right(Right.add_orgs):
            raise werkzeug.exceptions.Forbidden()
        elif not rr.have_right(Right.add_users):
            raise werkzeug.exceptions.Forbidden()
    
        form = UserEditForm()
        form.submit.label.text = 'Vytvořit'
        is_duplicate_name = False
        if form.validate_on_submit():
            check = True
    
            if mo.users.user_by_email(form.email.data) is not None:
                flash('Účet s daným e-mailem již existuje', 'danger')
                check = False
    
            if is_org:
                if (mo.db.get_session().query(db.User)
                        .filter_by(first_name=form.first_name.data, last_name=form.last_name.data, is_org=True)
                        .first() is not None):
                    is_duplicate_name = True
                    if not form.allow_duplicate_name.data:
                        flash('Organizátor s daným jménem již existuje. V případě, že se nejedná o chybu, zaškrtněte políčko ve formuláři.', 'danger')
                        check = False
    
            if check:
                new_user = db.User()
                form.populate_obj(new_user)
                new_user.is_org = is_org
                sess.add(new_user)
                sess.flush()
    
                app.logger.info(f"New user created: {db.row2dict(new_user)}")
                mo.util.log(
                    type=db.LogType.user,
                    what=new_user.user_id,
                    details={'action': 'new', 'user': db.row2dict(new_user)},
                )
    
                sess.commit()
                flash('Nový uživatel vytvořen', 'success')
    
                # Send password (re)set link
                token = mo.users.ask_reset_password(new_user)
                db.get_session().commit()
    
                if mo.util.send_new_account_email(new_user, token):
                    flash('E-mail s odkazem pro nastavení hesla odeslán na {}'.format(new_user.email), 'success')
                else:
                    flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger')
    
                if is_org:
                    return redirect(url_for('org_org', id=new_user.user_id))
                return redirect(url_for('org_user', id=new_user.user_id))
    
        if not is_duplicate_name:
            del form.allow_duplicate_name
        return render_template('org_user_new.html', form=form, is_org=is_org)
    
    
    class ParticipantEditForm(FlaskForm):
        school = mo_fields.School("Škola", validators=[Required()], render_kw={'autofocus': True})
        grade = mo_fields.Grade("Třída", validators=[Required()])
        birth_year = mo_fields.BirthYear("Rok narození", validators=[Required()])
        submit = wtforms.SubmitField("Uložit")
    
    
    @app.route('/org/user/<int:user_id>/participant/<int:year>/edit', methods=('GET', 'POST'))
    def org_user_participant_edit(user_id: int, year: int):
        sess = db.get_session()
        user = mo.users.user_by_uid(user_id)
        if not user:
            raise werkzeug.exceptions.NotFound()
    
        rr = g.gatekeeper.rights_generic()
        if not rr.can_edit_user(user):
            raise werkzeug.exceptions.Forbidden()
    
        participant = sess.query(db.Participant).filter_by(user_id=user.user_id).filter_by(year=year).one_or_none()
        if participant is None:
            raise werkzeug.exceptions.NotFound()
    
        form = ParticipantEditForm(obj=participant)
        if form.validate_on_submit():
            form.populate_obj(participant)
            if sess.is_modified(participant):
                changes = db.get_object_changes(participant)
    
                app.logger.info(f"Participant id {id} year {year} modified, changes: {changes}")
                mo.util.log(
                    type=db.LogType.participant,
                    what=user_id,
                    details={'action': 'edit-participant', 'year': year, 'changes': changes},
                )
                sess.commit()
                flash('Změny registrace uloženy', 'success')
            else:
                flash('Žádné změny k uložení', 'info')
    
            return redirect(url_for('org_user', id=user_id))
    
        return render_template('org_user_participant_edit.html', user=user, year=year, form=form)