Code

Ticket #2705: for_update_tests_14778.diff

File for_update_tests_14778.diff, 20.0 KB (added by danfairs, 3 years ago)

Updated version of patch with tests - really this time! (passed on MySQL, PostgreSQL, sqlite)

Line 
1Index: django/db/models/sql/compiler.py
2===================================================================
3--- django/db/models/sql/compiler.py    (revision 14778)
4+++ django/db/models/sql/compiler.py    (working copy)
5@@ -117,6 +117,10 @@
6                         result.append('LIMIT %d' % val)
7                 result.append('OFFSET %d' % self.query.low_mark)
8 
9+        if self.query.select_for_update and self.connection.features.has_select_for_update:
10+            nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update
11+            result.append("%s" % self.connection.ops.for_update_sql(nowait=nowait))
12+
13         return ' '.join(result), tuple(params)
14 
15     def as_nested_sql(self):
16Index: django/db/models/sql/query.py
17===================================================================
18--- django/db/models/sql/query.py       (revision 14778)
19+++ django/db/models/sql/query.py       (working copy)
20@@ -11,7 +11,7 @@
21 from django.utils.tree import Node
22 from django.utils.datastructures import SortedDict
23 from django.utils.encoding import force_unicode
24-from django.db import connections, DEFAULT_DB_ALIAS
25+from django.db import connections, DEFAULT_DB_ALIAS, DatabaseError
26 from django.db.models import signals
27 from django.db.models.fields import FieldDoesNotExist
28 from django.db.models.query_utils import select_related_descend, InvalidQuery
29@@ -23,8 +23,16 @@
30     ExtraWhere, AND, OR)
31 from django.core.exceptions import FieldError
32 
33-__all__ = ['Query', 'RawQuery']
34+__all__ = ['Query', 'RawQuery', 'LockNotAvailable']
35 
36+
37+class LockNotAvailable(DatabaseError):
38+    '''
39+    Raised when a query fails because a lock was not available.
40+    '''
41+    pass
42+
43+
44 class RawQuery(object):
45     """
46     A single raw SQL query
47@@ -83,10 +91,15 @@
48         return "<RawQuery: %r>" % (self.sql % self.params)
49 
50     def _execute_query(self):
51-        self.cursor = connections[self.using].cursor()
52-        self.cursor.execute(self.sql, self.params)
53+        connection = connections[self.using]
54+        self.cursor = connection.cursor()
55+        try:
56+            self.cursor.execute(self.sql, self.params)
57+        except DatabaseError, e:
58+            if connection.features.has_select_for_update_nowait and connection.ops.signals_lock_not_available(e):
59+                raise LockNotAvailable(*e.args)
60+            raise
61 
62-
63 class Query(object):
64     """
65     A single SQL query.
66@@ -131,6 +144,8 @@
67         self.order_by = []
68         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
69         self.distinct = False
70+        self.select_for_update = False
71+        self.select_for_update_nowait = False
72         self.select_related = False
73         self.related_select_cols = []
74 
75@@ -260,6 +275,8 @@
76         obj.order_by = self.order_by[:]
77         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
78         obj.distinct = self.distinct
79+        obj.select_for_update = self.select_for_update
80+        obj.select_for_update_nowait = self.select_for_update_nowait
81         obj.select_related = self.select_related
82         obj.related_select_cols = []
83         obj.aggregates = deepcopy(self.aggregates, memo=memo)
84@@ -366,6 +383,7 @@
85 
86         query.clear_ordering(True)
87         query.clear_limits()
88+        query.select_for_update = False
89         query.select_related = False
90         query.related_select_cols = []
91         query.related_select_fields = []
92Index: django/db/models/manager.py
93===================================================================
94--- django/db/models/manager.py (revision 14778)
95+++ django/db/models/manager.py (working copy)
96@@ -164,6 +164,9 @@
97     def order_by(self, *args, **kwargs):
98         return self.get_query_set().order_by(*args, **kwargs)
99 
100+    def select_for_update(self, *args, **kwargs):
101+        return self.get_query_set().select_for_update(*args, **kwargs)
102+
103     def select_related(self, *args, **kwargs):
104         return self.get_query_set().select_related(*args, **kwargs)
105 
106Index: django/db/models/query.py
107===================================================================
108--- django/db/models/query.py   (revision 14778)
109+++ django/db/models/query.py   (working copy)
110@@ -432,6 +432,7 @@
111         del_query._for_write = True
112 
113         # Disable non-supported fields.
114+        del_query.query.select_for_update = False
115         del_query.query.select_related = False
116         del_query.query.clear_ordering()
117 
118@@ -580,6 +581,18 @@
119         else:
120             return self._filter_or_exclude(None, **filter_obj)
121 
122+    def select_for_update(self, **kwargs):
123+        """
124+        Returns a new QuerySet instance that will select objects with a
125+        FOR UPDATE lock.
126+        """
127+        # Default to false for nowait
128+        nowait = kwargs.pop('nowait', False)
129+        obj = self._clone()
130+        obj.query.select_for_update = True
131+        obj.query.select_for_update_nowait = nowait
132+        return obj
133+
134     def select_related(self, *fields, **kwargs):
135         """
136         Returns a new QuerySet instance that will select related objects.
137Index: django/db/backends/mysql/base.py
138===================================================================
139--- django/db/backends/mysql/base.py    (revision 14778)
140+++ django/db/backends/mysql/base.py    (working copy)
141@@ -23,7 +23,7 @@
142     raise ImproperlyConfigured("MySQLdb-1.2.1p2 or newer is required; you have %s" % Database.__version__)
143 
144 from MySQLdb.converters import conversions
145-from MySQLdb.constants import FIELD_TYPE, FLAG, CLIENT
146+from MySQLdb.constants import FIELD_TYPE, FLAG, CLIENT, ER
147 
148 from django.db import utils
149 from django.db.backends import *
150@@ -124,6 +124,8 @@
151     allows_group_by_pk = True
152     related_fields_match_type = True
153     allow_sliced_subqueries = False
154+    has_select_for_update = True
155+    has_select_for_update_nowait = False
156     supports_forward_references = False
157     supports_long_model_names = False
158     supports_microsecond_precision = False
159@@ -135,6 +137,7 @@
160 
161 class DatabaseOperations(BaseDatabaseOperations):
162     compiler_module = "django.db.backends.mysql.compiler"
163+    signals_deadlock = lambda self, e: e.args[0] == ER.LOCK_DEADLOCK
164 
165     def date_extract_sql(self, lookup_type, field_name):
166         # http://dev.mysql.com/doc/mysql/en/date-and-time-functions.html
167Index: django/db/backends/oracle/base.py
168===================================================================
169--- django/db/backends/oracle/base.py   (revision 14778)
170+++ django/db/backends/oracle/base.py   (working copy)
171@@ -48,6 +48,8 @@
172     needs_datetime_string_cast = False
173     interprets_empty_strings_as_nulls = True
174     uses_savepoints = True
175+    has_select_for_update = True
176+    has_select_for_update_nowait = True
177     can_return_id_from_insert = True
178     allow_sliced_subqueries = False
179     supports_subqueries_in_group_by = False
180@@ -285,6 +287,12 @@
181                                            'column': column_name})
182         return output
183 
184+    def signals_deadlock(self, exception):
185+        return exception.args[0].code == 60
186+
187+    def signals_lock_not_available(self, exception):
188+        return exception.args[0].code == 54
189+
190     def start_transaction_sql(self):
191         return ''
192 
193Index: django/db/backends/__init__.py
194===================================================================
195--- django/db/backends/__init__.py      (revision 14778)
196+++ django/db/backends/__init__.py      (working copy)
197@@ -103,6 +103,8 @@
198     # integer primary keys.
199     related_fields_match_type = False
200     allow_sliced_subqueries = True
201+    has_select_for_update = False
202+    has_select_for_update_nowait = False
203 
204     # Does the default test database allow multiple connections?
205     # Usually an indication that the test database is in-memory
206@@ -282,6 +284,16 @@
207         """
208         return []
209 
210+    def for_update_sql(self, nowait=False):
211+        """
212+        Return FOR UPDATE SQL clause to lock row for update
213+        """
214+        if nowait:
215+            nowaitstr = ' NOWAIT'
216+        else:
217+            nowaitstr = ''
218+        return 'FOR UPDATE' + nowaitstr
219+
220     def fulltext_search_sql(self, field_name):
221         """
222         Returns the SQL WHERE clause to use in order to perform a full-text
223Index: django/db/backends/postgresql_psycopg2/base.py
224===================================================================
225--- django/db/backends/postgresql_psycopg2/base.py      (revision 14778)
226+++ django/db/backends/postgresql_psycopg2/base.py      (working copy)
227@@ -19,6 +19,7 @@
228 try:
229     import psycopg2 as Database
230     import psycopg2.extensions
231+    from psycopg2 import errorcodes
232 except ImportError, e:
233     from django.core.exceptions import ImproperlyConfigured
234     raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e)
235@@ -70,8 +71,21 @@
236     requires_rollback_on_dirty_transaction = True
237     has_real_datatype = True
238     can_defer_constraint_checks = True
239+    has_select_for_update = True
240+    has_select_for_update_nowait = True
241+   
242 
243 class DatabaseOperations(PostgresqlDatabaseOperations):
244+
245+    def _pg_error(self, e, code):
246+        return getattr(e, 'pgcode', None) == code
247+
248+    def signals_deadlock(self, e):
249+        return self._pg_error(e, errorcodes.DEADLOCK_DETECTED)
250+       
251+    def signals_lock_not_available(self, e):
252+        return self._pg_error(e, errorcodes.LOCK_NOT_AVAILABLE)
253+   
254     def last_executed_query(self, cursor, sql, params):
255         # With psycopg2, cursor objects have a "query" attribute that is the
256         # exact query sent to the database. See docs here:
257Index: tests/modeltests/select_for_update/__init__.py
258===================================================================
259Index: tests/modeltests/select_for_update/tests.py
260===================================================================
261--- tests/modeltests/select_for_update/tests.py (revision 0)
262+++ tests/modeltests/select_for_update/tests.py (revision 0)
263@@ -0,0 +1,173 @@
264+import threading
265+import time
266+from django.conf import settings
267+from django.db import connection
268+from django.db import transaction, connection
269+from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
270+from django.test import TransactionTestCase, skipUnlessDBFeature
271+
272+from models import Person
273+
274+class SelectForUpdateTests(TransactionTestCase):
275+
276+    def setUp(self):
277+        connection._rollback()
278+        connection._enter_transaction_management(True)
279+        self.new_connections = ConnectionHandler(settings.DATABASES)
280+        self.person = Person.objects.create(name='Reinhardt')
281+
282+        # We need to set settings.DEBUG to True so we can capture
283+        # the output SQL to examine.
284+        self._old_debug = settings.DEBUG
285+        settings.DEBUG = True
286+
287+    def tearDown(self):
288+        connection._leave_transaction_management(True)
289+        settings.DEBUG = self._old_debug
290+        try:
291+            self.end_blocking_transaction()
292+        except (DatabaseError, AttributeError):
293+            pass
294+
295+    def start_blocking_transaction(self):
296+        self.new_connection = self.new_connections[DEFAULT_DB_ALIAS]
297+        self.new_connection._enter_transaction_management(True)
298+        self.cursor = self.new_connection.cursor()
299+        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
300+            'db_table': Person._meta.db_table,
301+            'for_update': self.new_connection.ops.for_update_sql(),
302+            }
303+        self.cursor.execute(sql, ())
304+        result = self.cursor.fetchone()
305+
306+    def end_blocking_transaction(self):
307+        self.new_connection._rollback()
308+        self.new_connection.close()
309+        self.new_connection._leave_transaction_management(True)
310+
311+    def has_for_update_sql(self, tested_connection, nowait=False):
312+        for_update_sql = tested_connection.ops.for_update_sql(nowait)
313+        sql = tested_connection.queries[-1]['sql']
314+        return bool(sql.find(for_update_sql) > -1)
315+
316+    def check_exc(self, exc):
317+        self.failUnless(isinstance(exc, DatabaseError))
318+
319+    @skipUnlessDBFeature('has_select_for_update')
320+    def test_for_update_sql_generated(self):
321+        """
322+        Test that the backend's FOR UPDATE variant appears in
323+        generated SQL when select_for_update is invoked.
324+        """
325+        list(Person.objects.all().select_for_update())
326+        self.assertTrue(self.has_for_update_sql(connection))
327+
328+    @skipUnlessDBFeature('has_select_for_update_nowait')
329+    def test_for_update_sql_generated_nowait(self):
330+        """
331+        Test that the backend's FOR UPDATE NOWAIT variant appears in
332+        generated SQL when select_for_update is invoked.
333+        """
334+        list(Person.objects.all().select_for_update(nowait=True))
335+        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
336+
337+    @skipUnlessDBFeature('has_select_for_update_nowait')
338+    def test_nowait_raises_error_on_block(self):
339+        """
340+        If nowait is specified, we expect an error to be raised rather
341+        than blocking.
342+        """
343+        self.start_blocking_transaction()
344+        status = []
345+        thread = threading.Thread(
346+            target=self.run_select_for_update,
347+            args=(status,),
348+            kwargs={'nowait': True},
349+        )
350+
351+        thread.start()
352+        time.sleep(1)
353+        thread.join()
354+        self.end_blocking_transaction()
355+        self.check_exc(status[-1])
356+
357+    def run_select_for_update(self, status, nowait=False):
358+        status.append('started')
359+        try:
360+            connection._rollback()
361+            people = list(Person.objects.all().select_for_update(nowait=nowait))
362+            people[0].name = 'Fred'
363+            people[0].save()
364+            connection._commit()
365+        except DatabaseError, e:
366+            status.append(e)
367+        except Exception, e:
368+            raise
369+
370+    @skipUnlessDBFeature('has_select_for_update')
371+    def test_block(self):
372+        """
373+        Check that a thread running a select_for_update that
374+        accesses rows being touched by a similar operation
375+        on another connection blocks correctly.
376+        """
377+        # First, let's start the transaction in our thread.
378+        self.start_blocking_transaction()
379+
380+        # Now, try it again using the ORM's select_for_update
381+        # facility. Do this in a separate thread.
382+        status = []
383+        thread = threading.Thread(target=self.run_select_for_update, args=(status,))
384+
385+        # The thread should immediately block, but we'll sleep
386+        # for a bit to make sure
387+        thread.start()
388+        sanity_count = 0
389+        while len(status) != 1 and sanity_count < 10:
390+            sanity_count += 1
391+            time.sleep(1)
392+        if sanity_count >= 10:
393+            raise ValueError, 'Thread did not run and block'
394+
395+        # Check the person hasn't been updated. Since this isn't
396+        # using FOR UPDATE, it won't block.
397+        p = Person.objects.get(pk=self.person.pk)
398+        self.assertEqual('Reinhardt', p.name)
399+
400+        # When we end our blocking transaction, our thread should
401+        # be able to continue.
402+        self.end_blocking_transaction()
403+        thread.join(5.0)
404+
405+        # Check the thread has finished. Assuming it has, we should
406+        # find that it has updated the person's name.
407+        self.failIf(thread.is_alive())
408+        p = Person.objects.get(pk=self.person.pk)
409+        self.assertEqual('Fred', p.name)
410+
411+    @skipUnlessDBFeature('has_select_for_update')
412+    def test_raw_lock_not_available(self):
413+        """
414+        Check that running a raw query which can't obtain a FOR UPDATE lock
415+        raises the correct exception
416+        """
417+        self.start_blocking_transaction()
418+        def raw(status):
419+            try:
420+                list(
421+                    Person.objects.raw(
422+                        'SELECT * FROM %s %s' % (
423+                            Person._meta.db_table,
424+                            connection.ops.for_update_sql(nowait=True)
425+                        )
426+                    )
427+                )
428+            except DatabaseError, e:
429+                status.append(e)
430+        status = []
431+        thread = threading.Thread(target=raw, kwargs={'status': status})
432+        thread.start()
433+        time.sleep(1)
434+        thread.join()
435+        self.end_blocking_transaction()
436+        self.check_exc(status[-1])
437Index: tests/modeltests/select_for_update/models.py
438===================================================================
439--- tests/modeltests/select_for_update/models.py        (revision 0)
440+++ tests/modeltests/select_for_update/models.py        (revision 0)
441@@ -0,0 +1,4 @@
442+from django.db import models
443+
444+class Person(models.Model):
445+    name = models.CharField(max_length=30)
446\ No newline at end of file
447Index: AUTHORS
448===================================================================
449--- AUTHORS     (revision 14778)
450+++ AUTHORS     (working copy)
451@@ -522,6 +522,7 @@
452     Gasper Zejn <zejn@kiberpipa.org>
453     Jarek Zgoda <jarek.zgoda@gmail.com>
454     Cheng Zhang
455+    Dan Fairs <dan@fezconsulting.com>
456 
457 A big THANK YOU goes to:
458 
459Index: docs/ref/models/querysets.txt
460===================================================================
461--- docs/ref/models/querysets.txt       (revision 14778)
462+++ docs/ref/models/querysets.txt       (working copy)
463@@ -975,6 +975,53 @@
464     # queries the database with the 'backup' alias
465     >>> Entry.objects.using('backup')
466 
467+``select_for_update(nowait=False)``
468+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
469+       
470+Returns a queryset that will lock rows until the end of the transaction, 
471+generating a SELECT ... FOR UPDATE statement on supported databases.
472+       
473+For example::
474+       
475+    entries = Entry.objects.select_for_update().filter(author=request.user)
476+
477+All matched entries will be locked until the end of the transaction block, 
478+meaning that other transactions will be prevented from changing or acquiring 
479+locks on them.
480+
481+Usually, if another transaction has already acquired a lock on one of the 
482+selected rows, the query will block until the lock is released. If this is 
483+not the behaviour you want, call ``select_for_update(nowait=True)``. This will 
484+make the call non-blocking. If a conflicting lock is already acquired by 
485+another transaction, ``django.db.models.LockNotAvailable`` will be raised when 
486+the queryset is evaluated.
487+
488+Using blocking locks on a database can lead to deadlocks. This occurs when two 
489+concurrent transactions are both waiting on a lock the other transaction 
490+already holds. To deal with deadlocks, wrap your views that use 
491+``select_for_update(nowait=False)`` with the 
492+``django.views.decorators.deadlock.handle_deadlocks`` decorator. 
493+
494+For example::
495+
496+    from django.db import transaction
497+    from django.views.decorators.deadlock import handle_deadlocks
498+
499+    @handle_deadlocks(max_retries=2)
500+    @transaction.commit_on_success
501+    def my_view(request):
502+        ...
503+
504+If the database engine detects a deadlock involving ``my_view`` and decides 
505+to abort its transaction, it will be automatically retried. If deadlocks keep 
506+occurring after two repeated attempts, 
507+``django.views.decorators.DeadlockError`` will be raised, which can be 
508+propagated to the user or handled in a middleware.
509+
510+Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
511+database backends support ``select_for_update()`` but MySQL has no
512+support for the ``nowait`` argument. Other backends will simply
513+generate queries as if ``select_for_update()`` had not been used.
514 
515 Methods that do not return QuerySets
516 ------------------------------------
517Index: docs/ref/databases.txt
518===================================================================
519--- docs/ref/databases.txt      (revision 14778)
520+++ docs/ref/databases.txt      (working copy)
521@@ -362,6 +362,15 @@
522 column types have a maximum length restriction of 255 characters, regardless
523 of whether ``unique=True`` is specified or not.
524 
525+Row locking with ``QuerySet.select_for_update()``
526+-------------------------------------------------
527+
528+MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 
529+statement. However, you may call the ``select_for_update()`` method of a 
530+queryset with ``nowait=True``. In that case, the argument will be silently 
531+discarded and the generated query will block until the requested lock can be 
532+acquired.
533+
534 .. _sqlite-notes:
535 
536 SQLite notes