Opened 3 weeks ago
Last modified 3 weeks ago
#36916 assigned New feature
Add support for streaming with TaskGroups
| Reported by: | Thomas Grainger | Owned by: | Thomas Grainger |
|---|---|---|---|
| Component: | HTTP handling | Version: | dev |
| Severity: | Normal | Keywords: | structured-concurrency, taskgroups |
| Cc: | Thomas Grainger | Triage Stage: | Accepted |
| Has patch: | yes | Needs documentation: | yes |
| Needs tests: | no | Patch needs improvement: | no |
| Easy pickings: | no | UI/UX: | no |
Description (last modified by )
https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
https://github.com/django/new-features/issues/117
Feature Description
see https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
I’d like to be able to write code that combines multiple streams of data:
async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
async def gen() -> AsyncGenerator[bytes]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather", tx.clone())
tx.close()
async for msg in rx:
yield msg # yield in async generator!! illegal inside TaskGroup!
return StreamingHttpResponse(gen())
Problem
however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.
from useful_types import SupportsAnext
class AsyncIteratorBytesResource(Protocol):
"""
all the machinery needed to safely run an AsyncGenerator[Bytes]
(for django-stubs) this allows AsyncGenerator[bytes] but is less strict
so would also allow a anyio MemoryObjectRecieveStream[bytes]]
"""
async def __aiter__(self) -> SupportsAnext[bytes]: ...
async def aclose(self) -> object: ...
async def news_and_weather(request: HttpRequest) -> StreamingAcmgrHttpResponse:
@contextlib.asynccontextmanager
async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather", tx.clone())
tx.close()
yield rx # yield inside asynccontextmanager, permitted inside TaskGroup
return StreamingAcmgrHttpResponse(acmgr_gen())
Implementation Suggestions
Change History (5)
comment:1 by , 3 weeks ago
| Description: | modified (diff) |
|---|---|
| Owner: | set to |
| Status: | new → assigned |
comment:2 by , 3 weeks ago
| Owner: | removed |
|---|---|
| Status: | assigned → new |
comment:3 by , 3 weeks ago
| Description: | modified (diff) |
|---|
comment:4 by , 3 weeks ago
| Needs documentation: | set |
|---|
comment:5 by , 3 weeks ago
| Easy pickings: | unset |
|---|---|
| Owner: | set to |
| Status: | new → assigned |
| Summary: | add support for streaming with TaskGroups → Add support for streaming with TaskGroups |
| Triage Stage: | Unreviewed → Accepted |
| Type: | Uncategorized → New feature |
Thanks. Reception on the new-features repo issue seems positive, so though it hasn't moved through any swimlanes there yet, I will speculatively move this one to Accepted assuming it likely will.