Skip to content
Snippets Groups Projects
Select Git revision
  • a8b775d65bb92647e01e041050de4b4b880b89a5
  • devel default
  • master
  • fo
  • jirka/typing
  • fo-base
  • mj/submit-images
  • jk/issue-96
  • jk/issue-196
  • honza/add-contestant
  • honza/mr7
  • honza/mrf
  • honza/mrd
  • honza/mra
  • honza/mr6
  • honza/submit-images
  • honza/kolo-vs-soutez
  • jh-stress-test-wip
  • shorten-schools
19 results

org_place.py

Blame
  • org_users.py 13.77 KiB
    from flask import render_template, g, redirect, url_for, flash, request
    from flask_wtf import FlaskForm
    import werkzeug.exceptions
    import wtforms
    from sqlalchemy import or_
    
    from typing import Optional
    from wtforms import validators
    
    from wtforms.validators import Required
    
    import mo
    import mo.db as db
    import mo.rights
    import mo.util
    import mo.users
    from mo.web import app
    
    
    class PagerForm(FlaskForm):
        limit = wtforms.IntegerField()
        offset = wtforms.IntegerField()
        next = wtforms.SubmitField("Další")
        previous = wtforms.SubmitField("Předchozí")
    
    
    class UsersFilterForm(PagerForm):
        # participants
        year = wtforms.IntegerField("Ročník")
        school_code = wtforms.StringField("Škola")
    
        # rounds->participations
        round_year = wtforms.IntegerField("Ročník")
        round_category = wtforms.SelectField("Kategorie", choices=['*'] + sorted(db.get_categories()))
        round_seq = wtforms.SelectField("Kolo", choices=['*'] + sorted(db.get_seqs()))
        contest_site_code = wtforms.StringField("Soutěžní místo")
        participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices()))
    
        submit = wtforms.SubmitField("Filtrovat")
    
    
    @app.route('/org/user/')
    def org_users():
        sess = db.get_session()
        rr = mo.rights.Rights(g.user)
        rr.get_generic()
    
        q = sess.query(db.User).filter_by(is_admin=False, is_org=False)
        filter = UsersFilterForm(request.args)
    
        filter_errors = []
    
        participant_filter = sess.query(db.Participant.user_id)
        participant_filter_apply = False
        if filter.year.data:
            participant_filter = participant_filter.filter_by(year=filter.year.data)
            participant_filter_apply = True
        if filter.school_code.data:
            place = db.place_by_code(filter.school_code.data)
            if place:
                participant_filter = participant_filter.filter_by(school=place.place_id)
                participant_filter_apply = True
            else:
                filter_errors.append("Neexistující kód školy")
    
        if participant_filter_apply:
            q = q.filter(db.User.user_id.in_(participant_filter))
    
        round_filter = sess.query(db.Round.round_id)
        round_filter_apply = False
        if filter.round_year.data:
            round_filter = round_filter.filter_by(year=filter.round_year.data)
            round_filter_apply = True
        if filter.round_category.data and filter.round_category.data != "*":
            round_filter = round_filter.filter_by(category=filter.round_category.data)
            round_filter_apply = True
        if filter.round_seq.data and filter.round_seq.data != "*":
            round_filter = round_filter.filter_by(seq=filter.round_seq.data)
            round_filter_apply = True
    
        contest_filter = sess.query(db.Contest.contest_id)
        contest_filter_apply = False
        if round_filter_apply:
            contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter))
            contest_filter_apply = True
        if filter.contest_site_code.data:
            place = db.place_by_code(filter.contest_site_code.data)
            if place:
                contest_filter = contest_filter.filter_by(place_id=place.place_id)
                contest_filter_apply = True
            else:
                filter_errors.append("Neexistující kód soutěžního místa")
    
        participation_filter = sess.query(db.Participation.user_id)
        participation_filter_apply = False
        if contest_filter_apply:
            participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter))
            participation_filter_apply = True
        if filter.participation_state.data and filter.participation_state.data != '*':
            participation_filter = participation_filter.filter_by(state=filter.participation_state.data)
            participation_filter_apply = True
    
        if participation_filter_apply:
            q = q.filter(db.User.user_id.in_(participation_filter))
    
        # print(str(q))
        count = q.count()
    
        if not filter.offset.data:
            filter.offset.data = 0
        if not filter.limit.data:
            filter.limit.data = 50
    
        if filter.previous.data:
            filter.offset.data = max(0, filter.offset.data - 50)
        if filter.next.data:
            filter.offset.data = min(count // 50 * 50, filter.offset.data + 50)
    
        q = q.offset(filter.offset.data)
        q = q.limit(filter.limit.data)
        users = q.all()
    
        return render_template(
            'org_users.html', users=users, count=count,
            filter=filter, filter_errors=filter_errors,
            can_edit=rr.have_right(mo.rights.Right.edit_users),
            can_add=rr.have_right(mo.rights.Right.add_users),
        )
    
    
    class OrgsFilterForm(PagerForm):
        # TODO: filtering by roles?
        submit = wtforms.SubmitField("Filtrovat")
    
    
    @app.route('/org/org/')
    def org_orgs():
        sess = db.get_session()
        rr = mo.rights.Rights(g.user)
        rr.get_generic()
    
        q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org))
        filter = OrgsFilterForm(request.args)
        # TODO: filtering by roles?
    
        count = q.count()
    
        if not filter.offset.data:
            filter.offset.data = 0
        if not filter.limit.data:
            filter.limit.data = 50
    
        if filter.previous.data:
            filter.offset.data = max(0, filter.offset.data - 50)
        if filter.next.data:
            filter.offset.data = min(count // 50 * 50, filter.offset.data + 50)
    
        q = q.offset(filter.offset.data)
        q = q.limit(filter.limit.data)
        users = q.all()
    
        return render_template(
            'org_orgs.html', users=users, count=count,
            filter=filter, filter_errors=None,
            can_edit=rr.have_right(mo.rights.Right.edit_orgs),
            can_add=rr.have_right(mo.rights.Right.add_orgs),
        )
    
    
    class FormAddRole(FlaskForm):
        role = wtforms.SelectField('Role', choices=[(name.name, role.name) for (name, role) in mo.rights.roles_by_type.items()])
        place_code = wtforms.StringField('Oblast')
        year = wtforms.IntegerField('Ročník', validators=[validators.Optional()])
        category = wtforms.StringField("Kategorie")
        seq = wtforms.IntegerField("Kolo", validators=[validators.Optional()])
    
        submit = wtforms.SubmitField('Přidat roli')
    
    
    class FormRemoveRole(FlaskForm):
        remove_role_id = wtforms.IntegerField()
        remove = wtforms.SubmitField('Odebrat roli')
    
    
    @app.route('/org/org/<int:id>/', methods=('GET', 'POST'))
    def org_org(id: int):
        sess = db.get_session()
        user = sess.query(db.User).get(id)
        if not user or (not user.is_org and not user.is_admin):
            raise werkzeug.exceptions.NotFound()
    
        rr = mo.rights.Rights(g.user)
        rr.get_generic()
        can_assign_rights = rr.have_right(mo.rights.Right.assign_rights)
    
        form_add_role = FormAddRole()
        form_remove_role = FormRemoveRole()
        role_errors = []
        if can_assign_rights:
            if form_add_role.submit.data and form_add_role.validate_on_submit():
                new_role = db.UserRole()
                form_add_role.populate_obj(new_role)
    
                new_role.user_id = id
                new_role.place = db.get_root_place()
                new_role.assigned_by = g.user.user_id
    
                ok = True
                place_code = form_add_role.place_code.data
                if place_code:
                    place = db.get_place_by_code(place_code)
                    if not place:
                        role_errors.append("Nepovedlo se nalézt místo podle kódu")
                        ok = False
                    else:
                        new_role.place = place
    
                if not rr.can_set_role(new_role):
                    role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
                    ok = False
    
                if ok:
                    sess.add(new_role)
                    sess.flush()
                    mo.util.log(
                        type=db.LogType.user_role,
                        what=id,
                        details={'action': 'new', 'role': db.row2dict(new_role)},
                    )
                    sess.commit()
                    app.logger.info(f"New role for user id {id} added: {db.row2dict(new_role)}")
                    flash(f'Role "{new_role}" úspěšně přidána', 'success')
                    return redirect(url_for('org_user', id=id))
    
            if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit():
                role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data)
                if not role:
                    raise werkzeug.exceptions.NotFound
    
                if id == g.user.user_id:
                    role_errors.append('Nelze odebrat vlastní roli')
                elif not rr.can_set_role(role):
                    role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role')
                else:
                    sess.delete(role)
                    mo.util.log(
                        type=db.LogType.user_role,
                        what=id,
                        details={'action': 'delete', 'role': db.row2dict(role)},
                    )
                    sess.commit()
                    app.logger.info(f"Role for user {id} removed: {db.row2dict(role)}")
                    flash(f'Role "{role}" úspěšně odebrána', 'success')
                    return redirect(url_for('org_user', id=id))
    
        return render_template(
            'org_org.html', user=user,
            can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights,
            roles_by_type=mo.rights.roles_by_type, role_errors=role_errors,
            form_add_role=form_add_role, form_remove_role=form_remove_role,
        )
    
    
    @app.route('/org/user/<int:id>/', methods=('GET', 'POST'))
    def org_user(id: int):
        sess = db.get_session()
        user = sess.query(db.User).get(id)
        if not user:
            raise werkzeug.exceptions.NotFound()
        if user.is_org or user.is_admin:
            return redirect(url_for('org_org', id=id))
    
        rr = mo.rights.Rights(g.user)
        rr.get_generic()
    
        participants = sess.query(db.Participant).filter_by(user_id=user.user_id)
        rounds = sess.query(db.Participation).filter_by(user_id=user.user_id)
    
        return render_template(
            'org_user.html', user=user, can_edit=rr.can_edit_user(user),
            participants=participants, rounds=rounds
        )
    
    
    class UserEditForm(FlaskForm):
        first_name = wtforms.StringField("Jméno", validators=[Required()])
        last_name = wtforms.StringField("Příjmení", validators=[Required()])
        note = wtforms.TextAreaField("Poznámka")
        submit = wtforms.SubmitField("Uložit")
    
    
    class NewUserForm(UserEditForm):
        email = wtforms.StringField("E-mail", validators=[Required()])
        submit = wtforms.SubmitField("Vytvořit")
    
    
    @app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit")
    @app.route('/org/user/<int:id>/edit', methods=("GET", "POST"))
    def org_user_edit(id: int):
        sess = db.get_session()
        user = mo.users.user_by_uid(id)
        if not user:
            raise werkzeug.exceptions.NotFound()
    
        is_org = request.endpoint == "org_org_edit"
    
        if not is_org and (user.is_admin or user.is_org):
            return redirect(url_for("org_org_edit", id=id))
        if is_org and not (user.is_admin or user.is_org):
            return redirect(url_for("org_user_edit", id=id))
    
        rr = mo.rights.Rights(g.user)
        rr.get_generic()
        if not rr.can_edit_user(user):
            raise werkzeug.exceptions.Forbidden()
    
        form = UserEditForm(obj=user)
        if form.validate_on_submit():
            form.populate_obj(user)
    
            if sess.is_modified(user):
                changes = db.get_object_changes(user)
    
                app.logger.info(f"User {id} modified, changes: {changes}")
                mo.util.log(
                    type=db.LogType.user,
                    what=id,
                    details={'action': 'edit', 'changes': changes},
                )
                sess.commit()
                flash('Změny uživatele uloženy', 'success')
            else:
                flash(u'Žádné změny k uložení', 'info')
    
            return redirect(url_for('org_user', id=id))
    
        return render_template('org_user_edit.html', user=user, form=form, is_org=is_org)
    
    
    @app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new")
    @app.route('/org/user/new/', methods=('GET', 'POST'))
    def org_user_new():
        sess = db.get_session()
        rr = mo.rights.Rights(g.user)
        rr.get_generic()
    
        is_org = request.endpoint == "org_org_new"
    
        if is_org and not rr.have_right(mo.rights.Right.add_orgs):
            raise werkzeug.exceptions.Forbidden()
        elif not rr.have_right(mo.rights.Right.add_users):
            raise werkzeug.exceptions.Forbidden()
    
        form = NewUserForm()
        if form.validate_on_submit():
            check = True
    
            if mo.users.user_by_email(form.email.data) is not None:
                flash('Účet s daným e-mailem již existuje', 'danger')
                check = False
    
            if check:
                new_user = db.User()
                form.populate_obj(new_user)
                new_user.is_org = is_org
                sess.add(new_user)
                sess.flush()
    
                app.logger.info(f"New user created: {db.row2dict(new_user)}")
                mo.util.log(
                    type=db.LogType.user,
                    what=new_user.user_id,
                    details={'action': 'new', 'user': db.row2dict(new_user)},
                )
    
                sess.commit()
                flash('Nový uživatel vytvořen', 'success')
    
                # Send password (re)set link
                token = mo.users.ask_reset_password(new_user)
                link = url_for('reset', token=token, _external=True)
                db.get_session().commit()
    
                try:
                    mo.util.send_password_reset_email(new_user, link)
                    flash('E-mail s odkazem pro nastavení hesla odeslán na {}'.format(new_user.email), 'success')
                except RuntimeError as e:
                    app.logger.error('Login: problém při posílání e-mailu: {}'.format(e))
                    flash('Problém při odesílání e-mailu s odkazem pro nastavení hesla', 'danger')
    
                if is_org:
                    return redirect(url_for('org_org', id=new_user.user_id))
                return redirect(url_for('org_user', id=new_user.user_id))
    
        return render_template('org_user_new.html', form=form, is_org=is_org)