#!/usr/bin/python3 # A mini-app for accepting Zoom webhooks import configparser import json import psycopg2 import psycopg2.extras import traceback import dateutil.parser ### Configuration ### config = configparser.ConfigParser() config.read('zoom.ini') ### Database connection ### db_connection = None db = None def db_connect(): global db_connection, db db_connection = psycopg2.connect( host='localhost', user=config['db']['user'], password=config['db']['passwd'], dbname=config['db']['name'], ) db = db_connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) def db_query(query, args=()): if db is None: db_connect() try: db.execute(query, args) except psycopg2.DatabaseError: # Reconnect if the connection died (timeout, server restart etc.) db_connect() db.execute(query, args) def db_reset(): # At the end of every request, we have to close the implicitly opened # transaction. Otherwise we end up with serving stale data. if db_connection is not None: try: db_connection.rollback() except: pass ### Utilities ### def parse_time(iso_time): return dateutil.parser.isoparse(iso_time) ### Application ### class HookApp: def __init__(self, env, start_response): self.env = env self.wsgi_start = start_response def log(self, msg): print(msg, file=self.env['wsgi.errors'], flush=True) def http_error(self, code, msg, extra_headers=[]): self.wsgi_start("{} {}".format(code, msg), extra_headers + [ ('Content-Type', 'text/plain') ]) return ["{} {}".format(code, msg)] def create_schedule(self, mid, meeting_id, occurrence_id, occ): if occ.get('status', "") == 'deleted': self.log(f"Meeting {meeting_id}.{occurrence_id}: Skipping deleted occurrence") return self.log(f"Meeting {meeting_id}.{occurrence_id}: Scheduling") db_query(""" INSERT INTO zoom_schedule (mid, occurrence_id, start_time, duration) VALUES (%s, %s, %s, %s) """, ( mid, occurrence_id, parse_time(occ['start_time']), occ['duration'], )) def create_meeting(self, js): payload = js["payload"] meeting = payload["object"] meeting_id = meeting["id"] host_user_id = meeting["host_id"] db_query("SELECT * FROM zoom_users WHERE user_id=%s", (host_user_id,)) user = db.fetchone() if user is None: self.log(f"Meeting {meeting_id}: Host {host_user_id} not found in zoom_users") return db_query(""" INSERT INTO zoom_meetings (meeting_id, uuid, host_uid, topic, type) VALUES (%s, %s, %s, %s, %s) RETURNING mid """, ( meeting_id, meeting['uuid'], user.uid, meeting['topic'], meeting['type'], )) meeting_row = db.fetchone() mid = meeting_row.mid mtype = meeting["type"] self.log(f"Meeting {meeting_id}: Creating with mid={mid}, type={mtype}") if mtype == 8: for occ in meeting["occurrences"]: self.create_schedule(mid, meeting_id, occ["occurrence_id"], occ) elif 'start_time' in meeting: self.create_schedule(mid, meeting_id, 0, meeting) def delete_recurring_schedule(self, mid, meeting_id, meeting): for occ in meeting["occurrences"]: occ_id = occ["occurrence_id"] self.log(f"Meeting {meeting_id}.{occ_id}: Descheduling") db_query(""" DELETE FROM zoom_schedule WHERE mid=%s AND occurrence_id=%s """, ( mid, occ_id, )) def delete_meeting(self, js): payload = js["payload"] meeting = payload["object"] meeting_id = meeting["id"] db_query("SELECT * FROM zoom_meetings WHERE meeting_id=%s", (meeting_id,)) meeting_row = db.fetchone() if meeting_row is None: self.log(f"Meeting {meeting_id}: Unknown on delete") return mid = meeting_row.mid mtype = meeting_row.type if mtype == 8 and "occurrences" in meeting: delete_recurring_schedule(mid, meeting_id, meeting) else: self.log(f"Meeting {meeting_id}: Deleting") db_query("DELETE FROM zoom_schedule WHERE mid=%s", (mid,)) db_query("DELETE FROM zoom_meetings WHERE mid=%s", (mid,)) def update_schedule(self, mid, meeting_id, occurrence_id, new): if "start_time" in new: self.log(f"Meeting {meeting_id}.{occurrence_id}: Updating start time") db_query("UPDATE zoom_schedule SET start_time=%s WHERE mid=%s AND occurrence_id=%s", (parse_time(new['start_time']), mid, occurrence_id)) if "duration" in new: self.log(f"Meeting {meeting_id}.{occurrence_id}: Updating duration") db_query("UPDATE zoom_schedule SET duration=%s WHERE mid=%s AND occurrence_id=%s", (new['duration'], mid, occurrence_id)) def update_meeting_single(self, mid, meeting_id, old, new): self.log(f"Meeting {meeting_id}: Updating single occurrences") for occ in new["occurrences"]: self.update_schedule(mid, meeting_id, occ["occurrence_id"], new) # e.g., duration can be set here self.update_schedule(mid, meeting_id, occ["occurrence_id"], occ) def update_meeting_all(self, mid, meeting_id, old, new): if "occurrences" in new: # So this will be a recurrent meeting, replace all occurrences self.log(f"Meeting {meeting_id}: Replacing all occurrences") db_query("DELETE FROM zoom_schedule WHERE mid=%s", (mid,)) for occ in new["occurrences"]: self.create_schedule(mid, meeting_id, occ["occurrence_id"], occ) elif "start_time" in new: if new["start_time"] == "": # Descheduling (this can happen in type 3 meetings) self.log(f"Meeting {meeting_id}: Descheduling") db_query("DELETE FROM zoom_schedule WHERE mid=%s", (mid,)) elif "duration" in new: # Both start time and duration are set => can safely replacing schedule self.log(f"Meeting {meeting_id}.0: Replacing schedule") db_query("DELETE FROM zoom_schedule WHERE mid=%s", (mid,)) self.create_schedule(mid, meeting_id, 0, new) else: # This is just a schedule change self.log(f"Meeting {meeting_id}.0: Rescheduling with new start_time") db_query("UPDATE zoom_schedule SET start_time=%s WHERE mid=%s", (parse_time(new["start_time"]), mid)) elif "occurrences" in old: # Descheduling (this can happen when changing type 8 to type 3) self.log(f"Meeting {meeting_id}: Descheduling") db_query("DELETE FROM zoom_schedule WHERE mid=%s", (mid,)) elif "duration" in new: # This is just a schedule change self.log(f"Meeting {meeting_id}.0: Rescheduling with new duration") db_query("UPDATE zoom_schedule SET duration=%s WHERE mid=%s", (new["duration"], mid)) def update_meeting(self, js): payload = js["payload"] new = payload["object"] old = payload["old_object"] meeting_id = new["id"] db_query("SELECT * FROM zoom_meetings WHERE meeting_id=%s", (meeting_id,)) meeting_row = db.fetchone() if meeting_row is None: self.log(f"Meeting {meeting_id}: Unknown on update") return mid = meeting_row.mid old_type = old.get("type", -1) new_type = new.get("type", -1) if old_type != new_type: self.log(f"Meeting {meeting_id}: Transmuting from from type {old_type} to {new_type}") db_query("UPDATE zoom_meetings SET type=%s WHERE mid=%s", (new_type, mid)) if "topic" in new: self.log(f"Meeting {meeting_id}: Updating topic") db_query("UPDATE zoom_meetings SET topic=%s WHERE mid=%s", (new['topic'], mid)) for a in ['uuid', 'host_id']: if a in new: self.log(f"Meeting {meeting_id}: Change of {a} not supported") scope = payload.get("scope", "all") if scope == "single": self.update_meeting_single(mid, meeting_id, old, new) elif scope == "all": self.update_meeting_all(mid, meeting_id, old, new) else: self.log(f"Meeting {meeting_id}: Unsupported update scope {scope}") def run(self): method = self.env['REQUEST_METHOD'] if method != 'POST': return self.http_error(405, 'Method not allowed', [('Allow', 'POST')]) if self.env.get('HTTP_AUTHORIZATION', '') != config['hooks']['verification_token']: self.log('Verification token does not match!') return self.http_error(401, 'Authorization failed') body = self.env['wsgi.input'].read() js = json.loads(body) self.log(js) event = js["event"] db_query("INSERT INTO zoom_events(event,js) VALUES(%s,%s)", (event, json.dumps(js["payload"]))) if event == "meeting.created": self.create_meeting(js) elif event == "meeting.deleted": self.delete_meeting(js) elif event == "meeting.updated": self.update_meeting(js) else: self.log(f"Unknown event: {event}") db_connection.commit() self.wsgi_start("204 No Content", []) return b"" def application(env, start_response): app = HookApp(env, start_response) try: return app.run() except Exception as exc: app.log(traceback.print_exception(etype=None, value=exc, tb=exc.__traceback__)) return app.http_error(500, "Internal server error") finally: db_reset()