187 lines
5.3 KiB
Python
187 lines
5.3 KiB
Python
"""
|
|
Email message and address formatting/parsing functions.
|
|
"""
|
|
import copy
|
|
import email.charset
|
|
import email.generator
|
|
import email.header
|
|
import email.headerregistry
|
|
import email.message
|
|
import email.policy
|
|
import email.utils
|
|
import io
|
|
import re
|
|
from typing import List, Optional, Tuple, Union
|
|
|
|
|
|
__all__ = (
|
|
"extract_recipients",
|
|
"extract_sender",
|
|
"flatten_message",
|
|
"parse_address",
|
|
"quote_address",
|
|
)
|
|
|
|
|
|
LINE_SEP = "\r\n"
|
|
SPECIALS_REGEX = re.compile(r'[][\\()<>@,:;".]')
|
|
ESCAPES_REGEX = re.compile(r'[\\"]')
|
|
UTF8_CHARSET = email.charset.Charset("utf-8")
|
|
|
|
|
|
def parse_address(address: str) -> str:
|
|
"""
|
|
Parse an email address, falling back to the raw string given.
|
|
"""
|
|
display_name, parsed_address = email.utils.parseaddr(address)
|
|
|
|
return parsed_address or address.strip()
|
|
|
|
|
|
def quote_address(address: str) -> str:
|
|
"""
|
|
Quote a subset of the email addresses defined by RFC 821.
|
|
"""
|
|
parsed_address = parse_address(address)
|
|
return f"<{parsed_address}>"
|
|
|
|
|
|
def formataddr(pair: Tuple[str, str]) -> str:
|
|
"""
|
|
Copied from the standard library, and modified to handle international (UTF-8)
|
|
email addresses.
|
|
|
|
The inverse of parseaddr(), this takes a 2-tuple of the form
|
|
(realname, email_address) and returns the string value suitable
|
|
for an RFC 2822 From, To or Cc header.
|
|
If the first element of pair is false, then the second element is
|
|
returned unmodified.
|
|
"""
|
|
name, address = pair
|
|
if name:
|
|
encoded_name = UTF8_CHARSET.header_encode(name)
|
|
return f"{encoded_name} <{address}>"
|
|
else:
|
|
quotes = ""
|
|
if SPECIALS_REGEX.search(name):
|
|
quotes = '"'
|
|
name = ESCAPES_REGEX.sub(r"\\\g<0>", name)
|
|
return f"{quotes}{name}{quotes} <{address}>"
|
|
|
|
return address
|
|
|
|
|
|
def flatten_message(
|
|
message: Union[email.message.EmailMessage, email.message.Message],
|
|
/,
|
|
*,
|
|
utf8: bool = False,
|
|
cte_type: str = "8bit",
|
|
) -> bytes:
|
|
# Make a local copy so we can delete the bcc headers.
|
|
message_copy = copy.copy(message)
|
|
del message_copy["Bcc"]
|
|
del message_copy["Resent-Bcc"]
|
|
|
|
if isinstance(message, email.message.EmailMessage):
|
|
# New message class, default policy
|
|
policy = email.policy.default.clone(
|
|
linesep=LINE_SEP,
|
|
utf8=utf8,
|
|
cte_type=cte_type,
|
|
)
|
|
else:
|
|
# Old message class, Compat32 policy.
|
|
# Compat32 cannot use UTF8
|
|
policy = email.policy.compat32.clone(linesep=LINE_SEP, cte_type=cte_type)
|
|
|
|
with io.BytesIO() as messageio:
|
|
generator = email.generator.BytesGenerator(messageio, policy=policy)
|
|
generator.flatten(message_copy)
|
|
flat_message = messageio.getvalue()
|
|
|
|
return flat_message
|
|
|
|
|
|
def extract_addresses(
|
|
header: Union[str, email.headerregistry.AddressHeader, email.header.Header],
|
|
/,
|
|
) -> List[str]:
|
|
"""
|
|
Convert address headers into raw email addresses, suitable for use in
|
|
low level SMTP commands.
|
|
"""
|
|
addresses = []
|
|
if isinstance(header, email.headerregistry.AddressHeader):
|
|
for address in header.addresses:
|
|
# If the object has been assigned an iterable, it's possible to get
|
|
# a string here
|
|
if isinstance(address, email.headerregistry.Address):
|
|
addresses.append(address.addr_spec)
|
|
else:
|
|
addresses.append(parse_address(address))
|
|
elif isinstance(header, email.header.Header):
|
|
for address_bytes, charset in email.header.decode_header(header):
|
|
if charset is None:
|
|
charset = "ascii"
|
|
addresses.append(parse_address(str(address_bytes, encoding=charset)))
|
|
else:
|
|
addresses.extend(addr for _, addr in email.utils.getaddresses([header]))
|
|
|
|
return addresses
|
|
|
|
|
|
def extract_sender(
|
|
message: Union[email.message.EmailMessage, email.message.Message],
|
|
/,
|
|
) -> Optional[str]:
|
|
"""
|
|
Extract the sender from the message object given.
|
|
"""
|
|
resent_dates = message.get_all("Resent-Date")
|
|
|
|
if resent_dates is not None and len(resent_dates) > 1:
|
|
raise ValueError("Message has more than one 'Resent-' header block")
|
|
elif resent_dates:
|
|
sender_header_name = "Resent-Sender"
|
|
from_header_name = "Resent-From"
|
|
else:
|
|
sender_header_name = "Sender"
|
|
from_header_name = "From"
|
|
|
|
# Prefer the sender field per RFC 2822:3.6.2.
|
|
if sender_header_name in message:
|
|
sender_header = message[sender_header_name]
|
|
else:
|
|
sender_header = message[from_header_name]
|
|
|
|
if sender_header is None:
|
|
return None
|
|
|
|
return extract_addresses(sender_header)[0]
|
|
|
|
|
|
def extract_recipients(
|
|
message: Union[email.message.EmailMessage, email.message.Message],
|
|
/,
|
|
) -> List[str]:
|
|
"""
|
|
Extract the recipients from the message object given.
|
|
"""
|
|
recipients: List[str] = []
|
|
|
|
resent_dates = message.get_all("Resent-Date")
|
|
|
|
if resent_dates is not None and len(resent_dates) > 1:
|
|
raise ValueError("Message has more than one 'Resent-' header block")
|
|
elif resent_dates:
|
|
recipient_headers = ("Resent-To", "Resent-Cc", "Resent-Bcc")
|
|
else:
|
|
recipient_headers = ("To", "Cc", "Bcc")
|
|
|
|
for header in recipient_headers:
|
|
for recipient in message.get_all(header, failobj=[]):
|
|
recipients.extend(extract_addresses(recipient))
|
|
|
|
return recipients
|