Source code for email_normalize

"""
email-normalize
===============

Library for returning a normalized email-address stripping mailbox provider
specific behaviors such as "Plus addressing" (foo+bar@gmail.com).

"""
import asyncio
import copy
import dataclasses
import logging
import operator
import time
import typing
from email import utils

import aiodns
from aiodns import error

from email_normalize import providers

LOGGER = logging.getLogger(__name__)

MXRecords = typing.List[typing.Tuple[int, str]]

cache: typing.Dict[str, 'CachedItem'] = {}


class CachedItem:
    """Used to represent a cached lookup for implementing a LFRU cache"""
    __slots__ = ['cached_at', 'hits', 'last_access', 'mx_records', 'ttl']

    def __init__(self, mx_records: MXRecords, ttl: int):
        self.cached_at = time.monotonic()
        self.hits = 0
        self.last_access: float = 0.0
        self.mx_records = mx_records
        self.ttl = ttl

    @property
    def expired(self):
        return (time.monotonic() - self.cached_at) > self.ttl


[docs]@dataclasses.dataclass(frozen=True) class Result: """Instances of the :class:`~email_normalize.Result` class contain data from the email normalization process. :param address: The address that was normalized :type address: str :param normalized_address: The normalized version of the address :type normalized_address: str :param mx_records: A list of tuples representing the priority and host of the MX records found for the email address. If empty, indicates a failure to lookup the domain part of the email address. :type mx_records: :data:`~email_normalize.MXRecords` :param mailbox_provider: String that represents the mailbox provider name - is `None` if the mailbox provider could not be detected or was unsupported. :type mailbox_provider: str .. note:: If during the normalization process the MX records could not be resolved, the ``mx_records`` attribute will be an empty :class:`list` and the ``mailbox_provider`` attribute will be :data:`None`. **Example** .. code-block:: python @dataclasses.dataclass(frozen=True) class Result: address = 'Gavin.M.Roy+ignore-spam@gmail.com' normalized_address = 'gavinmroy@gmail.com' mx_records = [ (5, 'gmail-smtp-in.l.google.com'), (10, 'alt1.gmail-smtp-in.l.google.com'), (20, 'alt2.gmail-smtp-in.l.google.com'), (30, 'alt3.gmail-smtp-in.l.google.com'), (40, 'alt4.gmail-smtp-in.l.google.com') ] mailbox_provider = 'Gmail' """ address: str normalized_address: str mx_records: MXRecords mailbox_provider: typing.Optional[str] = None
[docs]class Normalizer: """Class for normalizing an email address and resolving MX records. Normalization is processed by splitting the local and domain parts of the email address and then performing DNS resolution for the MX records associated with the domain part of the address. The MX records are processed against a set of mailbox provider specific rules. If a match is found for the MX record hosts, the rules are applied to the email address. This class implements a least frequent recently used cache that respects the DNS TTL returned when performing MX lookups. Data is cached at the **module** level. **Usage Example** .. code-block:: python async def normalize(email_address: str) -> email_normalize.Result: normalizer = email_normalize.Normalizer() return await normalizer.normalize('foo@bar.io') :param name_servers: Optional list of hostnames to use for DNS resolution :type name_servers: list(str) or None :param int cache_limit: The maximum number of domain results that are cached. Defaults to `1024`. :param bool cache_failures: Toggle the behavior of caching DNS resolution failures for a given domain. When enabled, failures will be cached for `failure_ttl` seconds. Defaults to `True`. :param int failure_ttl: Duration in seconds to cache DNS failures. Only works when `cache_failures` is set to `True`. Defaults to `300` seconds. """ def __init__(self, name_servers: typing.Optional[typing.List[str]] = None, cache_limit: int = 1024, cache_failures: bool = True, failure_ttl: int = 300) -> 'Normalizer': self._resolver = aiodns.DNSResolver(name_servers) self.cache_failures = cache_failures self.cache_limit = cache_limit self.failure_ttl = failure_ttl
[docs] async def mx_records(self, domain_part: str) -> MXRecords: """Resolve MX records for a domain returning a list of tuples with the MX priority and value. :param domain_part: The domain to resolve MX records for :type domain_part: str :rtype: :data:`~email_normalize.MXRecords` """ if self._skip_cache(domain_part): try: records = await self._resolver.query(domain_part, 'MX') except error.DNSError as err: LOGGER.debug('Failed to resolve %r: %s', domain_part, err) if not self.cache_failures: return [] mx_records, ttl = [], self.failure_ttl else: mx_records = [(r.priority, r.host) for r in records] ttl = min(r.ttl for r in records) \ if records else self.failure_ttl # Prune the cache if over the limit, finding least used, oldest if len(cache.keys()) >= self.cache_limit: key_to_prune = sorted( cache.items(), key=lambda i: ( i[1].hits, i[1].last_access))[0][0] LOGGER.debug('Pruning cache of %s', key_to_prune) del cache[key_to_prune] cache[domain_part] = CachedItem( sorted(mx_records, key=operator.itemgetter(0, 1)), ttl) cache[domain_part].hits += 1 cache[domain_part].last_access = time.monotonic() return copy.deepcopy(cache[domain_part].mx_records)
[docs] async def normalize(self, email_address: str) -> Result: """Return a :class:`~email_normalize.Result` instance containing the original address, the normalized address, the MX records found, and the detected mailbox provider. .. note:: If the MX records could not be resolved, the ``mx_records`` attribute of the result will be an empty :class:`list` and the ``mailbox_provider`` will be :data:`None`. :param email_address: The address to normalize :rtype: :class:`~email_normalize.Result` """ address = utils.parseaddr(email_address) local_part, domain_part = address[1].lower().split('@') mx_records = await self.mx_records(domain_part) provider = self._lookup_provider(mx_records) if provider: if provider.Flags & providers.Rules.LOCAL_PART_AS_HOSTNAME: local_part, domain_part = self._local_part_as_hostname( local_part, domain_part) if provider.Flags & providers.Rules.STRIP_PERIODS: local_part = local_part.replace('.', '') if provider.Flags & providers.Rules.PLUS_ADDRESSING: local_part = local_part.split('+')[0] if provider.Flags & providers.Rules.DASH_ADDRESSING: local_part = local_part.split('-')[0] return Result(email_address, '@'.join([local_part, domain_part]), mx_records, provider.__name__ if provider else None)
@staticmethod def _local_part_as_hostname(local_part: str, domain_part: str) -> typing.Tuple[str, str]: domain_segments = domain_part.split('.') if len(domain_segments) > 2: local_part = domain_segments[0] domain_part = '.'.join(domain_segments[1:]) return local_part, domain_part @staticmethod def _lookup_provider(mx_records: typing.List[typing.Tuple[int, str]]) \ -> typing.Optional[providers.MailboxProvider]: for priority, host in mx_records: for provider in providers.Providers: for domain in provider.MXDomains: if host.endswith(domain): return provider def _skip_cache(self, domain: str) -> bool: if domain not in cache: return True elif cache[domain].expired: del cache[domain] return True return False
[docs]def normalize(email_address: str) -> Result: """Normalize an email address This method abstracts the :mod:`asyncio` base for this library and provides a blocking function. If you intend to use this library as part of an :mod:`asyncio` based application, it is recommended that you use the :meth:`~email_normalize.Normalizer.normalize` instead. .. note:: If the MX records could not be resolved, the ``mx_records`` attribute of the result will be an empty :class:`list` and the ``mailbox_provider`` attribute will be :data:`None`. **Usage Example** .. code-block:: python import email_normalize result = email_normalize.normalize('foo@bar.io') :param email_address: The address to normalize """ loop = asyncio.get_event_loop() normalizer = Normalizer() return loop.run_until_complete(normalizer.normalize(email_address))