Skip to content
Snippets Groups Projects
Commit cc7bb550 authored by Martin Mareš's avatar Martin Mareš
Browse files

DSN: Prvotní verze API na parsování DSN

parent 1c6ac0f4
Branches
No related tags found
No related merge requests found
...@@ -261,6 +261,7 @@ except ImportError: ...@@ -261,6 +261,7 @@ except ImportError:
# Většina webu je v samostatných modulech # Většina webu je v samostatných modulech
import mo.web.api import mo.web.api
import mo.web.api_dsn
import mo.web.acct import mo.web.acct
import mo.web.doc import mo.web.doc
import mo.web.jinja import mo.web.jinja
... ...
......
# Web: API pro zpracování mailových Delivery Status Notifications
import email
from email.errors import MessageError
import email.headerregistry
import email.message
import email.parser
import email.policy
from flask import request, Response
from flask.json import jsonify
import re
from typing import Optional
import werkzeug.exceptions
import mo.config as config
import mo.email
import mo.util_format
from mo.web import app
class DSN:
msg: email.message.EmailMessage
message_id: str
verdict: str
dsn_action: str
dsn_status: str
def __init__(self, body: bytes):
self.msg = email.message_from_bytes(body, policy=email.policy.default) # FIXME: Types
self.message_id = (self.msg['Message-Id'] or '?').strip()
self.dsn_action = '?'
self.dsn_status = '?'
self.verdict = self.parse_dsn()
def parse_dsn(self) -> str:
if self.msg.get_content_type() != 'multipart/report':
return 'unexpected content-type'
if not self.msg.is_multipart():
return 'not multipart'
report_type = self.msg['Content-type'].params.get('report-type', '?')
if report_type != 'delivery-status':
return 'unknown report-type'
if len(self.msg.get_payload()) < 2:
return 'too few parts'
part2: email.message.EmailMessage = self.msg.get_payload(1)
if part2.get_content_type() != 'message/delivery-status':
return 'unexpected part 2 type'
dsn = part2.get_payload()
if len(dsn) < 2:
return 'DSN too short'
# main = dsn[0]
per_addr = dsn[1]
self.dsn_action = per_addr.get('Action', '?')
self.dsn_status = per_addr.get('Status', '?')
if self.dsn_action != 'failed':
return 'not failed'
return 'ok'
def find_token(self) -> Optional[str]:
to = self.msg.get('To')
if to is None:
return None
std_from = email.headerregistry.Address(addr_spec=config.MAIL_CONTACT)
for addr in to.addresses:
if m := re.fullmatch(r'([^+]+)\+(.*)', addr.username):
if m[1] == std_from.username and addr.domain == std_from.domain:
return m[2]
return None
@app.route('/api/email-dsn', methods=('POST',))
def api_email_dsn() -> Response:
# FIXME: Authorization?
# FIXME: Add in Flask 3.1: request.max_content_length = 1048576
body = request.get_data(cache=False)
try:
dsn = DSN(body)
except MessageError as e:
app.logger.info(f'DSN: Nemohu naparsovat zprávu: {e}')
raise werkzeug.exceptions.UnprocessableEntity()
app.logger.info(f'DSN: Message-ID: {dsn.message_id}')
app.logger.info(f'DSN: Parse: action={dsn.dsn_action} status={dsn.dsn_status} -> {dsn.verdict}')
if dsn.verdict == 'not failed':
return jsonify({})
elif dsn.verdict != 'ok':
raise werkzeug.exceptions.UnprocessableEntity()
if not (token := dsn.find_token()):
app.logger.info('DSN: Token not found')
raise werkzeug.exceptions.UnprocessableEntity()
app.logger.info(f'DSN: Token: {token}')
try:
user, rr, when = mo.email.validate_dsn_token(token)
user_info = f'#{user.user_id}' if user is not None else '-'
rr_info = f'#{rr.reg_id}' if rr is not None else '-'
age = mo.util_format.time_duration_numeric(mo.now - when)
app.logger.info(f'DSN: user={user_info} registration={rr_info} age={age}')
except ValueError as e:
app.logger.info(f'DSN: {e}')
pass
return jsonify({})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment