Code

Ticket #2705: for_update-1.0.1-v2.diff

File for_update-1.0.1-v2.diff, 18.7 KB (added by anih, 5 years ago)

with all

Line 
1Index: django/db/models/sql/query.py
2===================================================================
3--- django/db/models/sql/query.py       (revision 9472)
4+++ django/db/models/sql/query.py       (working copy)
5@@ -12,7 +12,7 @@
6 from django.utils.tree import Node
7 from django.utils.datastructures import SortedDict
8 from django.utils.encoding import force_unicode
9-from django.db import connection
10+from django.db import connection, DatabaseError
11 from django.db.models import signals
12 from django.db.models.fields import FieldDoesNotExist
13 from django.db.models.query_utils import select_related_descend
14@@ -27,8 +27,14 @@
15 except NameError:
16     from sets import Set as set     # Python 2.3 fallback
17 
18-__all__ = ['Query', 'BaseQuery']
19+__all__ = ['Query', 'BaseQuery', 'LockNotAvailable']
20 
21+class LockNotAvailable(DatabaseError):
22+    '''
23+    Raised when a query fails because a lock was not available.
24+    '''
25+    pass
26+
27 class BaseQuery(object):
28     """
29     A single SQL query.
30@@ -71,6 +77,8 @@
31         self.order_by = []
32         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
33         self.distinct = False
34+        self.select_for_update = False
35+        self.select_for_update_nowait = False
36         self.select_related = False
37         self.related_select_cols = []
38 
39@@ -179,6 +187,8 @@
40         obj.order_by = self.order_by[:]
41         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
42         obj.distinct = self.distinct
43+        obj.select_for_update = self.select_for_update
44+        obj.select_for_update_nowait = self.select_for_update_nowait
45         obj.select_related = self.select_related
46         obj.related_select_cols = []
47         obj.max_depth = self.max_depth
48@@ -225,6 +235,7 @@
49         obj = self.clone()
50         obj.clear_ordering(True)
51         obj.clear_limits()
52+        obj.select_for_update = False
53         obj.select_related = False
54         obj.related_select_cols = []
55         obj.related_select_fields = []
56@@ -310,6 +321,10 @@
57                         result.append('LIMIT %d' % val)
58                 result.append('OFFSET %d' % self.low_mark)
59 
60+        if self.select_for_update and self.connection.features.has_select_for_update:
61+            nowait = self.select_for_update_nowait and self.connection.features.has_select_for_update
62+            result.append("%s" % self.connection.ops.for_update_sql(nowait=nowait))
63+
64         params.extend(self.extra_params)
65         return ' '.join(result), tuple(params)
66 
67@@ -1731,7 +1746,12 @@
68                 return
69 
70         cursor = self.connection.cursor()
71-        cursor.execute(sql, params)
72+        try:
73+            cursor.execute(sql, params)
74+        except DatabaseError, e:
75+            if self.connection.features.has_select_for_update_nowait and self.connection.ops.signals_lock_not_available(e):
76+                raise LockNotAvailable(*e.args)
77+            raise
78 
79         if not result_type:
80             return cursor
81Index: django/db/models/manager.py
82===================================================================
83--- django/db/models/manager.py (revision 9472)
84+++ django/db/models/manager.py (working copy)
85@@ -119,6 +119,9 @@
86     def order_by(self, *args, **kwargs):
87         return self.get_query_set().order_by(*args, **kwargs)
88 
89+    def select_for_update(self, *args, **kwargs):
90+        return self.get_query_set().select_for_update(*args, **kwargs)
91+       
92     def select_related(self, *args, **kwargs):
93         return self.get_query_set().select_related(*args, **kwargs)
94 
95Index: django/db/models/__init__.py
96===================================================================
97--- django/db/models/__init__.py        (revision 9472)
98+++ django/db/models/__init__.py        (working copy)
99@@ -9,6 +9,7 @@
100 from django.db.models.fields.subclassing import SubfieldBase
101 from django.db.models.fields.files import FileField, ImageField
102 from django.db.models.fields.related import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel
103+from django.db.models.sql.query import LockNotAvailable
104 from django.db.models import signals
105 
106 # Admin stages.
107Index: django/db/models/query.py
108===================================================================
109--- django/db/models/query.py   (revision 9472)
110+++ django/db/models/query.py   (working copy)
111@@ -385,6 +385,7 @@
112         del_query = self._clone()
113 
114         # Disable non-supported fields.
115+        del_query.query.select_for_update = False
116         del_query.query.select_related = False
117         del_query.query.clear_ordering()
118 
119@@ -524,6 +525,18 @@
120         else:
121             return self._filter_or_exclude(None, **filter_obj)
122 
123+    def select_for_update(self, **kwargs):
124+        """
125+        Returns a new QuerySet instance that will select objects with a
126+        FOR UPDATE lock.
127+        """
128+        # Default to false for nowait
129+        nowait = kwargs.pop('nowait', False)
130+        obj = self._clone()
131+        obj.query.select_for_update = True
132+        obj.query.select_for_update_nowait = nowait
133+        return obj
134+
135     def select_related(self, *fields, **kwargs):
136         """
137         Returns a new QuerySet instance that will select related objects.
138Index: django/db/backends/mysql/base.py
139===================================================================
140--- django/db/backends/mysql/base.py    (revision 9472)
141+++ django/db/backends/mysql/base.py    (working copy)
142@@ -22,7 +22,7 @@
143     raise ImproperlyConfigured("MySQLdb-1.2.1p2 or newer is required; you have %s" % Database.__version__)
144 
145 from MySQLdb.converters import conversions
146-from MySQLdb.constants import FIELD_TYPE, FLAG
147+from MySQLdb.constants import FIELD_TYPE, FLAG, ER
148 
149 from django.db.backends import *
150 from django.db.backends.mysql.client import DatabaseClient
151@@ -111,6 +111,8 @@
152     empty_fetchmany_value = ()
153     update_can_self_select = False
154     related_fields_match_type = True
155+    has_select_for_update = True
156+    has_select_for_update_nowait = False
157 
158 class DatabaseOperations(BaseDatabaseOperations):
159     def date_extract_sql(self, lookup_type, field_name):
160@@ -193,6 +195,8 @@
161         # MySQL doesn't support microseconds
162         return unicode(value.replace(microsecond=0))
163 
164+    signals_deadlock = lambda self, e: e.args[0] == ER.LOCK_DEADLOCK
165+
166     def year_lookup_bounds(self, value):
167         # Again, no microseconds
168         first = '%s-01-01 00:00:00'
169Index: django/db/backends/__init__.py
170===================================================================
171--- django/db/backends/__init__.py      (revision 9472)
172+++ django/db/backends/__init__.py      (working copy)
173@@ -74,6 +74,8 @@
174     # If True, don't use integer foreign keys referring to, e.g., positive
175     # integer primary keys.
176     related_fields_match_type = False
177+    has_select_for_update = False
178+    has_select_for_update_nowait = False
179 
180 class BaseDatabaseOperations(object):
181     """
182@@ -143,6 +145,16 @@
183         """
184         return '%s'
185 
186+    def for_update_sql(self, nowait=False):
187+        """
188+        Return FOR UPDATE SQL clause to lock row for update
189+        """
190+        if nowait:
191+            nowaitstr = ' NOWAIT'
192+        else:
193+            nowaitstr = ''
194+        return 'FOR UPDATE' + nowaitstr
195+
196     def fulltext_search_sql(self, field_name):
197         """
198         Returns the SQL WHERE clause to use in order to perform a full-text
199Index: django/db/backends/postgresql_psycopg2/base.py
200===================================================================
201--- django/db/backends/postgresql_psycopg2/base.py      (revision 9472)
202+++ django/db/backends/postgresql_psycopg2/base.py      (working copy)
203@@ -15,6 +15,7 @@
204 try:
205     import psycopg2 as Database
206     import psycopg2.extensions
207+    from psycopg2 import errorcodes
208 except ImportError, e:
209     from django.core.exceptions import ImproperlyConfigured
210     raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e)
211@@ -29,6 +30,8 @@
212 class DatabaseFeatures(BaseDatabaseFeatures):
213     needs_datetime_string_cast = False
214     uses_savepoints = True
215+    has_select_for_update = True
216+    has_select_for_update_nowait = True
217 
218 class DatabaseOperations(PostgresqlDatabaseOperations):
219     def last_executed_query(self, cursor, sql, params):
220@@ -37,6 +40,11 @@
221         # http://www.initd.org/tracker/psycopg/wiki/psycopg2_documentation#postgresql-status-message-and-executed-query
222         return cursor.query
223 
224+    signals_deadlock = lambda self, e: e.pgcode == errorcodes.DEADLOCK_DETECTED
225+
226+    signals_lock_not_available = lambda self, e: e.pgcode == errorcodes.LOCK_NOT_AVAILABLE
227+   
228+
229 class DatabaseWrapper(BaseDatabaseWrapper):
230     operators = {
231         'exact': '= %s',
232Index: django/views/decorators/deadlock.py
233===================================================================
234--- django/views/decorators/deadlock.py (revision 0)
235+++ django/views/decorators/deadlock.py (revision 0)
236@@ -0,0 +1,53 @@
237+"""
238+Decorators for deadlock handling.
239+"""
240+import sys
241+try:
242+    from functools import wraps
243+except ImportError:
244+    from django.utils.functional import wraps  # Python 2.3, 2.4 fallback.
245+
246+from django.db import transaction, connection, DatabaseError
247+
248+class DeadlockError(Exception):
249+    """
250+    Thrown by a view decorated by handle_deadlock(max_retries) when a deadlock
251+    has been detected and the view won't be called again to retry the aborted
252+    transaction.
253+    """
254+    pass
255+
256+def handle_deadlocks(max_retries=2):
257+    """
258+    Decorator to retry a view when a database deadlock is detected.
259+
260+    When there are no retries left, raises DeadlockError with the traceback of
261+    the original, database backend-specific exception.
262+
263+    Views using querysets constructed with select_for_update() should use this
264+    decorator. If the backend does not support locking and/or deadlock
265+    handling, this doesn't do anything.
266+    """
267+    if connection.features.has_select_for_update:
268+        signals_deadlock = connection.ops.signals_deadlock
269+    else:
270+        return lambda f: f
271+    def decorator(func):
272+        def inner(*args, **kwargs):
273+            retries = 0
274+            while 1:
275+                try:
276+                    return func(*args, **kwargs)
277+                except DatabaseError, e:
278+                    if signals_deadlock(e):
279+                        if retries == max_retries:
280+                            raise DeadlockError, 'Deadlock detected', sys.exc_info()[2]
281+                        retries += 1
282+                        # Rollback needed by PostgreSQL
283+                        transaction.rollback()
284+                        transaction.set_clean()
285+                        continue
286+                    raise
287+        return wraps(func)(inner)
288+    return decorator
289+
290Index: tests/regressiontests/select_for_update/__init__.py
291===================================================================
292--- tests/regressiontests/select_for_update/__init__.py (revision 0)
293+++ tests/regressiontests/select_for_update/__init__.py (revision 0)
294@@ -0,0 +1,2 @@
295+
296+
297Index: tests/regressiontests/select_for_update/tests.py
298===================================================================
299--- tests/regressiontests/select_for_update/tests.py    (revision 0)
300+++ tests/regressiontests/select_for_update/tests.py    (revision 0)
301@@ -0,0 +1,107 @@
302+import time
303+import threading
304+from unittest import TestCase
305+
306+from django.conf import settings
307+from django.db import transaction, connection
308+from django.db.models import LockNotAvailable
309+from django.views.decorators.deadlock import DeadlockError, handle_deadlocks
310+
311+from regressiontests.select_for_update.models import Tag
312+
313+class SelectForUpdateTests(TestCase):
314+
315+    def setUp(self):
316+        Tag.objects.create(name='1')
317+        Tag.objects.create(name='2')
318+
319+    def test_basics(self):
320+        def test():
321+            t = Tag(name='update')
322+            t.save()
323+            transaction.commit()
324+            tfound = Tag.objects.select_for_update().get(pk=t.id)
325+            tfound.name = 'update2'
326+            tfound.save()
327+            transaction.commit()
328+            tfound = Tag.objects.select_for_update().get(pk=t.id)
329+            tfound.delete()
330+            transaction.commit()
331+        test = transaction.commit_manually(test)
332+        test()
333+
334+    def test_backend_features(self):
335+        if settings.DATABASE_ENGINE == 'postgresql_psycopg2':
336+            self.failUnless(hasattr(connection.ops, 'signals_deadlock'))
337+            self.failUnless(hasattr(connection.ops, 'signals_lock_not_available'))
338+        elif settings.DATABASE_ENGINE == 'mysql':
339+            self.failUnless(hasattr(connection.ops, 'signals_deadlock'))
340+   
341+    def test_deadlock(self):
342+        '''
343+        This test will fail on MySQL if the storage engine is not InnoDB.
344+        '''
345+        # Don't look for deadlocks if the backend doesn't support SELECT FOR UPDATE
346+        if not connection.features.has_select_for_update:
347+            return
348+        def test(max_retries):
349+            vars = {0: None, 1: None}
350+            def view0():
351+                t1 = Tag.objects.select_for_update().get(pk=1)
352+                time.sleep(1)
353+                t2 = Tag.objects.select_for_update().get(pk=2)
354+                transaction.commit()
355+            view0 = handle_deadlocks(max_retries=max_retries)(transaction.commit_manually(view0))
356+            def view1():
357+                t2 = Tag.objects.select_for_update().get(pk=2)
358+                time.sleep(1)
359+                t1 = Tag.objects.select_for_update().get(pk=1)
360+                transaction.commit()
361+            view1 = handle_deadlocks(max_retries=max_retries)(transaction.commit_manually(view1))
362+            def thread0(vars):
363+                try:
364+                    view0()
365+                except Exception, e:
366+                    vars[0] = e
367+            def thread1(vars):
368+                try:
369+                    view1()
370+                except Exception, e:
371+                    vars[1] = e
372+            t0 = threading.Thread(target=thread0, args=(vars,))
373+            t1 = threading.Thread(target=thread1, args=(vars,))
374+            t0.start()
375+            t1.start()
376+            t0.join()
377+            t1.join()
378+            return vars[0], vars[1]
379+        # Make a deadlock and don't retry the aborted transaction
380+        # We are expecting a DeadlockError
381+        e0, e1 = test(0)
382+        self.assertEqual(e0 or e1, e1 or e0)
383+        self.assert_(isinstance(e0 or e1, DeadlockError))
384+        # Make a deadlock and retry the aborted transaction
385+        # We expect no errors
386+        e0, e1 = test(1)
387+        self.assertEqual(e0 or e1, None)
388+
389+    def test_nowait(self):
390+        if not connection.features.has_select_for_update_nowait:
391+            return
392+        def view():
393+            try:
394+                t1 = Tag.objects.select_for_update(nowait=True).get(pk=1)
395+                time.sleep(1)
396+            finally:
397+                transaction.rollback()
398+        view = transaction.commit_manually(view)
399+        t = threading.Thread(target=view)
400+        t.start()
401+        time.sleep(.25)
402+        try:
403+            view()
404+        except LockNotAvailable:
405+            pass
406+        else:
407+            self.fail('Expected view to raise LockNotAvailable')
408+
409Index: tests/regressiontests/select_for_update/models.py
410===================================================================
411--- tests/regressiontests/select_for_update/models.py   (revision 0)
412+++ tests/regressiontests/select_for_update/models.py   (revision 0)
413@@ -0,0 +1,13 @@
414+from django.db import models
415+
416+class Tag(models.Model):
417+    name = models.CharField(max_length=10)
418+    parent = models.ForeignKey('self', blank=True, null=True,
419+            related_name='children')
420+
421+    class Meta:
422+        ordering = ['name']
423+
424+    def __unicode__(self):
425+        return self.name
426+
427Index: docs/ref/models/querysets.txt
428===================================================================
429--- docs/ref/models/querysets.txt       (revision 9472)
430+++ docs/ref/models/querysets.txt       (working copy)
431@@ -726,6 +726,54 @@
432 
433         Entry.objects.extra(where=['headline=%s'], params=['Lennon'])
434 
435+``select_for_update(nowait=False)``
436+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
437+
438+Returns a queryset that will lock rows until the end of the transaction,
439+generating a SELECT ... FOR UPDATE statement on supported databases.
440+
441+For example::
442+
443+    entries = Entry.objects.select_for_update().filter(author=request.user)
444+
445+All matched entries will be locked until the end of the transaction block,
446+meaning that other transactions will be prevented from changing or acquiring
447+locks on them.
448+
449+Usually, if another transaction has already acquired a lock on one of the
450+selected rows, the query will block until the lock is released. If this is
451+not the behaviour you want, call ``select_for_update(nowait=True)``. This will
452+make the call non-blocking. If a conflicting lock is already acquired by
453+another transaction, ``django.db.models.LockNotAvailable`` will be raised when
454+the queryset is evaluated.
455+
456+Using blocking locks on a database can lead to deadlocks. This occurs when two
457+concurrent transactions are both waiting on a lock the other transaction
458+already holds. To deal with deadlocks, wrap your views that use
459+``select_for_update(nowait=False)`` with the
460+``django.views.decorators.deadlock.handle_deadlocks`` decorator.
461+
462+For example::
463+
464+    from django.db import transaction
465+    from django.views.decorators.deadlock import handle_deadlocks
466+
467+    @handle_deadlocks(max_retries=2)
468+    @transaction.commit_on_success
469+    def my_view(request):
470+        ...
471+
472+If the database engine detects a deadlock involving ``my_view`` and decides
473+to abort its transaction, it will be automatically retried. If deadlocks keep
474+occurring after two repeated attempts,
475+``django.views.decorators.DeadlockError`` will be raised, which can be
476+propagated to the user or handled in a middleware.
477+
478+Currently the ``postgresql_psycopg2`` and ``mysql`` database backends
479+support ``select_for_update()`` but MySQL has no support for the ``nowait``
480+argument. Other backends will simply generate queries as if
481+``select_for_update()`` had not been used.
482+
483 QuerySet methods that do not return QuerySets
484 ---------------------------------------------
485 
486Index: docs/ref/databases.txt
487===================================================================
488--- docs/ref/databases.txt      (revision 9472)
489+++ docs/ref/databases.txt      (working copy)
490@@ -244,6 +244,15 @@
491 matter unless you're printing out the field values and are expecting to see
492 ``True`` and ``False.``.
493 
494+Row locking with ``QuerySet.select_for_update()``
495+-------------------------------------------------
496+
497+MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE
498+statement. However, you may call the ``select_for_update()`` method of a
499+queryset with ``nowait=True``. In that case, the argument will be silently
500+discarded and the generated query will block until the requested lock can be
501+acquired.
502+
503 .. _sqlite-notes:
504 
505 SQLite notes