Opened 4 weeks ago

Last modified 10 days ago

#36714 new Bug

Async signals lose ContextVar state due to use of asyncio.gather

Reported by: Mykhailo Havelia Owned by:
Component: HTTP handling Version: dev
Severity: Normal Keywords: asyncio, signals
Cc: Mykhailo Havelia, Carlton Gibson Triage Stage: Accepted
Has patch: no Needs documentation: no
Needs tests: no Patch needs improvement: no
Easy pickings: no UI/UX: no

Description

The natural way to share global, per-request state in asyncio is through contextvars. In Django, this is typically used via asgiref.local.Local. However, Django's async signal dispatch currently uses asyncio.gather, which internally creates new tasks (asyncio.create_task). This breaks context propagation, since each task gets its own copy of the context. As a result, it's impossible to set a global (context-based) variable inside a signal handler and have it shared with other signal handlers or parts of the same request/response cycle.

Example

from django.core import signals
from django.http import (
    HttpRequest,
    HttpResponse,
)
import contextvars
from django.urls import path


request_id = contextvars.ContextVar('request_id', default=None)


async def set_global_variable(*args, **kwargs):
    # set global variable
    request_id.set('request_id_value')
    print('get value', request_id.get())

signals.request_started.connect(set_global_variable)


async def index(request: HttpRequest) -> HttpResponse:
    # get global variable
    print('request_id', request_id.get())
    return HttpResponse(content=request_id.get())


urlpatterns = [path("", index), ]

result

get value request_id_value
request_id None

The value set inside the signal handler is lost, because the handler runs in a separate task with its own context.

If we are talking exactly about signals.request_started and signals.request_finished, they are typically used for setting up and cleaning up per-request resources. With asyncio.gather, cleanup logic that relies on ContextVar cannot work properly.

from django.core import signals
from django.http import (
    HttpRequest,
    HttpResponse,
)
import contextvars
from django.urls import path


db_connection = contextvars.ContextVar('db_connection', default=None)

async def get_or_create_connection():
    if not db_connection.get():
        db_connection.set('connection')

    return db_connection.get()

async def close_connection(*args, **kwargs):
    connection = db_connection.get()

    if not connection:
        print('cannot clean - connection does not exist')
        return

    print('close connection')
    connection.set(None)


signals.request_finished.connect(close_connection)


async def index(request: HttpRequest) -> HttpResponse:
    # create connection inside handler
    connection = await get_or_create_connection()
    # await get_data(connection)
    return HttpResponse(content="ok")


urlpatterns = [path("", index), ]

result

cannot clean - connection does not exist

Expected behavior

Signal handlers should run in the same async context as the request, preserving ContextVar and asgiref.local.Local state.

Proposed solution

Signal:

Dispatch async signal handlers sequentially (or via direct await) instead of using asyncio.gather, so that the existing execution context is preserved throughout the request lifecycle. Yes, this change removes parallelism, but that shouldn’t be a major concern. The only real benefit of running signal handlers in parallel would be for IO-bound operations - yet in most cases, these handlers interact with the same database connection. Since database operations aren’t truly parallel under the hood, the performance gain from asyncio.gather is negligible.

ASGIHandler:

async def handle(self, scope, receive, send):
    ...
    await signals.request_started.asend(sender=self.__class__, scope=scope)
    
    tasks = [
        asyncio.create_task(self.listen_for_disconnect(receive)),
        asyncio.create_task(process_request(request, send)),
    ]

    ...

    await signals.request_finished.asend(sender=self.__class__)

Global variables created inside process_request are not visible to request_finished, because each task runs in a separate context. We can try using contextvars.copy_context() to preserve and share the same context between tasks and signal handlers.

async def handle(self, scope, receive, send):
    ...
    await signals.request_started.asend(sender=self.__class__, scope=scope)

    ctx = contextvars.copy_context()
    
    tasks = [
        asyncio.create_task(self.listen_for_disconnect(receive)),
        asyncio.create_task(process_request(request, send), context=ctx),
    ]

    ...

    await asyncio.create_task(signals.request_finished.asend(sender=self.__class__), context=ctx)

Here is a simple example

import asyncio
import contextvars

global_state = contextvars.ContextVar('stage', default=0)


async def inc():
    value = global_state.get()
    print('value: ', value)
    global_state.set(value + 1)


async def main():
    await asyncio.create_task(inc())
    await asyncio.create_task(inc())
    await asyncio.create_task(inc())
    
    print('first: ', global_state.get())
    
    ctx = contextvars.copy_context()
    
    await asyncio.create_task(inc(), context=ctx)
    await asyncio.create_task(inc(), context=ctx)
    await asyncio.create_task(inc(), context=ctx)
    
    print('second: ', ctx.get(global_state))


await main()

result

value:  0
value:  0
value:  0
first:  0
value:  0
value:  1
value:  2
second:  3

Change History (20)

comment:1 by Carlton Gibson, 4 weeks ago

Cc: Carlton Gibson added
Triage Stage: UnreviewedAccepted

Hi Mykhailo.

Thanks for the report. (And thanks for writing it up so nicely.)

We can try using contextvars.copy_context() to preserve and share the same context between tasks and signal handlers.

So, yes — passing the context explicitly to the tasks would be my first thought. The whole idea of that API is to allow control over this, when trying to keep things structured, no?

Then we should resolve #36315 first no? TaskGroup is the newer preferred API. Do you fancy reviewing that?

I'd be a bit sad if we had to lose the concurrent async dispatch on signals. (I grant your point that it may not be a big gain in performance necessarily, but ... 🙂 )

comment:2 by Varun Kasyap Pentamaraju, 4 weeks ago

Owner: set to Varun Kasyap Pentamaraju
Status: newassigned

comment:3 by Varun Kasyap Pentamaraju, 4 weeks ago

Owner: Varun Kasyap Pentamaraju removed
Status: assignednew

in reply to:  1 comment:4 by Mykhailo Havelia, 4 weeks ago

Replying to Carlton Gibson:

The whole idea of that API is to allow control over this, when trying to keep things structured, no?

🙂 If we’re talking about ASGIHandler.handler, then yes. It’s easy to implement and would unblock my current work on the async backend.

Then we should resolve #36315 first no?

I think our priority should be to focus first on the:

  • database cursor
  • ORM
  • middlewares,
  • signals
  • cache

to ensure proper async handling first. After that, we can focus on optimization and improvements since optimization often adds complexity, and there are still some core challenges to solve in the async port 😔.

That said, if I can prepare a small MR with minimal changes that could be reviewed and merged quickly, I’d really prefer that approach, especially since #36315 has been pending for over six months. It would help unblock my current work.

If that’s not possible, I'm more than happy to contribute to #36315 to help move it forward and unblock these changes together.

What do you think?

The whole idea of that API is to allow control over this, when trying to keep things structured, no?

If we're talking about using asyncio.gather inside signals, things get a bit more complicated. I'm not entirely sure how context sharing behaves with parallel tasks yet (I'll need to look into it). We might need to do something similar to what asgiref does, for example:

def _restore_context(context: contextvars.Context) -> None:
    cvalue = context.get(cvar)
    try:
        if cvar.get() != cvalue:
            cvar.set(cvalue)
    except LookupError:
        cvar.set(cvalue)

Manually handling context like this can easily lead to unexpected behavior. For instance, overwriting a global variable when we shouldn't. So we'll need to test this carefully, especially with sync_to_async and async_to_sync. It might take a fair bit of time to review and verify that everything works as expected.

It's much easier to delete it. What do you think?

comment:5 by Carlton Gibson, 4 weeks ago

There's already two small PRs for #36315. They look good, and just need a review, and confirmation. (They're sat on my list. 🤹) — My point was that we should resolve those and then implement the change here on top of that, rather than making a separate change here, and then having to redo it.

in reply to:  5 comment:6 by Mykhailo Havelia, 4 weeks ago

Replying to Carlton Gibson:

There's already two small PRs for #36315. They look good, and just need a review, and confirmation. (They're sat on my list. 🤹) — My point was that we should resolve those and then implement the change here on top of that, rather than making a separate change here, and then having to redo it.

Got it. I’ll take a look at them tonight and leave a review. Hopefully, that helps speed up the process.

comment:7 by Jacob Walls, 3 weeks ago

Type: UncategorizedBug

comment:8 by Carlton Gibson, 3 weeks ago

I reviewed #36315 and marked it as Ready for checkin, so hopefully that can progress to clear the path for work here.

in reply to:  8 comment:9 by Mykhailo Havelia, 3 weeks ago

Replying to Carlton Gibson:

I reviewed #36315 and marked it as Ready for checkin, so hopefully that can progress to clear the path for work here.

Test MR: https://github.com/Arfey/django/pull/3/files

I used the latest changes and prepared a fix for context sharing. At this point, we have:

  • Successful context sharing between async handlers for asend/send
  • Successful context sharing between sync handlers for asend/send
  • Successful cross-sharing between async/sync handlers for send

Current issue:

  • Cross-sharing between async/sync handlers for asend

The difference is in how send and asend execute handlers. send runs sync handlers first, and then runs async handlers in parallel and this works fine. asend tries to run all handlers "in parallel". In this case, sync_to_async copies the context instead of using the existing one, so context sharing breaks. I propose splitting the execution of sync and async handlers for asend as well.

Current code:

await _gather(
    sync_send(),
    _gather(
        *(
            receiver(signal=self, sender=sender, **named)
            for receiver in async_receivers
        )
    ),
)

Proposed adjustment:

await sync_send()
await _gather(
    *(
        receiver(signal=self, sender=sender, **named)
        for receiver in async_receivers
    )
)

What do you think?

comment:10 by Carlton Gibson, 3 weeks ago

I continue to think that it would be a shame to not at least try to maintain the concurrent signal dispatch here. We should be able to pass the current context to the sync_send task, using the context parameter, AFAICS. I'd like us to at least try that. If there's some blocking reason why that's not feasible then (sure) we can look at running the sync signals first.

in reply to:  10 comment:11 by Mykhailo Havelia, 3 weeks ago

Replying to Carlton Gibson:

I continue to think that it would be a shame to not at least try to maintain the concurrent signal dispatch here. We should be able to pass the current context to the sync_send task, using the context parameter, AFAICS. I'd like us to at least try that. If there's some blocking reason why that's not feasible then (sure) we can look at running the sync signals first.

This would require changes to sync_to_async. It might look something like:

context = contextvars.copy_context()

@sync_to_async(context=ctx)
def foo():
    ...

If this approach is acceptable, I can prepare an MR.

comment:12 by Carlton Gibson, 3 weeks ago

sync_to_async already propagates the parent context. See tests. Is it not the case that the asyncio.TaskGroup() call in _gather needs to take the context parameter? 🤔

I think this is going to be easier to look at in a PR than a series of comments on the issue here.

in reply to:  12 comment:13 by Mykhailo Havelia, 3 weeks ago

Replying to Carlton Gibson:

sync_to_async already propagates the parent context. See tests. Is it not the case that the asyncio.TaskGroup() call in _gather needs to take the context parameter? 🤔

I think this is going to be easier to look at in a PR than a series of comments on the issue here.

I’ve already attached the link to the MR above: https://github.com/Arfey/django/pull/3/files. Did you have a chance to look at it? 🙂

sync_to_async does support modifying contextvars, but what we need is the ability to share the same context between tasks running in parallel. There’s a test included in my MR that demonstrates exactly what behavior we have to achieve

async def test_asend_correct_contextvars_sharing_mix_receivers(self):
    handler1 = self.CtxSyncHandler(self.ctx_var)
    handler2 = self.CtxAsyncHandler(self.ctx_var)
    signal = dispatch.Signal()
    signal.connect(handler1)
    signal.connect(handler2)

    # set custom value outer signal
    self.ctx_var.set(1)

    await signal.asend(self.__class__)

    self.assertEqual(len(handler1.values), 1)
    self.assertEqual(len(handler2.values), 1)
    self.assertEqual(
        sorted([*handler1.values, *handler2.values]),
        [2, 3]
    )
    self.assertEqual(self.ctx_var.get(), 3)

Right now we don't end up with all 3. We only get 2, because handler1 and handler2 don't share context with each other. The reason is that sync_to_async copies the context (see: https://github.com/django/asgiref/blob/2138f0317d79cedd065571447ae0a7571989550e/asgiref/sync.py#L483), but in our case we need them to share it.

comment:14 by Carlton Gibson, 3 weeks ago

OK, so first step is an issue on asgiref showing the necessary adjust there, yes?

in reply to:  14 comment:15 by Mykhailo Havelia, 3 weeks ago

Replying to Carlton Gibson:

OK, so first step is an issue on asgiref showing the necessary adjust there, yes?

If we want to try keep async running then yes.

comment:16 by Carlton Gibson, 3 weeks ago

Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!

in reply to:  16 comment:17 by Mykhailo Havelia, 3 weeks ago

Replying to Carlton Gibson:

Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!

Got it. I'll prepare the MR soon.

in reply to:  16 ; comment:18 by Mykhailo Havelia, 3 weeks ago

Replying to Carlton Gibson:

Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!

Done https://github.com/django/asgiref/pull/536 😌

in reply to:  18 comment:19 by Mykhailo Havelia, 3 weeks ago

Replying to Mykhailo Havelia:

Replying to Carlton Gibson:

Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!

Done https://github.com/django/asgiref/pull/536 😌

I tested the updated asgiref with Django signals using my branch

pip install git+https://github.com/Arfey/asgiref.git@feat/added-custom-context-parameter-for-sync-to-async#egg=asgiref

Everything works correctly:

./runtests.py signals
Testing against Django installed in ... with up to 10 processes
Found 28 test(s).
............................
----------------------------------------------------------------------
Ran 28 tests in 0.333s

in reply to:  16 comment:20 by Mykhailo Havelia, 10 days ago

Replying to Carlton Gibson:

Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!

I've prepared a patch — https://github.com/django/django/pull/20288 — but ran into a problem 😔
I didn't run the full test suite and overlooked an issue in asgi.tests.ASGITest.test_assert_in_listen_for_disconnect. That test uses ApplicationCommunicator, which internally does:

self._future = contextvars.Context().run(
    asyncio.create_task,
    self.application(
        self.scope, self.input_queue.get, self.output_queue.put
    ),
)

As a result, we hit this error:

RuntimeError: cannot enter context: <_contextvars.Context object at 0x104939040> is already entered

So ApplicationCommunicator is already setting a context, and we can't override it using asyncio.create_task(coro(), context=ctx). I'm looking for a workaround, but it seems like this may require another asgiref release 😔

If you have any ideas on how to handle this, please let me know.

Note: See TracTickets for help on using tickets.
Back to Top