Ticket #12091: wsgi-apps.diff

File wsgi-apps.diff, 19.4 KB (added by Gustavo Narea, 15 years ago)

Support for WSGI apps within Django

  • django/test/test_wsgi_views.py

     
     1"""
     2Tests for the use of WSGI applications within Django.
     3
     4"""
     5from re import compile as compile_regex
     6from unittest import TestCase
     7
     8from django.views.wsgi import call_wsgi_app, make_wsgi_view, _ResponseStarter
     9from django.core.handlers.wsgi import WSGIRequest
     10
     11
     12class TestResponseStarter(TestCase):
     13    """Tests for the internal _ResponseStarter."""
     14   
     15    def test_constructor(self):
     16        """The constructor should set all the attributes correctly."""
     17        start_response = _ResponseStarter()
     18        self.assertEqual(None, start_response.status)
     19        self.assertEqual([], start_response.response_headers)
     20        self.assertEqual(None, start_response.exc_info)
     21   
     22    def test_call(self):
     23        start_response = _ResponseStarter()
     24        status = "200 Everything's alright"
     25        headers = (
     26            ("X-FOO", "ABC"),
     27            ("X-BAR", "XYZ"),
     28            )
     29        exc_info = object()
     30        # Starting the mock response:
     31        start_response(status, headers, exc_info)
     32        # Checking whether they were all set correctly:
     33        self.assertEqual(start_response.status, 200)
     34        self.assertEqual(start_response.response_headers, headers)
     35        self.assertEqual(start_response.exc_info, exc_info)
     36
     37
     38class TestCallWSGIApp(TestCase):
     39    """
     40    Tests for call_wsgi_app()
     41   
     42    """
     43   
     44    def test_original_environ_not_modified(self):
     45        """The original environ must have not been modified."""
     46        original_environ = complete_environ(SCRIPT_NAME="/blog",
     47                                            PATH_INFO="/admin/models")
     48        request = make_request(**original_environ)
     49        expected_environ = original_environ.copy()
     50        # Running the app:
     51        app = MockApp("200 OK", [])
     52        call_wsgi_app(app, request, "/admin")
     53        self.assertEqual(request.environ, expected_environ)
     54   
     55    def test_routing_args_are_removed(self):
     56        """The ``wsgiorg.routing_args`` environment key must be removed."""
     57        environ = {
     58            'wsgiorg.routing_args': ((), {}),
     59            'PATH_INFO': "/admin/models",
     60            }
     61        environ = complete_environ(**environ)
     62        request = make_request(**environ)
     63        # Running the app:
     64        app = MockApp("200 OK", [])
     65        call_wsgi_app(app, request, "/admin")
     66        self.assertTrue("wsgiorg.routing_args" not in app.environ)
     67   
     68    def test_mount_point_as_string(self):
     69        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki")
     70        request = make_request(**environ)
     71        # Running the app:
     72        app = MockApp("200 OK", [])
     73        call_wsgi_app(app, request, "/trac")
     74        self.assertEqual(app.environ['SCRIPT_NAME'], "/dev/trac")
     75        self.assertEqual(app.environ['PATH_INFO'], "/wiki")
     76       
     77    def test_mount_point_as_regex(self):
     78        environ = complete_environ(SCRIPT_NAME="/dev",
     79                                   PATH_INFO="/project5/trac/wiki")
     80        request = make_request(**environ)
     81        mount_point = compile_regex(r"^(?P<project>\w+)/trac")
     82        # Running the app:
     83        app = MockApp("200 OK", [])
     84        call_wsgi_app(app, request, mount_point)
     85        self.assertEqual(app.environ['SCRIPT_NAME'], "/dev/project5/trac")
     86        self.assertEqual(app.environ['PATH_INFO'], "/wiki")
     87       
     88    def test_implied_mount_point(self):
     89        """The mount point must be implied by the view's path."""
     90        environ = complete_environ(SCRIPT_NAME="/dev",
     91                                   PATH_INFO="/project5/trac/wiki")
     92        request = make_request(**environ)
     93        # Forging the matched URL:
     94        request.matched_url_regex = compile_regex(r"^(?P<project>\w+)/trac")
     95        # Running the app:
     96        app = MockApp("200 OK", [])
     97        call_wsgi_app(app, request)
     98        self.assertEqual(app.environ['SCRIPT_NAME'], "/dev/project5/trac")
     99        self.assertEqual(app.environ['PATH_INFO'], "/wiki")
     100   
     101    def test_incorrect_mount_point(self):
     102        environ = complete_environ(SCRIPT_NAME="/dev",
     103                                   PATH_INFO="/trac/wiki")
     104        request = make_request(**environ)
     105        mount_point_string = "/bugzilla"
     106        mount_point_regex = compile_regex(r"/bugzilla")
     107        # Running the app:
     108        app = MockApp("200 OK", [])
     109        self.assertRaises(ValueError, call_wsgi_app, app, request,
     110                          mount_point_string)
     111        self.assertRaises(ValueError, call_wsgi_app, app, request,
     112                          mount_point_regex)
     113   
     114    def test_http_status_code(self):
     115        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki")
     116        request = make_request(**environ)
     117        # Running the app and make a valid request:
     118        app_ok = MockApp("200 OK", [])
     119        django_response_ok = call_wsgi_app(app_ok, request, "/trac")
     120        self.assertEqual(200, django_response_ok.status_code)
     121        # Running the app and make an invalid request:
     122        app_bad = MockApp("403 What are you trying to do?", [])
     123        django_response_bad = call_wsgi_app(app_bad, request, "/trac")
     124        self.assertEqual(403, django_response_bad.status_code)
     125   
     126    def test_headers_are_copied_over(self):
     127        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki")
     128        request = make_request(**environ)
     129        headers = [
     130            ("X-Foo", "bar"),
     131            ("Content-Type", "text/plain"),
     132            ]
     133        # The same headers, but set in the format used by HttpResponse
     134        expected_headers = {
     135            'x-foo': ("X-Foo", "bar"),
     136            'content-type': ("Content-Type", "text/plain"),
     137            }
     138        # Running the app:
     139        app = MockApp("200 OK", headers)
     140        django_response = call_wsgi_app(app, request, "/trac")
     141        self.assertEqual(expected_headers, django_response._headers)
     142   
     143    def test_authenticated_user(self):
     144        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki")
     145        request = make_request(authenticated=True, **environ)
     146        # Running the app:
     147        app = MockApp("200 OK", [])
     148        call_wsgi_app(app, request, "/trac")
     149        self.assertEqual("foobar", app.environ['REMOTE_USER'])
     150   
     151    def test_cookies_sent(self):
     152        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/trac/wiki")
     153        request = make_request(**environ)
     154        headers = [
     155            ("Set-Cookie", "arg1=val1"),
     156            ("Set-Cookie", "arg2=val2; expires=Fri,%2031-Dec-2010%2023:59:59%20GMT"),
     157            ("Set-Cookie", "arg3=val3; path=/"),
     158            ("Set-Cookie", "arg4=val4; path=/wiki"),
     159            ("Set-Cookie", "arg5=val5; domain=.example.org"),
     160            ("Set-Cookie", "arg6=val6; max-age=3600"),
     161            ("Set-Cookie", "arg7=val7; expires=Fri,%2031-Dec-2010%2023:59:59%20GMT; max-age=3600; domain=.example.org; path=/wiki"),
     162            # Now let's try an Unicode cookie:
     163            ("Set-Cookie", u"arg8=val8; max-age=3600"),
     164            # TODO: The "secure" cookie *attribute* is broken in SimpleCookie.
     165            # See: http://bugs.python.org/issue1028088
     166            #("Set-Cookie", "arg9=val9; secure"),
     167            ]
     168        expected_cookies = {
     169            'arg1': {'value': "val1"},
     170            'arg2': {'value': "val2", 'expires': "Fri,%2031-Dec-2010%2023:59:59%20GMT"},
     171            'arg3': {'value': "val3", 'path': "/"},
     172            'arg4': {'value': "val4", 'path': "/wiki"},
     173            'arg5': {'value': "val5", 'domain': ".example.org"},
     174            'arg6': {'value': "val6", 'max-age': "3600"},
     175            'arg7': {
     176                'value': "val7",
     177                'expires': "Fri,%2031-Dec-2010%2023:59:59%20GMT",
     178                'path': "/wiki",
     179                'domain': ".example.org",
     180                'max-age': "3600",
     181                },
     182            'arg8': {'value': "val8", 'max-age': "3600"},
     183            # Why the next item as disabled? Check the `headers` variable above
     184            #'arg9': {'value': "val9", 'secure': True},
     185            }
     186        # Running the app:
     187        app = MockApp("200 OK", headers)
     188        django_response = call_wsgi_app(app, request, "/trac")
     189        # Checking the cookies:
     190        self.assertEqual(len(expected_cookies), len(django_response.cookies))
     191        # Finally, let's check each cookie:
     192        for (cookie_set_name, cookie_set) in django_response.cookies.items():
     193            expected_cookie = expected_cookies[cookie_set_name]
     194            expected_cookie_value = expected_cookie.pop("value")
     195            self.assertEqual(expected_cookie_value, cookie_set.value,
     196                             'Cookie "%s" has a wrong value ("%s")' %
     197                             (cookie_set_name, cookie_set.value))
     198            for (attr_key, attr_val) in expected_cookie.items():
     199                self.assertEqual(cookie_set[attr_key], attr_val,
     200                                 'Attribute "%s" in cookie "%s" is wrong (%s)' %
     201                                 (attr_key, cookie_set_name, cookie_set[attr_key]))
     202   
     203    def test_string_as_response(self):
     204        app = MockApp("200 It is OK", [("X-HEADER", "Foo")])
     205        django_view = make_wsgi_view(app, "/blog")
     206        # Running a request:
     207        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts")
     208        request = make_request(**environ)
     209        # Checking the response:
     210        django_response = django_view(request)
     211        http_response = (
     212            "X-HEADER: Foo\n"
     213            "Content-Type: text/html; charset=utf-8\n"
     214            "\n"
     215            "body"
     216            )
     217        self.assertEqual(http_response, str(django_response))
     218   
     219    def test_iterable_as_response(self):
     220        app = MockGeneratorApp("200 It is OK", [("X-HEADER", "Foo")])
     221        django_view = make_wsgi_view(app, "/blog")
     222        # Running a request:
     223        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts")
     224        request = make_request(**environ)
     225        # Checking the response:
     226        django_response = django_view(request)
     227        self.assertFalse(django_response._is_string)
     228        self.assertTrue(django_response.has_header("X-HEADER"))
     229        http_response = (
     230            "X-HEADER: Foo\n"
     231            "Content-Type: text/html; charset=utf-8\n"
     232            "\n"
     233            "body as iterable"
     234            )
     235        self.assertEqual(http_response, str(django_response))
     236   
     237    def test_write_response(self):
     238        app = MockWriteApp("200 It is OK", [("X-HEADER", "Foo")])
     239        django_view = make_wsgi_view(app, "/blog")
     240        # Running a request:
     241        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts")
     242        request = make_request(**environ)
     243        # Checking the response:
     244        django_response = django_view(request)
     245        self.assertFalse(django_response._is_string)
     246        self.assertTrue(django_response.has_header("X-HEADER"))
     247        http_response = (
     248            "X-HEADER: Foo\n"
     249            "Content-Type: text/html; charset=utf-8\n"
     250            "\n"
     251            "body as iterable"
     252            )
     253        self.assertEqual(http_response, str(django_response))
     254
     255
     256class TestWSGIView(TestCase):
     257    """
     258    Tests for make_wsgi_view().
     259   
     260    """
     261   
     262    def test_it(self):
     263        # Loading a WSGI-powered Django view:
     264        headers = [("X-SALUTATION", "Hey")]
     265        app = MockApp("206 One step at a time", headers)
     266        django_view = make_wsgi_view(app, "/blog")
     267        # Running a request:
     268        environ = complete_environ(SCRIPT_NAME="/dev", PATH_INFO="/blog/posts")
     269        request = make_request(**environ)
     270        # Checking the response:
     271        django_response = django_view(request)
     272        self.assertEqual(django_response.status_code, 206)
     273        self.assertTrue(("X-SALUTATION", "Hey") ==
     274                        django_response._headers['x-salutation'])
     275
     276
     277#{ Test utilities
     278
     279
     280class MockApp(object):
     281    """
     282    Mock WSGI application.
     283   
     284    """
     285   
     286    def __init__(self, status, headers):
     287        self.status = status
     288        self.headers = headers
     289
     290    def __call__(self, environ, start_response):
     291        self.environ = environ
     292        start_response(self.status, self.headers)
     293        return ["body"]
     294
     295
     296class MockGeneratorApp(MockApp):
     297    """
     298    Mock WSGI application that returns an iterator.
     299   
     300    """
     301
     302    def __call__(self, environ, start_response):
     303        self.environ = environ
     304        start_response(self.status, self.headers)
     305        def gen():
     306            yield "body"
     307            yield " as"
     308            yield " iterable"
     309        return gen()
     310
     311
     312class MockWriteApp(MockApp):
     313    """
     314    Mock WSGI app which uses the write() function.
     315   
     316    """
     317
     318    def __call__(self, environ, start_response):
     319        self.environ = environ
     320        write = start_response(self.status, self.headers)
     321        write( "body")
     322        write(" as")
     323        write(" iterable")
     324        return []
     325
     326
     327def make_request(authenticated=False, **environ):
     328    """
     329    Make a Django request from the items in the WSGI ``environ``.
     330   
     331    """
     332    class MockDjangoUser(object):
     333        def __init__(self, authenticated):
     334            self.username = "foobar"
     335            self.authenticated = authenticated
     336        def is_authenticated(self):
     337            return self.authenticated
     338    request = WSGIRequest(environ)
     339    request.user = MockDjangoUser(authenticated)
     340    return request
     341
     342
     343def complete_environ(**environ):
     344    """
     345    Add the missing items in ``environ``.
     346   
     347    """
     348    full_environ = {
     349        'REQUEST_METHOD': "GET",
     350        'SERVER_NAME': "example.org",
     351        }
     352    full_environ.update(environ)
     353    return full_environ
     354
     355
     356#}
  • django/core/handlers/base.py

     
    7878        urlconf = getattr(request, "urlconf", settings.ROOT_URLCONF)
    7979
    8080        resolver = urlresolvers.RegexURLResolver(r'^/', urlconf)
     81        # Let's include the matched URL pattern in the request, which would be
     82        # useful when dealing with WSGI applications within Django:
     83        request.matched_url_regex = resolver.regex
    8184        try:
    8285            callback, callback_args, callback_kwargs = resolver.resolve(
    8386                    request.path_info)
  • django/views/wsgi.py

     
     1"""
     2Utilities to use WSGI applications within Django.
     3
     4"""
     5
     6from Cookie import SimpleCookie
     7from itertools import chain
     8
     9from django.http import HttpResponse
     10
     11
     12__all__ = ("call_wsgi_app", "make_wsgi_view")
     13
     14
     15def call_wsgi_app(wsgi_app, request, mount_point=None):
     16    """
     17    Call the ``wsgi_app`` with ``request`` and return its response.
     18   
     19    :param wsgi_app: The WSGI application to be run.
     20    :type wsgi_app: callable
     21    :param request: The Django request.
     22    :type request: :class:`django.http.HttpRequest`
     23    :param mount_point: The path where the WSGI application should be mounted.
     24    :type mount_point: regex pattern or :class:`basestring`
     25    :return: The response from the WSGI application, turned into a Django
     26        response.
     27    :rtype: :class:`django.http.HttpResponse`
     28   
     29    If ``mount_point`` is not present, the URL matched for the current request
     30    in Django will be used -- This is the desired behavior is most situations.
     31   
     32    """
     33    environ = request.environ.copy()
     34   
     35    # Moving the portion of the path consumed by the current view, from the
     36    # PATH_INTO to the SCRIPT_NAME:
     37    final_mount_point = mount_point or request.matched_url_regex
     38    if isinstance(final_mount_point, basestring):
     39        # It's already an string, so we just have to make sure it's valid:
     40        if not environ['PATH_INFO'].startswith(final_mount_point):
     41            raise ValueError("Path %s has not been consumed in PATH_INFO" %
     42                             final_mount_point)
     43    else:
     44        # It's a regular expression:
     45        match = final_mount_point.search(environ['PATH_INFO'][1:])
     46        if not match:
     47            regex = final_mount_point.pattern
     48            raise ValueError("Path pattern %s has not been consumed in "
     49                             "PATH_INFO" % regex)
     50        final_mount_point = "/%s" % match.group()
     51    environ['PATH_INFO'] = environ['PATH_INFO'][len(final_mount_point):]
     52    environ['SCRIPT_NAME'] = environ['SCRIPT_NAME'] + final_mount_point
     53   
     54    # If the user has been authenticated in Django, log him in the WSGI app:
     55    if request.user.is_authenticated():
     56        environ['REMOTE_USER'] = request.user.username
     57   
     58    # Cleaning the routing_args, if any. The application should have its own
     59    # arguments, without relying on any arguments from a parent application:
     60    if "wsgiorg.routing_args" in environ:
     61        del environ['wsgiorg.routing_args']
     62   
     63    # Calling the WSGI application and getting its response:
     64    response_wrapper = _ResponseStarter()
     65    wsgi_response = wsgi_app(environ, response_wrapper)
     66    body = chain(response_wrapper.body, wsgi_response)
     67   
     68    # Turning its response into a Django response:
     69    cookies = SimpleCookie()
     70    django_response = HttpResponse(body, status=response_wrapper.status)
     71    for (header, value) in response_wrapper.response_headers:
     72        if header.upper() == "SET-COOKIE":
     73            if isinstance(value, unicode):
     74                # It can't be Unicode:
     75                value = value.encode("us-ascii")
     76            cookies.load(value)
     77        else:
     78            django_response[header] = value
     79   
     80    # Setting the cookies from Django:
     81    for (cookie_name, cookie) in cookies.items():
     82        cookie_attributes = {
     83            'key': cookie_name,
     84            'value': cookie.value,
     85            'max_age': cookie.get("max-age"),
     86            'expires': cookie.get("expires"),
     87            'path': cookie.get("path", "/"),
     88            'domain': cookie.get("domain"),
     89            }
     90        django_response.set_cookie(**cookie_attributes)
     91    return django_response
     92
     93
     94def make_wsgi_view(wsgi_app, mount_point=None):
     95    """
     96    Return a callable which can be used as a Django view powered by the
     97    ``wsgi_app``.
     98   
     99    :param wsgi_app: The WSGI which will run the view.
     100    :return: The callable.
     101   
     102    """
     103    def view(request):
     104        return call_wsgi_app(wsgi_app, request, mount_point)
     105    return view
     106
     107
     108#{ Internal WSGI stuff
     109
     110
     111class _ResponseStarter(object):
     112    """
     113    Callable to be used as ``start_response`` in order to extract the HTTP
     114    status code and headers.
     115   
     116    """
     117   
     118    def __init__(self):
     119        self.status = None
     120        self.response_headers = []
     121        self.exc_info = None
     122        self.body = []
     123   
     124    def __call__(self, status, response_headers, exc_info=None):
     125        self.status = int(status[:3])
     126        self.response_headers = response_headers
     127        # exc_info is not used at all. It does not seem to be possible to use
     128        # it in Django.
     129        self.exc_info = exc_info
     130       
     131        def write(data):
     132            self.body.append(data)
     133       
     134        return write
     135
     136
     137#}
Back to Top