| 1 | """ |
| 2 | Tests for the use of WSGI applications within Django. |
| 3 | |
| 4 | """ |
| 5 | from re import compile as compile_regex |
| 6 | from unittest import TestCase |
| 7 | |
| 8 | from django.views.wsgi import call_wsgi_app, make_wsgi_view, _ResponseStarter |
| 9 | from django.core.handlers.wsgi import WSGIRequest |
| 10 | |
| 11 | |
| 12 | class TestResponseStarter(TestCase): |
| 13 | """Tests for the internal _ResponseStarter.""" |
| 14 | |
| 15 | def test_constructor(self): |
| 16 | """The constructor should set all the attributes correctly.""" |
| 17 | start_response = _ResponseStarter() |
| 18 | self.assertEqual(None, start_response.status) |
| 19 | self.assertEqual([], start_response.response_headers) |
| 20 | self.assertEqual(None, start_response.exc_info) |
| 21 | |
| 22 | def test_call(self): |
| 23 | start_response = _ResponseStarter() |
| 24 | status = "200 Everything's alright" |
| 25 | headers = ( |
| 26 | ("X-FOO", "ABC"), |
| 27 | ("X-BAR", "XYZ"), |
| 28 | ) |
| 29 | exc_info = object() |
| 30 | # Starting the mock response: |
| 31 | start_response(status, headers, exc_info) |
| 32 | # Checking whether they were all set correctly: |
| 33 | self.assertEqual(start_response.status, 200) |
| 34 | self.assertEqual(start_response.response_headers, headers) |
| 35 | self.assertEqual(start_response.exc_info, exc_info) |
| 36 | |
| 37 | |
| 38 | class TestCallWSGIApp(TestCase): |
| 39 | """ |
| 40 | Tests for call_wsgi_app() |
| 41 | |
| 42 | """ |
| 43 | |
| 44 | def test_original_environ_not_modified(self): |
| 45 | """The original environ must have not been modified.""" |
| 46 | original_environ = complete_environ(SCRIPT_NAME="/blog", |
| 47 | PATH_INFO="/admin/models") |
| 48 | request = make_request(**original_environ) |
| 49 | expected_environ = original_environ.copy() |
| 50 | # Running the app: |
| 51 | app = MockApp("200 OK", []) |
| 52 | call_wsgi_app(app, request, "/admin") |
| 53 | self.assertEqual(request.environ, expected_environ) |
| 54 | |
| 55 | def test_routing_args_are_removed(self): |
| 56 | """The ``wsgiorg.routing_args`` environment key must be removed.""" |
| 57 | environ = { |
| 58 | 'wsgiorg.routing_args': ((), {}), |
| 59 | 'PATH_INFO': "/admin/models", |
| 60 | } |
| 61 | environ = complete_environ(**environ) |
| 62 | request = make_request(**environ) |
| 63 | # Running the app: |
| 64 | app = MockApp("200 OK", []) |
| 65 | call_wsgi_app(app, request, "/admin") |
| 66 | self.assertTrue("wsgiorg.routing_args" not in app.environ) |
| 67 | |
| 68 | def test_mount_point_as_string(self): |
| 69 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki") |
| 70 | request = make_request(**environ) |
| 71 | # Running the app: |
| 72 | app = MockApp("200 OK", []) |
| 73 | call_wsgi_app(app, request, "/trac") |
| 74 | self.assertEqual(app.environ['SCRIPT_NAME'], "/dev/trac") |
| 75 | self.assertEqual(app.environ['PATH_INFO'], "/wiki") |
| 76 | |
| 77 | def test_mount_point_as_regex(self): |
| 78 | environ = complete_environ(SCRIPT_NAME="/dev", |
| 79 | PATH_INFO="/project5/trac/wiki") |
| 80 | request = make_request(**environ) |
| 81 | mount_point = compile_regex(r"^(?P<project>\w+)/trac") |
| 82 | # Running the app: |
| 83 | app = MockApp("200 OK", []) |
| 84 | call_wsgi_app(app, request, mount_point) |
| 85 | self.assertEqual(app.environ['SCRIPT_NAME'], "/dev/project5/trac") |
| 86 | self.assertEqual(app.environ['PATH_INFO'], "/wiki") |
| 87 | |
| 88 | def test_implied_mount_point(self): |
| 89 | """The mount point must be implied by the view's path.""" |
| 90 | environ = complete_environ(SCRIPT_NAME="/dev", |
| 91 | PATH_INFO="/project5/trac/wiki") |
| 92 | request = make_request(**environ) |
| 93 | # Forging the matched URL: |
| 94 | request.matched_url_regex = compile_regex(r"^(?P<project>\w+)/trac") |
| 95 | # Running the app: |
| 96 | app = MockApp("200 OK", []) |
| 97 | call_wsgi_app(app, request) |
| 98 | self.assertEqual(app.environ['SCRIPT_NAME'], "/dev/project5/trac") |
| 99 | self.assertEqual(app.environ['PATH_INFO'], "/wiki") |
| 100 | |
| 101 | def test_incorrect_mount_point(self): |
| 102 | environ = complete_environ(SCRIPT_NAME="/dev", |
| 103 | PATH_INFO="/trac/wiki") |
| 104 | request = make_request(**environ) |
| 105 | mount_point_string = "/bugzilla" |
| 106 | mount_point_regex = compile_regex(r"/bugzilla") |
| 107 | # Running the app: |
| 108 | app = MockApp("200 OK", []) |
| 109 | self.assertRaises(ValueError, call_wsgi_app, app, request, |
| 110 | mount_point_string) |
| 111 | self.assertRaises(ValueError, call_wsgi_app, app, request, |
| 112 | mount_point_regex) |
| 113 | |
| 114 | def test_http_status_code(self): |
| 115 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki") |
| 116 | request = make_request(**environ) |
| 117 | # Running the app and make a valid request: |
| 118 | app_ok = MockApp("200 OK", []) |
| 119 | django_response_ok = call_wsgi_app(app_ok, request, "/trac") |
| 120 | self.assertEqual(200, django_response_ok.status_code) |
| 121 | # Running the app and make an invalid request: |
| 122 | app_bad = MockApp("403 What are you trying to do?", []) |
| 123 | django_response_bad = call_wsgi_app(app_bad, request, "/trac") |
| 124 | self.assertEqual(403, django_response_bad.status_code) |
| 125 | |
| 126 | def test_headers_are_copied_over(self): |
| 127 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki") |
| 128 | request = make_request(**environ) |
| 129 | headers = [ |
| 130 | ("X-Foo", "bar"), |
| 131 | ("Content-Type", "text/plain"), |
| 132 | ] |
| 133 | # The same headers, but set in the format used by HttpResponse |
| 134 | expected_headers = { |
| 135 | 'x-foo': ("X-Foo", "bar"), |
| 136 | 'content-type': ("Content-Type", "text/plain"), |
| 137 | } |
| 138 | # Running the app: |
| 139 | app = MockApp("200 OK", headers) |
| 140 | django_response = call_wsgi_app(app, request, "/trac") |
| 141 | self.assertEqual(expected_headers, django_response._headers) |
| 142 | |
| 143 | def test_authenticated_user(self): |
| 144 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki") |
| 145 | request = make_request(authenticated=True, **environ) |
| 146 | # Running the app: |
| 147 | app = MockApp("200 OK", []) |
| 148 | call_wsgi_app(app, request, "/trac") |
| 149 | self.assertEqual("foobar", app.environ['REMOTE_USER']) |
| 150 | |
| 151 | def test_cookies_sent(self): |
| 152 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki") |
| 153 | request = make_request(**environ) |
| 154 | headers = [ |
| 155 | ("Set-Cookie", "arg1=val1"), |
| 156 | ("Set-Cookie", "arg2=val2; expires=Fri,%2031-Dec-2010%2023:59:59%20GMT"), |
| 157 | ("Set-Cookie", "arg3=val3; path=/"), |
| 158 | ("Set-Cookie", "arg4=val4; path=/wiki"), |
| 159 | ("Set-Cookie", "arg5=val5; domain=.example.org"), |
| 160 | ("Set-Cookie", "arg6=val6; max-age=3600"), |
| 161 | ("Set-Cookie", "arg7=val7; expires=Fri,%2031-Dec-2010%2023:59:59%20GMT; max-age=3600; domain=.example.org; path=/wiki"), |
| 162 | # Now let's try an Unicode cookie: |
| 163 | ("Set-Cookie", u"arg8=val8; max-age=3600"), |
| 164 | # TODO: The "secure" cookie *attribute* is broken in SimpleCookie. |
| 165 | # See: http://bugs.python.org/issue1028088 |
| 166 | #("Set-Cookie", "arg9=val9; secure"), |
| 167 | ] |
| 168 | expected_cookies = { |
| 169 | 'arg1': {'value': "val1"}, |
| 170 | 'arg2': {'value': "val2", 'expires': "Fri,%2031-Dec-2010%2023:59:59%20GMT"}, |
| 171 | 'arg3': {'value': "val3", 'path': "/"}, |
| 172 | 'arg4': {'value': "val4", 'path': "/wiki"}, |
| 173 | 'arg5': {'value': "val5", 'domain': ".example.org"}, |
| 174 | 'arg6': {'value': "val6", 'max-age': "3600"}, |
| 175 | 'arg7': { |
| 176 | 'value': "val7", |
| 177 | 'expires': "Fri,%2031-Dec-2010%2023:59:59%20GMT", |
| 178 | 'path': "/wiki", |
| 179 | 'domain': ".example.org", |
| 180 | 'max-age': "3600", |
| 181 | }, |
| 182 | 'arg8': {'value': "val8", 'max-age': "3600"}, |
| 183 | # Why the next item as disabled? Check the `headers` variable above |
| 184 | #'arg9': {'value': "val9", 'secure': True}, |
| 185 | } |
| 186 | # Running the app: |
| 187 | app = MockApp("200 OK", headers) |
| 188 | django_response = call_wsgi_app(app, request, "/trac") |
| 189 | # Checking the cookies: |
| 190 | self.assertEqual(len(expected_cookies), len(django_response.cookies)) |
| 191 | # Finally, let's check each cookie: |
| 192 | for (cookie_set_name, cookie_set) in django_response.cookies.items(): |
| 193 | expected_cookie = expected_cookies[cookie_set_name] |
| 194 | expected_cookie_value = expected_cookie.pop("value") |
| 195 | self.assertEqual(expected_cookie_value, cookie_set.value, |
| 196 | 'Cookie "%s" has a wrong value ("%s")' % |
| 197 | (cookie_set_name, cookie_set.value)) |
| 198 | for (attr_key, attr_val) in expected_cookie.items(): |
| 199 | self.assertEqual(cookie_set[attr_key], attr_val, |
| 200 | 'Attribute "%s" in cookie "%s" is wrong (%s)' % |
| 201 | (attr_key, cookie_set_name, cookie_set[attr_key])) |
| 202 | |
| 203 | def test_string_as_response(self): |
| 204 | app = MockApp("200 It is OK", [("X-HEADER", "Foo")]) |
| 205 | django_view = make_wsgi_view(app, "/blog") |
| 206 | # Running a request: |
| 207 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts") |
| 208 | request = make_request(**environ) |
| 209 | # Checking the response: |
| 210 | django_response = django_view(request) |
| 211 | http_response = ( |
| 212 | "X-HEADER: Foo\n" |
| 213 | "Content-Type: text/html; charset=utf-8\n" |
| 214 | "\n" |
| 215 | "body" |
| 216 | ) |
| 217 | self.assertEqual(http_response, str(django_response)) |
| 218 | |
| 219 | def test_iterable_as_response(self): |
| 220 | app = MockGeneratorApp("200 It is OK", [("X-HEADER", "Foo")]) |
| 221 | django_view = make_wsgi_view(app, "/blog") |
| 222 | # Running a request: |
| 223 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts") |
| 224 | request = make_request(**environ) |
| 225 | # Checking the response: |
| 226 | django_response = django_view(request) |
| 227 | self.assertFalse(django_response._is_string) |
| 228 | self.assertTrue(django_response.has_header("X-HEADER")) |
| 229 | http_response = ( |
| 230 | "X-HEADER: Foo\n" |
| 231 | "Content-Type: text/html; charset=utf-8\n" |
| 232 | "\n" |
| 233 | "body as iterable" |
| 234 | ) |
| 235 | self.assertEqual(http_response, str(django_response)) |
| 236 | |
| 237 | def test_write_response(self): |
| 238 | app = MockWriteApp("200 It is OK", [("X-HEADER", "Foo")]) |
| 239 | django_view = make_wsgi_view(app, "/blog") |
| 240 | # Running a request: |
| 241 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts") |
| 242 | request = make_request(**environ) |
| 243 | # Checking the response: |
| 244 | django_response = django_view(request) |
| 245 | self.assertFalse(django_response._is_string) |
| 246 | self.assertTrue(django_response.has_header("X-HEADER")) |
| 247 | http_response = ( |
| 248 | "X-HEADER: Foo\n" |
| 249 | "Content-Type: text/html; charset=utf-8\n" |
| 250 | "\n" |
| 251 | "body as iterable" |
| 252 | ) |
| 253 | self.assertEqual(http_response, str(django_response)) |
| 254 | |
| 255 | |
| 256 | class TestWSGIView(TestCase): |
| 257 | """ |
| 258 | Tests for make_wsgi_view(). |
| 259 | |
| 260 | """ |
| 261 | |
| 262 | def test_it(self): |
| 263 | # Loading a WSGI-powered Django view: |
| 264 | headers = [("X-SALUTATION", "Hey")] |
| 265 | app = MockApp("206 One step at a time", headers) |
| 266 | django_view = make_wsgi_view(app, "/blog") |
| 267 | # Running a request: |
| 268 | environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts") |
| 269 | request = make_request(**environ) |
| 270 | # Checking the response: |
| 271 | django_response = django_view(request) |
| 272 | self.assertEqual(django_response.status_code, 206) |
| 273 | self.assertTrue(("X-SALUTATION", "Hey") == |
| 274 | django_response._headers['x-salutation']) |
| 275 | |
| 276 | |
| 277 | #{ Test utilities |
| 278 | |
| 279 | |
| 280 | class MockApp(object): |
| 281 | """ |
| 282 | Mock WSGI application. |
| 283 | |
| 284 | """ |
| 285 | |
| 286 | def __init__(self, status, headers): |
| 287 | self.status = status |
| 288 | self.headers = headers |
| 289 | |
| 290 | def __call__(self, environ, start_response): |
| 291 | self.environ = environ |
| 292 | start_response(self.status, self.headers) |
| 293 | return ["body"] |
| 294 | |
| 295 | |
| 296 | class MockGeneratorApp(MockApp): |
| 297 | """ |
| 298 | Mock WSGI application that returns an iterator. |
| 299 | |
| 300 | """ |
| 301 | |
| 302 | def __call__(self, environ, start_response): |
| 303 | self.environ = environ |
| 304 | start_response(self.status, self.headers) |
| 305 | def gen(): |
| 306 | yield "body" |
| 307 | yield " as" |
| 308 | yield " iterable" |
| 309 | return gen() |
| 310 | |
| 311 | |
| 312 | class MockWriteApp(MockApp): |
| 313 | """ |
| 314 | Mock WSGI app which uses the write() function. |
| 315 | |
| 316 | """ |
| 317 | |
| 318 | def __call__(self, environ, start_response): |
| 319 | self.environ = environ |
| 320 | write = start_response(self.status, self.headers) |
| 321 | write( "body") |
| 322 | write(" as") |
| 323 | write(" iterable") |
| 324 | return [] |
| 325 | |
| 326 | |
| 327 | def make_request(authenticated=False, **environ): |
| 328 | """ |
| 329 | Make a Django request from the items in the WSGI ``environ``. |
| 330 | |
| 331 | """ |
| 332 | class MockDjangoUser(object): |
| 333 | def __init__(self, authenticated): |
| 334 | self.username = "foobar" |
| 335 | self.authenticated = authenticated |
| 336 | def is_authenticated(self): |
| 337 | return self.authenticated |
| 338 | request = WSGIRequest(environ) |
| 339 | request.user = MockDjangoUser(authenticated) |
| 340 | return request |
| 341 | |
| 342 | |
| 343 | def complete_environ(**environ): |
| 344 | """ |
| 345 | Add the missing items in ``environ``. |
| 346 | |
| 347 | """ |
| 348 | full_environ = { |
| 349 | 'REQUEST_METHOD': "GET", |
| 350 | 'SERVER_NAME': "example.org", |
| 351 | } |
| 352 | full_environ.update(environ) |
| 353 | return full_environ |
| 354 | |
| 355 | |
| 356 | #} |