1 | import asyncio
|
---|
2 | import os
|
---|
3 | import random
|
---|
4 |
|
---|
5 | from aiohttp import web
|
---|
6 | from asgiref.sync import sync_to_async
|
---|
7 | from django.conf import settings
|
---|
8 | from django.contrib.auth import get_user_model
|
---|
9 | from django.core import management
|
---|
10 | from django.core.wsgi import get_wsgi_application
|
---|
11 | from django.db import close_old_connections
|
---|
12 | from django.db.transaction import Atomic
|
---|
13 | from functools import wraps, WRAPPER_ASSIGNMENTS
|
---|
14 |
|
---|
15 |
|
---|
16 | # https://stackoverflow.com/questions/74575922/how-to-use-transaction-with-async-functions-in-django
|
---|
17 | class AsyncAtomicContextManager(Atomic):
|
---|
18 | def __init__(self, using=None, savepoint=True, durable=False):
|
---|
19 | super().__init__(using, savepoint, durable)
|
---|
20 |
|
---|
21 | async def __aenter__(self):
|
---|
22 | await sync_to_async(super().__enter__, thread_sensitive=True)()
|
---|
23 | return self
|
---|
24 |
|
---|
25 | async def __aexit__(self, exc_type, exc_value, traceback):
|
---|
26 | await sync_to_async(super().__exit__, thread_sensitive=True)(exc_type, exc_value, traceback)
|
---|
27 |
|
---|
28 |
|
---|
29 | def aatomic(using=None, savepoint=True, durable=False):
|
---|
30 |
|
---|
31 | def decorator(func):
|
---|
32 | @wraps(func, assigned=WRAPPER_ASSIGNMENTS)
|
---|
33 | async def _wrapped(*args, **kwargs):
|
---|
34 | async with AsyncAtomicContextManager():
|
---|
35 | return await func(*args, **kwargs)
|
---|
36 |
|
---|
37 | return _wrapped
|
---|
38 |
|
---|
39 | return decorator
|
---|
40 |
|
---|
41 |
|
---|
42 | # Configure Django so the ORM works
|
---|
43 | if not settings.configured:
|
---|
44 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "failure_settings")
|
---|
45 |
|
---|
46 | application = get_wsgi_application()
|
---|
47 | management.call_command("migrate")
|
---|
48 |
|
---|
49 |
|
---|
50 | @aatomic()
|
---|
51 | async def i_am_atomic(num):
|
---|
52 | user = await User.objects.aget(username=f"user{num}")
|
---|
53 | return user
|
---|
54 |
|
---|
55 |
|
---|
56 | async def handle(request):
|
---|
57 | await sync_to_async(close_old_connections)()
|
---|
58 | num = request.query["num"]
|
---|
59 | user = await i_am_atomic(num)
|
---|
60 | return web.Response(text=user.username)
|
---|
61 |
|
---|
62 |
|
---|
63 | app = web.Application()
|
---|
64 | app.add_routes([web.get("/", handle),
|
---|
65 | web.get("/{name}", handle)])
|
---|
66 |
|
---|
67 |
|
---|
68 | if __name__ == '__main__':
|
---|
69 | # Create users
|
---|
70 | User = get_user_model()
|
---|
71 | for i in range(100):
|
---|
72 | User.objects.get_or_create(username=f"user{i}", email=f"user{i}@aaa.com")
|
---|
73 |
|
---|
74 | web.run_app(app)
|
---|