Ticket #15042: ticket15042.diff
File ticket15042.diff, 38.7 KB (added by , 14 years ago) |
---|
-
django/core/mail/backends/smtp.py
diff --git a/django/core/mail/backends/smtp.py b/django/core/mail/backends/smtp.py index 3b2962f..bb184ab 100644
a b 1 1 """SMTP email backend class.""" 2 3 2 import smtplib 4 3 import socket 5 4 import threading … … import threading 7 6 from django.conf import settings 8 7 from django.core.mail.backends.base import BaseEmailBackend 9 8 from django.core.mail.utils import DNS_NAME 9 from django.core.mail.message import sanitize_address 10 10 11 11 12 class EmailBackend(BaseEmailBackend): 12 13 """ … … class EmailBackend(BaseEmailBackend): 91 92 self._lock.release() 92 93 return num_sent 93 94 94 def _sanitize(self, email):95 name, domain = email.split('@', 1)96 email = '@'.join([name, domain.encode('idna')])97 return email98 99 95 def _send(self, email_message): 100 96 """A helper method that does the actual sending.""" 101 97 if not email_message.recipients(): 102 98 return False 103 from_email = self._sanitize(email_message.from_email) 104 recipients = map(self._sanitize, email_message.recipients()) 99 from_email = sanitize_address(email_message.from_email, email_message.encoding) 100 recipients = [sanitize_address(addr, email_message.encoding) 101 for addr in email_message.recipients()] 105 102 try: 106 103 self.connection.sendmail(from_email, recipients, 107 104 email_message.message().as_string()) -
django/core/mail/message.py
diff --git a/django/core/mail/message.py b/django/core/mail/message.py index 2311102..96ff689 100644
a b from email.Utils import formatdate, getaddresses, formataddr 12 12 from django.conf import settings 13 13 from django.core.mail.utils import DNS_NAME 14 14 from django.utils.encoding import smart_str, force_unicode 15 from email.Utils import parseaddr 15 16 16 17 # Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from 17 18 # some spam filters. … … def make_msgid(idstring=None): 54 55 return msgid 55 56 56 57 58 # Header names that contain structured address data (RFC #5322) 59 ADDRESS_HEADERS = set([ 60 'from', 61 'sender', 62 'reply-to', 63 'to', 64 'cc', 65 'bcc', 66 'resent-from', 67 'resent-sender', 68 'resent-to', 69 'resent-cc', 70 'resent-bcc', 71 ]) 72 73 57 74 def forbid_multi_line_headers(name, val, encoding): 58 75 """Forbids multi-line headers, to prevent header injection.""" 59 76 encoding = encoding or settings.DEFAULT_CHARSET … … def forbid_multi_line_headers(name, val, encoding): 63 80 try: 64 81 val = val.encode('ascii') 65 82 except UnicodeEncodeError: 66 if name.lower() in ('to', 'from', 'cc'): 67 result = [] 68 for nm, addr in getaddresses((val,)): 69 nm = str(Header(nm.encode(encoding), encoding)) 70 try: 71 addr = addr.encode('ascii') 72 except UnicodeEncodeError: # IDN 73 addr = str(Header(addr.encode(encoding), encoding)) 74 result.append(formataddr((nm, addr))) 75 val = ', '.join(result) 83 if name.lower() in ADDRESS_HEADERS: 84 val = ', '.join(sanitize_address(addr, encoding) 85 for addr in getaddresses((val,))) 76 86 else: 77 val = Header(val.encode(encoding), encoding)87 val = str(Header(val, encoding)) 78 88 else: 79 89 if name.lower() == 'subject': 80 90 val = Header(val) 81 91 return name, val 82 92 93 94 def sanitize_address(addr, encoding): 95 if isinstance(addr, basestring): 96 addr = parseaddr(force_unicode(addr)) 97 nm, addr = addr 98 nm = str(Header(nm, encoding)) 99 try: 100 addr = addr.encode('ascii') 101 except UnicodeEncodeError: # IDN 102 if u'@' in addr: 103 localpart, domain = addr.split(u'@', 1) 104 localpart = str(Header(localpart, encoding)) 105 domain = domain.encode('idna') 106 addr = '@'.join([localpart, domain]) 107 else: 108 addr = str(Header(addr, encoding)) 109 return formataddr((nm, addr)) 110 111 83 112 class SafeMIMEText(MIMEText): 84 113 85 114 def __init__(self, text, subtype, charset): 86 115 self.encoding = charset 87 116 MIMEText.__init__(self, text, subtype, charset) 88 89 def __setitem__(self, name, val): 117 118 def __setitem__(self, name, val): 90 119 name, val = forbid_multi_line_headers(name, val, self.encoding) 91 120 MIMEText.__setitem__(self, name, val) 92 121 122 93 123 class SafeMIMEMultipart(MIMEMultipart): 94 124 95 125 def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params): 96 126 self.encoding = encoding 97 127 MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params) 98 128 99 129 def __setitem__(self, name, val): 100 130 name, val = forbid_multi_line_headers(name, val, self.encoding) 101 131 MIMEMultipart.__setitem__(self, name, val) 102 132 133 103 134 class EmailMessage(object): 104 135 """ 105 136 A container for email information. … … class EmailMultiAlternatives(EmailMessage): 274 305 conversions. 275 306 """ 276 307 super(EmailMultiAlternatives, self).__init__(subject, body, from_email, to, bcc, connection, attachments, headers, cc) 277 self.alternatives =alternatives or []308 self.alternatives = alternatives or [] 278 309 279 310 def attach_alternative(self, content, mimetype): 280 311 """Attach an alternative content representation.""" -
tests/regressiontests/mail/tests.py
diff --git a/tests/regressiontests/mail/tests.py b/tests/regressiontests/mail/tests.py index a6cd60e..18c4c62 100644
a b 1 1 # coding: utf-8 2 import asyncore 2 3 import email 3 4 import os 4 5 import shutil 6 import smtpd 5 7 import sys 6 8 import tempfile 9 import threading 7 10 from StringIO import StringIO 8 11 from django.conf import settings 9 12 from django.core import mail 10 13 from django.core.mail import EmailMessage, mail_admins, mail_managers, EmailMultiAlternatives 11 14 from django.core.mail import send_mail, send_mass_mail 12 from django.core.mail.backends.base import BaseEmailBackend13 15 from django.core.mail.backends import console, dummy, locmem, filebased, smtp 14 16 from django.core.mail.message import BadHeaderError 15 17 from django.test import TestCase 16 18 from django.utils.translation import ugettext_lazy 19 from django.utils.functional import wraps 20 from email.Utils import parseaddr 21 22 23 def alter_global_settings(**kwargs): 24 oldvalues = {} 25 nonexistant = [] 26 for setting, newvalue in kwargs.iteritems(): 27 try: 28 oldvalues[setting] = getattr(settings, setting) 29 except AttributeError: 30 nonexistant.append(setting) 31 setattr(settings, setting, newvalue) 32 return oldvalues, nonexistant 33 34 35 def restore_global_settings(state): 36 oldvalues, nonexistant = state 37 for setting, oldvalue in oldvalues.iteritems(): 38 setattr(settings, setting, oldvalue) 39 for setting in nonexistant: 40 delattr(settings, setting) 41 42 43 def with_global_setting(**kwargs): 44 def decorator(test): 45 @wraps(test) 46 def decorated_test(self): 47 state = alter_global_settings(**kwargs) 48 try: 49 return test(self) 50 finally: 51 restore_global_settings(state) 52 return decorated_test 53 return decorator 54 17 55 18 56 class MailTests(TestCase): 57 """ 58 Non-backend specific tests. 59 """ 19 60 20 61 def test_ascii(self): 21 62 email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) … … class MailTests(TestCase): 26 67 self.assertEqual(message['To'], 'to@example.com') 27 68 28 69 def test_multiple_recipients(self): 29 email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'])70 email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com']) 30 71 message = email.message() 31 72 self.assertEqual(message['Subject'].encode(), 'Subject') 32 73 self.assertEqual(message.get_payload(), 'Content') … … class MailTests(TestCase): 40 81 self.assertEqual(message['Cc'], 'cc@example.com') 41 82 self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com']) 42 83 43 # Verify headers44 old_stdout = sys.stdout45 sys.stdout = StringIO()46 connection = console.EmailBackend()47 connection.send_messages([email])48 self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: '))49 sys.stdout = old_stdout50 51 84 # Test multiple CC with multiple To 52 85 email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com']) 53 86 message = email.message() … … class MailTests(TestCase): 83 116 email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers) 84 117 self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent') 85 118 86 def test_empty_admins(self):87 """88 Test that mail_admins/mail_managers doesn't connect to the mail server89 if there are no recipients (#9383)90 """91 old_admins = settings.ADMINS92 old_managers = settings.MANAGERS93 94 settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]95 mail.outbox = []96 mail_admins('hi', 'there')97 self.assertEqual(len(mail.outbox), 1)98 mail.outbox = []99 mail_managers('hi', 'there')100 self.assertEqual(len(mail.outbox), 1)101 102 settings.ADMINS = settings.MANAGERS = []103 mail.outbox = []104 mail_admins('hi', 'there')105 self.assertEqual(len(mail.outbox), 0)106 mail.outbox = []107 mail_managers('hi', 'there')108 self.assertEqual(len(mail.outbox), 0)109 110 settings.ADMINS = old_admins111 settings.MANAGERS = old_managers112 113 119 def test_from_header(self): 114 120 """ 115 121 Make sure we can manually set the From header (#9214) … … class MailTests(TestCase): 129 135 message = email.message() 130 136 self.assertEqual(message['From'], 'from@example.com') 131 137 132 def test_unicode_ header(self):138 def test_unicode_address_header(self): 133 139 """ 134 140 Regression for #11144 - When a to/from/cc header contains unicode, 135 141 make sure the email addresses are parsed correctly (especially with 136 142 regards to commas) 137 143 """ 138 email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com'])144 email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com']) 139 145 self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com') 140 email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com'])146 email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com']) 141 147 self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com') 142 148 149 def test_unicode_headers(self): 150 email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"], 151 headers={"Sender": '"Firstname Sürname" <sender@example.com>', 152 "Comments": 'My Sürname is non-ASCII'}) 153 message = email.message() 154 self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=') 155 self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= <sender@example.com>') 156 self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=') 157 143 158 def test_safe_mime_multipart(self): 144 159 """ 145 160 Make sure headers can be set with a different encoding than utf-8 in … … class MailTests(TestCase): 193 208 self.assertEqual(payload[0].get_content_type(), 'multipart/alternative') 194 209 self.assertEqual(payload[1].get_content_type(), 'application/pdf') 195 210 196 def test_arbitrary_stream(self): 197 """ 198 Test that the console backend can be pointed at an arbitrary stream. 199 """ 200 s = StringIO() 201 connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s) 202 send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) 203 self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) 204 205 def test_stdout(self): 206 """Make sure that the console backend writes to stdout by default""" 207 old_stdout = sys.stdout 208 sys.stdout = StringIO() 209 connection = console.EmailBackend() 210 email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) 211 connection.send_messages([email]) 212 self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) 213 sys.stdout = old_stdout 214 215 def test_dummy(self): 211 def test_dummy_backend(self): 216 212 """ 217 213 Make sure that dummy backends returns correct number of sent messages 218 214 """ … … class MailTests(TestCase): 220 216 email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) 221 217 self.assertEqual(connection.send_messages([email, email, email]), 3) 222 218 223 def test_locmem(self):224 """225 Make sure that the locmen backend populates the outbox.226 """227 mail.outbox = []228 connection = locmem.EmailBackend()229 email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})230 email2 = EmailMessage('Subject 2', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})231 connection.send_messages([email1, email2])232 self.assertEqual(len(mail.outbox), 2)233 self.assertEqual(mail.outbox[0].subject, 'Subject')234 self.assertEqual(mail.outbox[1].subject, 'Subject 2')235 236 # Make sure that multiple locmem connections share mail.outbox237 mail.outbox = []238 connection2 = locmem.EmailBackend()239 email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})240 connection.send_messages([email])241 connection2.send_messages([email])242 self.assertEqual(len(mail.outbox), 2)243 244 def test_file_backend(self):245 tmp_dir = tempfile.mkdtemp()246 connection = filebased.EmailBackend(file_path=tmp_dir)247 email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})248 connection.send_messages([email1])249 self.assertEqual(len(os.listdir(tmp_dir)), 1)250 message = email.message_from_file(open(os.path.join(tmp_dir, os.listdir(tmp_dir)[0])))251 self.assertEqual(message.get_content_type(), 'text/plain')252 self.assertEqual(message.get('subject'), 'Subject')253 self.assertEqual(message.get('from'), 'from@example.com')254 self.assertEqual(message.get('to'), 'to@example.com')255 connection2 = filebased.EmailBackend(file_path=tmp_dir)256 connection2.send_messages([email1])257 self.assertEqual(len(os.listdir(tmp_dir)), 2)258 connection.send_messages([email1])259 self.assertEqual(len(os.listdir(tmp_dir)), 2)260 email1.connection = filebased.EmailBackend(file_path=tmp_dir)261 connection_created = connection.open()262 email1.send()263 self.assertEqual(len(os.listdir(tmp_dir)), 3)264 email1.send()265 self.assertEqual(len(os.listdir(tmp_dir)), 3)266 connection.close()267 shutil.rmtree(tmp_dir)268 269 219 def test_arbitrary_keyword(self): 270 220 """ 271 221 Make sure that get_connection() accepts arbitrary keyword that might be … … class MailTests(TestCase): 289 239 self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend)) 290 240 self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend)) 291 241 tmp_dir = tempfile.mkdtemp() 292 self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) 293 shutil.rmtree(tmp_dir) 242 try: 243 self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) 244 finally: 245 shutil.rmtree(tmp_dir) 294 246 self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend)) 295 247 248 @with_global_setting( 249 EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend', 250 ADMINS=[('nobody', 'nobody@example.com')], 251 MANAGERS=[('nobody', 'nobody@example.com')]) 296 252 def test_connection_arg(self): 297 253 """Test connection argument to send_mail(), et. al.""" 298 connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend')299 300 254 mail.outbox = [] 255 256 # Send using non-default connection 257 connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') 301 258 send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) 302 self.assertEqual(len(mail.outbox), 1) 303 message = mail.outbox[0] 304 self.assertEqual(message.subject, 'Subject') 305 self.assertEqual(message.from_email, 'from@example.com') 306 self.assertEqual(message.to, ['to@example.com']) 259 self.assertEqual(mail.outbox, []) 260 self.assertEqual(len(connection.test_outbox), 1) 261 self.assertEqual(connection.test_outbox[0].subject, 'Subject') 307 262 308 mail.outbox = []263 connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') 309 264 send_mass_mail([ 310 265 ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']), 311 ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']) 266 ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']), 312 267 ], connection=connection) 313 self.assertEqual(len(mail.outbox), 2) 314 message = mail.outbox[0] 315 self.assertEqual(message.subject, 'Subject1') 316 self.assertEqual(message.from_email, 'from1@example.com') 317 self.assertEqual(message.to, ['to1@example.com']) 318 message = mail.outbox[1] 319 self.assertEqual(message.subject, 'Subject2') 320 self.assertEqual(message.from_email, 'from2@example.com') 321 self.assertEqual(message.to, ['to2@example.com']) 322 323 old_admins = settings.ADMINS 324 old_managers = settings.MANAGERS 325 settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')] 268 self.assertEqual(mail.outbox, []) 269 self.assertEqual(len(connection.test_outbox), 2) 270 self.assertEqual(connection.test_outbox[0].subject, 'Subject1') 271 self.assertEqual(connection.test_outbox[1].subject, 'Subject2') 326 272 327 mail.outbox = [] 328 mail_admins('Subject', 'Content', connection=connection) 329 self.assertEqual(len(mail.outbox), 1) 330 message = mail.outbox[0] 331 self.assertEqual(message.subject, '[Django] Subject') 332 self.assertEqual(message.from_email, 'root@localhost') 333 self.assertEqual(message.to, ['nobody@example.com']) 273 connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') 274 mail_admins('Admin message', 'Content', connection=connection) 275 self.assertEqual(mail.outbox, []) 276 self.assertEqual(len(connection.test_outbox), 1) 277 self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message') 334 278 335 mail.outbox = [] 336 mail_managers('Subject', 'Content', connection=connection) 337 self.assertEqual(len(mail.outbox), 1) 338 message = mail.outbox[0] 339 self.assertEqual(message.subject, '[Django] Subject') 340 self.assertEqual(message.from_email, 'root@localhost') 341 self.assertEqual(message.to, ['nobody@example.com']) 342 343 settings.ADMINS = old_admins 344 settings.MANAGERS = old_managers 345 346 def test_mail_prefix(self): 347 """Test prefix argument in manager/admin mail.""" 348 # Regression for #13494. 349 old_admins = settings.ADMINS 350 old_managers = settings.MANAGERS 351 settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')] 279 connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') 280 mail_managers('Manager message', 'Content', connection=connection) 281 self.assertEqual(mail.outbox, []) 282 self.assertEqual(len(connection.test_outbox), 1) 283 self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message') 284 285 286 class BaseEmailBackendTests(object): 287 email_backend = None 288 289 def setUp(self): 290 self.__settings_state = alter_global_settings(EMAIL_BACKEND=self.email_backend) 291 292 def tearDown(self): 293 restore_global_settings(self.__settings_state) 294 295 def assertStartsWith(self, first, second): 296 if not first.startswith(second): 297 self.longMessage = True 298 self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.") 299 300 def get_mailbox_content(self): 301 raise NotImplementedError 302 303 def flush_mailbox(self): 304 raise NotImplementedError 305 306 def get_the_message(self): 307 mailbox = self.get_mailbox_content() 308 self.assertEqual(len(mailbox), 1, 309 "Expected exactly one message, got %d.\n%r" % (len(mailbox), [ 310 m.as_string() for m in mailbox])) 311 return mailbox[0] 312 313 def test_send(self): 314 email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) 315 num_sent = mail.get_connection().send_messages([email]) 316 self.assertEqual(num_sent, 1) 317 message = self.get_the_message() 318 self.assertEqual(message["subject"], "Subject") 319 self.assertEqual(message.get_payload(), "Content") 320 self.assertEqual(message["from"], "from@example.com") 321 self.assertEqual(message.get_all("to"), ["to@example.com"]) 322 323 def test_send_many(self): 324 email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com']) 325 email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com']) 326 num_sent = mail.get_connection().send_messages([email1, email2]) 327 self.assertEqual(num_sent, 2) 328 messages = self.get_mailbox_content() 329 self.assertEquals(len(messages), 2) 330 self.assertEqual(messages[0].get_payload(), "Content1") 331 self.assertEqual(messages[1].get_payload(), "Content2") 332 333 def test_send_verbose_name(self): 334 email = EmailMessage("Subject", "Content", '"Firstname Sürname" <from@example.com>', 335 ["to@example.com"]) 336 email.send() 337 message = self.get_the_message() 338 self.assertEqual(message["subject"], "Subject") 339 self.assertEqual(message.get_payload(), "Content") 340 self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>") 341 342 @with_global_setting(MANAGERS=[('nobody', 'nobody@example.com')]) 343 def test_html_mail_managers(self): 344 """Test html_message argument to mail_managers""" 345 mail_managers('Subject', 'Content', html_message='HTML Content') 346 message = self.get_the_message() 347 348 self.assertEqual(message.get('subject'), '[Django] Subject') 349 self.assertEqual(message.get_all('to'), ['nobody@example.com']) 350 self.assertTrue(message.is_multipart()) 351 self.assertEqual(len(message.get_payload()), 2) 352 self.assertEqual(message.get_payload(0).get_payload(), 'Content') 353 self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') 354 self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') 355 self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') 352 356 357 @with_global_setting(ADMINS=[('nobody', 'nobody@example.com')]) 358 def test_html_mail_admins(self): 359 """Test html_message argument to mail_admins """ 360 mail_admins('Subject', 'Content', html_message='HTML Content') 361 message = self.get_the_message() 362 363 self.assertEqual(message.get('subject'), '[Django] Subject') 364 self.assertEqual(message.get_all('to'), ['nobody@example.com']) 365 self.assertTrue(message.is_multipart()) 366 self.assertEqual(len(message.get_payload()), 2) 367 self.assertEqual(message.get_payload(0).get_payload(), 'Content') 368 self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') 369 self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') 370 self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') 371 372 @with_global_setting(ADMINS=[('nobody', 'nobody+admin@example.com')], 373 MANAGERS=[('nobody', 'nobody+manager@example.com')]) 374 def test_manager_and_admin_mail_prefix(self): 375 """ 376 Regression for #13494: 377 string prefix + lazy translated subject = bad output 378 """ 353 379 mail_managers(ugettext_lazy('Subject'), 'Content') 354 self.assertEqual(len(mail.outbox), 1) 355 message = mail.outbox[0] 356 self.assertEqual(message.subject, '[Django] Subject') 380 message = self.get_the_message() 381 self.assertEqual(message.get('subject'), '[Django] Subject') 357 382 358 mail.outbox = []383 self.flush_mailbox() 359 384 mail_admins(ugettext_lazy('Subject'), 'Content') 360 self.assertEqual(len(mail.outbox), 1) 361 message = mail.outbox[0] 362 self.assertEqual(message.subject, '[Django] Subject') 385 message = self.get_the_message() 386 self.assertEqual(message.get('subject'), '[Django] Subject') 363 387 364 settings.ADMINS = old_admins 365 settings.MANAGERS = old_managers 388 @with_global_setting(ADMINS=(), MANAGERS=()) 389 def test_empty_admins(self): 390 """ 391 Test that mail_admins/mail_managers doesn't connect to the mail server 392 if there are no recipients (#9383) 393 """ 394 mail_admins('hi', 'there') 395 self.assertEqual(self.get_mailbox_content(), []) 396 mail_managers('hi', 'there') 397 self.assertEqual(self.get_mailbox_content(), []) 366 398 367 def test_html_mail_admins(self): 368 """Test html_message argument to mail_admins and mail_managers""" 369 old_admins = settings.ADMINS 370 settings.ADMINS = [('nobody','nobody@example.com')] 399 def test_message_cc_header(self): 400 """ 401 Regression test for #7722 402 """ 403 email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com']) 404 mail.get_connection().send_messages([email]) 405 message = self.get_the_message() 406 self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ') 371 407 372 mail.outbox = [] 373 mail_admins('Subject', 'Content', html_message='HTML Content') 374 self.assertEqual(len(mail.outbox), 1) 375 message = mail.outbox[0] 376 self.assertEqual(message.subject, '[Django] Subject') 377 self.assertEqual(message.body, 'Content') 378 self.assertEqual(message.alternatives, [('HTML Content', 'text/html')]) 408 def test_idn_send(self): 409 """ 410 Regression test for #14301 411 """ 412 self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com'])) 413 message = self.get_the_message() 414 self.assertEqual(message.get('subject'), 'Subject') 415 self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') 416 self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') 417 418 self.flush_mailbox() 419 m = EmailMessage('Subject', 'Content', 'from@öäü.com', 420 [u'to@öäü.com'], cc=[u'cc@öäü.com']) 421 m.send() 422 message = self.get_the_message() 423 self.assertEqual(message.get('subject'), 'Subject') 424 self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') 425 self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') 426 self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com') 379 427 380 settings.ADMINS = old_admins 428 def test_receiptient_without_domain(self): 429 """ 430 Regression test for #15042 431 """ 432 self.assertTrue(send_mail("Subject", "Content", "tester", ["django"])) 433 message = self.get_the_message() 434 self.assertEqual(message.get('subject'), 'Subject') 435 self.assertEqual(message.get('from'), "tester") 436 self.assertEqual(message.get('to'), "django") 381 437 382 def test_html_mail_managers(self):383 """Test html_message argument to mail_admins and mail_managers"""384 old_managers = settings.MANAGERS385 settings.MANAGERS = [('nobody','nobody@example.com')]386 438 387 mail.outbox = [] 388 mail_managers('Subject', 'Content', html_message='HTML Content') 389 self.assertEqual(len(mail.outbox), 1) 390 message = mail.outbox[0] 391 self.assertEqual(message.subject, '[Django] Subject') 392 self.assertEqual(message.body, 'Content') 393 self.assertEqual(message.alternatives, [('HTML Content', 'text/html')]) 439 class LocmemBackendTests(BaseEmailBackendTests, TestCase): 440 email_backend = 'django.core.mail.backends.locmem.EmailBackend' 394 441 395 settings.MANAGERS = old_managers 442 def get_mailbox_content(self): 443 return [m.message() for m in mail.outbox] 396 444 397 def test_idn_validation(self): 398 """Test internationalized email adresses""" 399 # Regression for #14301. 445 def flush_mailbox(self): 400 446 mail.outbox = [] 401 from_email = u'fröm@öäü.com' 402 to_email = u'tö@öäü.com' 403 connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend') 404 send_mail('Subject', 'Content', from_email, [to_email], connection=connection) 405 self.assertEqual(len(mail.outbox), 1) 406 message = mail.outbox[0] 407 self.assertEqual(message.subject, 'Subject') 408 self.assertEqual(message.from_email, from_email) 409 self.assertEqual(message.to, [to_email]) 410 self.assertTrue(message.message().as_string().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: =?utf-8?b?ZnLDtm1Aw7bDpMO8LmNvbQ==?=\nTo: =?utf-8?b?dMO2QMO2w6TDvC5jb20=?=')) 411 412 def test_idn_smtp_send(self): 413 import smtplib 414 smtplib.SMTP = MockSMTP 415 from_email = u'fröm@öäü.com' 416 to_email = u'tö@öäü.com' 417 connection = mail.get_connection('django.core.mail.backends.smtp.EmailBackend') 418 self.assertTrue(send_mail('Subject', 'Content', from_email, [to_email], connection=connection)) 419 420 class MockSMTP(object): 421 def __init__(self, host='', port=0, local_hostname=None, 422 timeout=1): 423 pass 424 425 def sendmail(self, from_addr, to_addrs, msg, mail_options=[], 426 rcpt_options=[]): 427 for addr in to_addrs: 428 str(addr.split('@', 1)[-1]) 429 return {} 430 431 def quit(self): 432 return 0 447 448 def tearDown(self): 449 super(LocmemBackendTests, self).tearDown() 450 mail.outbox = [] 451 452 def test_locmem_shared_messages(self): 453 """ 454 Make sure that the locmen backend populates the outbox. 455 """ 456 connection = locmem.EmailBackend() 457 connection2 = locmem.EmailBackend() 458 email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) 459 connection.send_messages([email]) 460 connection2.send_messages([email]) 461 self.assertEqual(len(mail.outbox), 2) 462 463 464 class FileBackendTests(BaseEmailBackendTests, TestCase): 465 email_backend = 'django.core.mail.backends.filebased.EmailBackend' 466 467 def setUp(self): 468 super(FileBackendTests, self).setUp() 469 self.tmp_dir = tempfile.mkdtemp() 470 self.__settings_state = alter_global_settings(EMAIL_FILE_PATH=self.tmp_dir) 471 472 def tearDown(self): 473 restore_global_settings(self.__settings_state) 474 shutil.rmtree(self.tmp_dir) 475 super(FileBackendTests, self).tearDown() 476 477 def flush_mailbox(self): 478 for filename in os.listdir(self.tmp_dir): 479 os.unlink(os.path.join(self.tmp_dir, filename)) 480 481 def get_mailbox_content(self): 482 messages = [] 483 for filename in os.listdir(self.tmp_dir): 484 session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n') 485 messages.extend(email.message_from_string(m) for m in session if m) 486 return messages 487 488 def test_file_sessions(self): 489 """Make sure opening a connection creates a new file""" 490 msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) 491 connection = mail.get_connection() 492 connection.send_messages([msg]) 493 494 self.assertEqual(len(os.listdir(self.tmp_dir)), 1) 495 message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0]))) 496 self.assertEqual(message.get_content_type(), 'text/plain') 497 self.assertEqual(message.get('subject'), 'Subject') 498 self.assertEqual(message.get('from'), 'from@example.com') 499 self.assertEqual(message.get('to'), 'to@example.com') 500 501 connection2 = mail.get_connection() 502 connection2.send_messages([msg]) 503 self.assertEqual(len(os.listdir(self.tmp_dir)), 2) 504 505 connection.send_messages([msg]) 506 self.assertEqual(len(os.listdir(self.tmp_dir)), 2) 507 508 msg.connection = mail.get_connection() 509 self.assertTrue(connection.open()) 510 msg.send() 511 self.assertEqual(len(os.listdir(self.tmp_dir)), 3) 512 msg.send() 513 self.assertEqual(len(os.listdir(self.tmp_dir)), 3) 514 515 516 class ConsoleBackendTests(BaseEmailBackendTests, TestCase): 517 email_backend = 'django.core.mail.backends.console.EmailBackend' 518 519 def setUp(self): 520 super(ConsoleBackendTests, self).setUp() 521 self.__stdout = sys.stdout 522 self.stream = sys.stdout = StringIO() 523 524 def tearDown(self): 525 del self.stream 526 sys.stdout = self.__stdout 527 del self.__stdout 528 super(ConsoleBackendTests, self).tearDown() 529 530 def flush_mailbox(self): 531 self.stream = sys.stdout = StringIO() 532 533 def get_mailbox_content(self): 534 messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n') 535 return [email.message_from_string(m) for m in messages if m] 536 537 def test_console_stream_kwarg(self): 538 """ 539 Test that the console backend can be pointed at an arbitrary stream. 540 """ 541 s = StringIO() 542 connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s) 543 send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) 544 self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) 545 546 547 class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): 548 """ 549 Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from: 550 http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup 551 """ 552 553 def __init__(self, *args, **kwargs): 554 threading.Thread.__init__(self) 555 smtpd.SMTPServer.__init__(self, *args, **kwargs) 556 self._sink = [] 557 self.active = False 558 self.active_lock = threading.Lock() 559 self.sink_lock = threading.Lock() 560 561 def process_message(self, peer, mailfrom, rcpttos, data): 562 m = email.message_from_string(data) 563 maddr = parseaddr(m.get('from'))[1] 564 if mailfrom != maddr: 565 return "553 '%s' != '%s'" % (mailfrom, maddr) 566 self.sink_lock.acquire() 567 self._sink.append(m) 568 self.sink_lock.release() 569 570 def get_sink(self): 571 self.sink_lock.acquire() 572 try: 573 return self._sink[:] 574 finally: 575 self.sink_lock.release() 576 577 def flush_sink(self): 578 self.sink_lock.acquire() 579 self._sink[:] = [] 580 self.sink_lock.release() 581 582 def start(self): 583 assert not self.active 584 self.__flag = threading.Event() 585 threading.Thread.start(self) 586 self.__flag.wait() 587 588 def run(self): 589 self.active = True 590 self.__flag.set() 591 while self.active and asyncore.socket_map: 592 self.active_lock.acquire() 593 asyncore.loop(timeout=0.1, count=1) 594 self.active_lock.release() 595 asyncore.close_all() 596 597 def stop(self): 598 assert self.active 599 self.active = False 600 self.join() 601 602 603 class SMTPBackendTests(BaseEmailBackendTests, TestCase): 604 email_backend = 'django.core.mail.backends.smtp.EmailBackend' 605 606 @classmethod 607 def setUpClass(cls): 608 cls.__server = FakeSMTPServer(('127.0.0.1', 0), None) 609 cls.__settings = alter_global_settings( 610 EMAIL_HOST="127.0.0.1", 611 EMAIL_PORT=cls.__server.socket.getsockname()[1]) 612 cls.__server.start() 613 614 @classmethod 615 def tearDownClass(cls): 616 cls.__server.stop() 617 618 def setUp(self): 619 super(SMTPBackendTests, self).setUp() 620 self.__server.flush_sink() 621 622 def tearDown(self): 623 self.__server.flush_sink() 624 super(SMTPBackendTests, self).tearDown() 625 626 def flush_mailbox(self): 627 self.__server.flush_sink() 628 629 def get_mailbox_content(self): 630 return self.__server.get_sink()