Skip to content
Snippets Groups Projects
Select Git revision
  • f6d5ee560c0e738b3eba2308d9b9d52c002678f2
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

create-tasks

Blame
  • org_users.py 16.46 KiB
    from typing import Optional
    from flask import render_template, g, redirect, url_for, flash, request
    from flask_wtf import FlaskForm
    import werkzeug.exceptions
    import wtforms
    from sqlalchemy import or_
    from sqlalchemy.orm import joinedload, subqueryload
    
    from wtforms import validators
    
    from wtforms.validators import Required
    
    import mo
    import mo.db as db
    from mo.rights import Right
    import mo.util
    import mo.users
    from mo.web import app
    from mo.web.util import PagerForm
    
    
    class UsersFilterForm(PagerForm):
        # user
        search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True})
        search_email = wtforms.TextField("E-mail")
    
        # participants
        year = wtforms.IntegerField("Ročník")
        school_code = wtforms.StringField("Škola")
    
        # rounds->participations
        round_year = wtforms.IntegerField("Ročník")
        round_category = wtforms.SelectField("Kategorie")
        round_seq = wtforms.SelectField("Kolo")
        contest_site_code = wtforms.StringField("Soutěžní oblast")
        participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices()))
    
        submit = wtforms.SubmitField("Filtrovat")
    
        # Výstupní hodnoty filtru, None při nepoužitém filtru, prázdná db hodnota při
        # nepovedené filtraci (neexistující místo a podobně)
        f_search_name: Optional[str] = None
        f_search_email: Optional[str] = None
        f_year: Optional[int] = None
        f_school: Optional[db.Place] = None
        f_round_category: Optional[str] = None
        f_round_seq: Optional[int] = None
        f_contest_site: Optional[db.Place] = None
        f_participation_state: Optional[db.PartState] = None
    
        def __init__(self, formdata, **kwargs):
            super().__init__(formdata=formdata, **kwargs)
            self.round_category.choices = ['*'] + sorted(db.get_categories())
            self.round_seq.choices = ['*'] + sorted(db.get_seqs())
    
        def validate(self):
            self.f_search_name = f"%{self.search_name.data}%" if self.search_name.data else None
            self.f_search_email = f"%{self.search_email.data}%" if self.search_email.data else None
            self.f_year = self.year.data
            self.f_round_year = self.round_year.data
    
            if self.school_code.data:
                self.f_school = db.get_place_by_code(self.school_code.data)
                if not self.f_school:
                    flash(f"Zadaná škola '{self.school_code.data}' neexistuje", "danger")
                    self.f_school = db.Place()
            if self.contest_site_code.data:
                self.f_contest_site = db.get_place_by_code(self.contest_site_code.data)
                if not self.f_contest_site:
                    flash(f"Zadaná soutěžní oblast '{self.contest_site_code.data}' neexistuje", "danger")
                    self.f_contest_site = db.Place()
    
            self.f_round_category = None if self.round_category.data == '*' else self.round_category.data
            self.f_round_seq = None if self.round_seq.data == '*' else self.round_seq.data
            self.f_participation_state = None if self.participation_state.data == '*' else self.participation_state.data
    
    
    @app.route('/org/user/')
    def org_users():
        sess = db.get_session()
        rr = g.gatekeeper.rights_generic()
    
        q = sess.query(db.User).filter_by(is_admin=False, is_org=False).options(
            subqueryload(db.User.participants).joinedload(db.Participant.school_place)
        )
        filter = UsersFilterForm(request.args)
        filter.validate()
    
        if filter.f_search_name:
            q = q.filter(or_(
                db.User.first_name.ilike(filter.f_search_name),
                db.User.last_name.ilike(filter.f_search_name)
            ))
        if filter.f_search_email:
            q = q.filter(db.User.email.ilike(filter.f_search_email))
    
        if filter.f_year or filter.f_school:
            participant_filter = sess.query(db.Participant.user_id)
            if filter.f_year:
                participant_filter = participant_filter.filter_by(year=filter.f_year)
            if filter.f_school:
                participant_filter = participant_filter.filter_by(school=filter.f_school.place_id)
            q = q.filter(db.User.user_id.in_(participant_filter))
    
        round_filter = sess.query(db.Round.round_id)
        round_filter_apply = False
        if filter.f_round_year:
            round_filter = round_filter.filter_by(year=filter.f_round_year)
            round_filter_apply = True
        if filter.f_round_category:
            round_filter = round_filter.filter_by(category=filter.f_round_category)
            round_filter_apply = True
        if filter.round_seq.data and filter.round_seq.data != "*":
            round_filter = round_filter.filter_by(seq=filter.round_seq.data)
            round_filter_apply = True
    
        contest_filter = sess.query(db.Contest.contest_id)
        contest_filter_apply = False
        if round_filter_apply:
            contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter))
            contest_filter_apply = True
        if filter.f_contest_site:
            contest_filter = contest_filter.filter_by(place_id=filter.f_contest_site.place_id)
            contest_filter_apply = True
    
        participation_filter = sess.query(db.Participation.user_id)
        participation_filter_apply = False
        if contest_filter_apply:
            participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter))
            participation_filter_apply = True
        if filter.f_participation_state:
            participation_filter = participation_filter.filter_by(state=filter.f_participation_state)
            participation_filter_apply = True
    
        if participation_filter_apply:
            q = q.filter(db.User.user_id.in_(participation_filter))
    
        # print(str(q))
        (count, q) = filter.apply_limits(q, pagesize=50)
        users = q.all()
    
        return render_template(
            'org_users.html', users=users, count=count,
            filter=filter,
            can_edit=rr.have_right(Right.edit_users),
            can_add=rr.have_right(Right.add_users),
        )
    
    
    class OrgsFilterForm(PagerForm):
        # user
        search_name = wtforms.TextField("Jméno/příjmení", render_kw={'autofocus': True})
        search_email = wtforms.TextField("E-mail")
    
        # TODO: filtering by roles?
        submit = wtforms.SubmitField("Filtrovat")
    
        # Výstupní hodnoty filtru, None při nepoužitém filtru, prázdná db hodnota při
        # nepovedené filtraci (neexistující místo a podobně)
        f_search_name: Optional[str] = None
        f_search_email: Optional[str] = None
    
        def validate(self):
            self.f_search_name = f"%{self.search_name.data}%" if self.search_name.data else None
            self.f_search_email = f"%{self.search_email.data}%" if self.search_email.data else None
    
    
    @app.route('/org/org/')
    def org_orgs():
        sess = db.get_session()
        rr = g.gatekeeper.rights_generic()
    
        q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org)).options(
            subqueryload(db.User.roles).joinedload(db.UserRole.place)
        )
        filter = OrgsFilterForm(request.args)
        filter.validate()
    
        if filter.f_search_name:
            q = q.filter(or_(
                db.User.first_name.ilike(filter.f_search_name),
                db.User.last_name.ilike(filter.f_search_name)
            ))
        if filter.f_search_email:
            q = q.filter(db.User.email.ilike(filter.f_search_email))
    
        (count, q) = filter.apply_limits(q, pagesize=50)
        users = q.all()
    
        return render_template(
            'org_orgs.html', users=users, count=count,
            filter=filter,
            can_edit=rr.have_right(Right.edit_orgs),
            can_add=rr.have_right(Right.add_orgs),
        )
    
    
    class FormAddRole(FlaskForm):
        role = wtforms.SelectField('Role',  choices=db.RoleType.choices(), coerce=db.RoleType.coerce, render_kw={'autofocus': True})
        place_code = wtforms.StringField('Oblast')
        year = wtforms.IntegerField('Ročník', validators=[validators.Optional()])
        category = wtforms.StringField("Kategorie", validators=[validators.Length(max=2)], filters=[lambda x: x or None])
        seq = wtforms.IntegerField("Kolo", validators=[validators.Optional()])
    
        submit = wtforms.SubmitField('Přidat roli')
    
    
    class FormRemoveRole(FlaskForm):
        remove_role_id = wtforms.IntegerField()
        remove = wtforms.SubmitField('Odebrat roli')
    
    
    @app.route('/org/org/<int:id>/', methods=('GET', 'POST'))
    def org_org(id: int):
        sess = db.get_session()
        user = (sess.query(db.User)
                .options(subqueryload(db.User.roles).joinedload(db.UserRole.place, db.UserRole.assigned_by_user))
                .get(id))
        if not user or (not user.is_org and not user.is_admin):
            raise werkzeug.exceptions.NotFound()
    
        rr = g.gatekeeper.rights_generic()
        can_assign_rights = rr.have_right(Right.assign_rights)
    
        form_add_role = FormAddRole()
        form_remove_role = FormRemoveRole()
        role_errors = []
        if can_assign_rights:
            if form_add_role.submit.data and form_add_role.validate_on_submit():
                new_role = db.UserRole()
                form_add_role.populate_obj(new_role)
    
                new_role.user_id = id
                new_role.place = db.get_root_place()
                new_role.assigned_by = g.user.user_id
    
                ok = True
                place_code = form_add_role.place_code.data
                if place_code:
                    place = db.get_place_by_code(place_code)
                    if not place:
                        role_errors.append("Nepovedlo se nalézt místo podle kódu")
                        ok = False
                    else:
                        new_role.place = place
    
                if not g.gatekeeper.can_set_role(new_role):
                    role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
                    ok = False
    
                if ok:
                    sess.add(new_role)
                    sess.flush()
                    mo.util.log(
                        type=db.LogType.user_role,
                        what=id,
                        details={'action': 'new', 'role': db.row2dict(new_role)},
                    )
                    sess.commit()
                    app.logger.info(f"New role for user id {id} added: {db.row2dict(new_role)}")
                    flash(f'Role "{new_role}" úspěšně přidána', 'success')
                    return redirect(url_for('org_user', id=id))
    
            if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit():
                role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data)
                if not role:
                    raise werkzeug.exceptions.NotFound()
    
                if not g.gatekeeper.can_set_role(role):
                    role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role')
                else:
                    sess.delete(role)
                    mo.util.log(
                        type=db.LogType.user_role,
                        what=id,
                        details={'action': 'delete', 'role': db.row2dict(role)},
                    )
                    sess.commit()
                    app.logger.info(f"Role for user {id} removed: {db.row2dict(role)}")
                    flash(f'Role "{role}" úspěšně odebrána', 'success')
                    return redirect(url_for('org_user', id=id))
    
        return render_template(
            'org_org.html', user=user,
            can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights,
            can_incarnate=g.user.is_admin,
            roles_by_type=mo.rights.roles_by_type, role_errors=role_errors,
            form_add_role=form_add_role, form_remove_role=form_remove_role,
        )
    
    
    @app.route('/org/user/<int:id>/', methods=('GET', 'POST'))
    def org_user(id: int):
        sess = db.get_session()
        user = mo.users.user_by_uid(id)
        if not user:
            raise werkzeug.exceptions.NotFound()
        if user.is_org or user.is_admin:
            return redirect(url_for('org_org', id=id))
    
        rr = g.gatekeeper.rights_generic()
    
        participants = sess.query(db.Participant).filter_by(user_id=user.user_id)
        participations = (
            sess.query(db.Participation, db.Contest, db.Round)
            .select_from(db.Participation)
            .join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id)
            .join(db.Round)
            .filter(db.Participation.user == user)
            .options(joinedload(db.Contest.place))
            .order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part)
            .all()
        )
    
        return render_template(
            'org_user.html', user=user, can_edit=rr.can_edit_user(user),
            can_incarnate=g.user.is_admin,
            participants=participants, participations=participations,
        )
    
    
    class UserEditForm(FlaskForm):
        first_name = wtforms.StringField("Jméno", validators=[Required()], render_kw={'autofocus': True})
        last_name = wtforms.StringField("Příjmení", validators=[Required()])
        email = wtforms.StringField("E-mail", validators=[Required()])
        note = wtforms.TextAreaField("Poznámka")
        submit = wtforms.SubmitField("Uložit")
    
        def validate_email(form, field):
            try:
                field.data = mo.users.normalize_email(field.data)
            except mo.CheckError as e:
                raise wtforms.ValidationError(str(e))
    
    
    @app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit")
    @app.route('/org/user/<int:id>/edit', methods=("GET", "POST"))
    def org_user_edit(id: int):
        sess = db.get_session()
        user = mo.users.user_by_uid(id)
        if not user:
            raise werkzeug.exceptions.NotFound()
    
        is_org = request.endpoint == "org_org_edit"
    
        if not is_org and (user.is_admin or user.is_org):
            return redirect(url_for("org_org_edit", id=id))
        if is_org and not (user.is_admin or user.is_org):
            return redirect(url_for("org_user_edit", id=id))
    
        rr = g.gatekeeper.rights_generic()
        if not rr.can_edit_user(user):
            raise werkzeug.exceptions.Forbidden()
    
        form = UserEditForm(obj=user)
        if (user.is_org or user.is_admin) and not g.user.is_admin:
            # emaily u organizátorů může editovat jen správce
            del form.email
        if form.validate_on_submit():
            check = True
    
            if mo.users.user_by_email(form.email.data) is not None:
                flash('Zadaný e-mail nelze použít, existuje jiný účet s tímto e-mailem', 'danger')
                check = False
    
            if check:
                form.populate_obj(user)
                if sess.is_modified(user):
                    changes = db.get_object_changes(user)
    
                    app.logger.info(f"User {id} modified, changes: {changes}")
                    mo.util.log(
                        type=db.LogType.user,
                        what=id,
                        details={'action': 'edit', 'changes': changes},
                    )
                    sess.commit()
                    flash('Změny uživatele uloženy', 'success')
                else:
                    flash(u'Žádné změny k uložení', 'info')
    
                return redirect(url_for('org_user', id=id))
    
        return render_template('org_user_edit.html', user=user, form=form, is_org=is_org)
    
    
    @app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new")
    @app.route('/org/user/new/', methods=('GET', 'POST'))
    def org_user_new():
        sess = db.get_session()
        rr = g.gatekeeper.rights_generic()
    
        is_org = request.endpoint == "org_org_new"
    
        if is_org and not rr.have_right(Right.add_orgs):
            raise werkzeug.exceptions.Forbidden()
        elif not rr.have_right(Right.add_users):
            raise werkzeug.exceptions.Forbidden()
    
        form = UserEditForm()
        form.submit.label.text = 'Vytvořit'
        if form.validate_on_submit():
            check = True
    
            if mo.users.user_by_email(form.email.data) is not None:
                flash('Účet s daným e-mailem již existuje', 'danger')
                check = False
    
            if check:
                new_user = db.User()
                form.populate_obj(new_user)
                new_user.is_org = is_org
                sess.add(new_user)
                sess.flush()
    
                app.logger.info(f"New user created: {db.row2dict(new_user)}")
                mo.util.log(
                    type=db.LogType.user,
                    what=new_user.user_id,
                    details={'action': 'new', 'user': db.row2dict(new_user)},
                )
    
                sess.commit()
                flash('Nový uživatel vytvořen', 'success')
    
                # Send password (re)set link
                token = mo.users.ask_reset_password(new_user)
                db.get_session().commit()
    
                if mo.util.send_new_account_email(new_user, token):
                    flash('E-mail s odkazem pro nastavení hesla odeslán na {}'.format(new_user.email), 'success')
                else:
                    flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger')
    
                if is_org:
                    return redirect(url_for('org_org', id=new_user.user_id))
                return redirect(url_for('org_user', id=new_user.user_id))
    
        return render_template('org_user_new.html', form=form, is_org=is_org)