Opened 4 weeks ago
Last modified 10 days ago
#36714 new Bug
Async signals lose ContextVar state due to use of asyncio.gather
| Reported by: | Mykhailo Havelia | Owned by: | |
|---|---|---|---|
| Component: | HTTP handling | Version: | dev |
| Severity: | Normal | Keywords: | asyncio, signals |
| Cc: | Mykhailo Havelia, Carlton Gibson | Triage Stage: | Accepted |
| Has patch: | no | Needs documentation: | no |
| Needs tests: | no | Patch needs improvement: | no |
| Easy pickings: | no | UI/UX: | no |
Description
The natural way to share global, per-request state in asyncio is through contextvars. In Django, this is typically used via asgiref.local.Local. However, Django's async signal dispatch currently uses asyncio.gather, which internally creates new tasks (asyncio.create_task). This breaks context propagation, since each task gets its own copy of the context. As a result, it's impossible to set a global (context-based) variable inside a signal handler and have it shared with other signal handlers or parts of the same request/response cycle.
Example
from django.core import signals
from django.http import (
HttpRequest,
HttpResponse,
)
import contextvars
from django.urls import path
request_id = contextvars.ContextVar('request_id', default=None)
async def set_global_variable(*args, **kwargs):
# set global variable
request_id.set('request_id_value')
print('get value', request_id.get())
signals.request_started.connect(set_global_variable)
async def index(request: HttpRequest) -> HttpResponse:
# get global variable
print('request_id', request_id.get())
return HttpResponse(content=request_id.get())
urlpatterns = [path("", index), ]
result
get value request_id_value request_id None
The value set inside the signal handler is lost, because the handler runs in a separate task with its own context.
If we are talking exactly about signals.request_started and signals.request_finished, they are typically used for setting up and cleaning up per-request resources. With asyncio.gather, cleanup logic that relies on ContextVar cannot work properly.
from django.core import signals
from django.http import (
HttpRequest,
HttpResponse,
)
import contextvars
from django.urls import path
db_connection = contextvars.ContextVar('db_connection', default=None)
async def get_or_create_connection():
if not db_connection.get():
db_connection.set('connection')
return db_connection.get()
async def close_connection(*args, **kwargs):
connection = db_connection.get()
if not connection:
print('cannot clean - connection does not exist')
return
print('close connection')
connection.set(None)
signals.request_finished.connect(close_connection)
async def index(request: HttpRequest) -> HttpResponse:
# create connection inside handler
connection = await get_or_create_connection()
# await get_data(connection)
return HttpResponse(content="ok")
urlpatterns = [path("", index), ]
result
cannot clean - connection does not exist
Expected behavior
Signal handlers should run in the same async context as the request, preserving ContextVar and asgiref.local.Local state.
Proposed solution
Signal:
Dispatch async signal handlers sequentially (or via direct await) instead of using asyncio.gather, so that the existing execution context is preserved throughout the request lifecycle. Yes, this change removes parallelism, but that shouldn’t be a major concern. The only real benefit of running signal handlers in parallel would be for IO-bound operations - yet in most cases, these handlers interact with the same database connection. Since database operations aren’t truly parallel under the hood, the performance gain from asyncio.gather is negligible.
ASGIHandler:
async def handle(self, scope, receive, send):
...
await signals.request_started.asend(sender=self.__class__, scope=scope)
tasks = [
asyncio.create_task(self.listen_for_disconnect(receive)),
asyncio.create_task(process_request(request, send)),
]
...
await signals.request_finished.asend(sender=self.__class__)
Global variables created inside process_request are not visible to request_finished, because each task runs in a separate context. We can try using contextvars.copy_context() to preserve and share the same context between tasks and signal handlers.
async def handle(self, scope, receive, send):
...
await signals.request_started.asend(sender=self.__class__, scope=scope)
ctx = contextvars.copy_context()
tasks = [
asyncio.create_task(self.listen_for_disconnect(receive)),
asyncio.create_task(process_request(request, send), context=ctx),
]
...
await asyncio.create_task(signals.request_finished.asend(sender=self.__class__), context=ctx)
Here is a simple example
import asyncio
import contextvars
global_state = contextvars.ContextVar('stage', default=0)
async def inc():
value = global_state.get()
print('value: ', value)
global_state.set(value + 1)
async def main():
await asyncio.create_task(inc())
await asyncio.create_task(inc())
await asyncio.create_task(inc())
print('first: ', global_state.get())
ctx = contextvars.copy_context()
await asyncio.create_task(inc(), context=ctx)
await asyncio.create_task(inc(), context=ctx)
await asyncio.create_task(inc(), context=ctx)
print('second: ', ctx.get(global_state))
await main()
result
value: 0 value: 0 value: 0 first: 0 value: 0 value: 1 value: 2 second: 3
Change History (20)
follow-up: 4 comment:1 by , 4 weeks ago
| Cc: | added |
|---|---|
| Triage Stage: | Unreviewed → Accepted |
comment:2 by , 4 weeks ago
| Owner: | set to |
|---|---|
| Status: | new → assigned |
comment:3 by , 4 weeks ago
| Owner: | removed |
|---|---|
| Status: | assigned → new |
comment:4 by , 4 weeks ago
Replying to Carlton Gibson:
The whole idea of that API is to allow control over this, when trying to keep things structured, no?
🙂 If we’re talking about ASGIHandler.handler, then yes. It’s easy to implement and would unblock my current work on the async backend.
Then we should resolve #36315 first no?
I think our priority should be to focus first on the:
- database cursor
- ORM
- middlewares,
- signals
- cache
to ensure proper async handling first. After that, we can focus on optimization and improvements since optimization often adds complexity, and there are still some core challenges to solve in the async port 😔.
That said, if I can prepare a small MR with minimal changes that could be reviewed and merged quickly, I’d really prefer that approach, especially since #36315 has been pending for over six months. It would help unblock my current work.
If that’s not possible, I'm more than happy to contribute to #36315 to help move it forward and unblock these changes together.
What do you think?
The whole idea of that API is to allow control over this, when trying to keep things structured, no?
If we're talking about using asyncio.gather inside signals, things get a bit more complicated. I'm not entirely sure how context sharing behaves with parallel tasks yet (I'll need to look into it). We might need to do something similar to what asgiref does, for example:
def _restore_context(context: contextvars.Context) -> None:
cvalue = context.get(cvar)
try:
if cvar.get() != cvalue:
cvar.set(cvalue)
except LookupError:
cvar.set(cvalue)
Manually handling context like this can easily lead to unexpected behavior. For instance, overwriting a global variable when we shouldn't. So we'll need to test this carefully, especially with sync_to_async and async_to_sync. It might take a fair bit of time to review and verify that everything works as expected.
It's much easier to delete it. What do you think?
follow-up: 6 comment:5 by , 4 weeks ago
There's already two small PRs for #36315. They look good, and just need a review, and confirmation. (They're sat on my list. 🤹) — My point was that we should resolve those and then implement the change here on top of that, rather than making a separate change here, and then having to redo it.
comment:6 by , 4 weeks ago
Replying to Carlton Gibson:
There's already two small PRs for #36315. They look good, and just need a review, and confirmation. (They're sat on my list. 🤹) — My point was that we should resolve those and then implement the change here on top of that, rather than making a separate change here, and then having to redo it.
Got it. I’ll take a look at them tonight and leave a review. Hopefully, that helps speed up the process.
comment:7 by , 3 weeks ago
| Type: | Uncategorized → Bug |
|---|
follow-up: 9 comment:8 by , 3 weeks ago
I reviewed #36315 and marked it as Ready for checkin, so hopefully that can progress to clear the path for work here.
comment:9 by , 3 weeks ago
Replying to Carlton Gibson:
I reviewed #36315 and marked it as Ready for checkin, so hopefully that can progress to clear the path for work here.
Test MR: https://github.com/Arfey/django/pull/3/files
I used the latest changes and prepared a fix for context sharing. At this point, we have:
- Successful context sharing between async handlers for asend/send
- Successful context sharing between sync handlers for asend/send
- Successful cross-sharing between async/sync handlers for send
Current issue:
- Cross-sharing between async/sync handlers for asend
The difference is in how send and asend execute handlers. send runs sync handlers first, and then runs async handlers in parallel and this works fine. asend tries to run all handlers "in parallel". In this case, sync_to_async copies the context instead of using the existing one, so context sharing breaks. I propose splitting the execution of sync and async handlers for asend as well.
Current code:
await _gather(
sync_send(),
_gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
),
)
Proposed adjustment:
await sync_send()
await _gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
)
What do you think?
follow-up: 11 comment:10 by , 3 weeks ago
I continue to think that it would be a shame to not at least try to maintain the concurrent signal dispatch here. We should be able to pass the current context to the sync_send task, using the context parameter, AFAICS. I'd like us to at least try that. If there's some blocking reason why that's not feasible then (sure) we can look at running the sync signals first.
comment:11 by , 3 weeks ago
Replying to Carlton Gibson:
I continue to think that it would be a shame to not at least try to maintain the concurrent signal dispatch here. We should be able to pass the current context to the sync_send task, using the context parameter, AFAICS. I'd like us to at least try that. If there's some blocking reason why that's not feasible then (sure) we can look at running the sync signals first.
This would require changes to sync_to_async. It might look something like:
context = contextvars.copy_context()
@sync_to_async(context=ctx)
def foo():
...
If this approach is acceptable, I can prepare an MR.
follow-up: 13 comment:12 by , 3 weeks ago
sync_to_async already propagates the parent context. See tests. Is it not the case that the asyncio.TaskGroup() call in _gather needs to take the context parameter? 🤔
I think this is going to be easier to look at in a PR than a series of comments on the issue here.
comment:13 by , 3 weeks ago
Replying to Carlton Gibson:
sync_to_async already propagates the parent context. See tests. Is it not the case that the
asyncio.TaskGroup()call in_gatherneeds to take the context parameter? 🤔
I think this is going to be easier to look at in a PR than a series of comments on the issue here.
I’ve already attached the link to the MR above: https://github.com/Arfey/django/pull/3/files. Did you have a chance to look at it? 🙂
sync_to_async does support modifying contextvars, but what we need is the ability to share the same context between tasks running in parallel. There’s a test included in my MR that demonstrates exactly what behavior we have to achieve
async def test_asend_correct_contextvars_sharing_mix_receivers(self):
handler1 = self.CtxSyncHandler(self.ctx_var)
handler2 = self.CtxAsyncHandler(self.ctx_var)
signal = dispatch.Signal()
signal.connect(handler1)
signal.connect(handler2)
# set custom value outer signal
self.ctx_var.set(1)
await signal.asend(self.__class__)
self.assertEqual(len(handler1.values), 1)
self.assertEqual(len(handler2.values), 1)
self.assertEqual(
sorted([*handler1.values, *handler2.values]),
[2, 3]
)
self.assertEqual(self.ctx_var.get(), 3)
Right now we don't end up with all 3. We only get 2, because handler1 and handler2 don't share context with each other. The reason is that sync_to_async copies the context (see: https://github.com/django/asgiref/blob/2138f0317d79cedd065571447ae0a7571989550e/asgiref/sync.py#L483), but in our case we need them to share it.
follow-up: 15 comment:14 by , 3 weeks ago
OK, so first step is an issue on asgiref showing the necessary adjust there, yes?
comment:15 by , 3 weeks ago
Replying to Carlton Gibson:
OK, so first step is an issue on asgiref showing the necessary adjust there, yes?
If we want to try keep async running then yes.
follow-ups: 17 18 20 comment:16 by , 3 weeks ago
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
comment:17 by , 3 weeks ago
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
Got it. I'll prepare the MR soon.
follow-up: 19 comment:18 by , 3 weeks ago
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
comment:19 by , 3 weeks ago
Replying to Mykhailo Havelia:
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
I tested the updated asgiref with Django signals using my branch
pip install git+https://github.com/Arfey/asgiref.git@feat/added-custom-context-parameter-for-sync-to-async#egg=asgiref
Everything works correctly:
./runtests.py signals Testing against Django installed in ... with up to 10 processes Found 28 test(s). ............................ ---------------------------------------------------------------------- Ran 28 tests in 0.333s
comment:20 by , 10 days ago
Replying to Carlton Gibson:
Being able to share the context between multiple concurrent sync_to_async tasks seems like something that's generally useful, and in-line with the create_task and TaskGroup API. So, that's a useful step independently I'd think. Thanks!
I've prepared a patch — https://github.com/django/django/pull/20288 — but ran into a problem 😔
I didn't run the full test suite and overlooked an issue in asgi.tests.ASGITest.test_assert_in_listen_for_disconnect. That test uses ApplicationCommunicator, which internally does:
self._future = contextvars.Context().run(
asyncio.create_task,
self.application(
self.scope, self.input_queue.get, self.output_queue.put
),
)
As a result, we hit this error:
RuntimeError: cannot enter context: <_contextvars.Context object at 0x104939040> is already entered
So ApplicationCommunicator is already setting a context, and we can't override it using asyncio.create_task(coro(), context=ctx). I'm looking for a workaround, but it seems like this may require another asgiref release 😔
If you have any ideas on how to handle this, please let me know.
Hi Mykhailo.
Thanks for the report. (And thanks for writing it up so nicely.)
So, yes — passing the
contextexplicitly to the tasks would be my first thought. The whole idea of that API is to allow control over this, when trying to keep things structured, no?Then we should resolve #36315 first no? TaskGroup is the newer preferred API. Do you fancy reviewing that?
I'd be a bit sad if we had to lose the concurrent async dispatch on signals. (I grant your point that it may not be a big gain in performance necessarily, but ... 🙂 )