Django

Code

root/django/branches/newforms-admin/django/test/utils.py

Revision 7951, 8.6 kB (checked in by brosner, 5 months ago)

newforms-admin: Merged from trunk up to [7950].

  • Property svn:eol-style set to native
Line 
1 import sys, time, os
2 from django.conf import settings
3 from django.db import connection, get_creation_module
4 from django.core import mail
5 from django.core.management import call_command
6 from django.dispatch import dispatcher
7 from django.test import signals
8 from django.template import Template
9 from django.utils.translation import deactivate
10
11 # The prefix to put on the default database name when creating
12 # the test database.
13 TEST_DATABASE_PREFIX = 'test_'
14
15 def instrumented_test_render(self, context):
16     """
17     An instrumented Template render method, providing a signal
18     that can be intercepted by the test system Client
19     """
20     dispatcher.send(signal=signals.template_rendered, sender=self, template=self, context=context)
21     return self.nodelist.render(context)
22
23 class TestSMTPConnection(object):
24     """A substitute SMTP connection for use during test sessions.
25     The test connection stores email messages in a dummy outbox,
26     rather than sending them out on the wire.
27
28     """
29     def __init__(*args, **kwargs):
30         pass
31     def open(self):
32         "Mock the SMTPConnection open() interface"
33         pass
34     def close(self):
35         "Mock the SMTPConnection close() interface"
36         pass
37     def send_messages(self, messages):
38         "Redirect messages to the dummy outbox"
39         mail.outbox.extend(messages)
40         return len(messages)
41
42 def setup_test_environment():
43     """Perform any global pre-test setup. This involves:
44
45         - Installing the instrumented test renderer
46         - Diverting the email sending functions to a test buffer
47         - Setting the active locale to match the LANGUAGE_CODE setting.
48     """
49     Template.original_render = Template.render
50     Template.render = instrumented_test_render
51
52     mail.original_SMTPConnection = mail.SMTPConnection
53     mail.SMTPConnection = TestSMTPConnection
54
55     mail.outbox = []
56
57     deactivate()
58
59 def teardown_test_environment():
60     """Perform any global post-test teardown. This involves:
61
62         - Restoring the original test renderer
63         - Restoring the email sending functions
64
65     """
66     Template.render = Template.original_render
67     del Template.original_render
68
69     mail.SMTPConnection = mail.original_SMTPConnection
70     del mail.original_SMTPConnection
71
72     del mail.outbox
73
74 def _set_autocommit(connection):
75     "Make sure a connection is in autocommit mode."
76     if hasattr(connection.connection, "autocommit"):
77         if callable(connection.connection.autocommit):
78             connection.connection.autocommit(True)
79         else:
80             connection.connection.autocommit = True
81     elif hasattr(connection.connection, "set_isolation_level"):
82         connection.connection.set_isolation_level(0)
83
84 def get_mysql_create_suffix():
85     suffix = []
86     if settings.TEST_DATABASE_CHARSET:
87         suffix.append('CHARACTER SET %s' % settings.TEST_DATABASE_CHARSET)
88     if settings.TEST_DATABASE_COLLATION:
89         suffix.append('COLLATE %s' % settings.TEST_DATABASE_COLLATION)
90     return ' '.join(suffix)
91
92 def get_postgresql_create_suffix():
93     assert settings.TEST_DATABASE_COLLATION is None, "PostgreSQL does not support collation setting at database creation time."
94     if settings.TEST_DATABASE_CHARSET:
95         return "WITH ENCODING '%s'" % settings.TEST_DATABASE_CHARSET
96     return ''
97
98 def create_test_db(verbosity=1, autoclobber=False):
99     """
100     Creates a test database, prompting the user for confirmation if the
101     database already exists. Returns the name of the test database created.
102     """
103     # If the database backend wants to create the test DB itself, let it
104     creation_module = get_creation_module()
105     if hasattr(creation_module, "create_test_db"):
106         creation_module.create_test_db(settings, connection, verbosity, autoclobber)
107         return
108
109     if verbosity >= 1:
110         print "Creating test database..."
111     # If we're using SQLite, it's more convenient to test against an
112     # in-memory database. Using the TEST_DATABASE_NAME setting you can still choose
113     # to run on a physical database.
114     if settings.DATABASE_ENGINE == "sqlite3":
115         if settings.TEST_DATABASE_NAME and settings.TEST_DATABASE_NAME != ":memory:":
116             TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME
117             # Erase the old test database
118             if verbosity >= 1:
119                 print "Destroying old test database..."
120             if os.access(TEST_DATABASE_NAME, os.F_OK):
121                 if not autoclobber:
122                     confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME)
123                 if autoclobber or confirm == 'yes':
124                   try:
125                       if verbosity >= 1:
126                           print "Destroying old test database..."
127                       os.remove(TEST_DATABASE_NAME)
128                   except Exception, e:
129                       sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
130                       sys.exit(2)
131                 else:
132                     print "Tests cancelled."
133                     sys.exit(1)
134             if verbosity >= 1:
135                 print "Creating test database..."
136         else:
137             TEST_DATABASE_NAME = ":memory:"
138     else:
139         suffix = {
140             'postgresql': get_postgresql_create_suffix,
141             'postgresql_psycopg2': get_postgresql_create_suffix,
142             'mysql': get_mysql_create_suffix,
143         }.get(settings.DATABASE_ENGINE, lambda: '')()
144         if settings.TEST_DATABASE_NAME:
145             TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME
146         else:
147             TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
148
149         qn = connection.ops.quote_name
150
151         # Create the test database and connect to it. We need to autocommit
152         # if the database supports it because PostgreSQL doesn't allow
153         # CREATE/DROP DATABASE statements within transactions.
154         cursor = connection.cursor()
155         _set_autocommit(connection)
156         try:
157             cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix))
158         except Exception, e:
159             sys.stderr.write("Got an error creating the test database: %s\n" % e)
160             if not autoclobber:
161                 confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME)
162             if autoclobber or confirm == 'yes':
163                 try:
164                     if verbosity >= 1:
165                         print "Destroying old test database..."
166                     cursor.execute("DROP DATABASE %s" % qn(TEST_DATABASE_NAME))
167                     if verbosity >= 1:
168                         print "Creating test database..."
169                     cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix))
170                 except Exception, e:
171                     sys.stderr.write("Got an error recreating the test database: %s\n" % e)
172                     sys.exit(2)
173             else:
174                 print "Tests cancelled."
175                 sys.exit(1)
176
177     connection.close()
178     settings.DATABASE_NAME = TEST_DATABASE_NAME
179
180     call_command('syncdb', verbosity=verbosity, interactive=False)
181
182     if settings.CACHE_BACKEND.startswith('db://'):
183         cache_name = settings.CACHE_BACKEND[len('db://'):]
184         call_command('createcachetable', cache_name)
185
186     # Get a cursor (even though we don't need one yet). This has
187     # the side effect of initializing the test database.
188     cursor = connection.cursor()
189
190     return TEST_DATABASE_NAME
191
192 def destroy_test_db(old_database_name, verbosity=1):
193     # If the database wants to drop the test DB itself, let it
194     creation_module = get_creation_module()
195     if hasattr(creation_module, "destroy_test_db"):
196         creation_module.destroy_test_db(settings, connection, old_database_name, verbosity)
197         return
198
199     if verbosity >= 1:
200         print "Destroying test database..."
201     connection.close()
202     TEST_DATABASE_NAME = settings.DATABASE_NAME
203     settings.DATABASE_NAME = old_database_name
204     if settings.DATABASE_ENGINE == "sqlite3":
205         if TEST_DATABASE_NAME and TEST_DATABASE_NAME != ":memory:":
206             # Remove the SQLite database file
207             os.remove(TEST_DATABASE_NAME)
208     else:
209         # Remove the test database to clean up after
210         # ourselves. Connect to the previous database (not the test database)
211         # to do so, because it's not allowed to delete a database while being
212         # connected to it.
213         cursor = connection.cursor()
214         _set_autocommit(connection)
215         time.sleep(1) # To avoid "database is being accessed by other users" errors.
216         cursor.execute("DROP DATABASE %s" % connection.ops.quote_name(TEST_DATABASE_NAME))
217         connection.close()
Note: See TracBrowser for help on using the browser.