Opened 2 months ago

Last modified 6 days ago

#36916 assigned New feature

Add support for streaming with TaskGroups

Reported by: Thomas Grainger Owned by: Carlton Gibson
Component: HTTP handling Version: dev
Severity: Normal Keywords: structured-concurrency, taskgroups
Cc: Thomas Grainger Triage Stage: Accepted
Has patch: no Needs documentation: no
Needs tests: no Patch needs improvement: no
Easy pickings: no UI/UX: no

Description (last modified by Thomas Grainger)

https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
https://github.com/django/new-features/issues/117

Feature Description

see https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4

I’d like to be able to write code that combines multiple streams of data:

async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
    async def gen() -> AsyncGenerator[bytes]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                async for msg in rx:
                    yield msg  # yield in async generator!! illegal inside TaskGroup!
    return StreamingHttpResponse(gen())

Problem

however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.

from useful_types import SupportsAnext

class AsyncIteratorBytesResource(Protocol):
    """
    all the machinery needed to safely run an AsyncGenerator[Bytes]

    (for django-stubs) this allows AsyncGenerator[bytes] but is less strict
    so would also allow a anyio MemoryObjectRecieveStream[bytes]]
    """

    async def __aiter__(self) -> SupportsAnext[bytes]: ...
    async def aclose(self) -> object: ...


async def news_and_weather(request: HttpRequest) -> StreamingAcmgrHttpResponse:
    @contextlib.asynccontextmanager
    async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                yield rx  # yield inside asynccontextmanager, permitted inside TaskGroup

    return StreamingAcmgrHttpResponse(acmgr_gen())

Implementation Suggestions

https://github.com/django/django/pull/19364/changes

Change History (10)

comment:1 by Amar, 2 months ago

Description: modified (diff)
Owner: set to Amar
Status: newassigned

comment:2 by Amar, 2 months ago

Owner: Amar removed
Status: assignednew

comment:3 by Thomas Grainger, 2 months ago

Description: modified (diff)

comment:4 by Thomas Grainger, 2 months ago

Needs documentation: set

comment:5 by Jacob Walls, 2 months ago

Easy pickings: unset
Owner: set to Thomas Grainger
Status: newassigned
Summary: add support for streaming with TaskGroupsAdd support for streaming with TaskGroups
Triage Stage: UnreviewedAccepted
Type: UncategorizedNew feature

Thanks. Reception on the new-features repo issue seems positive, so though it hasn't moved through any swimlanes there yet, I will speculatively move this one to Accepted assuming it likely will.

comment:6 by Carlton Gibson, 3 weeks ago

Patch needs improvement: set

General approach looks good. I left some comments, and there’s some discussion about a possible alternative approach (but I’m not sure if that’s viable or not, so we should see if that appears.)

comment:7 by Thomas Grainger, 7 days ago

Has patch: unset
Patch needs improvement: unset

comment:8 by Thomas Grainger, 7 days ago

How about this approach? Working from my phone so posting as a patch rather than pushing directly.

Summary of changes (from Claude)

django/http/response.py

  • _set_streaming_content detects async context managers via hasattr(__aenter__, __aexit__), sets self.is_acmgr = True and stores the value in self.__acmgr
  • streaming_content property: is_acmgr is an early return at the top (unnested from is_async). Returns an @asynccontextmanager when is_acmgr is True — callers that check is_acmgr use async with response.streaming_content as agen. Both acmgr and regular async paths apply make_bytes and aclose their underlying iterator in a finally block
  • StreamingHttpResponse gains __aenter__/__aexit__: for acmgr responses, __aenter__ calls into self.streaming_content (no duplication of awrapper logic needed), stores the context and the yielded generator, then __aexit__ closes the generator and exits the context in the right order so TaskGroup cleans up correctly
  • __iter__, __aiter__, and getvalue all raise IsAcmgrException when is_acmgr is True
  • IsAcmgrException is exported from django.http

django/core/handlers/asgi.py

  • send_response simplified to async with response as content — works for both regular streaming and acmgr responses. aclosing no longer needed

django/middleware/gzip.py

  • Adds is_acmgr branch: captures response.streaming_content (an acmgr) and wraps it in a new @asynccontextmanager that feeds the yielded generator into acompress_sequence

django/utils/text.py

  • acompress_sequence wraps its entire body in try/finally to aclose its operand on exit if the method is present
  • django/core/handlers/asgi.py

    diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py
    index 9555860..0f55717 100644
    a b import sys  
    44import tempfile
    55import traceback
    66from collections import defaultdict
    7 from contextlib import aclosing, closing
     7from contextlib import closing
    88
    99from asgiref.sync import ThreadSensitiveContext, sync_to_async
    1010
    class ASGIHandler(base.BaseHandler):  
    315315        )
    316316        # Streaming responses need to be pinned to their iterator.
    317317        if response.streaming:
    318             # - Consume via `__aiter__` and not `streaming_content` directly,
    319             #   to allow mapping of a sync iterator.
    320             # - Use aclosing() when consuming aiter. See
    321             #   https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
    322             async with aclosing(aiter(response)) as content:
     318            async with response as content:
    323319                async for part in content:
    324320                    for chunk, _ in self.chunk_bytes(part):
    325321                        await send(
  • django/http/__init__.py

    diff --git a/django/http/__init__.py b/django/http/__init__.py
    index 628564e..2df8fa6 100644
    a b from django.http.request import (  
    88)
    99from django.http.response import (
    1010    BadHeaderError,
     11    IsAcmgrException,
    1112    FileResponse,
    1213    Http404,
    1314    HttpResponse,
    __all__ = [  
    4748    "HttpResponseServerError",
    4849    "Http404",
    4950    "BadHeaderError",
     51    "IsAcmgrException",
    5052    "JsonResponse",
    5153    "FileResponse",
    5254]
  • django/http/response.py

    diff --git a/django/http/response.py b/django/http/response.py
    index 9bf0b14..e3976da 100644
    a b import re  
    77import sys
    88import time
    99import warnings
     10from contextlib import asynccontextmanager
    1011from email.header import Header
    1112from http.client import responses
    1213from urllib.parse import urlsplit
    class BadHeaderError(ValueError):  
    104105    pass
    105106
    106107
     108class IsAcmgrException(Exception):
     109    pass
     110
     111
    107112class HttpResponseBase:
    108113    """
    109114    An HTTP response base class with dictionary-accessed headers.
    class StreamingHttpResponse(HttpResponseBase):  
    479484
    480485    @property
    481486    def streaming_content(self):
     487        if self.is_acmgr:
     488            # Pull to lexical scope in case streaming_content is set again.
     489            _acmgr = self.__acmgr
     490
     491            @asynccontextmanager
     492            async def acmgr_wrapper():
     493                async with _acmgr as agen:
     494                    try:
     495                        async def awrapper():
     496                            async for part in agen:
     497                                yield self.make_bytes(part)
     498
     499                        yield awrapper()
     500                    finally:
     501                        if hasattr(agen, "aclose"):
     502                            await agen.aclose()
     503
     504            return acmgr_wrapper()
    482505        if self.is_async:
    483506            # pull to lexical scope to capture fixed reference in case
    484507            # streaming_content is set again later.
    485508            _iterator = self._iterator
    486509
    487510            async def awrapper():
    488                 async for part in _iterator:
    489                     yield self.make_bytes(part)
     511                try:
     512                    async for part in _iterator:
     513                        yield self.make_bytes(part)
     514                finally:
     515                    if hasattr(_iterator, "aclose"):
     516                        await _iterator.aclose()
    490517
    491518            return awrapper()
    492519        else:
    class StreamingHttpResponse(HttpResponseBase):  
    498525
    499526    def _set_streaming_content(self, value):
    500527        # Ensure we can never iterate on "value" more than once.
     528        if hasattr(value, "__aenter__") and hasattr(value, "__aexit__"):
     529            self.__acmgr = value
     530            self.is_acmgr = True
     531            self.is_async = True
     532            return
     533        self.is_acmgr = False
    501534        try:
    502535            self._iterator = iter(value)
    503536            self.is_async = False
    class StreamingHttpResponse(HttpResponseBase):  
    507540        if hasattr(value, "close"):
    508541            self._resource_closers.append(value.close)
    509542
     543    async def __aenter__(self):
     544        if self.is_acmgr:
     545            self.__acmgr_ctx = self.streaming_content
     546            self.__agen = await self.__acmgr_ctx.__aenter__()
     547        else:
     548            self.__agen = aiter(self)
     549        return self.__agen
     550
     551    async def __aexit__(self, *exc_info):
     552        try:
     553            await self.__agen.aclose()
     554        finally:
     555            if self.is_acmgr:
     556                return await self.__acmgr_ctx.__aexit__(*exc_info)
     557        return None
     558
    510559    def __iter__(self):
     560        if self.is_acmgr:
     561            raise IsAcmgrException(
     562                "%s must be consumed via `async with`. Use `async with response` "
     563                "and iterate the yielded content." % self.__class__.__name__
     564            )
    511565        try:
    512566            return iter(self.streaming_content)
    513567        except TypeError:
    class StreamingHttpResponse(HttpResponseBase):  
    528582            return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator)))
    529583
    530584    async def __aiter__(self):
     585        if self.is_acmgr:
     586            raise IsAcmgrException(
     587                "%s must be consumed via `async with`. Use `async with response` "
     588                "and iterate the yielded content." % self.__class__.__name__
     589            )
    531590        try:
    532591            async for part in self.streaming_content:
    533592                yield part
    class StreamingHttpResponse(HttpResponseBase):  
    544603                yield part
    545604
    546605    def getvalue(self):
     606        if self.is_acmgr:
     607            raise IsAcmgrException(
     608                "%s must be consumed via `async with`. Use `async with response` "
     609                "and iterate the yielded content." % self.__class__.__name__
     610            )
    547611        return b"".join(self.streaming_content)
    548612
    549613
  • django/middleware/gzip.py

    diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py
    index eb151d7..78b5739 100644
    a b  
     1from contextlib import asynccontextmanager
     2
    13from django.utils.cache import patch_vary_headers
    24from django.utils.deprecation import MiddlewareMixin
    35from django.utils.regex_helper import _lazy_re_compile
    class GZipMiddleware(MiddlewareMixin):  
    3133            return response
    3234
    3335        if response.streaming:
    34             if response.is_async:
     36            if response.is_acmgr:
     37                original_acmgr = response.streaming_content
     38                max_random_bytes = self.max_random_bytes
     39
     40                @asynccontextmanager
     41                async def compressed_acmgr():
     42                    async with original_acmgr as agen:
     43                        yield acompress_sequence(
     44                            agen,
     45                            max_random_bytes=max_random_bytes,
     46                        )
     47
     48                response.streaming_content = compressed_acmgr()
     49            elif response.is_async:
    3550                response.streaming_content = acompress_sequence(
    3651                    response.streaming_content,
    3752                    max_random_bytes=self.max_random_bytes,
  • django/utils/text.py

    diff --git a/django/utils/text.py b/django/utils/text.py
    index d1306f9..55bd6f5 100644
    a b def compress_sequence(sequence, *, max_random_bytes=None):  
    390390
    391391
    392392async def acompress_sequence(sequence, *, max_random_bytes=None):
    393     buf = StreamingBuffer()
    394     filename = _get_random_filename(max_random_bytes) if max_random_bytes else None
    395     with GzipFile(
    396         filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0
    397     ) as zfile:
    398         # Output headers...
     393    try:
     394        buf = StreamingBuffer()
     395        filename = _get_random_filename(max_random_bytes) if max_random_bytes else None
     396        with GzipFile(
     397            filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0
     398        ) as zfile:
     399            # Output headers...
     400            yield buf.read()
     401            async for item in sequence:
     402                zfile.write(item)
     403                zfile.flush()
     404                data = buf.read()
     405                if data:
     406                    yield data
    399407        yield buf.read()
    400         async for item in sequence:
    401             zfile.write(item)
    402             zfile.flush()
    403             data = buf.read()
    404             if data:
    405                 yield data
    406     yield buf.read()
     408    finally:
     409        if hasattr(sequence, "aclose"):
     410            await sequence.aclose()

comment:9 by Thomas Grainger, 7 days ago

Attaching updated patch based on the approach discussed in PR #19364.

Summary of changes

django/http/response.py:

  • _set_streaming_content detects async context managers via hasattr(__aenter__, __aexit__), sets is_acmgr = True and stores the value
  • streaming_content property returns an @asynccontextmanager wrapping make_bytes when is_acmgr is True
  • StreamingHttpResponse gains __aenter__/__aexit__: for acmgr responses, enters the streaming_content CM; for regular responses, returns aiter(self). __aexit__ only handles acmgr CM cleanup — aclosing in the ASGI handler handles iterator close
  • __iter__, __aiter__, and getvalue raise IsAcmgrException when is_acmgr is True

django/core/handlers/asgi.py:

  • Single code path: async with response as agen, aclosing(agen) as content: — works for both regular streaming and acmgr responses

django/middleware/gzip.py:

  • Adds is_acmgr branch that wraps the acmgr in a new @asynccontextmanager feeding into acompress_sequence

django/utils/text.py:

  • acompress_sequence wraps its body in try/finally to aclose its operand

Tests

  • 13 new tests in tests/httpwrappers/tests.py covering: basic acmgr streaming, make_bytes coercion, IsAcmgrException guards on __iter__/__aiter__/getvalue, __aexit__ on error and break, non-acmgr __aenter__ fallback, reassignment, single-producer TaskGroup+Queue, and multi-producer fan-in (news-and-weather pattern)
  • 1 new test in tests/middleware/tests.py for gzip compression of acmgr streaming responses

Docs

  • New "Streaming with TaskGroup" section in docs/ref/request-response.txt with full news_and_weather example, key points re PEP 789, and anyio note
  • Release note in docs/releases/6.1.txt

Patch

  • django/core/handlers/asgi.py

    diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py
    index 7ee5208..bcdafdc 100644
    a b class ASGIHandler(base.BaseHandler):  
    318318        )
    319319        # Streaming responses need to be pinned to their iterator.
    320320        if response.streaming:
    321             # - Consume via `__aiter__` and not `streaming_content` directly,
    322             #   to allow mapping of a sync iterator.
    323             # - Use aclosing() when consuming aiter. See
    324             #   https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
    325             async with aclosing(aiter(response)) as content:
     321            # Use aclosing() when consuming aiter. See
     322            # https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
     323            async with response as agen, aclosing(agen) as content:
    326324                async for part in content:
    327325                    for chunk, _ in self.chunk_bytes(part):
    328326                        await send(
  • django/http/__init__.py

    diff --git a/django/http/__init__.py b/django/http/__init__.py
    index 628564e..2df8fa6 100644
    a b from django.http.request import (  
    88)
    99from django.http.response import (
    1010    BadHeaderError,
     11    IsAcmgrException,
    1112    FileResponse,
    1213    Http404,
    1314    HttpResponse,
    __all__ = [  
    4748    "HttpResponseServerError",
    4849    "Http404",
    4950    "BadHeaderError",
     51    "IsAcmgrException",
    5052    "JsonResponse",
    5153    "FileResponse",
    5254]
  • django/http/response.py

    diff --git a/django/http/response.py b/django/http/response.py
    index 9bf0b14..c1c0c20 100644
    a b import re  
    77import sys
    88import time
    99import warnings
     10from contextlib import asynccontextmanager
    1011from email.header import Header
    1112from http.client import responses
    1213from urllib.parse import urlsplit
    class BadHeaderError(ValueError):  
    104105    pass
    105106
    106107
     108class IsAcmgrException(Exception):
     109    pass
     110
     111
    107112class HttpResponseBase:
    108113    """
    109114    An HTTP response base class with dictionary-accessed headers.
    class StreamingHttpResponse(HttpResponseBase):  
    479484
    480485    @property
    481486    def streaming_content(self):
     487        if self.is_acmgr:
     488            # Pull to lexical scope in case streaming_content is set again.
     489            _acmgr = self.__acmgr
     490
     491            @asynccontextmanager
     492            async def acmgr_wrapper():
     493                async with _acmgr as agen:
     494                    async def awrapper():
     495                        try:
     496                            async for part in agen:
     497                                yield self.make_bytes(part)
     498                        finally:
     499                            if hasattr(agen, "aclose"):
     500                                await agen.aclose()
     501
     502                    yield awrapper()
     503
     504            return acmgr_wrapper()
    482505        if self.is_async:
    483506            # pull to lexical scope to capture fixed reference in case
    484507            # streaming_content is set again later.
    485508            _iterator = self._iterator
    486509
    487510            async def awrapper():
    488                 async for part in _iterator:
    489                     yield self.make_bytes(part)
     511                try:
     512                    async for part in _iterator:
     513                        yield self.make_bytes(part)
     514                finally:
     515                    if hasattr(_iterator, "aclose"):
     516                        await _iterator.aclose()
    490517
    491518            return awrapper()
    492519        else:
    class StreamingHttpResponse(HttpResponseBase):  
    498525
    499526    def _set_streaming_content(self, value):
    500527        # Ensure we can never iterate on "value" more than once.
     528        if hasattr(value, "__aenter__") and hasattr(value, "__aexit__"):
     529            self.__acmgr = value
     530            self.is_acmgr = True
     531            self.is_async = True
     532            return
     533        self.is_acmgr = False
    501534        try:
    502535            self._iterator = iter(value)
    503536            self.is_async = False
    class StreamingHttpResponse(HttpResponseBase):  
    507540        if hasattr(value, "close"):
    508541            self._resource_closers.append(value.close)
    509542
     543    async def __aenter__(self):
     544        if self.is_acmgr:
     545            self.__acmgr_ctx = self.streaming_content
     546            return await self.__acmgr_ctx.__aenter__()
     547        return aiter(self)
     548
     549    async def __aexit__(self, *exc_info):
     550        if self.is_acmgr:
     551            return await self.__acmgr_ctx.__aexit__(*exc_info)
     552
    510553    def __iter__(self):
     554        if self.is_acmgr:
     555            raise IsAcmgrException(
     556                "%s must be consumed via `async with`. Use `async with response` "
     557                "and iterate the yielded content." % self.__class__.__name__
     558            )
    511559        try:
    512560            return iter(self.streaming_content)
    513561        except TypeError:
    class StreamingHttpResponse(HttpResponseBase):  
    528576            return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator)))
    529577
    530578    async def __aiter__(self):
     579        if self.is_acmgr:
     580            raise IsAcmgrException(
     581                "%s must be consumed via `async with`. Use `async with response` "
     582                "and iterate the yielded content." % self.__class__.__name__
     583            )
    531584        try:
    532585            async for part in self.streaming_content:
    533586                yield part
    class StreamingHttpResponse(HttpResponseBase):  
    544597                yield part
    545598
    546599    def getvalue(self):
     600        if self.is_acmgr:
     601            raise IsAcmgrException(
     602                "%s must be consumed via `async with`. Use `async with response` "
     603                "and iterate the yielded content." % self.__class__.__name__
     604            )
    547605        return b"".join(self.streaming_content)
    548606
    549607
  • django/middleware/gzip.py

    diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py
    index eb151d7..78b5739 100644
    a b  
     1from contextlib import asynccontextmanager
     2
    13from django.utils.cache import patch_vary_headers
    24from django.utils.deprecation import MiddlewareMixin
    35from django.utils.regex_helper import _lazy_re_compile
    class GZipMiddleware(MiddlewareMixin):  
    3133            return response
    3234
    3335        if response.streaming:
    34             if response.is_async:
     36            if response.is_acmgr:
     37                original_acmgr = response.streaming_content
     38                max_random_bytes = self.max_random_bytes
     39
     40                @asynccontextmanager
     41                async def compressed_acmgr():
     42                    async with original_acmgr as agen:
     43                        yield acompress_sequence(
     44                            agen,
     45                            max_random_bytes=max_random_bytes,
     46                        )
     47
     48                response.streaming_content = compressed_acmgr()
     49            elif response.is_async:
    3550                response.streaming_content = acompress_sequence(
    3651                    response.streaming_content,
    3752                    max_random_bytes=self.max_random_bytes,
  • django/utils/text.py

    diff --git a/django/utils/text.py b/django/utils/text.py
    index d1306f9..55bd6f5 100644
    a b def compress_sequence(sequence, *, max_random_bytes=None):  
    390390
    391391
    392392async def acompress_sequence(sequence, *, max_random_bytes=None):
    393     buf = StreamingBuffer()
    394     filename = _get_random_filename(max_random_bytes) if max_random_bytes else None
    395     with GzipFile(
    396         filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0
    397     ) as zfile:
    398         # Output headers...
     393    try:
     394        buf = StreamingBuffer()
     395        filename = _get_random_filename(max_random_bytes) if max_random_bytes else None
     396        with GzipFile(
     397            filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0
     398        ) as zfile:
     399            # Output headers...
     400            yield buf.read()
     401            async for item in sequence:
     402                zfile.write(item)
     403                zfile.flush()
     404                data = buf.read()
     405                if data:
     406                    yield data
    399407        yield buf.read()
    400         async for item in sequence:
    401             zfile.write(item)
    402             zfile.flush()
    403             data = buf.read()
    404             if data:
    405                 yield data
    406     yield buf.read()
     408    finally:
     409        if hasattr(sequence, "aclose"):
     410            await sequence.aclose()
    407411
    408412
    409413# Expression to match some_token and some_token="with spaces" (and similarly
  • docs/ref/request-response.txt

    diff --git a/docs/ref/request-response.txt b/docs/ref/request-response.txt
    index ba60415..945683a 100644
    a b is streaming. If you perform long-running operations in your view before  
    14241424returning the ``StreamingHttpResponse`` object, then you may also want to
    14251425:ref:`handle disconnections in the view <async-handling-disconnect>` itself.
    14261426
     1427.. _request-response-streaming-task-groups:
     1428
     1429Streaming with ``TaskGroup``
     1430----------------------------
     1431
     1432.. versionadded:: 6.1
     1433
     1434When serving under ASGI you may want to stream content that is produced by
     1435background tasks — for example, fan-in from multiple websocket connections or
     1436long-running computations. Python's :class:`asyncio.TaskGroup` is the natural
     1437way to manage those tasks, but an ``async for`` loop cannot ``yield`` inside a
     1438``TaskGroup`` context (see :pep:`789` for details).
     1439
     1440The solution is to pass an :func:`~contextlib.asynccontextmanager` instance as
     1441the ``streaming_content`` argument. The context manager's ``__aenter__`` sets up
     1442the ``TaskGroup`` (and any other resources), then ``yield`` s an async iterator
     1443that the framework will consume. When the response finishes — or the client
     1444disconnects — the context manager's ``__aexit__`` tears everything down in the
     1445correct order.
     1446
     1447Here is a complete example that merges two websocket feeds into a single
     1448streaming response::
     1449
     1450    import asyncio
     1451    from collections.abc import AsyncGenerator
     1452    from contextlib import asynccontextmanager
     1453
     1454    from django.http import StreamingHttpResponse
     1455
     1456
     1457    @asynccontextmanager
     1458    async def news_and_weather() -> AsyncGenerator[AsyncGenerator[bytes, None], None]:
     1459        queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=1)
     1460
     1461        async def consume_ws(url: str) -> None:
     1462            async with await connect_ws(url) as ws:
     1463                async for msg in ws:
     1464                    await queue.put(msg)
     1465
     1466        async def run_producers() -> None:
     1467            async with asyncio.TaskGroup() as tg:
     1468                tg.create_task(consume_ws("ws://news.example.com/feed"))
     1469                tg.create_task(consume_ws("ws://weather.example.com/feed"))
     1470            await queue.put(None)  # sentinel: all producers finished
     1471
     1472        async def items() -> AsyncGenerator[bytes, None]:
     1473            while (item := await queue.get()) is not None:
     1474                yield item
     1475
     1476        async with asyncio.TaskGroup() as tg:
     1477            tg.create_task(run_producers())
     1478            yield items()
     1479
     1480
     1481    async def news_and_weather_view(request):
     1482        return StreamingHttpResponse(news_and_weather())
     1483
     1484Key points:
     1485
     1486* ``news_and_weather()`` is decorated with
     1487  :func:`~contextlib.asynccontextmanager`. Django detects this via
     1488  ``hasattr(value, "__aenter__")`` in ``_set_streaming_content`` and sets
     1489  :attr:`~StreamingHttpResponse.is_acmgr` to ``True``.
     1490
     1491* The ``yield`` that suspends the frame happens inside
     1492  ``@asynccontextmanager``, which correctly routes exceptions back to the
     1493  ``TaskGroup`` cancel scope. The ``items()`` async generator yielded *out* to
     1494  the caller never yields inside a cancel scope, so it is safe.
     1495
     1496* Middleware that wraps ``streaming_content`` (such as
     1497  :class:`~django.middleware.gzip.GZipMiddleware`) is aware of the
     1498  ``is_acmgr`` flag and preserves the context-manager protocol.
     1499
     1500* Attempting to iterate an ``is_acmgr`` response directly (via ``__iter__``,
     1501  ``__aiter__``, or ``getvalue()``) raises
     1502  :exc:`~django.http.IsAcmgrException`. The response must be consumed with
     1503  ``async with``::
     1504
     1505      async with response as agen:
     1506          async for chunk in agen:
     1507              ...
     1508
     1509.. note::
     1510
     1511    The `anyio <https://anyio.readthedocs.io/>`_ library provides
     1512    :func:`anyio.create_task_group` and :func:`anyio.create_memory_object_stream`,
     1513    which can be used as an alternative to :class:`asyncio.TaskGroup` and
     1514    :class:`asyncio.Queue`. A ``MemoryObjectReceiveStream`` is an async
     1515    iterable and can be yielded directly from the context manager.
     1516
     1517.. attribute:: StreamingHttpResponse.is_acmgr
     1518
     1519    Boolean indicating whether the response was created with an async context
     1520    manager as its streaming content. When ``True``,
     1521    :attr:`~StreamingHttpResponse.is_async` is also ``True`` and the response
     1522    must be consumed via ``async with``.
     1523
    14271524``FileResponse`` objects
    14281525========================
    14291526
  • docs/releases/6.1.txt

    diff --git a/docs/releases/6.1.txt b/docs/releases/6.1.txt
    index 00eaf53..790782c 100644
    a b Pagination  
    323323Requests and Responses
    324324~~~~~~~~~~~~~~~~~~~~~~
    325325
     326* :class:`~django.http.StreamingHttpResponse` now accepts an async context
     327  manager as ``streaming_content``, enabling streaming patterns that require
     328  :class:`asyncio.TaskGroup` or similar structured concurrency primitives. See
     329  :ref:`request-response-streaming-task-groups` for details.
     330
    326331* :attr:`HttpRequest.multipart_parser_class <django.http.HttpRequest.multipart_parser_class>`
    327332  can now be customized to use a different multipart parser class.
    328333
  • tests/asgi/urls.py

    diff --git a/tests/asgi/urls.py b/tests/asgi/urls.py
    index 0311cf3..b7fa206 100644
    a b  
    11import asyncio
     2import contextlib
    23import threading
    34import time
    45
  • tests/httpwrappers/tests.py

    diff --git a/tests/httpwrappers/tests.py b/tests/httpwrappers/tests.py
    index 3e8364e..a2add61 100644
    a b  
     1import asyncio
     2import contextlib
    13import copy
    24import json
    35import os
    46import pickle
     7import sys
    58import unittest
    69import uuid
    710
    from django.http import (  
    1619    HttpResponseNotModified,
    1720    HttpResponsePermanentRedirect,
    1821    HttpResponseRedirect,
     22    IsAcmgrException,
    1923    JsonResponse,
    2024    QueryDict,
    2125    SimpleCookie,
    class StreamingHttpResponseTests(SimpleTestCase):  
    808812        with self.assertWarnsMessage(Warning, msg):
    809813            self.assertEqual(b"hello", await anext(aiter(r)))
    810814
     815    async def test_async_context_manager_streaming_response(self):
     816        """StreamingHttpResponse accepts an async context manager as content."""
     817
     818        @contextlib.asynccontextmanager
     819        async def acmgr():
     820            async def iterator():
     821                yield b"hello"
     822                yield b"world"
     823
     824            yield iterator()
     825
     826        r = StreamingHttpResponse(acmgr())
     827        self.assertTrue(r.is_acmgr)
     828        self.assertTrue(r.is_async)
     829
     830        chunks = []
     831        async with r as agen, contextlib.aclosing(agen) as content:
     832            async for chunk in content:
     833                chunks.append(chunk)
     834        self.assertEqual(chunks, [b"hello", b"world"])
     835
     836    async def test_acmgr_make_bytes(self):
     837        """Acmgr streaming content is coerced to bytes via make_bytes."""
     838
     839        @contextlib.asynccontextmanager
     840        async def acmgr():
     841            async def iterator():
     842                yield "hello"
     843                yield "café"
     844
     845            yield iterator()
     846
     847        r = StreamingHttpResponse(acmgr())
     848        chunks = []
     849        async with r as agen, contextlib.aclosing(agen) as content:
     850            async for chunk in content:
     851                chunks.append(chunk)
     852        self.assertEqual(chunks, [b"hello", b"caf\xc3\xa9"])
     853
     854    async def test_acmgr_iter_raises(self):
     855        """Iterating an acmgr response via __iter__ raises IsAcmgrException."""
     856
     857        @contextlib.asynccontextmanager
     858        async def acmgr():
     859            yield iter([b"hello"])
     860
     861        r = StreamingHttpResponse(acmgr())
     862        msg = (
     863            "StreamingHttpResponse must be consumed via `async with`. "
     864            "Use `async with response` and iterate the yielded content."
     865        )
     866        with self.assertRaisesMessage(IsAcmgrException, msg):
     867            iter(r)
     868
     869    async def test_acmgr_aiter_raises(self):
     870        """Iterating an acmgr response via __aiter__ raises IsAcmgrException."""
     871
     872        @contextlib.asynccontextmanager
     873        async def acmgr():
     874            yield iter([b"hello"])
     875
     876        r = StreamingHttpResponse(acmgr())
     877        msg = (
     878            "StreamingHttpResponse must be consumed via `async with`. "
     879            "Use `async with response` and iterate the yielded content."
     880        )
     881        with self.assertRaisesMessage(IsAcmgrException, msg):
     882            async for _ in r:
     883                pass
     884
     885    async def test_acmgr_getvalue_raises(self):
     886        """Calling getvalue() on an acmgr response raises IsAcmgrException."""
     887
     888        @contextlib.asynccontextmanager
     889        async def acmgr():
     890            yield iter([b"hello"])
     891
     892        r = StreamingHttpResponse(acmgr())
     893        with self.assertRaises(IsAcmgrException):
     894            r.getvalue()
     895
     896    async def test_acmgr_aexit_called_on_error(self):
     897        """The acmgr's __aexit__ is called even when iteration raises."""
     898        exited = []
     899
     900        @contextlib.asynccontextmanager
     901        async def acmgr():
     902            try:
     903                async def iterator():
     904                    yield b"hello"
     905                    raise ValueError("boom")
     906
     907                yield iterator()
     908            finally:
     909                exited.append(True)
     910
     911        r = StreamingHttpResponse(acmgr())
     912        with self.assertRaises(ValueError):
     913            async with r as agen, contextlib.aclosing(agen) as content:
     914                async for _ in content:
     915                    pass
     916        self.assertEqual(exited, [True])
     917
     918    async def test_acmgr_aexit_called_on_break(self):
     919        """The acmgr cleans up when the consumer breaks out early."""
     920        exited = []
     921
     922        @contextlib.asynccontextmanager
     923        async def acmgr():
     924            try:
     925                async def iterator():
     926                    yield b"hello"
     927                    yield b"world"
     928
     929                yield iterator()
     930            finally:
     931                exited.append(True)
     932
     933        r = StreamingHttpResponse(acmgr())
     934        async with r as agen, contextlib.aclosing(agen) as content:
     935            async for _ in content:
     936                break
     937        self.assertEqual(exited, [True])
     938
     939    async def test_acmgr_non_acmgr_aenter_returns_aiter(self):
     940        """__aenter__ on a non-acmgr async response returns aiter(self)."""
     941
     942        async def async_iter():
     943            yield b"hello"
     944
     945        r = StreamingHttpResponse(async_iter())
     946        self.assertFalse(r.is_acmgr)
     947        agen = await r.__aenter__()
     948        chunk = await agen.__anext__()
     949        self.assertEqual(chunk, b"hello")
     950        await r.__aexit__(None, None, None)
     951
     952    async def test_acmgr_reassign_streaming_content(self):
     953        """Assigning a new acmgr to streaming_content replaces the old one."""
     954
     955        @contextlib.asynccontextmanager
     956        async def acmgr1():
     957            async def it():
     958                yield b"first"
     959
     960            yield it()
     961
     962        @contextlib.asynccontextmanager
     963        async def acmgr2():
     964            async def it():
     965                yield b"second"
     966
     967            yield it()
     968
     969        r = StreamingHttpResponse(acmgr1())
     970        r.streaming_content = acmgr2()
     971        self.assertTrue(r.is_acmgr)
     972
     973        chunks = []
     974        async with r as agen, contextlib.aclosing(agen) as content:
     975            async for chunk in content:
     976                chunks.append(chunk)
     977        self.assertEqual(chunks, [b"second"])
     978
     979    @unittest.skipUnless(
     980        sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python 3.11+"
     981    )
     982    async def test_taskgroup_queue_streaming_response(self):
     983        """
     984        StreamingHttpResponse works with asyncio.TaskGroup and asyncio.Queue,
     985        the canonical pattern for background-task-driven streaming.
     986        """
     987
     988        @contextlib.asynccontextmanager
     989        async def taskgroup_stream():
     990            queue = asyncio.Queue()
     991
     992            async def producer():
     993                for chunk in [b"hello", b"world"]:
     994                    await queue.put(chunk)
     995                await queue.put(None)  # sentinel
     996
     997            async def generator():
     998                while True:
     999                    chunk = await queue.get()
     1000                    if chunk is None:
     1001                        break
     1002                    yield chunk
     1003
     1004            async with asyncio.TaskGroup() as tg:
     1005                tg.create_task(producer())
     1006                yield generator()
     1007
     1008        r = StreamingHttpResponse(taskgroup_stream())
     1009        self.assertTrue(r.is_acmgr)
     1010
     1011        chunks = []
     1012        async with r as agen, contextlib.aclosing(agen) as content:
     1013            async for chunk in content:
     1014                chunks.append(chunk)
     1015        self.assertEqual(chunks, [b"hello", b"world"])
     1016
     1017    @unittest.skipUnless(
     1018        sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python 3.11+"
     1019    )
     1020    async def test_taskgroup_multiple_producers_streaming_response(self):
     1021        """
     1022        StreamingHttpResponse works with multiple producer tasks feeding a
     1023        shared queue, as in the news-and-weather websocket fan-in pattern.
     1024        """
     1025
     1026        @contextlib.asynccontextmanager
     1027        async def multi_producer_stream():
     1028            queue = asyncio.Queue(maxsize=1)
     1029
     1030            async def consume(source):
     1031                """Simulate a websocket consumer pushing to the shared queue."""
     1032                for msg in source:
     1033                    await queue.put(msg)
     1034
     1035            async def run_producers():
     1036                async with asyncio.TaskGroup() as tg:
     1037                    tg.create_task(consume([b"news-1", b"news-2"]))
     1038                    tg.create_task(consume([b"weather-1"]))
     1039                await queue.put(None)  # sentinel after all producers finish
     1040
     1041            async def items():
     1042                while (item := await queue.get()) is not None:
     1043                    yield item
     1044
     1045            async with asyncio.TaskGroup() as tg:
     1046                tg.create_task(run_producers())
     1047                yield items()
     1048
     1049        r = StreamingHttpResponse(multi_producer_stream())
     1050        self.assertTrue(r.is_acmgr)
     1051
     1052        chunks = []
     1053        async with r as agen, contextlib.aclosing(agen) as content:
     1054            async for chunk in content:
     1055                chunks.append(chunk)
     1056        # All three messages arrive (order depends on scheduling).
     1057        self.assertEqual(sorted(chunks), [b"news-1", b"news-2", b"weather-1"])
     1058
    8111059    def test_text_attribute_error(self):
    8121060        r = StreamingHttpResponse(iter(["hello", "world"]))
    8131061        msg = "This %s instance has no `text` attribute." % r.__class__.__name__
  • tests/middleware/tests.py

    diff --git a/tests/middleware/tests.py b/tests/middleware/tests.py
    index a61c4b1..4192ada 100644
    a b  
     1import contextlib
    12import gzip
    23import random
    34import re
    class GZipMiddlewareTest(SimpleTestCase):  
    936937        self.assertEqual(r.get("Content-Encoding"), "gzip")
    937938        self.assertFalse(r.has_header("Content-Length"))
    938939
     940    async def test_compress_acmgr_streaming_response(self):
     941        """
     942        Compression is performed on responses with async context manager content.
     943        """
     944
     945        async def get_stream_response(request):
     946            @contextlib.asynccontextmanager
     947            async def acmgr():
     948                async def iterator():
     949                    for chunk in self.sequence:
     950                        yield chunk
     951
     952                yield iterator()
     953
     954            resp = StreamingHttpResponse(acmgr())
     955            resp["Content-Type"] = "text/html; charset=UTF-8"
     956            return resp
     957
     958        r = await GZipMiddleware(get_stream_response)(self.req)
     959        self.assertTrue(r.is_acmgr)
     960        chunks = []
     961        async with r as agen, contextlib.aclosing(agen) as content:
     962            async for chunk in content:
     963                chunks.append(chunk)
     964        self.assertEqual(
     965            self.decompress(b"".join(chunks)),
     966            b"".join(self.sequence),
     967        )
     968        self.assertEqual(r.get("Content-Encoding"), "gzip")
     969        self.assertFalse(r.has_header("Content-Length"))
     970
    939971    def test_compress_streaming_response_unicode(self):
    940972        """
    941973        Compression is performed on responses with streaming Unicode content.

comment:10 by Jacob Walls, 6 days ago

Needs documentation: unset
Owner: changed from Thomas Grainger to Carlton Gibson
Note: See TracTickets for help on using tickets.
Back to Top