Ticket #15042: ticket15042.diff

File ticket15042.diff, 38.7 KB (added by Łukasz Rekucki, 14 years ago)

The right patch against trunk.

  • django/core/mail/backends/smtp.py

    diff --git a/django/core/mail/backends/smtp.py b/django/core/mail/backends/smtp.py
    index 3b2962f..bb184ab 100644
    a b  
    11"""SMTP email backend class."""
    2 
    32import smtplib
    43import socket
    54import threading
    import threading  
    76from django.conf import settings
    87from django.core.mail.backends.base import BaseEmailBackend
    98from django.core.mail.utils import DNS_NAME
     9from django.core.mail.message import sanitize_address
     10
    1011
    1112class EmailBackend(BaseEmailBackend):
    1213    """
    class EmailBackend(BaseEmailBackend):  
    9192            self._lock.release()
    9293        return num_sent
    9394
    94     def _sanitize(self, email):
    95         name, domain = email.split('@', 1)
    96         email = '@'.join([name, domain.encode('idna')])
    97         return email
    98 
    9995    def _send(self, email_message):
    10096        """A helper method that does the actual sending."""
    10197        if not email_message.recipients():
    10298            return False
    103         from_email = self._sanitize(email_message.from_email)
    104         recipients = map(self._sanitize, email_message.recipients())
     99        from_email = sanitize_address(email_message.from_email, email_message.encoding)
     100        recipients = [sanitize_address(addr, email_message.encoding)
     101                      for addr in email_message.recipients()]
    105102        try:
    106103            self.connection.sendmail(from_email, recipients,
    107104                    email_message.message().as_string())
  • django/core/mail/message.py

    diff --git a/django/core/mail/message.py b/django/core/mail/message.py
    index 2311102..96ff689 100644
    a b from email.Utils import formatdate, getaddresses, formataddr  
    1212from django.conf import settings
    1313from django.core.mail.utils import DNS_NAME
    1414from django.utils.encoding import smart_str, force_unicode
     15from email.Utils import parseaddr
    1516
    1617# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
    1718# some spam filters.
    def make_msgid(idstring=None):  
    5455    return msgid
    5556
    5657
     58# Header names that contain structured address data (RFC #5322)
     59ADDRESS_HEADERS = set([
     60    'from',
     61    'sender',
     62    'reply-to',
     63    'to',
     64    'cc',
     65    'bcc',
     66    'resent-from',
     67    'resent-sender',
     68    'resent-to',
     69    'resent-cc',
     70    'resent-bcc',
     71])
     72
     73
    5774def forbid_multi_line_headers(name, val, encoding):
    5875    """Forbids multi-line headers, to prevent header injection."""
    5976    encoding = encoding or settings.DEFAULT_CHARSET
    def forbid_multi_line_headers(name, val, encoding):  
    6380    try:
    6481        val = val.encode('ascii')
    6582    except UnicodeEncodeError:
    66         if name.lower() in ('to', 'from', 'cc'):
    67             result = []
    68             for nm, addr in getaddresses((val,)):
    69                 nm = str(Header(nm.encode(encoding), encoding))
    70                 try:
    71                     addr = addr.encode('ascii')
    72                 except UnicodeEncodeError:  # IDN
    73                     addr = str(Header(addr.encode(encoding), encoding))
    74                 result.append(formataddr((nm, addr)))
    75             val = ', '.join(result)
     83        if name.lower() in ADDRESS_HEADERS:
     84            val = ', '.join(sanitize_address(addr, encoding)
     85                for addr in getaddresses((val,)))
    7686        else:
    77             val = Header(val.encode(encoding), encoding)
     87            val = str(Header(val, encoding))
    7888    else:
    7989        if name.lower() == 'subject':
    8090            val = Header(val)
    8191    return name, val
    8292
     93
     94def sanitize_address(addr, encoding):
     95    if isinstance(addr, basestring):
     96        addr = parseaddr(force_unicode(addr))
     97    nm, addr = addr
     98    nm = str(Header(nm, encoding))
     99    try:
     100        addr = addr.encode('ascii')
     101    except UnicodeEncodeError:  # IDN
     102        if u'@' in addr:
     103            localpart, domain = addr.split(u'@', 1)
     104            localpart = str(Header(localpart, encoding))
     105            domain = domain.encode('idna')
     106            addr = '@'.join([localpart, domain])
     107        else:
     108            addr = str(Header(addr, encoding))
     109    return formataddr((nm, addr))
     110
     111
    83112class SafeMIMEText(MIMEText):
    84    
     113
    85114    def __init__(self, text, subtype, charset):
    86115        self.encoding = charset
    87116        MIMEText.__init__(self, text, subtype, charset)
    88    
    89     def __setitem__(self, name, val):   
     117
     118    def __setitem__(self, name, val):
    90119        name, val = forbid_multi_line_headers(name, val, self.encoding)
    91120        MIMEText.__setitem__(self, name, val)
    92121
     122
    93123class SafeMIMEMultipart(MIMEMultipart):
    94    
     124
    95125    def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params):
    96126        self.encoding = encoding
    97127        MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params)
    98        
     128
    99129    def __setitem__(self, name, val):
    100130        name, val = forbid_multi_line_headers(name, val, self.encoding)
    101131        MIMEMultipart.__setitem__(self, name, val)
    102132
     133
    103134class EmailMessage(object):
    104135    """
    105136    A container for email information.
    class EmailMultiAlternatives(EmailMessage):  
    274305        conversions.
    275306        """
    276307        super(EmailMultiAlternatives, self).__init__(subject, body, from_email, to, bcc, connection, attachments, headers, cc)
    277         self.alternatives=alternatives or []
     308        self.alternatives = alternatives or []
    278309
    279310    def attach_alternative(self, content, mimetype):
    280311        """Attach an alternative content representation."""
  • tests/regressiontests/mail/tests.py

    diff --git a/tests/regressiontests/mail/tests.py b/tests/regressiontests/mail/tests.py
    index a6cd60e..18c4c62 100644
    a b  
    11# coding: utf-8
     2import asyncore
    23import email
    34import os
    45import shutil
     6import smtpd
    57import sys
    68import tempfile
     9import threading
    710from StringIO import StringIO
    811from django.conf import settings
    912from django.core import mail
    1013from django.core.mail import EmailMessage, mail_admins, mail_managers, EmailMultiAlternatives
    1114from django.core.mail import send_mail, send_mass_mail
    12 from django.core.mail.backends.base import BaseEmailBackend
    1315from django.core.mail.backends import console, dummy, locmem, filebased, smtp
    1416from django.core.mail.message import BadHeaderError
    1517from django.test import TestCase
    1618from django.utils.translation import ugettext_lazy
     19from django.utils.functional import wraps
     20from email.Utils import parseaddr
     21
     22
     23def alter_global_settings(**kwargs):
     24    oldvalues = {}
     25    nonexistant = []
     26    for setting, newvalue in kwargs.iteritems():
     27        try:
     28            oldvalues[setting] = getattr(settings, setting)
     29        except AttributeError:
     30            nonexistant.append(setting)
     31        setattr(settings, setting, newvalue)
     32    return oldvalues, nonexistant
     33
     34
     35def restore_global_settings(state):
     36    oldvalues, nonexistant = state
     37    for setting, oldvalue in oldvalues.iteritems():
     38        setattr(settings, setting, oldvalue)
     39    for setting in nonexistant:
     40        delattr(settings, setting)
     41
     42
     43def with_global_setting(**kwargs):
     44    def decorator(test):
     45        @wraps(test)
     46        def decorated_test(self):
     47            state = alter_global_settings(**kwargs)
     48            try:
     49                return test(self)
     50            finally:
     51                restore_global_settings(state)
     52        return decorated_test
     53    return decorator
     54
    1755
    1856class MailTests(TestCase):
     57    """
     58    Non-backend specific tests.
     59    """
    1960
    2061    def test_ascii(self):
    2162        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
    class MailTests(TestCase):  
    2667        self.assertEqual(message['To'], 'to@example.com')
    2768
    2869    def test_multiple_recipients(self):
    29         email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com','other@example.com'])
     70        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'])
    3071        message = email.message()
    3172        self.assertEqual(message['Subject'].encode(), 'Subject')
    3273        self.assertEqual(message.get_payload(), 'Content')
    class MailTests(TestCase):  
    4081        self.assertEqual(message['Cc'], 'cc@example.com')
    4182        self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com'])
    4283
    43         # Verify headers
    44         old_stdout = sys.stdout
    45         sys.stdout = StringIO()
    46         connection = console.EmailBackend()
    47         connection.send_messages([email])
    48         self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: '))
    49         sys.stdout = old_stdout
    50 
    5184        # Test multiple CC with multiple To
    5285        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com'])
    5386        message = email.message()
    class MailTests(TestCase):  
    83116        email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers)
    84117        self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent')
    85118
    86     def test_empty_admins(self):
    87         """
    88         Test that mail_admins/mail_managers doesn't connect to the mail server
    89         if there are no recipients (#9383)
    90         """
    91         old_admins = settings.ADMINS
    92         old_managers = settings.MANAGERS
    93 
    94         settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
    95         mail.outbox = []
    96         mail_admins('hi', 'there')
    97         self.assertEqual(len(mail.outbox), 1)
    98         mail.outbox = []
    99         mail_managers('hi', 'there')
    100         self.assertEqual(len(mail.outbox), 1)
    101 
    102         settings.ADMINS = settings.MANAGERS = []
    103         mail.outbox = []
    104         mail_admins('hi', 'there')
    105         self.assertEqual(len(mail.outbox), 0)
    106         mail.outbox = []
    107         mail_managers('hi', 'there')
    108         self.assertEqual(len(mail.outbox), 0)
    109 
    110         settings.ADMINS = old_admins
    111         settings.MANAGERS = old_managers
    112 
    113119    def test_from_header(self):
    114120        """
    115121        Make sure we can manually set the From header (#9214)
    class MailTests(TestCase):  
    129135        message = email.message()
    130136        self.assertEqual(message['From'], 'from@example.com')
    131137
    132     def test_unicode_header(self):
     138    def test_unicode_address_header(self):
    133139        """
    134140        Regression for #11144 - When a to/from/cc header contains unicode,
    135141        make sure the email addresses are parsed correctly (especially with
    136142        regards to commas)
    137143        """
    138         email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>','other@example.com'])
     144        email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com'])
    139145        self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com')
    140         email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>','other@example.com'])
     146        email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com'])
    141147        self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com')
    142148
     149    def test_unicode_headers(self):
     150        email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"],
     151                             headers={"Sender": '"Firstname Sürname" <sender@example.com>',
     152                                      "Comments": 'My Sürname is non-ASCII'})
     153        message = email.message()
     154        self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=')
     155        self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= <sender@example.com>')
     156        self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=')
     157
    143158    def test_safe_mime_multipart(self):
    144159        """
    145160        Make sure headers can be set with a different encoding than utf-8 in
    class MailTests(TestCase):  
    193208        self.assertEqual(payload[0].get_content_type(), 'multipart/alternative')
    194209        self.assertEqual(payload[1].get_content_type(), 'application/pdf')
    195210
    196     def test_arbitrary_stream(self):
    197         """
    198         Test that the console backend can be pointed at an arbitrary stream.
    199         """
    200         s = StringIO()
    201         connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s)
    202         send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
    203         self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
    204 
    205     def test_stdout(self):
    206         """Make sure that the console backend writes to stdout by default"""
    207         old_stdout = sys.stdout
    208         sys.stdout = StringIO()
    209         connection = console.EmailBackend()
    210         email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
    211         connection.send_messages([email])
    212         self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
    213         sys.stdout = old_stdout
    214 
    215     def test_dummy(self):
     211    def test_dummy_backend(self):
    216212        """
    217213        Make sure that dummy backends returns correct number of sent messages
    218214        """
    class MailTests(TestCase):  
    220216        email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
    221217        self.assertEqual(connection.send_messages([email, email, email]), 3)
    222218
    223     def test_locmem(self):
    224         """
    225         Make sure that the locmen backend populates the outbox.
    226         """
    227         mail.outbox = []
    228         connection = locmem.EmailBackend()
    229         email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
    230         email2 = EmailMessage('Subject 2', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
    231         connection.send_messages([email1, email2])
    232         self.assertEqual(len(mail.outbox), 2)
    233         self.assertEqual(mail.outbox[0].subject, 'Subject')
    234         self.assertEqual(mail.outbox[1].subject, 'Subject 2')
    235 
    236         # Make sure that multiple locmem connections share mail.outbox
    237         mail.outbox = []
    238         connection2 = locmem.EmailBackend()
    239         email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
    240         connection.send_messages([email])
    241         connection2.send_messages([email])
    242         self.assertEqual(len(mail.outbox), 2)
    243 
    244     def test_file_backend(self):
    245         tmp_dir = tempfile.mkdtemp()
    246         connection = filebased.EmailBackend(file_path=tmp_dir)
    247         email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
    248         connection.send_messages([email1])
    249         self.assertEqual(len(os.listdir(tmp_dir)), 1)
    250         message = email.message_from_file(open(os.path.join(tmp_dir, os.listdir(tmp_dir)[0])))
    251         self.assertEqual(message.get_content_type(), 'text/plain')
    252         self.assertEqual(message.get('subject'), 'Subject')
    253         self.assertEqual(message.get('from'), 'from@example.com')
    254         self.assertEqual(message.get('to'), 'to@example.com')
    255         connection2 = filebased.EmailBackend(file_path=tmp_dir)
    256         connection2.send_messages([email1])
    257         self.assertEqual(len(os.listdir(tmp_dir)), 2)
    258         connection.send_messages([email1])
    259         self.assertEqual(len(os.listdir(tmp_dir)), 2)
    260         email1.connection = filebased.EmailBackend(file_path=tmp_dir)
    261         connection_created = connection.open()
    262         email1.send()
    263         self.assertEqual(len(os.listdir(tmp_dir)), 3)
    264         email1.send()
    265         self.assertEqual(len(os.listdir(tmp_dir)), 3)
    266         connection.close()
    267         shutil.rmtree(tmp_dir)
    268 
    269219    def test_arbitrary_keyword(self):
    270220        """
    271221        Make sure that get_connection() accepts arbitrary keyword that might be
    class MailTests(TestCase):  
    289239        self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend))
    290240        self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend))
    291241        tmp_dir = tempfile.mkdtemp()
    292         self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend))
    293         shutil.rmtree(tmp_dir)
     242        try:
     243            self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend))
     244        finally:
     245            shutil.rmtree(tmp_dir)
    294246        self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend))
    295247
     248    @with_global_setting(
     249        EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend',
     250        ADMINS=[('nobody', 'nobody@example.com')],
     251        MANAGERS=[('nobody', 'nobody@example.com')])
    296252    def test_connection_arg(self):
    297253        """Test connection argument to send_mail(), et. al."""
    298         connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend')
    299 
    300254        mail.outbox = []
     255
     256        # Send using non-default connection
     257        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
    301258        send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
    302         self.assertEqual(len(mail.outbox), 1)
    303         message = mail.outbox[0]
    304         self.assertEqual(message.subject, 'Subject')
    305         self.assertEqual(message.from_email, 'from@example.com')
    306         self.assertEqual(message.to, ['to@example.com'])
     259        self.assertEqual(mail.outbox, [])
     260        self.assertEqual(len(connection.test_outbox), 1)
     261        self.assertEqual(connection.test_outbox[0].subject, 'Subject')
    307262
    308         mail.outbox = []
     263        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
    309264        send_mass_mail([
    310265                ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']),
    311                 ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com'])
     266                ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']),
    312267            ], connection=connection)
    313         self.assertEqual(len(mail.outbox), 2)
    314         message = mail.outbox[0]
    315         self.assertEqual(message.subject, 'Subject1')
    316         self.assertEqual(message.from_email, 'from1@example.com')
    317         self.assertEqual(message.to, ['to1@example.com'])
    318         message = mail.outbox[1]
    319         self.assertEqual(message.subject, 'Subject2')
    320         self.assertEqual(message.from_email, 'from2@example.com')
    321         self.assertEqual(message.to, ['to2@example.com'])
    322 
    323         old_admins = settings.ADMINS
    324         old_managers = settings.MANAGERS
    325         settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
     268        self.assertEqual(mail.outbox, [])
     269        self.assertEqual(len(connection.test_outbox), 2)
     270        self.assertEqual(connection.test_outbox[0].subject, 'Subject1')
     271        self.assertEqual(connection.test_outbox[1].subject, 'Subject2')
    326272
    327         mail.outbox = []
    328         mail_admins('Subject', 'Content', connection=connection)
    329         self.assertEqual(len(mail.outbox), 1)
    330         message = mail.outbox[0]
    331         self.assertEqual(message.subject, '[Django] Subject')
    332         self.assertEqual(message.from_email, 'root@localhost')
    333         self.assertEqual(message.to, ['nobody@example.com'])
     273        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
     274        mail_admins('Admin message', 'Content', connection=connection)
     275        self.assertEqual(mail.outbox, [])
     276        self.assertEqual(len(connection.test_outbox), 1)
     277        self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message')
    334278
    335         mail.outbox = []
    336         mail_managers('Subject', 'Content', connection=connection)
    337         self.assertEqual(len(mail.outbox), 1)
    338         message = mail.outbox[0]
    339         self.assertEqual(message.subject, '[Django] Subject')
    340         self.assertEqual(message.from_email, 'root@localhost')
    341         self.assertEqual(message.to, ['nobody@example.com'])
    342 
    343         settings.ADMINS = old_admins
    344         settings.MANAGERS = old_managers
    345 
    346     def test_mail_prefix(self):
    347         """Test prefix argument in manager/admin mail."""
    348         # Regression for #13494.
    349         old_admins = settings.ADMINS
    350         old_managers = settings.MANAGERS
    351         settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
     279        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
     280        mail_managers('Manager message', 'Content', connection=connection)
     281        self.assertEqual(mail.outbox, [])
     282        self.assertEqual(len(connection.test_outbox), 1)
     283        self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message')
     284
     285
     286class BaseEmailBackendTests(object):
     287    email_backend = None
     288
     289    def setUp(self):
     290        self.__settings_state = alter_global_settings(EMAIL_BACKEND=self.email_backend)
     291
     292    def tearDown(self):
     293        restore_global_settings(self.__settings_state)
     294
     295    def assertStartsWith(self, first, second):
     296        if not first.startswith(second):
     297            self.longMessage = True
     298            self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.")
     299
     300    def get_mailbox_content(self):
     301        raise NotImplementedError
     302
     303    def flush_mailbox(self):
     304        raise NotImplementedError
     305
     306    def get_the_message(self):
     307        mailbox = self.get_mailbox_content()
     308        self.assertEqual(len(mailbox), 1,
     309            "Expected exactly one message, got %d.\n%r" % (len(mailbox), [
     310                m.as_string() for m in mailbox]))
     311        return mailbox[0]
     312
     313    def test_send(self):
     314        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
     315        num_sent = mail.get_connection().send_messages([email])
     316        self.assertEqual(num_sent, 1)
     317        message = self.get_the_message()
     318        self.assertEqual(message["subject"], "Subject")
     319        self.assertEqual(message.get_payload(), "Content")
     320        self.assertEqual(message["from"], "from@example.com")
     321        self.assertEqual(message.get_all("to"), ["to@example.com"])
     322
     323    def test_send_many(self):
     324        email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com'])
     325        email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com'])
     326        num_sent = mail.get_connection().send_messages([email1, email2])
     327        self.assertEqual(num_sent, 2)
     328        messages = self.get_mailbox_content()
     329        self.assertEquals(len(messages), 2)
     330        self.assertEqual(messages[0].get_payload(), "Content1")
     331        self.assertEqual(messages[1].get_payload(), "Content2")
     332
     333    def test_send_verbose_name(self):
     334        email = EmailMessage("Subject", "Content", '"Firstname Sürname" <from@example.com>',
     335                             ["to@example.com"])
     336        email.send()
     337        message = self.get_the_message()
     338        self.assertEqual(message["subject"], "Subject")
     339        self.assertEqual(message.get_payload(), "Content")
     340        self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>")
     341
     342    @with_global_setting(MANAGERS=[('nobody', 'nobody@example.com')])
     343    def test_html_mail_managers(self):
     344        """Test html_message argument to mail_managers"""
     345        mail_managers('Subject', 'Content', html_message='HTML Content')
     346        message = self.get_the_message()
     347
     348        self.assertEqual(message.get('subject'), '[Django] Subject')
     349        self.assertEqual(message.get_all('to'), ['nobody@example.com'])
     350        self.assertTrue(message.is_multipart())
     351        self.assertEqual(len(message.get_payload()), 2)
     352        self.assertEqual(message.get_payload(0).get_payload(), 'Content')
     353        self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
     354        self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
     355        self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
    352356
     357    @with_global_setting(ADMINS=[('nobody', 'nobody@example.com')])
     358    def test_html_mail_admins(self):
     359        """Test html_message argument to mail_admins """
     360        mail_admins('Subject', 'Content', html_message='HTML Content')
     361        message = self.get_the_message()
     362
     363        self.assertEqual(message.get('subject'), '[Django] Subject')
     364        self.assertEqual(message.get_all('to'), ['nobody@example.com'])
     365        self.assertTrue(message.is_multipart())
     366        self.assertEqual(len(message.get_payload()), 2)
     367        self.assertEqual(message.get_payload(0).get_payload(), 'Content')
     368        self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
     369        self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
     370        self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
     371
     372    @with_global_setting(ADMINS=[('nobody', 'nobody+admin@example.com')],
     373                         MANAGERS=[('nobody', 'nobody+manager@example.com')])
     374    def test_manager_and_admin_mail_prefix(self):
     375        """
     376            Regression for #13494:
     377             string prefix + lazy translated subject = bad output
     378        """
    353379        mail_managers(ugettext_lazy('Subject'), 'Content')
    354         self.assertEqual(len(mail.outbox), 1)
    355         message = mail.outbox[0]
    356         self.assertEqual(message.subject, '[Django] Subject')
     380        message = self.get_the_message()
     381        self.assertEqual(message.get('subject'), '[Django] Subject')
    357382
    358         mail.outbox = []
     383        self.flush_mailbox()
    359384        mail_admins(ugettext_lazy('Subject'), 'Content')
    360         self.assertEqual(len(mail.outbox), 1)
    361         message = mail.outbox[0]
    362         self.assertEqual(message.subject, '[Django] Subject')
     385        message = self.get_the_message()
     386        self.assertEqual(message.get('subject'), '[Django] Subject')
    363387
    364         settings.ADMINS = old_admins
    365         settings.MANAGERS = old_managers
     388    @with_global_setting(ADMINS=(), MANAGERS=())
     389    def test_empty_admins(self):
     390        """
     391        Test that mail_admins/mail_managers doesn't connect to the mail server
     392        if there are no recipients (#9383)
     393        """
     394        mail_admins('hi', 'there')
     395        self.assertEqual(self.get_mailbox_content(), [])
     396        mail_managers('hi', 'there')
     397        self.assertEqual(self.get_mailbox_content(), [])
    366398
    367     def test_html_mail_admins(self):
    368         """Test html_message argument to mail_admins and mail_managers"""
    369         old_admins = settings.ADMINS
    370         settings.ADMINS = [('nobody','nobody@example.com')]
     399    def test_message_cc_header(self):
     400        """
     401            Regression test for #7722
     402        """
     403        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com'])
     404        mail.get_connection().send_messages([email])
     405        message = self.get_the_message()
     406        self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ')
    371407
    372         mail.outbox = []
    373         mail_admins('Subject', 'Content', html_message='HTML Content')
    374         self.assertEqual(len(mail.outbox), 1)
    375         message = mail.outbox[0]
    376         self.assertEqual(message.subject, '[Django] Subject')
    377         self.assertEqual(message.body, 'Content')
    378         self.assertEqual(message.alternatives, [('HTML Content', 'text/html')])
     408    def test_idn_send(self):
     409        """
     410            Regression test for #14301
     411        """
     412        self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com']))
     413        message = self.get_the_message()
     414        self.assertEqual(message.get('subject'), 'Subject')
     415        self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
     416        self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
     417
     418        self.flush_mailbox()
     419        m = EmailMessage('Subject', 'Content', 'from@öäü.com',
     420                     [u'to@öäü.com'], cc=[u'cc@öäü.com'])
     421        m.send()
     422        message = self.get_the_message()
     423        self.assertEqual(message.get('subject'), 'Subject')
     424        self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
     425        self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
     426        self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com')
    379427
    380         settings.ADMINS = old_admins
     428    def test_receiptient_without_domain(self):
     429        """
     430            Regression test for #15042
     431        """
     432        self.assertTrue(send_mail("Subject", "Content", "tester", ["django"]))
     433        message = self.get_the_message()
     434        self.assertEqual(message.get('subject'), 'Subject')
     435        self.assertEqual(message.get('from'), "tester")
     436        self.assertEqual(message.get('to'), "django")
    381437
    382     def test_html_mail_managers(self):
    383         """Test html_message argument to mail_admins and mail_managers"""
    384         old_managers = settings.MANAGERS
    385         settings.MANAGERS = [('nobody','nobody@example.com')]
    386438
    387         mail.outbox = []
    388         mail_managers('Subject', 'Content', html_message='HTML Content')
    389         self.assertEqual(len(mail.outbox), 1)
    390         message = mail.outbox[0]
    391         self.assertEqual(message.subject, '[Django] Subject')
    392         self.assertEqual(message.body, 'Content')
    393         self.assertEqual(message.alternatives, [('HTML Content', 'text/html')])
     439class LocmemBackendTests(BaseEmailBackendTests, TestCase):
     440    email_backend = 'django.core.mail.backends.locmem.EmailBackend'
    394441
    395         settings.MANAGERS = old_managers
     442    def get_mailbox_content(self):
     443        return [m.message() for m in mail.outbox]
    396444
    397     def test_idn_validation(self):
    398         """Test internationalized email adresses"""
    399         # Regression for #14301.
     445    def flush_mailbox(self):
    400446        mail.outbox = []
    401         from_email = u'fröm@öäü.com'
    402         to_email = u'tö@öäü.com'
    403         connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend')
    404         send_mail('Subject', 'Content', from_email, [to_email], connection=connection)
    405         self.assertEqual(len(mail.outbox), 1)
    406         message = mail.outbox[0]
    407         self.assertEqual(message.subject, 'Subject')
    408         self.assertEqual(message.from_email, from_email)
    409         self.assertEqual(message.to, [to_email])
    410         self.assertTrue(message.message().as_string().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: =?utf-8?b?ZnLDtm1Aw7bDpMO8LmNvbQ==?=\nTo: =?utf-8?b?dMO2QMO2w6TDvC5jb20=?='))
    411 
    412     def test_idn_smtp_send(self):
    413         import smtplib
    414         smtplib.SMTP = MockSMTP
    415         from_email = u'fröm@öäü.com'
    416         to_email = u'tö@öäü.com'
    417         connection = mail.get_connection('django.core.mail.backends.smtp.EmailBackend')
    418         self.assertTrue(send_mail('Subject', 'Content', from_email, [to_email], connection=connection))
    419 
    420 class MockSMTP(object):
    421     def __init__(self, host='', port=0, local_hostname=None,
    422                  timeout=1):
    423         pass
    424 
    425     def sendmail(self, from_addr, to_addrs, msg, mail_options=[],
    426                  rcpt_options=[]):
    427         for addr in to_addrs:
    428             str(addr.split('@', 1)[-1])
    429         return {}
    430 
    431     def quit(self):
    432         return 0
     447
     448    def tearDown(self):
     449        super(LocmemBackendTests, self).tearDown()
     450        mail.outbox = []
     451
     452    def test_locmem_shared_messages(self):
     453        """
     454        Make sure that the locmen backend populates the outbox.
     455        """
     456        connection = locmem.EmailBackend()
     457        connection2 = locmem.EmailBackend()
     458        email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
     459        connection.send_messages([email])
     460        connection2.send_messages([email])
     461        self.assertEqual(len(mail.outbox), 2)
     462
     463
     464class FileBackendTests(BaseEmailBackendTests, TestCase):
     465    email_backend = 'django.core.mail.backends.filebased.EmailBackend'
     466
     467    def setUp(self):
     468        super(FileBackendTests, self).setUp()
     469        self.tmp_dir = tempfile.mkdtemp()
     470        self.__settings_state = alter_global_settings(EMAIL_FILE_PATH=self.tmp_dir)
     471
     472    def tearDown(self):
     473        restore_global_settings(self.__settings_state)
     474        shutil.rmtree(self.tmp_dir)
     475        super(FileBackendTests, self).tearDown()
     476
     477    def flush_mailbox(self):
     478        for filename in os.listdir(self.tmp_dir):
     479            os.unlink(os.path.join(self.tmp_dir, filename))
     480
     481    def get_mailbox_content(self):
     482        messages = []
     483        for filename in os.listdir(self.tmp_dir):
     484            session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n')
     485            messages.extend(email.message_from_string(m) for m in session if m)
     486        return messages
     487
     488    def test_file_sessions(self):
     489        """Make sure opening a connection creates a new file"""
     490        msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
     491        connection = mail.get_connection()
     492        connection.send_messages([msg])
     493
     494        self.assertEqual(len(os.listdir(self.tmp_dir)), 1)
     495        message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0])))
     496        self.assertEqual(message.get_content_type(), 'text/plain')
     497        self.assertEqual(message.get('subject'), 'Subject')
     498        self.assertEqual(message.get('from'), 'from@example.com')
     499        self.assertEqual(message.get('to'), 'to@example.com')
     500
     501        connection2 = mail.get_connection()
     502        connection2.send_messages([msg])
     503        self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
     504
     505        connection.send_messages([msg])
     506        self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
     507
     508        msg.connection = mail.get_connection()
     509        self.assertTrue(connection.open())
     510        msg.send()
     511        self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
     512        msg.send()
     513        self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
     514
     515
     516class ConsoleBackendTests(BaseEmailBackendTests, TestCase):
     517    email_backend = 'django.core.mail.backends.console.EmailBackend'
     518
     519    def setUp(self):
     520        super(ConsoleBackendTests, self).setUp()
     521        self.__stdout = sys.stdout
     522        self.stream = sys.stdout = StringIO()
     523
     524    def tearDown(self):
     525        del self.stream
     526        sys.stdout = self.__stdout
     527        del self.__stdout
     528        super(ConsoleBackendTests, self).tearDown()
     529
     530    def flush_mailbox(self):
     531        self.stream = sys.stdout = StringIO()
     532
     533    def get_mailbox_content(self):
     534        messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n')
     535        return [email.message_from_string(m) for m in messages if m]
     536
     537    def test_console_stream_kwarg(self):
     538        """
     539        Test that the console backend can be pointed at an arbitrary stream.
     540        """
     541        s = StringIO()
     542        connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s)
     543        send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
     544        self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
     545
     546
     547class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
     548    """
     549    Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from:
     550    http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup
     551    """
     552
     553    def __init__(self, *args, **kwargs):
     554        threading.Thread.__init__(self)
     555        smtpd.SMTPServer.__init__(self, *args, **kwargs)
     556        self._sink = []
     557        self.active = False
     558        self.active_lock = threading.Lock()
     559        self.sink_lock = threading.Lock()
     560
     561    def process_message(self, peer, mailfrom, rcpttos, data):
     562        m = email.message_from_string(data)
     563        maddr = parseaddr(m.get('from'))[1]
     564        if mailfrom != maddr:
     565            return "553 '%s' != '%s'" % (mailfrom, maddr)
     566        self.sink_lock.acquire()
     567        self._sink.append(m)
     568        self.sink_lock.release()
     569
     570    def get_sink(self):
     571        self.sink_lock.acquire()
     572        try:
     573            return self._sink[:]
     574        finally:
     575            self.sink_lock.release()
     576
     577    def flush_sink(self):
     578        self.sink_lock.acquire()
     579        self._sink[:] = []
     580        self.sink_lock.release()
     581
     582    def start(self):
     583        assert not self.active
     584        self.__flag = threading.Event()
     585        threading.Thread.start(self)
     586        self.__flag.wait()
     587
     588    def run(self):
     589        self.active = True
     590        self.__flag.set()
     591        while self.active and asyncore.socket_map:
     592            self.active_lock.acquire()
     593            asyncore.loop(timeout=0.1, count=1)
     594            self.active_lock.release()
     595        asyncore.close_all()
     596
     597    def stop(self):
     598        assert self.active
     599        self.active = False
     600        self.join()
     601
     602
     603class SMTPBackendTests(BaseEmailBackendTests, TestCase):
     604    email_backend = 'django.core.mail.backends.smtp.EmailBackend'
     605
     606    @classmethod
     607    def setUpClass(cls):
     608        cls.__server = FakeSMTPServer(('127.0.0.1', 0), None)
     609        cls.__settings = alter_global_settings(
     610            EMAIL_HOST="127.0.0.1",
     611            EMAIL_PORT=cls.__server.socket.getsockname()[1])
     612        cls.__server.start()
     613
     614    @classmethod
     615    def tearDownClass(cls):
     616        cls.__server.stop()
     617
     618    def setUp(self):
     619        super(SMTPBackendTests, self).setUp()
     620        self.__server.flush_sink()
     621
     622    def tearDown(self):
     623        self.__server.flush_sink()
     624        super(SMTPBackendTests, self).tearDown()
     625
     626    def flush_mailbox(self):
     627        self.__server.flush_sink()
     628
     629    def get_mailbox_content(self):
     630        return self.__server.get_sink()
Back to Top