Opened 2 months ago
Last modified 6 days ago
#36916 assigned New feature
Add support for streaming with TaskGroups
| Reported by: | Thomas Grainger | Owned by: | Carlton Gibson |
|---|---|---|---|
| Component: | HTTP handling | Version: | dev |
| Severity: | Normal | Keywords: | structured-concurrency, taskgroups |
| Cc: | Thomas Grainger | Triage Stage: | Accepted |
| Has patch: | no | Needs documentation: | no |
| Needs tests: | no | Patch needs improvement: | no |
| Easy pickings: | no | UI/UX: | no |
Description (last modified by )
https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
https://github.com/django/new-features/issues/117
Feature Description
see https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
I’d like to be able to write code that combines multiple streams of data:
async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
async def gen() -> AsyncGenerator[bytes]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather", tx.clone())
tx.close()
async for msg in rx:
yield msg # yield in async generator!! illegal inside TaskGroup!
return StreamingHttpResponse(gen())
Problem
however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.
from useful_types import SupportsAnext
class AsyncIteratorBytesResource(Protocol):
"""
all the machinery needed to safely run an AsyncGenerator[Bytes]
(for django-stubs) this allows AsyncGenerator[bytes] but is less strict
so would also allow a anyio MemoryObjectRecieveStream[bytes]]
"""
async def __aiter__(self) -> SupportsAnext[bytes]: ...
async def aclose(self) -> object: ...
async def news_and_weather(request: HttpRequest) -> StreamingAcmgrHttpResponse:
@contextlib.asynccontextmanager
async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather", tx.clone())
tx.close()
yield rx # yield inside asynccontextmanager, permitted inside TaskGroup
return StreamingAcmgrHttpResponse(acmgr_gen())
Implementation Suggestions
Change History (10)
comment:1 by , 2 months ago
| Description: | modified (diff) |
|---|---|
| Owner: | set to |
| Status: | new → assigned |
comment:2 by , 2 months ago
| Owner: | removed |
|---|---|
| Status: | assigned → new |
comment:3 by , 2 months ago
| Description: | modified (diff) |
|---|
comment:4 by , 2 months ago
| Needs documentation: | set |
|---|
comment:5 by , 2 months ago
| Easy pickings: | unset |
|---|---|
| Owner: | set to |
| Status: | new → assigned |
| Summary: | add support for streaming with TaskGroups → Add support for streaming with TaskGroups |
| Triage Stage: | Unreviewed → Accepted |
| Type: | Uncategorized → New feature |
comment:6 by , 3 weeks ago
| Patch needs improvement: | set |
|---|
General approach looks good. I left some comments, and there’s some discussion about a possible alternative approach (but I’m not sure if that’s viable or not, so we should see if that appears.)
comment:7 by , 7 days ago
| Has patch: | unset |
|---|---|
| Patch needs improvement: | unset |
comment:8 by , 7 days ago
How about this approach? Working from my phone so posting as a patch rather than pushing directly.
Summary of changes (from Claude)
django/http/response.py
_set_streaming_contentdetects async context managers viahasattr(__aenter__, __aexit__), setsself.is_acmgr = Trueand stores the value inself.__acmgrstreaming_contentproperty:is_acmgris an early return at the top (unnested fromis_async). Returns an@asynccontextmanagerwhenis_acmgris True — callers that checkis_acmgruseasync with response.streaming_content as agen. Both acmgr and regular async paths applymake_bytesandaclosetheir underlying iterator in afinallyblockStreamingHttpResponsegains__aenter__/__aexit__: for acmgr responses,__aenter__calls intoself.streaming_content(no duplication ofawrapperlogic needed), stores the context and the yielded generator, then__aexit__closes the generator and exits the context in the right order soTaskGroupcleans up correctly__iter__,__aiter__, andgetvalueall raiseIsAcmgrExceptionwhenis_acmgris TrueIsAcmgrExceptionis exported fromdjango.http
django/core/handlers/asgi.py
send_responsesimplified toasync with response as content— works for both regular streaming and acmgr responses.aclosingno longer needed
django/middleware/gzip.py
- Adds
is_acmgrbranch: capturesresponse.streaming_content(an acmgr) and wraps it in a new@asynccontextmanagerthat feeds the yielded generator intoacompress_sequence
django/utils/text.py
acompress_sequencewraps its entire body intry/finallytoacloseits operand on exit if the method is present
-
django/core/handlers/asgi.py
diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py index 9555860..0f55717 100644
a b import sys 4 4 import tempfile 5 5 import traceback 6 6 from collections import defaultdict 7 from contextlib import aclosing,closing7 from contextlib import closing 8 8 9 9 from asgiref.sync import ThreadSensitiveContext, sync_to_async 10 10 … … class ASGIHandler(base.BaseHandler): 315 315 ) 316 316 # Streaming responses need to be pinned to their iterator. 317 317 if response.streaming: 318 # - Consume via `__aiter__` and not `streaming_content` directly, 319 # to allow mapping of a sync iterator. 320 # - Use aclosing() when consuming aiter. See 321 # https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e 322 async with aclosing(aiter(response)) as content: 318 async with response as content: 323 319 async for part in content: 324 320 for chunk, _ in self.chunk_bytes(part): 325 321 await send( -
django/http/__init__.py
diff --git a/django/http/__init__.py b/django/http/__init__.py index 628564e..2df8fa6 100644
a b from django.http.request import ( 8 8 ) 9 9 from django.http.response import ( 10 10 BadHeaderError, 11 IsAcmgrException, 11 12 FileResponse, 12 13 Http404, 13 14 HttpResponse, … … __all__ = [ 47 48 "HttpResponseServerError", 48 49 "Http404", 49 50 "BadHeaderError", 51 "IsAcmgrException", 50 52 "JsonResponse", 51 53 "FileResponse", 52 54 ] -
django/http/response.py
diff --git a/django/http/response.py b/django/http/response.py index 9bf0b14..e3976da 100644
a b import re 7 7 import sys 8 8 import time 9 9 import warnings 10 from contextlib import asynccontextmanager 10 11 from email.header import Header 11 12 from http.client import responses 12 13 from urllib.parse import urlsplit … … class BadHeaderError(ValueError): 104 105 pass 105 106 106 107 108 class IsAcmgrException(Exception): 109 pass 110 111 107 112 class HttpResponseBase: 108 113 """ 109 114 An HTTP response base class with dictionary-accessed headers. … … class StreamingHttpResponse(HttpResponseBase): 479 484 480 485 @property 481 486 def streaming_content(self): 487 if self.is_acmgr: 488 # Pull to lexical scope in case streaming_content is set again. 489 _acmgr = self.__acmgr 490 491 @asynccontextmanager 492 async def acmgr_wrapper(): 493 async with _acmgr as agen: 494 try: 495 async def awrapper(): 496 async for part in agen: 497 yield self.make_bytes(part) 498 499 yield awrapper() 500 finally: 501 if hasattr(agen, "aclose"): 502 await agen.aclose() 503 504 return acmgr_wrapper() 482 505 if self.is_async: 483 506 # pull to lexical scope to capture fixed reference in case 484 507 # streaming_content is set again later. 485 508 _iterator = self._iterator 486 509 487 510 async def awrapper(): 488 async for part in _iterator: 489 yield self.make_bytes(part) 511 try: 512 async for part in _iterator: 513 yield self.make_bytes(part) 514 finally: 515 if hasattr(_iterator, "aclose"): 516 await _iterator.aclose() 490 517 491 518 return awrapper() 492 519 else: … … class StreamingHttpResponse(HttpResponseBase): 498 525 499 526 def _set_streaming_content(self, value): 500 527 # Ensure we can never iterate on "value" more than once. 528 if hasattr(value, "__aenter__") and hasattr(value, "__aexit__"): 529 self.__acmgr = value 530 self.is_acmgr = True 531 self.is_async = True 532 return 533 self.is_acmgr = False 501 534 try: 502 535 self._iterator = iter(value) 503 536 self.is_async = False … … class StreamingHttpResponse(HttpResponseBase): 507 540 if hasattr(value, "close"): 508 541 self._resource_closers.append(value.close) 509 542 543 async def __aenter__(self): 544 if self.is_acmgr: 545 self.__acmgr_ctx = self.streaming_content 546 self.__agen = await self.__acmgr_ctx.__aenter__() 547 else: 548 self.__agen = aiter(self) 549 return self.__agen 550 551 async def __aexit__(self, *exc_info): 552 try: 553 await self.__agen.aclose() 554 finally: 555 if self.is_acmgr: 556 return await self.__acmgr_ctx.__aexit__(*exc_info) 557 return None 558 510 559 def __iter__(self): 560 if self.is_acmgr: 561 raise IsAcmgrException( 562 "%s must be consumed via `async with`. Use `async with response` " 563 "and iterate the yielded content." % self.__class__.__name__ 564 ) 511 565 try: 512 566 return iter(self.streaming_content) 513 567 except TypeError: … … class StreamingHttpResponse(HttpResponseBase): 528 582 return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator))) 529 583 530 584 async def __aiter__(self): 585 if self.is_acmgr: 586 raise IsAcmgrException( 587 "%s must be consumed via `async with`. Use `async with response` " 588 "and iterate the yielded content." % self.__class__.__name__ 589 ) 531 590 try: 532 591 async for part in self.streaming_content: 533 592 yield part … … class StreamingHttpResponse(HttpResponseBase): 544 603 yield part 545 604 546 605 def getvalue(self): 606 if self.is_acmgr: 607 raise IsAcmgrException( 608 "%s must be consumed via `async with`. Use `async with response` " 609 "and iterate the yielded content." % self.__class__.__name__ 610 ) 547 611 return b"".join(self.streaming_content) 548 612 549 613 -
django/middleware/gzip.py
diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py index eb151d7..78b5739 100644
a b 1 from contextlib import asynccontextmanager 2 1 3 from django.utils.cache import patch_vary_headers 2 4 from django.utils.deprecation import MiddlewareMixin 3 5 from django.utils.regex_helper import _lazy_re_compile … … class GZipMiddleware(MiddlewareMixin): 31 33 return response 32 34 33 35 if response.streaming: 34 if response.is_async: 36 if response.is_acmgr: 37 original_acmgr = response.streaming_content 38 max_random_bytes = self.max_random_bytes 39 40 @asynccontextmanager 41 async def compressed_acmgr(): 42 async with original_acmgr as agen: 43 yield acompress_sequence( 44 agen, 45 max_random_bytes=max_random_bytes, 46 ) 47 48 response.streaming_content = compressed_acmgr() 49 elif response.is_async: 35 50 response.streaming_content = acompress_sequence( 36 51 response.streaming_content, 37 52 max_random_bytes=self.max_random_bytes, -
django/utils/text.py
diff --git a/django/utils/text.py b/django/utils/text.py index d1306f9..55bd6f5 100644
a b def compress_sequence(sequence, *, max_random_bytes=None): 390 390 391 391 392 392 async def acompress_sequence(sequence, *, max_random_bytes=None): 393 buf = StreamingBuffer() 394 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 395 with GzipFile( 396 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 397 ) as zfile: 398 # Output headers... 393 try: 394 buf = StreamingBuffer() 395 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 396 with GzipFile( 397 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 398 ) as zfile: 399 # Output headers... 400 yield buf.read() 401 async for item in sequence: 402 zfile.write(item) 403 zfile.flush() 404 data = buf.read() 405 if data: 406 yield data 399 407 yield buf.read() 400 async for item in sequence: 401 zfile.write(item) 402 zfile.flush() 403 data = buf.read() 404 if data: 405 yield data 406 yield buf.read() 408 finally: 409 if hasattr(sequence, "aclose"): 410 await sequence.aclose()
comment:9 by , 7 days ago
Attaching updated patch based on the approach discussed in PR #19364.
Summary of changes
django/http/response.py:
_set_streaming_contentdetects async context managers viahasattr(__aenter__, __aexit__), setsis_acmgr = Trueand stores the valuestreaming_contentproperty returns an@asynccontextmanagerwrappingmake_byteswhenis_acmgris TrueStreamingHttpResponsegains__aenter__/__aexit__: for acmgr responses, enters the streaming_content CM; for regular responses, returnsaiter(self).__aexit__only handles acmgr CM cleanup —aclosingin the ASGI handler handles iterator close__iter__,__aiter__, andgetvalueraiseIsAcmgrExceptionwhenis_acmgris True
django/core/handlers/asgi.py:
- Single code path:
async with response as agen, aclosing(agen) as content:— works for both regular streaming and acmgr responses
django/middleware/gzip.py:
- Adds
is_acmgrbranch that wraps the acmgr in a new@asynccontextmanagerfeeding intoacompress_sequence
django/utils/text.py:
acompress_sequencewraps its body intry/finallytoacloseits operand
Tests
- 13 new tests in
tests/httpwrappers/tests.pycovering: basic acmgr streaming, make_bytes coercion,IsAcmgrExceptionguards on__iter__/__aiter__/getvalue,__aexit__on error and break, non-acmgr__aenter__fallback, reassignment, single-producer TaskGroup+Queue, and multi-producer fan-in (news-and-weather pattern) - 1 new test in
tests/middleware/tests.pyfor gzip compression of acmgr streaming responses
Docs
- New "Streaming with TaskGroup" section in
docs/ref/request-response.txtwith fullnews_and_weatherexample, key points re PEP 789, and anyio note - Release note in
docs/releases/6.1.txt
Patch
-
django/core/handlers/asgi.py
diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py index 7ee5208..bcdafdc 100644
a b class ASGIHandler(base.BaseHandler): 318 318 ) 319 319 # Streaming responses need to be pinned to their iterator. 320 320 if response.streaming: 321 # - Consume via `__aiter__` and not `streaming_content` directly, 322 # to allow mapping of a sync iterator. 323 # - Use aclosing() when consuming aiter. See 324 # https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e 325 async with aclosing(aiter(response)) as content: 321 # Use aclosing() when consuming aiter. See 322 # https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e 323 async with response as agen, aclosing(agen) as content: 326 324 async for part in content: 327 325 for chunk, _ in self.chunk_bytes(part): 328 326 await send( -
django/http/__init__.py
diff --git a/django/http/__init__.py b/django/http/__init__.py index 628564e..2df8fa6 100644
a b from django.http.request import ( 8 8 ) 9 9 from django.http.response import ( 10 10 BadHeaderError, 11 IsAcmgrException, 11 12 FileResponse, 12 13 Http404, 13 14 HttpResponse, … … __all__ = [ 47 48 "HttpResponseServerError", 48 49 "Http404", 49 50 "BadHeaderError", 51 "IsAcmgrException", 50 52 "JsonResponse", 51 53 "FileResponse", 52 54 ] -
django/http/response.py
diff --git a/django/http/response.py b/django/http/response.py index 9bf0b14..c1c0c20 100644
a b import re 7 7 import sys 8 8 import time 9 9 import warnings 10 from contextlib import asynccontextmanager 10 11 from email.header import Header 11 12 from http.client import responses 12 13 from urllib.parse import urlsplit … … class BadHeaderError(ValueError): 104 105 pass 105 106 106 107 108 class IsAcmgrException(Exception): 109 pass 110 111 107 112 class HttpResponseBase: 108 113 """ 109 114 An HTTP response base class with dictionary-accessed headers. … … class StreamingHttpResponse(HttpResponseBase): 479 484 480 485 @property 481 486 def streaming_content(self): 487 if self.is_acmgr: 488 # Pull to lexical scope in case streaming_content is set again. 489 _acmgr = self.__acmgr 490 491 @asynccontextmanager 492 async def acmgr_wrapper(): 493 async with _acmgr as agen: 494 async def awrapper(): 495 try: 496 async for part in agen: 497 yield self.make_bytes(part) 498 finally: 499 if hasattr(agen, "aclose"): 500 await agen.aclose() 501 502 yield awrapper() 503 504 return acmgr_wrapper() 482 505 if self.is_async: 483 506 # pull to lexical scope to capture fixed reference in case 484 507 # streaming_content is set again later. 485 508 _iterator = self._iterator 486 509 487 510 async def awrapper(): 488 async for part in _iterator: 489 yield self.make_bytes(part) 511 try: 512 async for part in _iterator: 513 yield self.make_bytes(part) 514 finally: 515 if hasattr(_iterator, "aclose"): 516 await _iterator.aclose() 490 517 491 518 return awrapper() 492 519 else: … … class StreamingHttpResponse(HttpResponseBase): 498 525 499 526 def _set_streaming_content(self, value): 500 527 # Ensure we can never iterate on "value" more than once. 528 if hasattr(value, "__aenter__") and hasattr(value, "__aexit__"): 529 self.__acmgr = value 530 self.is_acmgr = True 531 self.is_async = True 532 return 533 self.is_acmgr = False 501 534 try: 502 535 self._iterator = iter(value) 503 536 self.is_async = False … … class StreamingHttpResponse(HttpResponseBase): 507 540 if hasattr(value, "close"): 508 541 self._resource_closers.append(value.close) 509 542 543 async def __aenter__(self): 544 if self.is_acmgr: 545 self.__acmgr_ctx = self.streaming_content 546 return await self.__acmgr_ctx.__aenter__() 547 return aiter(self) 548 549 async def __aexit__(self, *exc_info): 550 if self.is_acmgr: 551 return await self.__acmgr_ctx.__aexit__(*exc_info) 552 510 553 def __iter__(self): 554 if self.is_acmgr: 555 raise IsAcmgrException( 556 "%s must be consumed via `async with`. Use `async with response` " 557 "and iterate the yielded content." % self.__class__.__name__ 558 ) 511 559 try: 512 560 return iter(self.streaming_content) 513 561 except TypeError: … … class StreamingHttpResponse(HttpResponseBase): 528 576 return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator))) 529 577 530 578 async def __aiter__(self): 579 if self.is_acmgr: 580 raise IsAcmgrException( 581 "%s must be consumed via `async with`. Use `async with response` " 582 "and iterate the yielded content." % self.__class__.__name__ 583 ) 531 584 try: 532 585 async for part in self.streaming_content: 533 586 yield part … … class StreamingHttpResponse(HttpResponseBase): 544 597 yield part 545 598 546 599 def getvalue(self): 600 if self.is_acmgr: 601 raise IsAcmgrException( 602 "%s must be consumed via `async with`. Use `async with response` " 603 "and iterate the yielded content." % self.__class__.__name__ 604 ) 547 605 return b"".join(self.streaming_content) 548 606 549 607 -
django/middleware/gzip.py
diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py index eb151d7..78b5739 100644
a b 1 from contextlib import asynccontextmanager 2 1 3 from django.utils.cache import patch_vary_headers 2 4 from django.utils.deprecation import MiddlewareMixin 3 5 from django.utils.regex_helper import _lazy_re_compile … … class GZipMiddleware(MiddlewareMixin): 31 33 return response 32 34 33 35 if response.streaming: 34 if response.is_async: 36 if response.is_acmgr: 37 original_acmgr = response.streaming_content 38 max_random_bytes = self.max_random_bytes 39 40 @asynccontextmanager 41 async def compressed_acmgr(): 42 async with original_acmgr as agen: 43 yield acompress_sequence( 44 agen, 45 max_random_bytes=max_random_bytes, 46 ) 47 48 response.streaming_content = compressed_acmgr() 49 elif response.is_async: 35 50 response.streaming_content = acompress_sequence( 36 51 response.streaming_content, 37 52 max_random_bytes=self.max_random_bytes, -
django/utils/text.py
diff --git a/django/utils/text.py b/django/utils/text.py index d1306f9..55bd6f5 100644
a b def compress_sequence(sequence, *, max_random_bytes=None): 390 390 391 391 392 392 async def acompress_sequence(sequence, *, max_random_bytes=None): 393 buf = StreamingBuffer() 394 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 395 with GzipFile( 396 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 397 ) as zfile: 398 # Output headers... 393 try: 394 buf = StreamingBuffer() 395 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 396 with GzipFile( 397 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 398 ) as zfile: 399 # Output headers... 400 yield buf.read() 401 async for item in sequence: 402 zfile.write(item) 403 zfile.flush() 404 data = buf.read() 405 if data: 406 yield data 399 407 yield buf.read() 400 async for item in sequence: 401 zfile.write(item) 402 zfile.flush() 403 data = buf.read() 404 if data: 405 yield data 406 yield buf.read() 408 finally: 409 if hasattr(sequence, "aclose"): 410 await sequence.aclose() 407 411 408 412 409 413 # Expression to match some_token and some_token="with spaces" (and similarly -
docs/ref/request-response.txt
diff --git a/docs/ref/request-response.txt b/docs/ref/request-response.txt index ba60415..945683a 100644
a b is streaming. If you perform long-running operations in your view before 1424 1424 returning the ``StreamingHttpResponse`` object, then you may also want to 1425 1425 :ref:`handle disconnections in the view <async-handling-disconnect>` itself. 1426 1426 1427 .. _request-response-streaming-task-groups: 1428 1429 Streaming with ``TaskGroup`` 1430 ---------------------------- 1431 1432 .. versionadded:: 6.1 1433 1434 When serving under ASGI you may want to stream content that is produced by 1435 background tasks — for example, fan-in from multiple websocket connections or 1436 long-running computations. Python's :class:`asyncio.TaskGroup` is the natural 1437 way to manage those tasks, but an ``async for`` loop cannot ``yield`` inside a 1438 ``TaskGroup`` context (see :pep:`789` for details). 1439 1440 The solution is to pass an :func:`~contextlib.asynccontextmanager` instance as 1441 the ``streaming_content`` argument. The context manager's ``__aenter__`` sets up 1442 the ``TaskGroup`` (and any other resources), then ``yield`` s an async iterator 1443 that the framework will consume. When the response finishes — or the client 1444 disconnects — the context manager's ``__aexit__`` tears everything down in the 1445 correct order. 1446 1447 Here is a complete example that merges two websocket feeds into a single 1448 streaming response:: 1449 1450 import asyncio 1451 from collections.abc import AsyncGenerator 1452 from contextlib import asynccontextmanager 1453 1454 from django.http import StreamingHttpResponse 1455 1456 1457 @asynccontextmanager 1458 async def news_and_weather() -> AsyncGenerator[AsyncGenerator[bytes, None], None]: 1459 queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=1) 1460 1461 async def consume_ws(url: str) -> None: 1462 async with await connect_ws(url) as ws: 1463 async for msg in ws: 1464 await queue.put(msg) 1465 1466 async def run_producers() -> None: 1467 async with asyncio.TaskGroup() as tg: 1468 tg.create_task(consume_ws("ws://news.example.com/feed")) 1469 tg.create_task(consume_ws("ws://weather.example.com/feed")) 1470 await queue.put(None) # sentinel: all producers finished 1471 1472 async def items() -> AsyncGenerator[bytes, None]: 1473 while (item := await queue.get()) is not None: 1474 yield item 1475 1476 async with asyncio.TaskGroup() as tg: 1477 tg.create_task(run_producers()) 1478 yield items() 1479 1480 1481 async def news_and_weather_view(request): 1482 return StreamingHttpResponse(news_and_weather()) 1483 1484 Key points: 1485 1486 * ``news_and_weather()`` is decorated with 1487 :func:`~contextlib.asynccontextmanager`. Django detects this via 1488 ``hasattr(value, "__aenter__")`` in ``_set_streaming_content`` and sets 1489 :attr:`~StreamingHttpResponse.is_acmgr` to ``True``. 1490 1491 * The ``yield`` that suspends the frame happens inside 1492 ``@asynccontextmanager``, which correctly routes exceptions back to the 1493 ``TaskGroup`` cancel scope. The ``items()`` async generator yielded *out* to 1494 the caller never yields inside a cancel scope, so it is safe. 1495 1496 * Middleware that wraps ``streaming_content`` (such as 1497 :class:`~django.middleware.gzip.GZipMiddleware`) is aware of the 1498 ``is_acmgr`` flag and preserves the context-manager protocol. 1499 1500 * Attempting to iterate an ``is_acmgr`` response directly (via ``__iter__``, 1501 ``__aiter__``, or ``getvalue()``) raises 1502 :exc:`~django.http.IsAcmgrException`. The response must be consumed with 1503 ``async with``:: 1504 1505 async with response as agen: 1506 async for chunk in agen: 1507 ... 1508 1509 .. note:: 1510 1511 The `anyio <https://anyio.readthedocs.io/>`_ library provides 1512 :func:`anyio.create_task_group` and :func:`anyio.create_memory_object_stream`, 1513 which can be used as an alternative to :class:`asyncio.TaskGroup` and 1514 :class:`asyncio.Queue`. A ``MemoryObjectReceiveStream`` is an async 1515 iterable and can be yielded directly from the context manager. 1516 1517 .. attribute:: StreamingHttpResponse.is_acmgr 1518 1519 Boolean indicating whether the response was created with an async context 1520 manager as its streaming content. When ``True``, 1521 :attr:`~StreamingHttpResponse.is_async` is also ``True`` and the response 1522 must be consumed via ``async with``. 1523 1427 1524 ``FileResponse`` objects 1428 1525 ======================== 1429 1526 -
docs/releases/6.1.txt
diff --git a/docs/releases/6.1.txt b/docs/releases/6.1.txt index 00eaf53..790782c 100644
a b Pagination 323 323 Requests and Responses 324 324 ~~~~~~~~~~~~~~~~~~~~~~ 325 325 326 * :class:`~django.http.StreamingHttpResponse` now accepts an async context 327 manager as ``streaming_content``, enabling streaming patterns that require 328 :class:`asyncio.TaskGroup` or similar structured concurrency primitives. See 329 :ref:`request-response-streaming-task-groups` for details. 330 326 331 * :attr:`HttpRequest.multipart_parser_class <django.http.HttpRequest.multipart_parser_class>` 327 332 can now be customized to use a different multipart parser class. 328 333 -
tests/asgi/urls.py
diff --git a/tests/asgi/urls.py b/tests/asgi/urls.py index 0311cf3..b7fa206 100644
a b 1 1 import asyncio 2 import contextlib 2 3 import threading 3 4 import time 4 5 -
tests/httpwrappers/tests.py
diff --git a/tests/httpwrappers/tests.py b/tests/httpwrappers/tests.py index 3e8364e..a2add61 100644
a b 1 import asyncio 2 import contextlib 1 3 import copy 2 4 import json 3 5 import os 4 6 import pickle 7 import sys 5 8 import unittest 6 9 import uuid 7 10 … … from django.http import ( 16 19 HttpResponseNotModified, 17 20 HttpResponsePermanentRedirect, 18 21 HttpResponseRedirect, 22 IsAcmgrException, 19 23 JsonResponse, 20 24 QueryDict, 21 25 SimpleCookie, … … class StreamingHttpResponseTests(SimpleTestCase): 808 812 with self.assertWarnsMessage(Warning, msg): 809 813 self.assertEqual(b"hello", await anext(aiter(r))) 810 814 815 async def test_async_context_manager_streaming_response(self): 816 """StreamingHttpResponse accepts an async context manager as content.""" 817 818 @contextlib.asynccontextmanager 819 async def acmgr(): 820 async def iterator(): 821 yield b"hello" 822 yield b"world" 823 824 yield iterator() 825 826 r = StreamingHttpResponse(acmgr()) 827 self.assertTrue(r.is_acmgr) 828 self.assertTrue(r.is_async) 829 830 chunks = [] 831 async with r as agen, contextlib.aclosing(agen) as content: 832 async for chunk in content: 833 chunks.append(chunk) 834 self.assertEqual(chunks, [b"hello", b"world"]) 835 836 async def test_acmgr_make_bytes(self): 837 """Acmgr streaming content is coerced to bytes via make_bytes.""" 838 839 @contextlib.asynccontextmanager 840 async def acmgr(): 841 async def iterator(): 842 yield "hello" 843 yield "café" 844 845 yield iterator() 846 847 r = StreamingHttpResponse(acmgr()) 848 chunks = [] 849 async with r as agen, contextlib.aclosing(agen) as content: 850 async for chunk in content: 851 chunks.append(chunk) 852 self.assertEqual(chunks, [b"hello", b"caf\xc3\xa9"]) 853 854 async def test_acmgr_iter_raises(self): 855 """Iterating an acmgr response via __iter__ raises IsAcmgrException.""" 856 857 @contextlib.asynccontextmanager 858 async def acmgr(): 859 yield iter([b"hello"]) 860 861 r = StreamingHttpResponse(acmgr()) 862 msg = ( 863 "StreamingHttpResponse must be consumed via `async with`. " 864 "Use `async with response` and iterate the yielded content." 865 ) 866 with self.assertRaisesMessage(IsAcmgrException, msg): 867 iter(r) 868 869 async def test_acmgr_aiter_raises(self): 870 """Iterating an acmgr response via __aiter__ raises IsAcmgrException.""" 871 872 @contextlib.asynccontextmanager 873 async def acmgr(): 874 yield iter([b"hello"]) 875 876 r = StreamingHttpResponse(acmgr()) 877 msg = ( 878 "StreamingHttpResponse must be consumed via `async with`. " 879 "Use `async with response` and iterate the yielded content." 880 ) 881 with self.assertRaisesMessage(IsAcmgrException, msg): 882 async for _ in r: 883 pass 884 885 async def test_acmgr_getvalue_raises(self): 886 """Calling getvalue() on an acmgr response raises IsAcmgrException.""" 887 888 @contextlib.asynccontextmanager 889 async def acmgr(): 890 yield iter([b"hello"]) 891 892 r = StreamingHttpResponse(acmgr()) 893 with self.assertRaises(IsAcmgrException): 894 r.getvalue() 895 896 async def test_acmgr_aexit_called_on_error(self): 897 """The acmgr's __aexit__ is called even when iteration raises.""" 898 exited = [] 899 900 @contextlib.asynccontextmanager 901 async def acmgr(): 902 try: 903 async def iterator(): 904 yield b"hello" 905 raise ValueError("boom") 906 907 yield iterator() 908 finally: 909 exited.append(True) 910 911 r = StreamingHttpResponse(acmgr()) 912 with self.assertRaises(ValueError): 913 async with r as agen, contextlib.aclosing(agen) as content: 914 async for _ in content: 915 pass 916 self.assertEqual(exited, [True]) 917 918 async def test_acmgr_aexit_called_on_break(self): 919 """The acmgr cleans up when the consumer breaks out early.""" 920 exited = [] 921 922 @contextlib.asynccontextmanager 923 async def acmgr(): 924 try: 925 async def iterator(): 926 yield b"hello" 927 yield b"world" 928 929 yield iterator() 930 finally: 931 exited.append(True) 932 933 r = StreamingHttpResponse(acmgr()) 934 async with r as agen, contextlib.aclosing(agen) as content: 935 async for _ in content: 936 break 937 self.assertEqual(exited, [True]) 938 939 async def test_acmgr_non_acmgr_aenter_returns_aiter(self): 940 """__aenter__ on a non-acmgr async response returns aiter(self).""" 941 942 async def async_iter(): 943 yield b"hello" 944 945 r = StreamingHttpResponse(async_iter()) 946 self.assertFalse(r.is_acmgr) 947 agen = await r.__aenter__() 948 chunk = await agen.__anext__() 949 self.assertEqual(chunk, b"hello") 950 await r.__aexit__(None, None, None) 951 952 async def test_acmgr_reassign_streaming_content(self): 953 """Assigning a new acmgr to streaming_content replaces the old one.""" 954 955 @contextlib.asynccontextmanager 956 async def acmgr1(): 957 async def it(): 958 yield b"first" 959 960 yield it() 961 962 @contextlib.asynccontextmanager 963 async def acmgr2(): 964 async def it(): 965 yield b"second" 966 967 yield it() 968 969 r = StreamingHttpResponse(acmgr1()) 970 r.streaming_content = acmgr2() 971 self.assertTrue(r.is_acmgr) 972 973 chunks = [] 974 async with r as agen, contextlib.aclosing(agen) as content: 975 async for chunk in content: 976 chunks.append(chunk) 977 self.assertEqual(chunks, [b"second"]) 978 979 @unittest.skipUnless( 980 sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python 3.11+" 981 ) 982 async def test_taskgroup_queue_streaming_response(self): 983 """ 984 StreamingHttpResponse works with asyncio.TaskGroup and asyncio.Queue, 985 the canonical pattern for background-task-driven streaming. 986 """ 987 988 @contextlib.asynccontextmanager 989 async def taskgroup_stream(): 990 queue = asyncio.Queue() 991 992 async def producer(): 993 for chunk in [b"hello", b"world"]: 994 await queue.put(chunk) 995 await queue.put(None) # sentinel 996 997 async def generator(): 998 while True: 999 chunk = await queue.get() 1000 if chunk is None: 1001 break 1002 yield chunk 1003 1004 async with asyncio.TaskGroup() as tg: 1005 tg.create_task(producer()) 1006 yield generator() 1007 1008 r = StreamingHttpResponse(taskgroup_stream()) 1009 self.assertTrue(r.is_acmgr) 1010 1011 chunks = [] 1012 async with r as agen, contextlib.aclosing(agen) as content: 1013 async for chunk in content: 1014 chunks.append(chunk) 1015 self.assertEqual(chunks, [b"hello", b"world"]) 1016 1017 @unittest.skipUnless( 1018 sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python 3.11+" 1019 ) 1020 async def test_taskgroup_multiple_producers_streaming_response(self): 1021 """ 1022 StreamingHttpResponse works with multiple producer tasks feeding a 1023 shared queue, as in the news-and-weather websocket fan-in pattern. 1024 """ 1025 1026 @contextlib.asynccontextmanager 1027 async def multi_producer_stream(): 1028 queue = asyncio.Queue(maxsize=1) 1029 1030 async def consume(source): 1031 """Simulate a websocket consumer pushing to the shared queue.""" 1032 for msg in source: 1033 await queue.put(msg) 1034 1035 async def run_producers(): 1036 async with asyncio.TaskGroup() as tg: 1037 tg.create_task(consume([b"news-1", b"news-2"])) 1038 tg.create_task(consume([b"weather-1"])) 1039 await queue.put(None) # sentinel after all producers finish 1040 1041 async def items(): 1042 while (item := await queue.get()) is not None: 1043 yield item 1044 1045 async with asyncio.TaskGroup() as tg: 1046 tg.create_task(run_producers()) 1047 yield items() 1048 1049 r = StreamingHttpResponse(multi_producer_stream()) 1050 self.assertTrue(r.is_acmgr) 1051 1052 chunks = [] 1053 async with r as agen, contextlib.aclosing(agen) as content: 1054 async for chunk in content: 1055 chunks.append(chunk) 1056 # All three messages arrive (order depends on scheduling). 1057 self.assertEqual(sorted(chunks), [b"news-1", b"news-2", b"weather-1"]) 1058 811 1059 def test_text_attribute_error(self): 812 1060 r = StreamingHttpResponse(iter(["hello", "world"])) 813 1061 msg = "This %s instance has no `text` attribute." % r.__class__.__name__ -
tests/middleware/tests.py
diff --git a/tests/middleware/tests.py b/tests/middleware/tests.py index a61c4b1..4192ada 100644
a b 1 import contextlib 1 2 import gzip 2 3 import random 3 4 import re … … class GZipMiddlewareTest(SimpleTestCase): 936 937 self.assertEqual(r.get("Content-Encoding"), "gzip") 937 938 self.assertFalse(r.has_header("Content-Length")) 938 939 940 async def test_compress_acmgr_streaming_response(self): 941 """ 942 Compression is performed on responses with async context manager content. 943 """ 944 945 async def get_stream_response(request): 946 @contextlib.asynccontextmanager 947 async def acmgr(): 948 async def iterator(): 949 for chunk in self.sequence: 950 yield chunk 951 952 yield iterator() 953 954 resp = StreamingHttpResponse(acmgr()) 955 resp["Content-Type"] = "text/html; charset=UTF-8" 956 return resp 957 958 r = await GZipMiddleware(get_stream_response)(self.req) 959 self.assertTrue(r.is_acmgr) 960 chunks = [] 961 async with r as agen, contextlib.aclosing(agen) as content: 962 async for chunk in content: 963 chunks.append(chunk) 964 self.assertEqual( 965 self.decompress(b"".join(chunks)), 966 b"".join(self.sequence), 967 ) 968 self.assertEqual(r.get("Content-Encoding"), "gzip") 969 self.assertFalse(r.has_header("Content-Length")) 970 939 971 def test_compress_streaming_response_unicode(self): 940 972 """ 941 973 Compression is performed on responses with streaming Unicode content.
comment:10 by , 6 days ago
| Needs documentation: | unset |
|---|---|
| Owner: | changed from to |
Thanks. Reception on the new-features repo issue seems positive, so though it hasn't moved through any swimlanes there yet, I will speculatively move this one to Accepted assuming it likely will.