Select Git revision
-
Martin Mareš authoredMartin Mareš authored
owl.py 55.02 KiB
from flask import Flask, render_template, request, make_response, g, request_tearing_down, redirect, url_for, session, send_file
from flask.helpers import flash
import psycopg2
import psycopg2.extras
import sys
import os
from cas import CASClient
import logging
from flask_wtf import FlaskForm
import flask_wtf.file
import wtforms
import wtforms.validators as validators
import wtforms.fields.html5 as wtforms_html5
import secrets
import re
from dataclasses import dataclass
from decimal import Decimal
import datetime
import dateutil.tz
from http import HTTPStatus
import click
import email.message
import email.headerregistry
import textwrap
import subprocess
from json.encoder import JSONEncoder
from collections import defaultdict
import urllib.parse
from html import escape
from markupsafe import Markup
import csv
import io
### Flask app object ###
app = Flask(__name__)
app.config.from_pyfile('config.py')
app.jinja_options['extensions'].append('jinja2.ext.do')
app.jinja_env.lstrip_blocks = True
app.jinja_env.trim_blocks = True
app.logger.setLevel(logging.DEBUG)
def set_g_now():
if not hasattr(g, 'now'):
g.now = datetime.datetime.now(tz=dateutil.tz.UTC)
def filter_timeformat(dt):
return dt.astimezone().strftime("%Y-%m-%d %H:%M")
def filter_reltimeformat(dt):
if dt is None:
return ""
dt = dt.astimezone()
absolute = dt.strftime("%Y-%m-%d %H:%M")
set_g_now()
def unit(x, u):
if x > 1:
return f"{x} {u}s"
else:
return f"{x} {u}"
rel = abs(dt - g.now)
sec = int(rel.total_seconds())
if sec < 5:
return absolute + " (about now)"
if sec >= 2*86400:
out = unit(sec // 86400, 'day')
elif sec >= 2*3600:
out = unit(sec // 3600, 'hour')
elif sec >= 2*60:
out = unit(sec // 60, 'minute')
else:
out = unit(sec, 'second')
if dt > g.now:
out = "in " + out
else:
out = out + " ago"
return f"{absolute} ({out})"
def filter_mailto(email, text=None):
safe_email = urllib.parse.quote(email, safe='@')
return Markup(f'<a href="mailto:{escape(safe_email)}">{escape(text if text is not None else email)}</a>')
app.jinja_env.filters['timeformat'] = filter_timeformat
app.jinja_env.filters['reltimeformat'] = filter_reltimeformat
app.jinja_env.filters['mailto'] = filter_mailto
### Database connection ###
db_connection = None
db = None
def db_connect():
global db_connection, db
db_connection = psycopg2.connect(
host = app.config['DB_HOST'],
user = app.config['DB_USER'],
password = app.config['DB_PASSWD'],
dbname = app.config['DB_NAME'],
)
db = db_connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
def db_query(query, args=()):
if db is None:
db_connect()
try:
db.execute(query, args)
except psycopg2.DatabaseError:
# Reconnect if the connection died (timeout, server restart etc.)
db_connect()
db.execute(query, args)
def db_reset_signal(sender, **extra):
# At the end of every request, we have to close the implicitly opened
# transaction. Otherwise we end up with serving stale data.
if db_connection is not None:
try:
db_connection.rollback()
except:
pass
request_tearing_down.connect(db_reset_signal, app)
### CAS ###
cas_client = CASClient(
version = 'CAS_2_SAML_1_0',
server_url = 'https://cas.cuni.cz/cas/'
)
def have_session_p():
if 'uid' not in session:
return False
g.uid = session['uid']
g.name = session['name']
g.is_admin = ('admin' in session)
g.inline_att = session.get('inline', False)
return True
def login_redirect(next=None, current=False):
if current:
next = request.script_root + request.path
return redirect(url_for('login', next=next or url_for('index')))
### Main ###
@app.route('/')
def index():
if not have_session_p():
return login_redirect()
db_query("""
SELECT *
FROM owl_semesters
ORDER BY rank DESC
""")
semesters = db.fetchall()
db_query("""
SELECT *
FROM owl_courses
JOIN owl_enroll USING(cid)
WHERE uid=%s
ORDER BY name
""", (g.uid,))
course_list = db.fetchall()
courses = {s.semid: [] for s in semesters}
sem_teacher = {s.semid: False for s in semesters}
for c in course_list:
courses[c.semid].append(c)
if c.is_teacher:
sem_teacher[c.semid] = True
return render_template('main.html',
semesters=semesters,
courses=courses,
sem_teacher=sem_teacher,
)
@app.route('/doc/manual')
def manual():
return render_template('manual.html')
class EnrollTokenForm(FlaskForm):
token = wtforms.StringField("Token", validators=[validators.DataRequired()])
@app.route('/join/', methods=('GET', 'POST'))
def enroll():
if not have_session_p():
return login_redirect(current=True)
form = TokenLoginForm()
if not form.validate_on_submit():
return render_template('join.html', form=form, error=None)
token = form.token.data
db_query("SELECT c.*, s.ident AS sident FROM owl_courses c JOIN owl_semesters s USING(semid) WHERE enroll_token=%s", (token,))
course = db.fetchone()
if not course:
app.logger.info('Invalid enroll token for uid=%d', g.uid)
return render_template('join.html', form=form, error='Unrecognized token')
app.logger.info('Enrolling uid=%d to cid=%d', g.uid, course.cid)
db_query("""
INSERT INTO owl_enroll(uid, cid)
VALUES (%s, %s)
ON CONFLICT DO NOTHING
""",
(g.uid, course.cid))
db_connection.commit()
return redirect(url_for('course_index', sident=course.sident, cident=course.ident))
def course_init(sident, cident):
if not have_session_p():
return login_redirect(current=True)
db_query("""
SELECT c.*, s.ident AS sident
FROM owl_courses c
JOIN owl_semesters s USING(semid)
WHERE s.ident=%s AND c.ident=%s
""",
(sident, cident))
g.course = db.fetchone()
if not g.course:
return "No such course", HTTPStatus.NOT_FOUND
if g.is_admin:
g.is_teacher = True
return None
db_query("SELECT * FROM owl_enroll WHERE uid=%s AND cid=%s", (g.uid, g.course.cid))
enroll = db.fetchone()
if not enroll:
return "Not enrolled in this course", HTTPStatus.FORBIDDEN
g.is_teacher = enroll.is_teacher
return None
def topic_init(sident, cident, tident):
err = course_init(sident, cident)
if err:
return err
db_query("SELECT * FROM owl_topics WHERE cid=%s AND ident=%s", (g.course.cid, tident))
g.topic = db.fetchone()
if not g.topic:
return "No such topic", HTTPStatus.NOT_FOUND
if g.topic.type not in "DTA":
return "Bad topic type", HTTPStatus.NOT_FOUND
if not g.topic.public and not g.is_teacher and not g.is_admin:
return "This is a private topic", HTTPStatus.FORBIDDEN
if g.course.student_grading:
db_query("SELECT * FROM owl_student_graders WHERE tid=%s AND uid=%s", (g.topic.tid, g.uid));
g.is_grader = g.is_teacher or bool(db.fetchone())
else:
g.is_grader = g.is_teacher
return None
def must_be_teacher():
if not g.is_teacher and not g.is_admin:
return "You must be a teacher of this course", HTTPStatus.FORBIDDEN
return None
def must_be_admin():
if not g.is_admin:
return "You must be an administrator", HTTPStatus.FORBIDDEN
return None
@app.route('/c/<sident>/<cident>/')
def course_index(sident, cident):
err = course_init(sident, cident)
if err:
return err
if g.is_teacher:
target_uid = -1
else:
target_uid = g.uid
if g.course.student_grading:
sql_is_grader = "EXISTS (SELECT * FROM owl_student_graders x WHERE x.tid=t.tid AND x.uid=%s)"
sql_is_grader_params = [g.uid]
else:
sql_is_grader = "FALSE"
sql_is_grader_params = []
db_query(f"""
SELECT t.cid, t.tid, t.ident, t.title, t.type, t.deadline, t.max_points,
s.seen AS last_seen,
(SELECT MAX(x.created)
FROM owl_posts x
WHERE x.tid = t.tid
AND (x.target_uid=-1 OR x.target_uid = %s)
) AS last_posted,
(SELECT x.points
FROM owl_posts x
WHERE x.tid = t.tid
AND (x.target_uid=-1 OR x.target_uid = %s)
AND x.points IS NOT NULL
ORDER BY x.created DESC
LIMIT 1
) AS points,
{sql_is_grader} AS is_grader
FROM owl_topics t
LEFT JOIN owl_seen s ON s.tid = t.tid
AND s.observer_uid = %s
AND (s.target_uid=%s OR t.type='D' AND s.target_uid=-1)
WHERE t.cid = %s
AND t.public = true
ORDER by rank, created
""", [g.uid, g.uid] + sql_is_grader_params + [g.uid, target_uid, g.course.cid])
topics = db.fetchall()
course_total = Decimal('0.00')
course_max = Decimal('0.00')
is_grader = False
for t in topics:
if t.points is not None:
course_total += t.points
if t.max_points is not None:
course_max += t.max_points
if t.is_grader:
is_grader = True
return render_template(
'course.html',
topics=topics,
course_total=course_total,
course_max=course_max,
is_grader=is_grader)
### Posts ###
def validate_attachment(form, field):
f = field.data
if not f:
return
if f.content_length > 4*2**20:
raise wtforms.ValidationError("Attachment too large (4MB maximum)")
if is_pdf(f):
g.att_type = 'pdf'
elif is_utf8(f):
g.att_type = 'txt'
else:
raise wtforms.ValidationError("Must be a PDF or plain text file")
f.seek(0)
def is_pdf(f):
f.seek(0)
header = f.read(9)
return len(header) == 9 and header[:7] == b'%PDF-1.'
def is_utf8(f):
# FIXME: This is inefficient
f.seek(0)
x = f.read()
try:
x.decode(encoding='utf-8', errors='strict')
except:
return False
return True
class TopicPostForm(FlaskForm):
comment = wtforms.TextAreaField("Comment:", validators=[validators.Length(max=65536)])
attachment = flask_wtf.file.FileField("Attachment (PDF or UTF-8 text):", validators=[validate_attachment])
points = wtforms.DecimalField("Points:", validators=[validators.Optional()])
submit = wtforms.SubmitField("Submit")
ack_time = wtforms.HiddenField()
@app.route('/c/<sident>/<cident>/<tident>/', methods=('GET', 'POST'))
@app.route('/c/<sident>/<cident>/<tident>/<int:student_uid>/', methods=('GET', 'POST'))
def topic_index(sident, cident, tident, student_uid=None):
err = topic_init(sident, cident, tident)
if err:
return err
if student_uid is not None:
if not g.is_grader:
return "Only graders are allowed to do that", HTTPStatus.FORBIDDEN
else:
if g.topic.type == 'D':
# In discussion threads, all posts are global
student_uid = -1
elif g.is_teacher:
# Teachers view and post only global items at their page
student_uid = -1
else:
student_uid = g.uid
form = TopicPostForm()
if request.method == 'POST':
if form.validate_on_submit():
return topic_post(form, student_uid)
else:
flash('Post not submitted, see below for errors.', 'error')
db_query("""
SELECT p.pid, p.target_uid, p.author_uid, u.full_name AS author_name, p.created, p.modified, p.comment, p.attachment, p.points
FROM owl_posts p
LEFT JOIN owl_users u ON u.uid = p.author_uid
WHERE tid=%s
AND (target_uid=%s OR target_uid=-1)
ORDER by created
""", (g.topic.tid, student_uid))
posts = db.fetchall()
seen = None
db_query("""
SELECT seen
FROM owl_seen
WHERE observer_uid=%s
AND target_uid IS NOT DISTINCT FROM %s
AND tid=%s
""", (g.uid, student_uid, g.topic.tid))
seen_row = db.fetchone()
if seen_row:
seen = seen_row.seen
if posts and (seen == None or seen < posts[-1].created):
form.ack_time.data = posts[-1].created
return render_template('topic.html', posts=posts, form=form, student_uid=student_uid, seen=seen)
def strip_comment(c):
c = c.replace("\r\n", "\n")
c = c.strip()
if c != "":
return c
else:
return None
def topic_post(form, student_uid):
comment = None
if form.comment.data:
comment = strip_comment(form.comment.data)
attach = None
if form.attachment.data:
f = form.attachment.data
attach = secrets.token_hex(16) + "." + g.att_type # Type filled in by form field validator
f.save(os.path.join(app.instance_path, "files", attach))
app.logger.info("Uploaded file: attach=%s by_uid=%d", attach, g.uid)
points = None
if g.is_grader and form.points.data is not None:
points = form.points.data
if comment or attach or points is not None:
db_query("""
INSERT INTO owl_posts
(tid, target_uid, author_uid, comment, attachment, points)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING *
""",
(g.topic.tid, student_uid, g.uid, comment, attach, points))
new_post = db.fetchone()
assert new_post
app.logger.info(f"New post: pid={new_post.pid} tid={new_post.tid} author_uid={new_post.author_uid} target_uid={new_post.target_uid} points={points} attach={attach}")
db_query("INSERT INTO owl_notify(pid) VALUES(%s) ON CONFLICT DO NOTHING", (new_post.pid,))
else:
new_post = None
if new_post:
ack_time = new_post.created
elif form.ack_time.data:
ack_time = form.ack_time.data
else:
ack_time = None
if ack_time:
db_query("""
INSERT INTO owl_seen
(observer_uid, target_uid, tid)
VALUES (%s, %s, %s)
ON CONFLICT (observer_uid, target_uid, tid) DO UPDATE SET seen = %s
""", (g.uid, student_uid, g.topic.tid, ack_time))
db_connection.commit()
if new_post:
wake_up_mule()
return redirect(url_for('topic_index',
sident=g.course.sident,
cident=g.course.ident,
tident=g.topic.ident,
student_uid=(student_uid if student_uid != g.uid and student_uid >= 0 else None)
) + "#p" + str(new_post.pid))
elif g.is_teacher:
return redirect(url_for('teacher', sident=g.course.sident, cident=g.course.ident))
elif g.is_grader:
return redirect(url_for('topic_student_grade', sident=g.course.sident, cident=g.course.ident, tident=g.topic.ident))
else:
return redirect(url_for('course_index', sident=g.course.sident, cident=g.course.ident))
class EditPostForm(FlaskForm):
comment = wtforms.TextAreaField("Comment:", validators=[validators.Length(max=65536)])
points = wtforms.DecimalField("Points:", validators=[validators.Optional()])
submit = wtforms.SubmitField("Submit")
silent_submit = wtforms.SubmitField("Silent submit")
delete = wtforms.SubmitField("☠ Delete post ☠")
def edit_allowed_p(post):
# We must check that the post belongs to the selected course (for which g.is_teacher is valid)
if g.topic.cid != g.course.cid:
return False
return (g.is_admin or
g.is_teacher or
post.author_uid == g.uid and g.topic.public)
@app.route('/c/<sident>/<cident>/<tident>/grade')
def topic_student_grade(sident=None, cident=None, tident=None):
err = topic_init(sident, cident, tident)
if err:
return err
if not g.is_grader:
return "Only graders can grade", HTTPStatus.FORBIDDEN
db_query("""
SELECT s.uid, s.full_name
FROM owl_enroll e
JOIN owl_users s ON s.uid = e.uid
WHERE e.cid = %s
AND e.is_teacher = false
ORDER BY s.full_name
""", (g.course.cid,))
students = {}
for s in db.fetchall():
students[s.uid] = s
solutions = {}
for uid, s in students.items():
solutions[uid] = TSolution(
points=None,
last_activity=None,
last_seen_by_me=None,
last_seen_by_teacher=None,
)
db_query("""
SELECT p.target_uid AS target_uid, p.created AS post_created, p.points
FROM owl_posts p
JOIN owl_users s ON s.uid = p.target_uid
WHERE p.tid = %s
AND p.target_uid >= 0
ORDER BY p.target_uid, p.created, p.pid
""", (g.topic.tid,))
for p in db.fetchall():
s = solutions[p.target_uid]
if p.points is not None:
s.points = p.points
s.last_activity = p.post_created
db_query("""
SELECT s.target_uid, s.seen
FROM owl_seen s
WHERE s.tid = %s
AND s.observer_uid = %s
""", (g.topic.tid, g.uid))
for t in db.fetchall():
if t.target_uid in solutions:
s = solutions[t.target_uid]
s.last_seen_by_me = t.seen
return render_template('topic-grade.html', students=students, solutions=solutions)
@app.route('/post/<sident>/<cident>/<int:pid>/', methods=('GET', 'POST'))
def edit_post(sident, cident, pid):
err = course_init(sident, cident)
if err:
return err
db_query("SELECT * FROM owl_posts WHERE pid=%s FOR UPDATE", (pid,))
post = db.fetchone()
if not post:
return "Not allowed to edit this post", HTTPStatus.FORBIDDEN
db_query("SELECT * FROM owl_topics WHERE tid=%s", (post.tid,))
g.topic = db.fetchone()
assert g.topic
if g.course.student_grading:
db_query("SELECT * FROM owl_student_graders WHERE tid=%s AND uid=%s", (g.topic.tid, g.uid));
g.is_grader = g.is_teacher or bool(db.fetchone())
else:
g.is_grader = g.is_teacher
if not edit_allowed_p(post):
return "Not allowed to edit this post", HTTPStatus.FORBIDDEN
if request.method == 'POST':
form = EditPostForm()
if not form.validate_on_submit():
flash('Post not submitted, see below for errors.', 'error')
return render_template('edit-post.html', form=form)
if g.is_grader and post.target_uid >= 0:
student_uid = post.target_uid
else:
student_uid = None
return_to = url_for('topic_index', sident=g.course.sident, cident=g.course.ident, tident=g.topic.ident, student_uid=student_uid)
if form.delete.data:
if g.is_teacher:
app.logger.info(f"Deleted post: pid={pid} tid={post.tid} author_uid={post.author_uid} target_uid={post.target_uid} by_uid={g.uid} public={g.topic.public}")
db_query("DELETE FROM owl_posts WHERE pid=%s", (pid,))
db_connection.commit()
flash('Post deleted.', 'info')
return redirect(return_to)
cmt = "*Post deleted by its author.*"
att = None
points = None
else:
cmt = strip_comment(form.comment.data)
att = post.attachment
points = form.points.data
changed = False
if cmt != post.comment or att != post.attachment:
db_query("UPDATE owl_posts SET comment=%s, attachment=%s WHERE pid=%s", (cmt, att, pid))
changed = True
if g.is_grader and points != post.points:
db_query("UPDATE owl_posts SET points=%s WHERE pid=%s", (points, pid))
changed = True
if changed:
app.logger.info(f"Edited post: pid={pid} tid={post.tid} author_uid={post.author_uid} target_uid={post.target_uid} by_uid={g.uid} public={g.topic.public}")
if g.topic.public:
db_query("UPDATE owl_posts SET modified=CURRENT_TIMESTAMP WHERE pid=%s", (pid,))
else:
db_query("UPDATE owl_posts SET created=CURRENT_TIMESTAMP, modified=NULL WHERE pid=%s", (pid,))
if not form.silent_submit.data:
db_query("INSERT INTO owl_notify(pid) VALUES(%s) ON CONFLICT DO NOTHING", (pid,))
flash('Post changed.', 'info')
db_connection.commit()
if changed:
wake_up_mule()
return redirect(return_to)
form = EditPostForm(obj=post)
return render_template('edit-post.html', form=form)
### Teacher's overview ###
@dataclass
class TSolution:
points: Decimal
last_activity: datetime.datetime
last_seen_by_me: datetime.datetime
last_seen_by_teacher: datetime.datetime
@app.route('/teacher/c/<sident>/<cident>/')
@app.route('/teacher/s/<sident>/')
def teacher(sident, cident=None):
if not have_session_p():
return login_redirect(current=True)
db_query("CREATE TEMPORARY TABLE tmp_cids (cid int) ON COMMIT DROP");
if cident is not None:
err = course_init(sident, cident)
if err:
return err
db_query("INSERT INTO tmp_cids VALUES(%s)", (g.course.cid,))
else:
db_query("SELECT * FROM owl_semesters WHERE ident=%s", (sident,))
semester = db.fetchone()
if not semester:
return "No such semester", HTTPStatus.NOT_FOUND
db_query("""
INSERT INTO tmp_cids
SELECT cid
FROM owl_enroll
JOIN owl_courses c USING(cid)
WHERE uid=%s
AND is_teacher=true
AND c.semid=%s
""", (g.uid, semester.semid))
db_query("""
SELECT c.*, s.ident AS sident
FROM owl_courses c
JOIN owl_semesters s USING(semid)
WHERE cid IN (SELECT cid FROM tmp_cids)
ORDER BY c.ident
""")
courses = {}
course_totals = {}
topics = {}
topics_followed_by_header = {}
last_tid = {}
for c in db.fetchall():
courses[c.cid] = c
course_totals[c.cid] = 0
topics[c.cid] = {}
topics_followed_by_header[c.cid] = {}
last_tid[c.cid] = None
db_query("""
SELECT *
FROM owl_topics
WHERE cid IN (SELECT cid FROM tmp_cids)
AND type IN ('H', 'T', 'A')
ORDER BY cid, rank
""")
for t in db.fetchall():
if t.type in "TA":
topics[t.cid][t.tid] = t
if t.max_points is not None:
course_totals[t.cid] += t.max_points
last_tid[t.cid] = t.tid
elif t.type == 'H':
if last_tid[t.cid] is not None:
topics_followed_by_header[t.cid][last_tid[t.cid]] = True
for cid, tid in last_tid.items():
topics_followed_by_header[cid][tid] = True
db_query("""
SELECT c.cid, s.uid, s.full_name, s.email
FROM owl_courses c
JOIN owl_enroll e ON e.cid = c.cid
JOIN owl_users s ON s.uid = e.uid
WHERE c.cid IN (SELECT cid FROM tmp_cids)
AND e.is_teacher = false
ORDER BY c.ident, s.full_name
""")
students = { cid: {} for cid in courses.keys() }
for s in db.fetchall():
students[s.cid][s.uid] = s
solutions = {}
for c in courses.values():
cid = c.cid
solutions[cid] = {}
for s in students[cid].values():
uid = s.uid
solutions[cid][uid] = {}
for t in topics[cid].values():
solutions[cid][uid][t.tid] = TSolution(
points=None,
last_activity=None,
last_seen_by_me=None,
last_seen_by_teacher=None,
)
db_query("""
SELECT c.cid, t.tid, p.author_uid, p.target_uid AS target_uid, p.created AS post_created, p.points
FROM owl_courses c
JOIN owl_topics t ON t.cid = c.cid
JOIN owl_posts p ON p.tid = t.tid
JOIN owl_users s ON s.uid = p.target_uid
WHERE c.cid IN (SELECT cid FROM tmp_cids)
AND p.target_uid >= 0
ORDER BY c.ident, t.ident, p.target_uid, p.created, p.pid
""")
for p in db.fetchall():
s = solutions[p.cid][p.target_uid][p.tid]
if p.points is not None:
s.points = p.points
s.last_activity = p.post_created
totals = {}
for c in solutions.keys():
totals[c] = {}
for u in solutions[c].keys():
totals[c][u] = 0
for s in solutions[c][u].values():
if s.points is not None:
totals[c][u] += s.points
db_query("""
SELECT c.cid, t.tid, s.target_uid, s.seen
FROM owl_courses c
JOIN owl_topics t ON t.cid = c.cid
JOIN owl_seen s ON s.tid = t.tid
WHERE c.cid IN (SELECT cid FROM tmp_cids)
AND s.observer_uid = %s
""", (g.uid,))
for t in db.fetchall():
try:
s = solutions[t.cid][t.target_uid][t.tid]
s.last_seen_by_me = t.seen
except KeyError:
pass
db_query("""
SELECT c.cid, t.tid, s.target_uid, s.seen
FROM owl_courses c
JOIN owl_topics t ON t.cid = c.cid
JOIN owl_seen s ON s.tid = t.tid
WHERE c.cid IN (SELECT cid FROM tmp_cids)
AND s.observer_uid IN (SELECT uid FROM owl_enroll WHERE cid=c.cid AND is_teacher=true)
AND s.observer_uid <> %s
""", (g.uid,))
for t in db.fetchall():
try:
s = solutions[t.cid][t.target_uid][t.tid]
s.last_seen_by_teacher = t.seen
except KeyError:
pass
# app.logger.debug(courses)
# app.logger.debug(topics)
# app.logger.debug(students)
# app.logger.debug(solutions)
# app.logger.debug(totals)
set_g_now()
return render_template(
'teacher.html',
courses=courses,
topics=topics,
topics_followed_by_header=topics_followed_by_header,
students=students,
solutions=solutions,
course_totals=course_totals,
totals=totals)
### Serving files ###
ext_to_mime_type = {
'pdf': 'application/pdf',
'txt': 'text/plain; charset=utf-8',
}
@app.route('/files/<name>')
def serve_file(name):
m = re.fullmatch(r'([0-9a-zA-Z]+)\.([0-9a-zA-Z]+)', name)
if m:
return send_file(os.path.join(app.instance_path, "files", name),
mimetype = ext_to_mime_type[m.group(2)],
attachment_filename = name)
else:
return "No such file", HTTPStatus.NOT_FOUND
### Exports ###
@app.route('/teacher/c/<sident>/<cident>/results.json')
def results_json(sident, cident):
err = course_init(sident, cident) or must_be_teacher()
if err:
return err
return export_points('json')
@app.route('/teacher/c/<sident>/<cident>/results.csv')
def results_csv(sident, cident):
err = course_init(sident, cident) or must_be_teacher()
if err:
return err
return export_points('csv')
def export_points(format):
db_query("""
SELECT u.uid, u.ukco, u.full_name
FROM owl_users u
JOIN owl_enroll e USING(uid)
WHERE e.cid = %s
AND e.is_teacher = false
ORDER BY uid
""", (g.course.cid,))
students = db.fetchall()
db_query("""
SELECT *
FROM owl_topics
WHERE cid=%s
AND type IN ('A', 'T')
ORDER BY rank, tid
""", (g.course.cid,))
topics = db.fetchall()
db_query("""
SELECT t.ident AS tident, p.target_uid, p.points
FROM owl_topics t
JOIN owl_posts p USING(tid)
WHERE t.cid = %s
AND p.points IS NOT NULL
ORDER BY p.created
""", (g.course.cid,))
awards = db.fetchall()
points = { s.uid: {} for s in students }
for a in awards:
if a.target_uid == -1:
for uid in points.keys():
points[uid][a.tident] = float(a.points)
elif a.target_uid in points:
points[a.target_uid][a.tident] = float(a.points)
if format == "json":
out = []
for s in students:
out.append({
"uid": s.uid,
"ukco": s.ukco,
"name": s.full_name,
"tasks": points[s.uid],
"points": sum(points[s.uid].values()),
})
enc = JSONEncoder(ensure_ascii=False, sort_keys=True, indent=4)
resp = make_response(enc.encode(out))
resp.mimetype = 'application/json'
return resp
elif format == "csv":
out = io.StringIO()
writer = csv.writer(out, delimiter=',', quoting=csv.QUOTE_MINIMAL)
writer.writerow(['uid', 'ukco', 'name', 'points'] + [t.ident for t in topics])
for s in students:
writer.writerow([
s.uid,
s.ukco,
s.full_name,
sum(points[s.uid].values()),
] + [points[s.uid].get(t.ident, "") for t in topics])
resp = make_response(out.getvalue())
resp.mimetype = 'text/csv'
resp.mimetype_params['charset'] = 'utf-8'
return resp
else:
raise NotImplementedError()
### User settings ###
class SettingsForm(FlaskForm):
notify = wtforms.fields.BooleanField()
notify_self = wtforms.fields.BooleanField()
inline_att = wtforms.fields.BooleanField()
submit = wtforms.SubmitField("Submit")
@app.route('/settings/', methods=('GET', 'POST'))
def user_settings():
if not have_session_p():
return login_redirect(current=True)
if request.method == 'POST':
form = SettingsForm()
if not form.validate_on_submit():
return render_template('settings.html', form=form)
app.logger.info(f"Setting profile for uid={g.uid}")
db_query("UPDATE owl_users SET notify=%s, notify_self=%s, inline_att=%s WHERE uid=%s", (form.notify.data, form.notify_self.data, form.inline_att.data, g.uid))
db_connection.commit()
# Remember to update data cached in the session
session['inline'] = form.inline_att.data
flash('Settings saved.', 'info')
return redirect(url_for('index'))
else:
form = SettingsForm()
db_query('SELECT * FROM owl_users where uid=%s', (g.uid,))
user = db.fetchone()
form.notify.data = user.notify
form.notify_self.data = user.notify_self
form.inline_att.data = user.inline_att
return render_template('settings.html', form=form, user=user)
### Administration ###
@app.route('/admin/topics/<sident>/<cident>/')
def admin_topics(sident, cident):
err = course_init(sident, cident) or must_be_teacher()
if err:
return err
db_query("""
SELECT *
FROM owl_topics
WHERE cid = %s
ORDER by rank, created
""", (g.course.cid,))
topics = db.fetchall()
return render_template('admin-topics.html', topics=topics)
class AssignGraderForm(FlaskForm):
uid = wtforms.HiddenField()
assign = wtforms.SubmitField()
unassign = wtforms.SubmitField()
def validate(self):
if self.assign.data and self.unassign.data:
return False
return True
@app.route('/admin/topics/<sident>/<cident>/graders/<tid>', methods=('GET', 'POST'))
def admin_topic_graders(sident, cident, tid):
err = course_init(sident, cident) or must_be_teacher()
if err:
return err
if not g.course.student_grading:
return "Course does not allow student grading", HTTPStatus.FORBIDDEN
if tid is None:
return "Topic needed", HTTPStatus.NOT_FOUND
db_query('SELECT * FROM owl_topics WHERE cid=%s AND tid=%s', (g.course.cid, tid))
topic = db.fetchone()
if not topic:
return "No such topic", HTTPStatus.NOT_FOUND
if request.method == 'POST':
form = AssignGraderForm()
db_query('SELECT * FROM owl_enroll WHERE uid=%s AND cid=%s AND is_teacher=FALSE', (form.uid.data, g.course.cid))
if not db.fetchone():
return "Given uid is not a student of this course", HTTPStatus.BAD_REQUEST
db_query('SELECT * FROM owl_student_graders WHERE tid=%s AND uid=%s', (topic.tid, form.uid.data))
is_grader = bool(db.fetchone())
if form.assign.data:
if is_grader:
return "This student is already assigned", HTTPStatus.BAD_REQUEST
db_query('INSERT INTO owl_student_graders (tid, uid) VALUES (%s, %s)', (topic.tid, form.uid.data))
else:
if not is_grader:
return "This student is already not assigned", HTTPStatus.BAD_REQUEST
db_query('DELETE FROM owl_student_graders WHERE tid=%s AND uid=%s', (topic.tid, form.uid.data))
db_connection.commit()
return redirect(url_for('admin_topic_graders', sident=sident, cident=cident, tid=tid))
else:
db_query("""
SELECT s.uid, s.full_name,
EXISTS (SELECT * FROM owl_student_graders x WHERE x.tid=%s AND x.uid=s.uid) AS is_grader
FROM owl_enroll e
JOIN owl_users s ON s.uid = e.uid
WHERE e.cid = %s
AND e.is_teacher = false
ORDER BY s.full_name
""", (topic.tid, g.course.cid))
students = [(s, AssignGraderForm(uid=s.uid)) for s in db.fetchall()]
s_unassigned = [s for s in students if not s[0].is_grader]
s_assigned = [s for s in students if s[0].is_grader]
return render_template('admin-topic-graders.html', s_unassigned=s_unassigned, s_assigned=s_assigned, topic=topic)
class EditTopicForm(FlaskForm):
rank = wtforms_html5.IntegerField("Rank:", validators=[validators.required()])
ident = wtforms.StringField("Name:")
title = wtforms.StringField("Title:", validators=[validators.required()])
type = wtforms.SelectField("Type:", choices=[('H', 'heading'), ('T', 'task'), ('A', 'award'), ('D', 'discussion')], validators=[validators.required()])
deadline = wtforms_html5.DateTimeLocalField("Deadline:", format='%Y-%m-%dT%H:%M', validators=[validators.Optional()])
max_points = wtforms.DecimalField("Points:", validators=[validators.Optional()])
public = wtforms.BooleanField("Public:")
notify = wtforms.BooleanField("Notify now:")
submit = wtforms.SubmitField("Save")
delete = wtforms.SubmitField("☠ Delete topic ☠")
def validate(self):
ret = FlaskForm.validate(self)
if not ret:
return False
if self.type.data != 'H' and self.ident.data == "":
self.ident.errors.append('This type requires a name')
return False
if self.type.data == 'H' and self.ident.data != "":
self.ident.errors.append('Headings do not have a name')
return False
return True
@app.route('/admin/topics/<sident>/<cident>/edit/<int:tid>', methods=('GET', 'POST'))
@app.route('/admin/topics/<sident>/<cident>/new', methods=('GET', 'POST'))
@app.route('/admin/topics/<sident>/<cident>/copy/<int:copy_tid>', methods=('GET', 'POST'))
def admin_edit_topic(sident, cident, tid=None, copy_tid=None):
err = course_init(sident, cident) or must_be_teacher()
if err:
return err
# XXX: We have not checked that the tid belongs to the specified course
# where the current user has edit rights. So we must check it in DB operations.
if request.method == 'POST':
form = EditTopicForm()
if not form.validate_on_submit():
return render_template('admin-edit-topic.html', form=form, tid=None, copy_tid=None, post_count=None)
if form.delete.data and tid is not None:
app.logger.info(f"Deleted topic: tid={tid} by_uid={g.uid}")
db_query("DELETE FROM owl_topics WHERE tid=%s", (tid,))
db_connection.commit()
flash('Topic deleted.', 'info')
return redirect(url_for('admin_topics', sident=sident, cident=cident))
if tid is None:
db_query("INSERT INTO owl_topics(cid, type) VALUES(%s, 'H') RETURNING tid", (g.course.cid,))
row = db.fetchone()
assert row
tid = row.tid
app.logger.info(f"Created topic: tid={tid} by_uid={g.uid}")
deadline = form.deadline.data
if deadline is not None:
deadline = deadline.astimezone(tz=dateutil.tz.UTC)
app.logger.info(f"Edited topic: tid={tid} by_uid={g.uid}")
db_query("""
UPDATE owl_topics
SET ident=%s,
rank=%s,
public=%s,
title=%s,
deadline=%s,
type=%s,
max_points=%s
WHERE cid=%s AND tid=%s
""", (
form.ident.data if form.ident.data != "" else None,
form.rank.data,
form.public.data,
form.title.data,
deadline,
form.type.data,
form.max_points.data,
g.course.cid,
tid,
)
)
db_connection.commit()
if form.notify.data:
db_query("""
INSERT INTO owl_notify
(SELECT pid FROM owl_posts WHERE tid=%s AND target_uid=-1)
ON CONFLICT DO NOTHING
""", (tid,))
db_connection.commit()
wake_up_mule()
flash('Topic saved.', 'info')
return redirect(url_for('admin_topics', sident=sident, cident=cident))
else:
post_count = 0
if tid is None and copy_tid is None:
form = EditTopicForm()
form.rank.data = 999999;
form.type.data = 'T'
else:
db_query('SELECT * FROM owl_topics WHERE cid=%s AND tid=%s', (g.course.cid, tid or copy_tid))
topic = db.fetchone()
if not topic:
return "No such topic", HTTPStatus.NOT_FOUND
if tid is not None:
db_query('SELECT COUNT(*) AS cnt FROM owl_posts WHERE tid=%s', (tid,))
count_row = db.fetchone()
assert count_row
post_count = count_row.cnt
form = EditTopicForm(obj=topic)
if topic.deadline:
form.deadline.data = topic.deadline.replace(tzinfo=dateutil.tz.UTC).astimezone()
return render_template('admin-edit-topic.html', form=form, tid=tid, copy_tid=copy_tid, post_count=post_count)
@app.route('/admin/course/<sident>/<cident>/')
def admin_course(sident, cident):
err = course_init(sident, cident) or must_be_teacher()
if err:
return err
db_query("""
SELECT u.full_name
FROM owl_enroll e
JOIN owl_users u USING(uid)
WHERE e.cid=%s
AND is_teacher = true
""", (g.course.cid,))
teachers = db.fetchall()
return render_template('admin-course.html', teachers=[t.full_name for t in teachers])
@app.route('/admin/courses')
def admin_courses():
if not have_session_p():
return login_redirect(current=True)
err = must_be_admin()
if err:
return err
db_query("SELECT c.*, s.ident AS sident FROM owl_courses c JOIN owl_semesters s USING(semid) ORDER BY s.rank, c.ident");
courses = db.fetchall()
db_query("""
SELECT e.cid, u.full_name
FROM owl_enroll e
JOIN owl_users u USING(uid)
WHERE is_teacher = true
""")
teachers = defaultdict(list)
for e in db.fetchall():
teachers[e.cid].append(e.full_name)
return render_template('admin-courses.html',
courses=courses,
teachers=teachers,
)
### Authentication and sessions ###
class TokenLoginForm(FlaskForm):
next = wtforms.HiddenField()
key = wtforms.PasswordField("Token", validators=[validators.DataRequired()])
@app.route('/login/', methods=('GET', 'POST'))
def login():
form = TokenLoginForm(next=request.args.get('next') or url_for('index'))
if not form.validate_on_submit():
return render_template('login.html', form=form, error=None)
db_query("SELECT * FROM owl_users WHERE auth_token=%s", (form.key.data,))
row = db.fetchone()
if row:
app.logger.info('Logged in user: uid=%s, cn=%s, admin=%s by key', row.uid, row.full_name, row.is_admin)
session_from_db(row)
return redirect(form.next.data or url_for('index'))
app.logger.info('Invalid login key')
return render_template('login.html', form=form, error='Invalid access key')
@app.route('/login/cas', methods=('GET', 'POST'))
def cas_login():
next = request.args.get('next') or url_for('index')
if have_session_p():
return redirect(next)
ticket = request.args.get('ticket')
if not ticket:
# No ticket, the request come from end user, send to CAS login
cas_client.service_url = url_for('cas_login', next=next, _external=True)
cas_login_url = cas_client.get_login_url()
app.logger.debug('CAS login URL: %s', cas_login_url)
return redirect(cas_login_url)
# There is a ticket, the request come from CAS as callback.
# need call `verify_ticket()` to validate ticket and get user profile.
app.logger.debug('CAS ticket: %s', ticket)
user, attributes, pgtiou = cas_client.verify_ticket(ticket)
app.logger.debug(
'CAS verify ticket response: user: %s, attributes: %s, pgtiou: %s', user, attributes, pgtiou)
if not user:
return "Failed to verify ticket!", HTTPStatus.FORBIDDEN
else:
session_from_cas(user, attributes)
return redirect(next)
def parse_cas_emails(raw_email):
if raw_email[0] == '{' and raw_email[-1] == '}':
# Strange CAS encoding of multiple e-mail addresses
return raw_email[1:-1].split(',')
else:
return [raw_email]
def primary_cas_email(raw_email):
emails = parse_cas_emails(raw_email)
if not emails:
return None
uk = [e for e in emails if e.endswith('.cuni.cz')]
if uk:
return uk[0]
else:
return emails[0]
def session_from_cas(user, attrs):
db_query("SELECT * FROM owl_users WHERE ukco=%s", (user,))
row = db.fetchone()
if row:
app.logger.info('Logged in user: uid=%s, ukco=%s, cn=%s, mail=%s, admin=%s', row.uid, row.ukco, row.full_name, row.email, row.is_admin)
else:
email = primary_cas_email(attrs['mail'])
db_query("""
INSERT INTO owl_users(ukco, full_name, email)
VALUES (%s, %s, %s)
RETURNING uid, ukco, full_name, email, is_admin, inline_att
""", (user, attrs['cn'], email))
row = db.fetchone()
app.logger.info('Created new user: uid=%s, ukco=%s, cn=%s, mail=%s', row.uid, row.ukco, row.full_name, row.email)
db_connection.commit()
session_from_db(row)
session['cas'] = 1
def session_from_db(row):
session['uid'] = row.uid
session['name'] = row.full_name
if row.is_admin:
session['admin'] = 1
else:
session.pop('admin', None)
session['inline'] = row.inline_att
@app.route('/logout', methods=('GET', 'POST'))
def logout():
if not have_session_p():
return redirect(url_for('index'))
app.logger.info('Logged out uid=%d', g.uid)
session.pop('uid', None)
session.pop('name', None)
session.pop('admin', None)
redirect_url = url_for('logout', _external=True)
if 'cas' in session:
session.pop('cas', None)
cas_logout_url = cas_client.get_logout_url(redirect_url)
app.logger.debug('CAS logout URL: %s', cas_logout_url)
return redirect(cas_logout_url)
else:
return redirect(redirect_url)
@app.route('/mutate')
def mutate():
if not have_session_p() or not g.is_admin:
return "Must be admin", HTTPStatus.FORBIDDEN
uid = request.args.get('uid')
if uid is not None and uid.isnumeric:
db_query("SELECT * FROM owl_users WHERE uid=%s", (uid,))
row = db.fetchone()
if row:
session_from_db(row)
return redirect(url_for('index'))
else:
return "No such UID", HTTPStatus.NOT_FOUND
else:
return "Bad UID", HTTPStatus.NOT_FOUND
### API ###
def api_auth():
auth = request.headers.get('Authorization')
if not auth:
app.logger.info('API: Missing Authorization')
return False
afields = auth.split()
if len(afields) != 2 or afields[0].lower() != "bearer":
app.logger.info('API: Authorization type must be Bearer')
return False
token = re.fullmatch('(\d{1,6})-(\d{1,6})-(.*)', afields[1])
if not token:
app.logger.info('API: Malformed authorization token')
return False
uid = int(token[1])
cid = int(token[2])
tok = token[3]
db_query("SELECT * FROM owl_api_keys WHERE uid=%s AND cid=%s AND key=%s", (uid, cid, tok))
key = db.fetchone()
if not key:
app.logger.info('API: Unknown key')
return False
app.logger.info('API: Access using key %s (uid=%s, cid=%s)', key.key_id, uid, cid)
# Simulate session setup from have_session_p()
g.uid = uid
g.name = 'API User'
g.is_admin = False
# Simulate course setup from course_init()
db_query("SELECT * FROM owl_courses WHERE cid=%s", (cid,))
g.course = db.fetchone() # Guaranteed to exist
g.is_teacher = True # At this point, we support only teacher tokens
return True
@app.route('/api/points')
def api_points():
if not api_auth():
return "Unauthorized\n", HTTPStatus.UNAUTHORIZED
return export_points('json')
### Administrative commands ###
def cli_die(msg):
print(msg, file=sys.stderr)
sys.exit(1)
def cli_find_user(name):
# XXX: This doesn't support CAS multiple-email syntax
db_query("SELECT * FROM owl_users WHERE email=%s", (name,))
users = db.fetchall()
if len(users) == 1:
return users[0]
if len(users) > 1:
cli_die(f'Multiple users with e-mail {name} found')
db_query("SELECT * FROM owl_users WHERE full_name=%s", (name,))
users = db.fetchall()
if not users:
cli_die(f'No user with e-mail or full name {name} found')
if len(users) != 1:
cli_die(f'Multiple users with full name {name} found')
return users[0]
@app.cli.command("create-course")
@click.argument("ident")
@click.argument("name")
@click.argument("teacher")
@click.option("--token", help="Enrolment token (default: auto-generate)")
@click.option("--semester", help="Semester to create the course in (default: newest)")
def cli_create_course(ident, name, teacher, token=None, semester=None):
"""Create a new course. TEACHER is the teacher's full name or email."""
teacher_user = cli_find_user(teacher)
if not token:
token = secrets.token_hex(6)
if semester is None:
db_query("SELECT * FROM owl_semesters ORDER BY rank DESC LIMIT 1")
semester_row = db.fetchone()
if not semester_row:
cli_die("No semester found")
else:
db_query("SELECT * FROM owl_semesters WHERE ident=%s", (semester,))
semester_row = db.fetchone()
if not semester_row:
cli_die("No such semester")
db_query("""
INSERT INTO owl_courses(ident, name, enroll_token, semid)
VALUES (%s, %s, %s, %s)
RETURNING *
""",
(ident, name, token, semester_row.semid))
course = db.fetchone()
assert course is not None
db_query("""
INSERT INTO owl_enroll(uid, cid, is_teacher)
VALUES (%s, %s, true)
""",
(teacher_user.uid, course.cid))
db_connection.commit()
print(f'Created new course with enrollment token {token}')
@app.cli.command("add-teacher")
@click.argument("course_ident")
@click.argument("teacher")
def cli_add_teacher(course_ident, teacher):
"""Add a teacher to an existing course. TEACHER is the teacher's full name or e-mail."""
teacher_user = cli_find_user(teacher)
db_query("""
SELECT *
FROM owl_courses
WHERE ident=%s
""", (course_ident,))
course = db.fetchone()
if not course:
cli_die(f'No course with identifier {course_ident} found')
db_query("""
INSERT INTO owl_enroll(uid, cid, is_teacher)
VALUES (%s, %s, true)
""",
(teacher_user.uid, course.cid))
db_connection.commit()
@app.cli.command("convert-emails")
def cli_convert_emails():
"""Convert up e-mail addresses in the database from old SIS format."""
db_query("""
SELECT *
FROM owl_users
WHERE email LIKE '%%{%%'
""")
users = db.fetchall()
for u in users:
primary = primary_cas_email(u.email)
print(u.uid, u.full_name, u.email, '->', primary)
db_query("UPDATE owl_users SET email=%s WHERE uid=%s", (primary, u.uid))
db_connection.commit()
### Sending notifications ###
def send_notify():
while True:
db_query("""
SELECT *
FROM owl_notify
JOIN owl_posts USING(pid)
ORDER BY created
LIMIT 1
FOR UPDATE
""")
posts = db.fetchall()
if not posts:
db_connection.commit()
break
for post in posts:
send_notify_post(post)
db_query("DELETE FROM owl_notify WHERE pid=%s", (post.pid,))
db_connection.commit()
def send_notify_post(post):
db_query("SELECT * FROM owl_topics WHERE tid=%s", (post.tid,))
topic = db.fetchone()
if not topic.public:
return
db_query("SELECT c.*, s.ident AS sident FROM owl_courses c JOIN owl_semesters s USING(semid) WHERE cid=%s", (topic.cid,))
course = db.fetchone()
db_query("SELECT * FROM owl_users WHERE uid=%s", (post.author_uid,))
author = db.fetchone()
if post.target_uid >= 0:
db_query("SELECT * FROM owl_users WHERE uid=%s", (post.target_uid,))
target = db.fetchone()
else:
target = None
app.logger.info(f"Notify: pid={post.pid} cid={course.cid} tid={topic.tid} author_uid={author.uid} target_uid={post.target_uid}")
db_query("""
SELECT u.uid, u.email, u.full_name, e.is_teacher, u.notify_self
FROM owl_enroll e
JOIN owl_users u ON u.uid = e.uid
WHERE e.cid = %s
AND (%s < 0 OR e.uid = %s OR e.is_teacher = true OR e.uid IN (SELECT uid FROM owl_student_graders WHERE tid=%s))
AND u.notify = true
AND u.email IS NOT NULL
""", (topic.cid, post.target_uid, post.target_uid, topic.tid))
dests = db.fetchall()
for dest in dests:
if dest.uid != author.uid or dest.notify_self:
send_notify_to_dest(post, topic, course, author, target, dest)
else:
app.logger.info(f"E-mail: not sending self-notify to uid={dest.uid}")
def send_notify_to_dest(post, topic, course, author, target, dest):
if 'MAIL_FROM' not in app.config:
app.logger.info('Not sending: MAIL_FROM not defined')
return
if not dest.email:
app.logger.info(f"E-mail: uid={dest.uid} has no address")
return
app.logger.info(f"E-mail: uid={dest.uid} email={dest.email}")
msg = email.message.EmailMessage()
msg['From'] = email.headerregistry.Address(display_name=author.full_name + " via Owl", addr_spec=app.config['MAIL_FROM'])
msg['To'] = email.headerregistry.Address(display_name=dest.full_name, addr_spec=dest.email)
msg['Subject'] = f'Owl: {topic.title}'
msg['Date'] = (post.modified or post.created).astimezone()
if 'MAIL_REFERENCES' in app.config:
# Everybody gets his own view of the thread, so the target is the observer's uid
msg['References'] = '<' + app.config['MAIL_REFERENCES'].format(tid=post.tid, target=dest.uid) + '>'
if dest.is_teacher and target:
target_uid = target.uid
title = f"{topic.title} / {target.full_name}"
else:
target_uid = None
title = topic.title
h = [
('Topic', title),
('Post by', author.full_name),
('Posted', post.created.replace(microsecond=0).astimezone().strftime("%Y-%m-%d %H:%M:%S")),
]
if post.modified:
h.append(('Updated', post.modified.replace(microsecond=0).astimezone().strftime("%Y-%m-%d %H:%M:%S")))
h.append(('URL', url_for('topic_index', sident=course.ident, cident=course.ident, tident=topic.ident, student_uid=target_uid, _external=True, _scheme='https')))
h.append(('Points', post.points if post.points is not None else '-'))
head = "\n".join(["{:12} {}".format(p[0] + ':', p[1]) for p in h]) + "\n\n"
body = post.comment or ""
tail = textwrap.dedent(f"""
============================================================================
Yours sincerely, The Postal Owl running at {url_for('index')}.
You can configure your e-mail notifications in settings of your account.
""")
msg.set_content(head + body + tail, cte='quoted-printable')
if post.attachment is not None:
m = re.fullmatch(r'([0-9a-zA-Z]+)\.([0-9a-zA-Z]+)', post.attachment)
if m:
maintype, subtype = ext_to_mime_type[m.group(2)].split('/')
with open(os.path.join(app.instance_path, "files", post.attachment), 'rb') as file:
filebody = file.read()
msg.add_attachment(filebody, maintype=maintype, subtype=subtype, filename=post.attachment)
sm = subprocess.Popen([
"/usr/sbin/sendmail",
"-oi",
"-f", app.config['MAIL_FROM'],
dest.email,
], stdin=subprocess.PIPE)
sm.communicate(msg.as_bytes())
if sm.returncode != 0:
app.logger.error("Sendmail failed with return code %d", sm.returncode)
@app.cli.command("notify")
def cli_notify():
"""Manually send all e-mail notifications."""
send_notify()
@app.cli.command("test-notify")
@click.argument("pid", type=click.INT)
def cli_test_notify(pid):
"""Send e-mail notification for a selected post."""
db_query("SELECT * FROM owl_posts WHERE pid=%s", (pid,))
post = db.fetchone()
if post:
send_notify_post(post)
else:
app.logger.error("No such post")
### UWSGI glue ###
try:
import uwsgi
from uwsgidecorators import timer, signal
# Periodically scan for notifications (just in case a signal was lost)
@timer(300, target='mule')
def mule_timer(signum):
app.logger.debug('Mule: Received timer tick')
with app.app_context():
send_notify()
# Usually, we receive a signal when a notification is pending
@signal(42, target='mule')
def mule_signal(signum):
app.logger.debug('Mule: Received signal')
with app.app_context():
send_notify()
def wake_up_mule():
app.logger.debug('Mule: Sending wakeup signal')
uwsgi.signal(42)
except ImportError:
app.logger.debug('UWSGI not found, automatic notifications will not run')
def wake_up_mule():
app.logger.debug('Mule: No mule to wake up')
pass