Select Git revision
splay_experiment.py
-
Ondřej Mička authoredOndřej Mička authored
org_users.py 30.09 KiB
from typing import Optional, Set
from flask import render_template, g, redirect, url_for, flash, request
from flask_wtf import FlaskForm
import werkzeug.exceptions
import wtforms
from sqlalchemy import or_, select
import flask_sqlalchemy
from sqlalchemy.orm import joinedload, subqueryload
from wtforms import validators
from wtforms.fields.simple import SubmitField
from wtforms.validators import Required
import mo
import mo.db as db
import mo.email
from mo.imports import GlobalOrgsImport
from mo.rights import Right
import mo.util
import mo.users
from mo.web import app
import mo.web.fields as mo_fields
from mo.web.imports import ImportForm, generic_import_page
from mo.web.util import PagerForm
class UsersFilterForm(PagerForm):
# user
search_name = mo_fields.String("Jméno/příjmení", render_kw={'autofocus': True})
search_email = mo_fields.String("E-mail")
# participants
year = mo_fields.OptionalInt("Ročník")
school = mo_fields.School()
# rounds->participations
round_year = mo_fields.OptionalInt("Ročník")
round_category = wtforms.SelectField("Kategorie")
round_seq = wtforms.SelectField("Kolo")
contest_site = mo_fields.Place("Soutěžní oblast")
participation_state = wtforms.SelectField('Účast', choices=[('*', '*')] + list(db.PartState.choices()))
submit = wtforms.SubmitField("Filtrovat")
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.round_category.choices = ['*'] + sorted(db.get_categories())
self.round_seq.choices = ['*'] + sorted(db.get_seqs())
@app.route('/org/user/', methods=('GET', 'POST'))
def org_users():
sess = db.get_session()
rr = g.gatekeeper.rights_generic()
if rr.have_right(Right.view_all_users):
schools = None
else:
schools = rr.get_user_schools(Right.view_school_users)
q = sess.query(db.User).filter_by(is_admin=False, is_org=False).options(
subqueryload(db.User.participants).joinedload(db.Participant.school_place)
)
filter = UsersFilterForm(formdata=request.args)
if request.args:
filter.validate()
if filter.search_name.data:
q = q.filter(or_(
db.User.first_name.ilike(f"%{filter.search_name.data}%"),
db.User.last_name .ilike(f"%{filter.search_name.data}%")
))
if filter.search_email.data:
q = q.filter(db.User.email.ilike(f"%{filter.search_email.data}%"))
if filter.year.data or filter.school.place:
participant_filter = sess.query(db.Participant.user_id)
if filter.year.data:
participant_filter = participant_filter.filter_by(year=filter.year.data)
if filter.school.place:
participant_filter = participant_filter.filter_by(school=filter.school.place.place_id)
q = q.filter(db.User.user_id.in_(participant_filter))
round_filter = sess.query(db.Round.round_id)
round_filter_apply = False
if filter.round_year.data:
round_filter = round_filter.filter_by(year=filter.round_year.data)
round_filter_apply = True
if filter.round_category.data and filter.round_category.data != "*":
round_filter = round_filter.filter_by(category=filter.round_category.data)
round_filter_apply = True
if filter.round_seq.data and filter.round_seq.data != "*":
round_filter = round_filter.filter_by(seq=filter.round_seq.data)
round_filter_apply = True
contest_filter = sess.query(db.Contest.contest_id)
contest_filter_apply = False
if round_filter_apply:
contest_filter = contest_filter.filter(db.Contest.round_id.in_(round_filter))
contest_filter_apply = True
if filter.contest_site.place:
contest_filter = contest_filter.filter_by(place_id=filter.contest_site.place.place_id)
contest_filter_apply = True
participation_filter = sess.query(db.Participation.user_id)
participation_filter_apply = False
if contest_filter_apply:
participation_filter = participation_filter.filter(db.Participation.contest_id.in_(contest_filter))
participation_filter_apply = True
if filter.participation_state.data and filter.participation_state.data != '*':
participation_filter = participation_filter.filter_by(state=filter.participation_state.data)
participation_filter_apply = True
if participation_filter_apply:
q = q.filter(db.User.user_id.in_(participation_filter))
if schools is not None:
q = rr.restrict_user_query(q, schools)
(count, q) = filter.apply_limits(q, pagesize=50)
users = q.all()
return render_template(
'org_users.html', users=users, count=count,
filter=filter,
can_edit=rr.have_right(Right.edit_all_users) or rr.have_right(Right.edit_school_users),
can_add=rr.have_right(Right.add_users),
is_restricted=(schools is not None),
)
class OrgsFilterForm(PagerForm):
# user
search_name = mo_fields.String("Jméno/příjmení", render_kw={'autofocus': True})
search_email = mo_fields.String("E-mail")
search_role = wtforms.SelectMultipleField('Role', choices=db.RoleType.choices(), coerce=db.RoleType.coerce, validators=[validators.Optional()])
search_right_for_place = mo_fields.Place('Právo pro oblast', validators=[validators.Optional()])
search_in_place = mo_fields.Place('V oblasti', validators=[validators.Optional()])
search_place_level = wtforms.SelectMultipleField("Úroveň oblasti", choices=[(i.level, i.name) for i in db.place_levels], validators=[validators.Optional()], coerce=int)
search_year = mo_fields.IntList('Ročník', validators=[validators.Optional()])
search_category = mo_fields.String("Kategorie", validators=[validators.Optional()])
search_seq = mo_fields.IntList("Kolo", validators=[validators.Optional()])
submit = wtforms.SubmitField("Filtrovat")
show_role_filter = wtforms.SubmitField("Zobrazit filtrování dle rolí")
hide_role_filter = wtforms.SubmitField("Skrýt filtrování dle rolí")
is_role_filter = wtforms.HiddenField(default="") # "" -> skrýt. "yes" -> zobrazit
def prepare_role_filter(self):
if self.show_role_filter.data:
self.is_role_filter.data = "yes"
if self.hide_role_filter.data:
self.is_role_filter.data = ""
if self.is_role_filter.data:
del self.show_role_filter
else:
del self.hide_role_filter
del self.search_role
del self.search_right_for_place
del self.search_in_place
del self.search_place_level
del self.search_year
del self.search_category
del self.search_seq
@app.route('/org/org/', methods=('GET', 'POST'))
def org_orgs():
sess = db.get_session()
rr = g.gatekeeper.rights_generic()
q = sess.query(db.User).filter(or_(db.User.is_admin, db.User.is_org)).options(
subqueryload(db.User.roles).joinedload(db.UserRole.place)
)
filter = OrgsFilterForm(formdata=request.args)
if request.args:
filter.validate()
filter.prepare_role_filter()
if filter.search_name.data:
q = q.filter(or_(
db.User.first_name.ilike(f"%{filter.search_name.data}%"),
db.User.last_name .ilike(f"%{filter.search_name.data}%")
))
if filter.search_email.data:
q = q.filter(db.User.email.ilike(f"%{filter.search_email.data}%"))
def query_filter_role(qr: flask_sqlalchemy.BaseQuery) -> flask_sqlalchemy.BaseQuery:
if filter.search_role.data:
qr = qr.filter(db.UserRole.role.in_(filter.search_role.data))
if filter.search_category.data:
qr = qr.filter(or_(db.UserRole.category.in_(filter.search_category.data.split(",")), db.UserRole.category == None))
if filter.search_seq.list:
qr = qr.filter(or_(db.UserRole.seq.in_(filter.search_seq.list), db.UserRole.seq == None))
if filter.search_year.list:
qr = qr.filter(or_(db.UserRole.year.in_(filter.search_year.list), db.UserRole.year == None))
pass
if filter.search_in_place.place is not None:
qr = qr.filter(db.UserRole.place_id.in_(select([db.place_descendant_cte(filter.search_in_place.place)])))
if filter.search_right_for_place.place is not None:
qr = qr.filter(db.UserRole.place_id.in_([x.place_id for x in db.get_place_ancestors(filter.search_right_for_place.place)]))
# Po n>3 hodinách v mo.db jsem dospěl k závěru, že to hezčeji neumím (neumím vyrobit place_parents_cte)
if filter.search_place_level.data:
qr = qr.filter(db.UserRole.place_id.in_(
sess.query(db.Place.place_id).filter(db.Place.level.in_(filter.search_place_level.data))
))
return qr
if filter.is_role_filter.data:
qr = sess.query(db.UserRole.user_id)
qr = query_filter_role(qr)
q = q.filter(db.User.user_id.in_(qr))
q = q.order_by(db.User.user_id)
(count, q) = filter.apply_limits(q, pagesize=50)
users = q.all()
marked_roles_id: Set[int] = set()
if filter.is_role_filter.data:
qmr = sess.query(db.UserRole.user_role_id).filter(db.UserRole.user_id.in_([i.user_id for i in users]))
qmr = query_filter_role(qmr)
marked_roles_id = set([i[0] for i in qmr.all()])
return render_template(
'org_orgs.html', users=users, count=count,
filter=filter,
marked_roles_id=marked_roles_id,
can_edit=rr.have_right(Right.edit_orgs),
can_add=rr.have_right(Right.add_orgs),
)
class FormAddRole(FlaskForm):
role = wtforms.SelectField('Role', choices=db.RoleType.choices(), coerce=db.RoleType.coerce, render_kw={'autofocus': True})
place = mo_fields.Place()
year = wtforms.IntegerField('Ročník', validators=[validators.Optional()])
category = mo_fields.String("Kategorie", validators=[validators.Length(max=2)], filters=[lambda x: x or None])
seq = wtforms.IntegerField("Kolo", render_kw={"placeholder": "Pořadí kola v kategorii"}, validators=[validators.Optional()])
submit = wtforms.SubmitField('Přidat roli')
class FormRemoveRole(FlaskForm):
remove_role_id = wtforms.IntegerField()
remove = wtforms.SubmitField('Odebrat roli')
class ResendInviteForm(FlaskForm):
resend_invite = SubmitField()
def do(self, user: db.User):
if user.last_login_at is None:
token = mo.users.make_activation_token(user)
db.get_session().commit()
if mo.email.send_new_account_email(user, token):
flash('Uvítací e-mail s odkazem na aktivaci účtu odeslán na {}.'.format(user.email), 'success')
else:
flash('Problém při odesílání e-mailu s odkazem na aktivaci účtu.', 'danger')
else:
flash('Tento uživatel už má účet aktivovaný.', 'danger')
class UpgradeToOrgForm(FlaskForm):
upgrade = SubmitField()
@app.route('/org/org/<int:id>/', methods=('GET', 'POST'))
def org_org(id: int):
sess = db.get_session()
user = (sess.query(db.User)
.options(subqueryload(db.User.roles).joinedload(db.UserRole.place, db.UserRole.assigned_by_user))
.get(id))
if not user or (not user.is_org and not user.is_admin):
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_generic()
if not rr.can_view_user(user):
raise werkzeug.exceptions.Forbidden()
can_assign_rights = rr.have_right(Right.assign_rights)
resend_invite_form: Optional[ResendInviteForm] = None
if user.last_login_at is None and rr.can_edit_user(user):
resend_invite_form = ResendInviteForm()
if resend_invite_form.resend_invite.data and resend_invite_form.validate_on_submit():
resend_invite_form.do(user)
return redirect(url_for('org_org', id=id))
form_add_role = FormAddRole()
form_remove_role = FormRemoveRole()
role_errors = []
if can_assign_rights:
if form_add_role.submit.data and form_add_role.validate_on_submit():
new_role = db.UserRole()
form_add_role.populate_obj(new_role)
new_role.user_id = id
assert form_add_role.place
new_role.place = form_add_role.place.place or db.get_root_place()
new_role.assigned_by = g.user.user_id
ok = True
if not new_role.is_legal():
role_errors.append('Tato kombinace role a místa není povolena')
ok = False
elif not g.gatekeeper.can_set_role(new_role):
role_errors.append(f'Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
ok = False
if ok:
sess.add(new_role)
sess.flush()
mo.util.log(
type=db.LogType.user,
what=id,
details={'action': 'new-role', 'role': db.row2dict(new_role)},
)
sess.commit()
app.logger.info(f"New role for user #{id} added: {db.row2dict(new_role)}")
flash(f'Role "{new_role}" úspěšně přidána.', 'success')
return redirect(url_for('org_user', id=id))
if form_remove_role.remove_role_id.data and form_remove_role.validate_on_submit():
role = sess.query(db.UserRole).get(form_remove_role.remove_role_id.data)
if not role:
raise werkzeug.exceptions.NotFound()
if not g.gatekeeper.can_set_role(role):
role_errors.append(f'Roli "{role}" nelze odebrat, není podmnožinou žádné vaší role')
else:
sess.delete(role)
mo.util.log(
type=db.LogType.user,
what=id,
details={'action': 'delete-role', 'role': db.row2dict(role)},
)
sess.commit()
app.logger.info(f"Role for user #{id} removed: {db.row2dict(role)}")
flash(f'Role "{role}" úspěšně odebrána.', 'success')
return redirect(url_for('org_user', id=id))
return render_template(
'org_org.html', user=user,
can_edit=rr.can_edit_user(user), can_assign_rights=can_assign_rights,
can_incarnate=g.user.is_admin,
roles_by_type=mo.rights.roles_by_type, role_errors=role_errors,
form_add_role=form_add_role, form_remove_role=form_remove_role,
resend_invite_form=resend_invite_form,
)
@app.route('/org/user/<int:id>/', methods=('GET', 'POST'))
def org_user(id: int):
sess = db.get_session()
user = mo.users.user_by_uid(id)
if not user:
raise werkzeug.exceptions.NotFound()
if user.is_org or user.is_admin:
return redirect(url_for('org_org', id=id))
rr = g.gatekeeper.rights_generic()
can_edit = rr.can_edit_user(user)
can_view = can_edit or rr.can_view_user(user) # Zkratka, abychom se vyhnuli drahému dotazu
if not can_view:
raise werkzeug.exceptions.Forbidden()
resend_invite_form: Optional[ResendInviteForm] = None
if user.last_login_at is None and rr.can_edit_user(user):
resend_invite_form = ResendInviteForm()
if resend_invite_form.resend_invite.data and resend_invite_form.validate_on_submit():
resend_invite_form.do(user)
return redirect(url_for('org_user', id=id))
upgrade_form: Optional[UpgradeToOrgForm] = None
if rr.can_edit_user:
upgrade_form = UpgradeToOrgForm()
if upgrade_form.upgrade.data and upgrade_form.validate_on_submit():
try:
mo.users.change_user_to_org(user, reason='web')
sess.commit()
flash('Účet změněn na organizátorský.', 'success')
return redirect(url_for('org_org', id=user.user_id))
except mo.CheckError as e:
flash(str(e), 'danger')
return redirect(url_for('org_user', id=user.user_id))
participants = sess.query(db.Participant).filter_by(user_id=user.user_id)
participations = (
sess.query(db.Participation, db.Contest, db.Round)
.select_from(db.Participation)
.join(db.Contest, db.Contest.master_contest_id == db.Participation.contest_id)
.join(db.Round)
.filter(db.Participation.user == user)
.options(joinedload(db.Contest.place))
.order_by(db.Round.year.desc(), db.Round.category, db.Round.seq, db.Round.part)
.all()
)
return render_template(
'org_user.html', user=user, can_edit=can_edit,
can_incarnate=g.user.is_admin,
participants=participants, participations=participations,
resend_invite_form=resend_invite_form,
upgrade_form=upgrade_form,
)
class UserEditForm(FlaskForm):
first_name = mo_fields.FirstName(validators=[Required()], render_kw={'autofocus': True})
last_name = mo_fields.LastName(validators=[Required()])
email = mo_fields.Email(validators=[Required()])
note = wtforms.TextAreaField("Poznámka")
is_test = wtforms.BooleanField("Testovací účet")
allow_duplicate_name = wtforms.BooleanField("Přidat účet s duplicitním jménem")
allow_change_user_to_org = wtforms.BooleanField("Povolit převedení účastníka na organizátora")
submit = wtforms.SubmitField("Uložit")
@app.route('/org/org/<int:id>/edit', methods=("GET", "POST"), endpoint="org_org_edit")
@app.route('/org/user/<int:id>/edit', methods=("GET", "POST"))
def org_user_edit(id: int):
sess = db.get_session()
user = mo.users.user_by_uid(id)
if not user:
raise werkzeug.exceptions.NotFound()
is_org = request.endpoint == "org_org_edit"
if not is_org and (user.is_admin or user.is_org):
return redirect(url_for("org_org_edit", id=id))
if is_org and not (user.is_admin or user.is_org):
return redirect(url_for("org_user_edit", id=id))
rr = g.gatekeeper.rights_generic()
if not rr.can_edit_user(user):
raise werkzeug.exceptions.Forbidden()
form = UserEditForm(obj=user)
del form.allow_duplicate_name
del form.allow_change_user_to_org
if (user.is_org or user.is_admin) and not g.user.is_admin:
# emaily u organizátorů může editovat jen správce
del form.email
if form.validate_on_submit():
check = True
if hasattr(form, 'email') and form.email is not None:
other_user = mo.users.user_by_email(form.email.data)
if other_user is not None and other_user != user:
flash('Zadaný e-mail nelze použít, existuje jiný účet s tímto e-mailem.', 'danger')
check = False
if check:
form.populate_obj(user)
if sess.is_modified(user):
changes = db.get_object_changes(user)
app.logger.info(f"User {id} modified, changes: {changes}")
mo.util.log(
type=db.LogType.user,
what=id,
details={'action': 'edit', 'changes': changes},
)
sess.commit()
flash('Změny uživatele uloženy.', 'success')
else:
flash('Žádné změny k uložení.', 'info')
return redirect(url_for('org_user', id=id))
return render_template('org_user_edit.html', user=user, form=form, is_org=is_org)
@app.route('/org/org/new/', methods=('GET', 'POST'), endpoint="org_org_new")
@app.route('/org/user/new/', methods=('GET', 'POST'))
def org_user_new():
sess = db.get_session()
rr = g.gatekeeper.rights_generic()
is_org = request.endpoint == "org_org_new"
if is_org and not rr.have_right(Right.add_orgs):
raise werkzeug.exceptions.Forbidden()
elif not rr.have_right(Right.add_users):
raise werkzeug.exceptions.Forbidden()
form = UserEditForm()
form.submit.label.text = 'Vytvořit'
is_duplicate_name = False
allow_change_user_to_org_show_field = False
if form.validate_on_submit():
check = True
old_user = mo.users.user_by_email(form.email.data)
if old_user is not None:
if is_org and not old_user.is_org:
allow_change_user_to_org_show_field = True
if form.allow_change_user_to_org.data:
try:
mo.users.find_or_create_user(
email=form.email.data,
krestni=form.first_name.data,
prijmeni=form.last_name.data,
is_org=True,
reason="web",
allow_change_user_to_org=True)
except mo.CheckError as e:
flash(str(e), 'danger')
check = False
if check:
mo.db.get_session().commit()
return redirect(url_for('org_org', id=old_user.user_id))
if check:
flash('Účet s daným e-mailem již existuje. Převedení účastníka na organizátora můžete povolit ve formuláři.', 'danger')
check = False
if check:
flash('Účet s daným e-mailem již existuje.', 'danger')
check = False
if is_org:
if (mo.db.get_session().query(db.User)
.filter_by(first_name=form.first_name.data, last_name=form.last_name.data, is_org=True)
.first() is not None):
is_duplicate_name = True
if not form.allow_duplicate_name.data:
flash('Organizátor s daným jménem již existuje. V případě, že se nejedná o chybu, zaškrtněte políčko ve formuláři.', 'danger')
check = False
if check:
new_user = db.User()
form.populate_obj(new_user)
new_user.is_org = is_org
sess.add(new_user)
sess.flush()
app.logger.info(f"New user created: {db.row2dict(new_user)}")
mo.util.log(
type=db.LogType.user,
what=new_user.user_id,
details={'action': 'new', 'user': db.row2dict(new_user)},
)
token = mo.users.make_activation_token(new_user)
sess.commit()
flash('Nový uživatel vytvořen.', 'success')
if mo.email.send_new_account_email(new_user, token):
flash('E-mail s odkazem na aktivaci účtu odeslán na {}.'.format(new_user.email), 'success')
else:
flash('Problém při odesílání e-mailu s odkazem na aktivaci účtu.', 'danger')
if is_org:
return redirect(url_for('org_org', id=new_user.user_id))
return redirect(url_for('org_user', id=new_user.user_id))
if not is_duplicate_name and not form.allow_duplicate_name.data:
del form.allow_duplicate_name
if not (is_org and allow_change_user_to_org_show_field):
del form.allow_change_user_to_org
return render_template('org_user_new.html', form=form, is_org=is_org)
class ParticipantEditForm(FlaskForm):
school = mo_fields.School("Škola", validators=[Required()], render_kw={'autofocus': True})
grade = mo_fields.Grade("Třída", validators=[Required()])
birth_year = mo_fields.BirthYear("Rok narození", validators=[Required()])
submit = wtforms.SubmitField("Uložit")
@app.route('/org/user/<int:user_id>/participant/<int:year>/delete', methods=('POST',))
def org_user_participant_delete(user_id: int, year: int):
sess = db.get_session()
user = mo.users.user_by_uid(user_id)
if not user:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_generic()
if not rr.can_edit_user(user):
raise werkzeug.exceptions.Forbidden()
participant = sess.query(db.Participant).filter_by(user_id=user.user_id).filter_by(year=year).one_or_none()
if participant is None:
raise werkzeug.exceptions.NotFound()
if sess.query(db.Participation).filter_by(user_id=user.user_id).filter(db.Participation.contest.has(db.Contest.round.has(year=year))).count() != 0:
flash('Registraci není možná smazat, soutěžící se v daném ročníku účastní nějakého kola.', 'danger')
else:
sess.delete(participant)
app.logger.info(f"Participant id {user_id} year {year} deleted")
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'delete-participant', 'year': year, 'participant': db.row2dict(participant)},
)
sess.commit()
flash('Registrace smazána.', 'success')
return redirect(url_for('org_user', id=user_id))
@app.route('/org/user/<int:user_id>/participant/<int:year>/edit', methods=('GET', 'POST'))
def org_user_participant_edit(user_id: int, year: int):
sess = db.get_session()
user = mo.users.user_by_uid(user_id)
if not user:
raise werkzeug.exceptions.NotFound()
rr = g.gatekeeper.rights_generic()
if not rr.can_edit_user(user):
raise werkzeug.exceptions.Forbidden()
participant = sess.query(db.Participant).filter_by(user_id=user.user_id).filter_by(year=year).one_or_none()
if participant is None:
raise werkzeug.exceptions.NotFound()
form = ParticipantEditForm(obj=participant)
if form.validate_on_submit():
form.populate_obj(participant)
if sess.is_modified(participant):
changes = db.get_object_changes(participant)
app.logger.info(f"Participant id {user_id} year {year} modified, changes: {changes}")
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'edit-participant', 'year': year, 'changes': changes},
)
sess.commit()
flash('Změny registrace uloženy.', 'success')
else:
flash('Žádné změny k uložení.', 'info')
return redirect(url_for('org_user', id=user_id))
return render_template('org_user_participant_edit.html', user=user, year=year, form=form)
class GlobalOrgsImportForm(ImportForm):
allow_change_user_to_org = wtforms.BooleanField("Povolit převádění účastníků na organizátory")
default_place = mo_fields.Place("Výchozí oblast (není-li v souboru uvedena)")
default_cat = mo_fields.String("Výchozí kategorie (není-li v souboru uvedena)")
default_code = mo_fields.String("Výchozí kód kola (není-li v souboru uveden)")
only_this_year = wtforms.BooleanField("Omezit práva na aktuální ročník", default=True)
@app.route('/org/org/import', methods=('GET', 'POST'))
def org_orgs_import():
form = GlobalOrgsImportForm()
imp = None
if form.validate_on_submit():
imp = GlobalOrgsImport(
g.user,
default_place=form.default_place.place,
default_cat=form.default_cat.data,
default_code=form.default_code.data,
year=mo.config.CURRENT_YEAR if form.only_this_year.data else None
)
return generic_import_page(form, imp, url_for('org_orgs_import'), template='org_global_orgs_import.html')
class ConfirmDeleteForm(FlaskForm):
delete = SubmitField('Potvrdit smazání')
@app.route('/org/user/<int:user_id>/delete', methods=('GET', 'POST'))
def org_user_delete(user_id: int):
if not g.user.is_admin:
raise werkzeug.exceptions.Forbidden()
sess = db.get_session()
user = sess.query(db.User).get(user_id)
if not user:
raise werkzeug.exceptions.NotFound()
warnings = []
errors = []
pants = sess.query(db.Participant).filter_by(user=user).all()
for pant in pants:
warnings.append(f'Účastní se {pant.year}. ročníku')
pions = (sess.query(db.Participation)
.filter_by(user=user)
.options(joinedload(db.Participation.contest).joinedload(db.Contest.round))
.all())
for pion in pions:
warnings.append(f'Účastní se kola {pion.contest.round.round_code()}')
num_roles = sess.query(db.UserRole).filter_by(user=user).count()
if num_roles > 0:
warnings.append(f'Má přidělené role ({num_roles})')
sols = (sess.query(db.Solution)
.filter_by(user=user)
.options(joinedload(db.Solution.task).joinedload(db.Task.round))
.all())
for sol in sols:
errors.append(f'Odevzdal úlohu {sol.task.code} v kole {sol.task.round.round_code()}')
num_papers = sess.query(db.Paper).filter_by(for_user_obj=user).count()
if num_papers:
errors.append(f'Patří mu řešení/opravy ({num_papers})')
num_uploads = sess.query(db.Paper).filter_by(uploaded_by_obj=user).count()
if num_uploads:
errors.append(f'Nahrál řešení/opravy ({num_uploads})')
logs = sess.query(db.Log).filter_by(user=user).all()
num_good_logs = 0
num_bad_logs = 0
for log in logs:
if log.details.get('reason', "") == 'user-join':
num_good_logs += 1
else:
num_bad_logs += 1
if num_good_logs > 0:
warnings.append(f'Vlastní záznamy v logu z registrace ({num_good_logs})')
if num_bad_logs > 0:
errors.append(f'Vlastní záznamy v logu ({num_bad_logs})')
form = ConfirmDeleteForm()
if form.validate_on_submit() and not errors:
sess.rollback()
conn = sess.connection()
pant_table = db.Participation.__table__
pion_table = db.Participant.__table__
role_table = db.UserRole.__table__
log_table = db.Log.__table__
conn.execute(pant_table.delete().where(pant_table.c.user_id == user.user_id))
conn.execute(pion_table.delete().where(pion_table.c.user_id == user.user_id))
conn.execute(role_table.delete().where(role_table.c.user_id == user.user_id))
conn.execute(log_table.delete().where(log_table.c.changed_by == user.user_id))
sess.commit()
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'delete', 'user': db.row2dict(user)},
)
sess.delete(user)
sess.commit()
app.logger.info(f"Uživatel #{user.user_id} smazán")
flash('Uživatel smazán.', 'danger')
return redirect(url_for('org_orgs') if user.is_org else url_for('org_users'))
return render_template('org_user_delete.html', user=user, form=form, warnings=warnings, errors=errors)