Opened 7 weeks ago
Last modified 11 hours ago
#36714 assigned Bug
Async signals lose ContextVar state due to use of asyncio.gather
| Reported by: | Mykhailo Havelia | Owned by: | Mykhailo Havelia |
|---|---|---|---|
| Component: | Core (Other) | Version: | dev |
| Severity: | Normal | Keywords: | asyncio, signals |
| Cc: | Mykhailo Havelia, Carlton Gibson | Triage Stage: | Ready for checkin |
| Has patch: | yes | Needs documentation: | no |
| Needs tests: | no | Patch needs improvement: | no |
| Easy pickings: | no | UI/UX: | no |
Description
The natural way to share global, per-request state in asyncio is through contextvars. In Django, this is typically used via asgiref.local.Local. However, Django's async signal dispatch currently uses asyncio.gather, which internally creates new tasks (asyncio.create_task). This breaks context propagation, since each task gets its own copy of the context. As a result, it's impossible to set a global (context-based) variable inside a signal handler and have it shared with other signal handlers or parts of the same request/response cycle.
Example
from django.core import signals
from django.http import (
HttpRequest,
HttpResponse,
)
import contextvars
from django.urls import path
request_id = contextvars.ContextVar('request_id', default=None)
async def set_global_variable(*args, **kwargs):
# set global variable
request_id.set('request_id_value')
print('get value', request_id.get())
signals.request_started.connect(set_global_variable)
async def index(request: HttpRequest) -> HttpResponse:
# get global variable
print('request_id', request_id.get())
return HttpResponse(content=request_id.get())
urlpatterns = [path("", index), ]
result
get value request_id_value request_id None
The value set inside the signal handler is lost, because the handler runs in a separate task with its own context.
If we are talking exactly about signals.request_started and signals.request_finished, they are typically used for setting up and cleaning up per-request resources. With asyncio.gather, cleanup logic that relies on ContextVar cannot work properly.
from django.core import signals
from django.http import (
HttpRequest,
HttpResponse,
)
import contextvars
from django.urls import path
db_connection = contextvars.ContextVar('db_connection', default=None)
async def get_or_create_connection():
if not db_connection.get():
db_connection.set('connection')
return db_connection.get()
async def close_connection(*args, **kwargs):
connection = db_connection.get()
if not connection:
print('cannot clean - connection does not exist')
return
print('close connection')
connection.set(None)
signals.request_finished.connect(close_connection)
async def index(request: HttpRequest) -> HttpResponse:
# create connection inside handler
connection = await get_or_create_connection()
# await get_data(connection)
return HttpResponse(content="ok")
urlpatterns = [path("", index), ]
result
cannot clean - connection does not exist
Expected behavior
Signal handlers should run in the same async context as the request, preserving ContextVar and asgiref.local.Local state.
Proposed solution
Signal:
Dispatch async signal handlers sequentially (or via direct await) instead of using asyncio.gather, so that the existing execution context is preserved throughout the request lifecycle. Yes, this change removes parallelism, but that shouldn’t be a major concern. The only real benefit of running signal handlers in parallel would be for IO-bound operations - yet in most cases, these handlers interact with the same database connection. Since database operations aren’t truly parallel under the hood, the performance gain from asyncio.gather is negligible.
ASGIHandler:
async def handle(self, scope, receive, send):
...
await signals.request_started.asend(sender=self.__class__, scope=scope)
tasks = [
asyncio.create_task(self.listen_for_disconnect(receive)),
asyncio.create_task(process_request(request, send)),
]
...
await signals.request_finished.asend(sender=self.__class__)
Global variables created inside process_request are not visible to request_finished, because each task runs in a separate context. We can try using contextvars.copy_context() to preserve and share the same context between tasks and signal handlers.
async def handle(self, scope, receive, send):
...
await signals.request_started.asend(sender=self.__class__, scope=scope)
ctx = contextvars.copy_context()
tasks = [
asyncio.create_task(self.listen_for_disconnect(receive)),
asyncio.create_task(process_request(request, send), context=ctx),
]
...
await asyncio.create_task(signals.request_finished.asend(sender=self.__class__), context=ctx)
Here is a simple example
import asyncio
import contextvars
global_state = contextvars.ContextVar('stage', default=0)
async def inc():
value = global_state.get()
print('value: ', value)
global_state.set(value + 1)
async def main():
await asyncio.create_task(inc())
await asyncio.create_task(inc())
await asyncio.create_task(inc())
print('first: ', global_state.get())
ctx = contextvars.copy_context()
await asyncio.create_task(inc(), context=ctx)
await asyncio.create_task(inc(), context=ctx)
await asyncio.create_task(inc(), context=ctx)
print('second: ', ctx.get(global_state))
await main()
result
value: 0 value: 0 value: 0 first: 0 value: 0 value: 1 value: 2 second: 3
Change History (28)
follow-up: 4 comment:1 by , 7 weeks ago
| Cc: | added |
|---|---|
| Triage Stage: | Unreviewed → Accepted |
comment:2 by , 7 weeks ago
| Owner: | set to |
|---|---|
| Status: | new → assigned |
comment:3 by , 7 weeks ago
| Owner: | removed |
|---|---|
| Status: | assigned → new |
comment:4 by , 7 weeks ago
Replying to Carlton Gibson:
The whole idea of that API is to allow control over this, when trying to keep things structured, no?
🙂 If we’re talking about ASGIHandler.handler, then yes. It’s easy to implement and would unblock my current work on the async backend.
Then we should resolve #36315 first no?
I think our priority should be to focus first on the:
- database cursor
- ORM
- middlewares,
- signals
- cache
to ensure proper async handling first. After that, we can focus on optimization and improvements since optimization often adds complexity, and there are still some core challenges to solve in the async port 😔.
That said, if I can prepare a small MR with minimal changes that could be reviewed and merged quickly, I’d really prefer that approach, especially since #36315 has been pending for over six months. It would help unblock my current work.
If that’s not possible, I'm more than happy to contribute to #36315 to help move it forward and unblock these changes together.
What do you think?
The whole idea of that API is to allow control over this, when trying to keep things structured, no?
If we're talking about using asyncio.gather inside signals, things get a bit more complicated. I'm not entirely sure how context sharing behaves with parallel tasks yet (I'll need to look into it). We might need to do something similar to what asgiref does, for example:
def _restore_context(context: contextvars.Context) -> None:
cvalue = context.get(cvar)
try:
if cvar.get() != cvalue:
cvar.set(cvalue)
except LookupError:
cvar.set(cvalue)
Manually handling context like this can easily lead to unexpected behavior. For instance, overwriting a global variable when we shouldn't. So we'll need to test this carefully, especially with sync_to_async and async_to_sync. It might take a fair bit of time to review and verify that everything works as expected.
It's much easier to delete it. What do you think?
follow-up: 6 comment:5 by , 7 weeks ago
There's already two small PRs for #36315. They look good, and just need a review, and confirmation. (They're sat on my list. 🤹) — My point was that we should resolve those and then implement the change here on top of that, rather than making a separate change here, and then having to redo it.
comment:6 by , 7 weeks ago
Replying to Carlton Gibson:
There's already two small PRs for #36315. They look good, and just need a review, and confirmation. (They're sat on my list. 🤹) — My point was that we should resolve those and then implement the change here on top of that, rather than making a separate change here, and then having to redo it.
Got it. I’ll take a look at them tonight and leave a review. Hopefully, that helps speed up the process.
comment:7 by , 7 weeks ago
| Type: | Uncategorized → Bug |
|---|
follow-up: 9 comment:8 by , 7 weeks ago
I reviewed #36315 and marked it as Ready for checkin, so hopefully that can progress to clear the path for work here.
comment:9 by , 6 weeks ago
Replying to Carlton Gibson:
I reviewed #36315 and marked it as Ready for checkin, so hopefully that can progress to clear the path for work here.
Test MR: https://github.com/Arfey/django/pull/3/files
I used the latest changes and prepared a fix for context sharing. At this point, we have:
- Successful context sharing between async handlers for asend/send
- Successful context sharing between sync handlers for asend/send
- Successful cross-sharing between async/sync handlers for send
Current issue:
- Cross-sharing between async/sync handlers for asend
The difference is in how send and asend execute handlers. send runs sync handlers first, and then runs async handlers in parallel and this works fine. asend tries to run all handlers "in parallel". In this case, sync_to_async copies the context instead of using the existing one, so context sharing breaks. I propose splitting the execution of sync and async handlers for asend as well.
Current code:
await _gather(
sync_send(),
_gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
),
)
Proposed adjustment:
await sync_send()
await _gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
)
What do you think?
follow-ups: 11 21 comment:10 by , 6 weeks ago
I continue to think that it would be a shame to not at least try to maintain the concurrent signal dispatch here. We should be able to pass the current context to the sync_send task, using the context parameter, AFAICS. I'd like us to at least try that. If there's some blocking reason why that's not feasible then (sure) we can look at running the sync signals first.
comment:11 by , 6 weeks ago
Replying to Carlton Gibson:
I continue to think that it would be a shame to not at least try to maintain the concurrent signal dispatch here. We should be able to pass the current context to the sync_send task, using the context parameter, AFAICS. I'd like us to at least try that. If there's some blocking reason why that's not feasible then (sure) we can look at running the sync signals first.
This would require changes to sync_to_async. It might look something like:
context = contextvars.copy_context()
@sync_to_async(context=ctx)
def foo():
...
If this approach is acceptable, I can prepare an MR.
follow-up: 13 comment:12 by , 6 weeks ago
sync_to_async already propagates the parent context. See tests. Is it not the case that the asyncio.TaskGroup() call in _gather needs to take the context parameter? 🤔
I think this is going to be easier to look at in a PR than a series of comments on the issue here.
comment:13 by , 6 weeks ago
Replying to Carlton Gibson:
sync_to_async already propagates the parent context. See tests. Is it not the case that the
asyncio.TaskGroup()call in_gatherneeds to take the context parameter? 🤔
I think this is going to be easier to look at in a PR than a series of comments on the issue here.
I’ve already attached the link to the MR above: https://github.com/Arfey/django/pull/3/files. Did you have a chance to look at it? 🙂
sync_to_async does support modifying contextvars, but what we need is the ability to share the same context between tasks running in parallel. There’s a test included in my MR that demonstrates exactly what behavior we have to achieve
async def test_asend_correct_contextvars_sharing_mix_receivers(self):
handler1 = self.CtxSyncHandler(self.ctx_var)
handler2 = self.CtxAsyncHandler(self.ctx_var)
signal = dispatch.Signal()
signal.connect(handler1)
signal.connect(handler2)
# set custom value outer signal
self.ctx_var.set(1)
await signal.asend(self.__class__)
self.assertEqual(len(handler1.values), 1)
self.assertEqual(len(handler2.values), 1)
self.assertEqual(
sorted([*handler1.values, *handler2.values]),
[2, 3]
)
self.assertEqual(self.ctx_var.get(), 3)
Right now we don't end up with all 3. We only get 2, because handler1 and handler2 don't share context with each other. The reason is that sync_to_async copies the context (see: https://github.com/django/asgiref/blob/2138f0317d79cedd065571447ae0a7571989550e/asgiref/sync.py#L483), but in our case we need them to share it.
follow-up: 15 comment:14 by , 6 weeks ago
OK, so first step is an issue on asgiref showing the necessary adjust there, yes?
comment:15 by , 6 weeks ago
Replying to Carlton Gibson:
OK, so first step is an issue on asgiref showing the necessary adjust there, yes?
If we want to try keep async running then yes.
follow-ups: 17 18 20 comment:16 by , 6 weeks ago
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
comment:17 by , 6 weeks ago
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
Got it. I'll prepare the MR soon.
follow-up: 19 comment:18 by , 6 weeks ago
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
comment:19 by , 6 weeks ago
Replying to Mykhailo Havelia:
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
I tested the updated asgiref with Django signals using my branch
pip install git+https://github.com/Arfey/asgiref.git@feat/added-custom-context-parameter-for-sync-to-async#egg=asgiref
Everything works correctly:
./runtests.py signals Testing against Django installed in ... with up to 10 processes Found 28 test(s). ............................ ---------------------------------------------------------------------- Ran 28 tests in 0.333s
comment:20 by , 5 weeks ago
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
I've prepared a patch — https://github.com/django/django/pull/20288 — but ran into a problem 😔
I didn't run the full test suite and overlooked an issue in asgi.tests.ASGITest.test_assert_in_listen_for_disconnect. That test uses ApplicationCommunicator, which internally does:
self._future = contextvars.Context().run(
asyncio.create_task,
self.application(
self.scope, self.input_queue.get, self.output_queue.put
),
)
As a result, we hit this error:
RuntimeError: cannot enter context: <_contextvars.Context object at 0x104939040> is already entered
So ApplicationCommunicator is already setting a context, and we can't override it using asyncio.create_task(coro(), context=ctx). I'm looking for a workaround, but it seems like this may require another asgiref release 😔
If you have any ideas on how to handle this, please let me know.
follow-up: 22 comment:21 by , 3 weeks ago
Replying to Carlton Gibson:
Hi ✋
I continue to think that it would be a shame to not at least try to maintain the concurrent signal dispatch here.
I've spent quite a bit of time trying to make this work, and I believe I now have enough evidence that, unfortunately, we can't do it 😅.
Consider the example below:
import asyncio
import contextvars
import time
from asgiref.sync import async_to_sync, sync_to_async
async def sync_to_async_version():
context = contextvars.copy_context()
def sync_fn():
# some cpu-bound task
time.sleep(0.1)
async def async_fn():
# to enforce release an event loop (or replace with time.sleep(0.1))
asyncio.sleep(0)
asyncio.sleep(0)
async with asyncio.TaskGroup() as tg:
tg.create_task(sync_to_async(context=context)(sync_fn)())
tg.create_task(async_fn(), context=context)
async_to_sync(sync_to_async_version)()
Running this results in (probably 2-3 times to reproduce):
RuntimeError: cannot enter context: <_contextvars.Context object at 0x7f08aa52c100> is already entered
Why this happens
The core issue is that a single contextvars.Context cannot be entered more than once at the same time. Under normal circumstances, this error does not occur because the asyncio event loop is cooperative and single-threaded. Tasks are interleaved, not executed in parallel. Internally, the loop drives each coroutine step by step like this:
ctx.run(coro.send(None))
(see: asyncio/tasks.py)
Because everything runs on the same thread, the context is entered and exited sequentially, so no conflict occurs.
In our case, however, asgiref.sync_to_async uses a ThreadPoolExecutor to run the synchronous function on a separate OS thread. Here’s the problematic sequence:
- The thread pool starts executing `ctx.run(long_sync_task)`, and the Context becomes entered on the worker thread.
- While that thread is still running, the OS scheduler switches back to the event loop thread.
- The event loop continues executing the coroutine and tries to resume it using:
ctx.run(coro.send(None))
- This attempts to enter the same Context again, while it's already active on another thread, and because Context objects are not re-entrant, Python raises:
RuntimeError: cannot enter context: <Context> is already entered
Key takeaways
We cannot control OS thread scheduling, so we can't avoid conflicts when the same Context is shared across threads. Consequently, the only safe way to share a Context between sync and async signal handlers is to run sync handlers and async handlers serially rather than concurrently.
Can I prepare the MR with these changes?
BTW:
There are a lot of passing tests in the PR (see), mainly because the handlers run too quickly to trigger the error. In real-world projects, signal handlers are usually slower, which makes these race conditions much more likely to appear 😅.
follow-ups: 23 24 comment:22 by , 3 weeks ago
Replying to Mykhailo Havelia:
Thanks for the write up Mykhailo. That's a nice example. Let me play with it. I don't have an immediate conclusion to offer you.
I continue to think...
The issue here is the sync handlers. If I've gone to the effort to set up a fully asynchronous request-response flow, to then loose concurrent signal handling for sync signal handlers that I'm not using is going to be a pain. Can we not (at least) dispatch the async handlers concurrently, and then handle the synchronous ones sequentially, even if the requirement to pass the shared context here can't be worked around? 🤔
comment:23 by , 3 weeks ago
Replying to Carlton Gibson:
Can we not (at least) dispatch the async handlers concurrently, and then handle the synchronous ones sequentially
Yes, I've thought about it. It should be fairly easy to implement and maintain going forward 😌. So my plan is to:
- Rewrite the
asendmethod to run all async handlers in parallel first, and then execute sync handlers afterward. - Share the same context across all handlers.
comment:24 by , 3 weeks ago
Replying to Carlton Gibson:
Done - https://github.com/django/django/pull/20288. Please check when you have time.
comment:25 by , 3 weeks ago
| Component: | HTTP handling → Core (Other) |
|---|---|
| Has patch: | set |
comment:26 by , 3 weeks ago
| Owner: | set to |
|---|---|
| Status: | new → assigned |
comment:27 by , 5 days ago
| Patch needs improvement: | set |
|---|
comment:28 by , 11 hours ago
| Patch needs improvement: | unset |
|---|---|
| Triage Stage: | Accepted → Ready for checkin |
Hi Mykhailo.
Thanks for the report. (And thanks for writing it up so nicely.)
So, yes — passing the
contextexplicitly to the tasks would be my first thought. The whole idea of that API is to allow control over this, when trying to keep things structured, no?Then we should resolve #36315 first no? TaskGroup is the newer preferred API. Do you fancy reviewing that?
I'd be a bit sad if we had to lose the concurrent async dispatch on signals. (I grant your point that it may not be a big gain in performance necessarily, but ... 🙂 )