| 1 |
import sys, time, os |
|---|
| 2 |
from django.conf import settings |
|---|
| 3 |
from django.db import connection, get_creation_module |
|---|
| 4 |
from django.core import mail |
|---|
| 5 |
from django.core.management import call_command |
|---|
| 6 |
from django.dispatch import dispatcher |
|---|
| 7 |
from django.test import signals |
|---|
| 8 |
from django.template import Template |
|---|
| 9 |
from django.utils.translation import deactivate |
|---|
| 10 |
|
|---|
| 11 |
# The prefix to put on the default database name when creating |
|---|
| 12 |
# the test database. |
|---|
| 13 |
TEST_DATABASE_PREFIX = 'test_' |
|---|
| 14 |
|
|---|
| 15 |
def instrumented_test_render(self, context): |
|---|
| 16 |
""" |
|---|
| 17 |
An instrumented Template render method, providing a signal |
|---|
| 18 |
that can be intercepted by the test system Client |
|---|
| 19 |
""" |
|---|
| 20 |
dispatcher.send(signal=signals.template_rendered, sender=self, template=self, context=context) |
|---|
| 21 |
return self.nodelist.render(context) |
|---|
| 22 |
|
|---|
| 23 |
class TestSMTPConnection(object): |
|---|
| 24 |
"""A substitute SMTP connection for use during test sessions. |
|---|
| 25 |
The test connection stores email messages in a dummy outbox, |
|---|
| 26 |
rather than sending them out on the wire. |
|---|
| 27 |
|
|---|
| 28 |
""" |
|---|
| 29 |
def __init__(*args, **kwargs): |
|---|
| 30 |
pass |
|---|
| 31 |
def open(self): |
|---|
| 32 |
"Mock the SMTPConnection open() interface" |
|---|
| 33 |
pass |
|---|
| 34 |
def close(self): |
|---|
| 35 |
"Mock the SMTPConnection close() interface" |
|---|
| 36 |
pass |
|---|
| 37 |
def send_messages(self, messages): |
|---|
| 38 |
"Redirect messages to the dummy outbox" |
|---|
| 39 |
mail.outbox.extend(messages) |
|---|
| 40 |
return len(messages) |
|---|
| 41 |
|
|---|
| 42 |
def setup_test_environment(): |
|---|
| 43 |
"""Perform any global pre-test setup. This involves: |
|---|
| 44 |
|
|---|
| 45 |
- Installing the instrumented test renderer |
|---|
| 46 |
- Diverting the email sending functions to a test buffer |
|---|
| 47 |
- Setting the active locale to match the LANGUAGE_CODE setting. |
|---|
| 48 |
""" |
|---|
| 49 |
Template.original_render = Template.render |
|---|
| 50 |
Template.render = instrumented_test_render |
|---|
| 51 |
|
|---|
| 52 |
mail.original_SMTPConnection = mail.SMTPConnection |
|---|
| 53 |
mail.SMTPConnection = TestSMTPConnection |
|---|
| 54 |
|
|---|
| 55 |
mail.outbox = [] |
|---|
| 56 |
|
|---|
| 57 |
deactivate() |
|---|
| 58 |
|
|---|
| 59 |
def teardown_test_environment(): |
|---|
| 60 |
"""Perform any global post-test teardown. This involves: |
|---|
| 61 |
|
|---|
| 62 |
- Restoring the original test renderer |
|---|
| 63 |
- Restoring the email sending functions |
|---|
| 64 |
|
|---|
| 65 |
""" |
|---|
| 66 |
Template.render = Template.original_render |
|---|
| 67 |
del Template.original_render |
|---|
| 68 |
|
|---|
| 69 |
mail.SMTPConnection = mail.original_SMTPConnection |
|---|
| 70 |
del mail.original_SMTPConnection |
|---|
| 71 |
|
|---|
| 72 |
del mail.outbox |
|---|
| 73 |
|
|---|
| 74 |
def _set_autocommit(connection): |
|---|
| 75 |
"Make sure a connection is in autocommit mode." |
|---|
| 76 |
if hasattr(connection.connection, "autocommit"): |
|---|
| 77 |
if callable(connection.connection.autocommit): |
|---|
| 78 |
connection.connection.autocommit(True) |
|---|
| 79 |
else: |
|---|
| 80 |
connection.connection.autocommit = True |
|---|
| 81 |
elif hasattr(connection.connection, "set_isolation_level"): |
|---|
| 82 |
connection.connection.set_isolation_level(0) |
|---|
| 83 |
|
|---|
| 84 |
def get_mysql_create_suffix(): |
|---|
| 85 |
suffix = [] |
|---|
| 86 |
if settings.TEST_DATABASE_CHARSET: |
|---|
| 87 |
suffix.append('CHARACTER SET %s' % settings.TEST_DATABASE_CHARSET) |
|---|
| 88 |
if settings.TEST_DATABASE_COLLATION: |
|---|
| 89 |
suffix.append('COLLATE %s' % settings.TEST_DATABASE_COLLATION) |
|---|
| 90 |
return ' '.join(suffix) |
|---|
| 91 |
|
|---|
| 92 |
def get_postgresql_create_suffix(): |
|---|
| 93 |
assert settings.TEST_DATABASE_COLLATION is None, "PostgreSQL does not support collation setting at database creation time." |
|---|
| 94 |
if settings.TEST_DATABASE_CHARSET: |
|---|
| 95 |
return "WITH ENCODING '%s'" % settings.TEST_DATABASE_CHARSET |
|---|
| 96 |
return '' |
|---|
| 97 |
|
|---|
| 98 |
def create_test_db(verbosity=1, autoclobber=False): |
|---|
| 99 |
""" |
|---|
| 100 |
Creates a test database, prompting the user for confirmation if the |
|---|
| 101 |
database already exists. Returns the name of the test database created. |
|---|
| 102 |
""" |
|---|
| 103 |
# If the database backend wants to create the test DB itself, let it |
|---|
| 104 |
creation_module = get_creation_module() |
|---|
| 105 |
if hasattr(creation_module, "create_test_db"): |
|---|
| 106 |
creation_module.create_test_db(settings, connection, verbosity, autoclobber) |
|---|
| 107 |
return |
|---|
| 108 |
|
|---|
| 109 |
if verbosity >= 1: |
|---|
| 110 |
print "Creating test database..." |
|---|
| 111 |
# If we're using SQLite, it's more convenient to test against an |
|---|
| 112 |
# in-memory database. Using the TEST_DATABASE_NAME setting you can still choose |
|---|
| 113 |
# to run on a physical database. |
|---|
| 114 |
if settings.DATABASE_ENGINE == "sqlite3": |
|---|
| 115 |
if settings.TEST_DATABASE_NAME and settings.TEST_DATABASE_NAME != ":memory:": |
|---|
| 116 |
TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME |
|---|
| 117 |
# Erase the old test database |
|---|
| 118 |
if verbosity >= 1: |
|---|
| 119 |
print "Destroying old test database..." |
|---|
| 120 |
if os.access(TEST_DATABASE_NAME, os.F_OK): |
|---|
| 121 |
if not autoclobber: |
|---|
| 122 |
confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME) |
|---|
| 123 |
if autoclobber or confirm == 'yes': |
|---|
| 124 |
try: |
|---|
| 125 |
if verbosity >= 1: |
|---|
| 126 |
print "Destroying old test database..." |
|---|
| 127 |
os.remove(TEST_DATABASE_NAME) |
|---|
| 128 |
except Exception, e: |
|---|
| 129 |
sys.stderr.write("Got an error deleting the old test database: %s\n" % e) |
|---|
| 130 |
sys.exit(2) |
|---|
| 131 |
else: |
|---|
| 132 |
print "Tests cancelled." |
|---|
| 133 |
sys.exit(1) |
|---|
| 134 |
if verbosity >= 1: |
|---|
| 135 |
print "Creating test database..." |
|---|
| 136 |
else: |
|---|
| 137 |
TEST_DATABASE_NAME = ":memory:" |
|---|
| 138 |
else: |
|---|
| 139 |
suffix = { |
|---|
| 140 |
'postgresql': get_postgresql_create_suffix, |
|---|
| 141 |
'postgresql_psycopg2': get_postgresql_create_suffix, |
|---|
| 142 |
'mysql': get_mysql_create_suffix, |
|---|
| 143 |
}.get(settings.DATABASE_ENGINE, lambda: '')() |
|---|
| 144 |
if settings.TEST_DATABASE_NAME: |
|---|
| 145 |
TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME |
|---|
| 146 |
else: |
|---|
| 147 |
TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME |
|---|
| 148 |
|
|---|
| 149 |
qn = connection.ops.quote_name |
|---|
| 150 |
|
|---|
| 151 |
# Create the test database and connect to it. We need to autocommit |
|---|
| 152 |
# if the database supports it because PostgreSQL doesn't allow |
|---|
| 153 |
# CREATE/DROP DATABASE statements within transactions. |
|---|
| 154 |
cursor = connection.cursor() |
|---|
| 155 |
_set_autocommit(connection) |
|---|
| 156 |
try: |
|---|
| 157 |
cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix)) |
|---|
| 158 |
except Exception, e: |
|---|
| 159 |
sys.stderr.write("Got an error creating the test database: %s\n" % e) |
|---|
| 160 |
if not autoclobber: |
|---|
| 161 |
confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME) |
|---|
| 162 |
if autoclobber or confirm == 'yes': |
|---|
| 163 |
try: |
|---|
| 164 |
if verbosity >= 1: |
|---|
| 165 |
print "Destroying old test database..." |
|---|
| 166 |
cursor.execute("DROP DATABASE %s" % qn(TEST_DATABASE_NAME)) |
|---|
| 167 |
if verbosity >= 1: |
|---|
| 168 |
print "Creating test database..." |
|---|
| 169 |
cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix)) |
|---|
| 170 |
except Exception, e: |
|---|
| 171 |
sys.stderr.write("Got an error recreating the test database: %s\n" % e) |
|---|
| 172 |
sys.exit(2) |
|---|
| 173 |
else: |
|---|
| 174 |
print "Tests cancelled." |
|---|
| 175 |
sys.exit(1) |
|---|
| 176 |
|
|---|
| 177 |
connection.close() |
|---|
| 178 |
settings.DATABASE_NAME = TEST_DATABASE_NAME |
|---|
| 179 |
|
|---|
| 180 |
call_command('syncdb', verbosity=verbosity, interactive=False) |
|---|
| 181 |
|
|---|
| 182 |
if settings.CACHE_BACKEND.startswith('db://'): |
|---|
| 183 |
cache_name = settings.CACHE_BACKEND[len('db://'):] |
|---|
| 184 |
call_command('createcachetable', cache_name) |
|---|
| 185 |
|
|---|
| 186 |
# Get a cursor (even though we don't need one yet). This has |
|---|
| 187 |
# the side effect of initializing the test database. |
|---|
| 188 |
cursor = connection.cursor() |
|---|
| 189 |
|
|---|
| 190 |
return TEST_DATABASE_NAME |
|---|
| 191 |
|
|---|
| 192 |
def destroy_test_db(old_database_name, verbosity=1): |
|---|
| 193 |
# If the database wants to drop the test DB itself, let it |
|---|
| 194 |
creation_module = get_creation_module() |
|---|
| 195 |
if hasattr(creation_module, "destroy_test_db"): |
|---|
| 196 |
creation_module.destroy_test_db(settings, connection, old_database_name, verbosity) |
|---|
| 197 |
return |
|---|
| 198 |
|
|---|
| 199 |
if verbosity >= 1: |
|---|
| 200 |
print "Destroying test database..." |
|---|
| 201 |
connection.close() |
|---|
| 202 |
TEST_DATABASE_NAME = settings.DATABASE_NAME |
|---|
| 203 |
settings.DATABASE_NAME = old_database_name |
|---|
| 204 |
if settings.DATABASE_ENGINE == "sqlite3": |
|---|
| 205 |
if TEST_DATABASE_NAME and TEST_DATABASE_NAME != ":memory:": |
|---|
| 206 |
# Remove the SQLite database file |
|---|
| 207 |
os.remove(TEST_DATABASE_NAME) |
|---|
| 208 |
else: |
|---|
| 209 |
# Remove the test database to clean up after |
|---|
| 210 |
# ourselves. Connect to the previous database (not the test database) |
|---|
| 211 |
# to do so, because it's not allowed to delete a database while being |
|---|
| 212 |
# connected to it. |
|---|
| 213 |
cursor = connection.cursor() |
|---|
| 214 |
_set_autocommit(connection) |
|---|
| 215 |
time.sleep(1) # To avoid "database is being accessed by other users" errors. |
|---|
| 216 |
cursor.execute("DROP DATABASE %s" % connection.ops.quote_name(TEST_DATABASE_NAME)) |
|---|
| 217 |
connection.close() |
|---|