Opened 6 hours ago

Last modified 2 hours ago

#36916 assigned New feature

add support for streaming with TaskGroups — at Version 3

Reported by: Thomas Grainger Owned by:
Component: HTTP handling Version: dev
Severity: Normal Keywords: structured-concurrency, taskgroups
Cc: Thomas Grainger Triage Stage: Accepted
Has patch: yes Needs documentation: yes
Needs tests: no Patch needs improvement: no
Easy pickings: no UI/UX: no

Description (last modified by Thomas Grainger)

https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
https://github.com/django/new-features/issues/117

Feature Description

see https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4

I’d like to be able to write code that combines multiple streams of data:

async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
    async def gen() -> AsyncGenerator[bytes]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                async for msg in rx:
                    yield msg  # yield in async generator!! illegal inside TaskGroup!
    return StreamingHttpResponse(gen())

Problem

however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.

from useful_types import SupportsAnext

class AsyncIteratorBytesResource(Protocol):
    """
    all the machinery needed to safely run an AsyncGenerator[Bytes]

    (for django-stubs) this allows AsyncGenerator[bytes] but is less strict
    so would also allow a anyio MemoryObjectRecieveStream[bytes]]
    """

    async def __aiter__(self) -> SupportsAnext[bytes]: ...
    async def aclose(self) -> object: ...


async def news_and_weather(request: HttpRequest) -> StreamingAcmgrHttpResponse:
    @contextlib.asynccontextmanager
    async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                yield rx  # yield inside asynccontextmanager, permitted inside TaskGroup

    return StreamingAcmgrHttpResponse(acmgr_gen())

Implementation Suggestions

https://github.com/django/django/pull/19364/changes

Change History (3)

comment:1 by Amar, 5 hours ago

Description: modified (diff)
Owner: set to Amar
Status: newassigned

comment:2 by Amar, 5 hours ago

Owner: Amar removed
Status: assignednew

comment:3 by Thomas Grainger, 2 hours ago

Description: modified (diff)
Note: See TracTickets for help on using tickets.
Back to Top