﻿id	summary	reporter	owner	description	type	status	component	version	severity	resolution	keywords	cc	stage	has_patch	needs_docs	needs_tests	needs_better_patch	easy	ui_ux
36916	add support for streaming with TaskGroups	Thomas Grainger		"https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
https://github.com/django/new-features/issues/117


=== Feature Description ===

see https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4

I’d like to be able to write code that combines multiple streams of data:


{{{
async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
    async def gen() -> AsyncGenerator[bytes]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, ""ws://example.com/news"", tx.clone())
                tg.start_soon(push, ""ws://example.com/weather"", tx.clone())
                tx.close()
                async for msg in rx:
                    yield msg  # yield in async generator!! illegal inside TaskGroup!
    return StreamingHttpResponse(gen())
}}}



=== Problem ===

however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.



{{{
from useful_types import SupportsAnext

class AsyncIteratorBytesResource(Protocol):
    """"""
    all the machinery needed to safely run an AsyncGenerator[Bytes]

    (for django-stubs) this allows AsyncGenerator[bytes] but is less strict
    so would also allow a anyio MemoryObjectRecieveStream[bytes]]
    """"""

    async def __aiter__(self) -> SupportsAnext[bytes]: ...
    async def aclose(self) -> object: ...


async def news_and_weather(request: HttpRequest) -> StreamingAcmgrHttpResponse:
    @contextlib.asynccontextmanager
    async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, ""ws://example.com/news"", tx.clone())
                tg.start_soon(push, ""ws://example.com/weather"", tx.clone())
                tx.close()
                yield rx  # yield inside asynccontextmanager, permitted inside TaskGroup

    return StreamingAcmgrHttpResponse(acmgr_gen())
}}}


=== Implementation Suggestions ===

https://github.com/django/django/pull/19364/changes"	Uncategorized	new	HTTP handling	dev	Normal		structured-concurrency, taskgroups	Thomas Grainger	Unreviewed	1	0	0	0	1	0
