Code

Ticket #2705: for_update_r15007.diff

File for_update_r15007.diff, 19.2 KB (added by danfairs, 4 years ago)

Updated patch, with incomplete deadlock handling removed.

Line 
1Index: django/db/models/sql/compiler.py
2===================================================================
3--- django/db/models/sql/compiler.py    (revision 15007)
4+++ django/db/models/sql/compiler.py    (working copy)
5@@ -1,5 +1,6 @@
6 from django.core.exceptions import FieldError
7 from django.db import connections
8+from django.db import transaction
9 from django.db.backends.util import truncate_name
10 from django.db.models.sql.constants import *
11 from django.db.models.sql.datastructures import EmptyResultSet
12@@ -117,6 +118,10 @@
13                         result.append('LIMIT %d' % val)
14                 result.append('OFFSET %d' % self.query.low_mark)
15 
16+        if self.query.select_for_update and self.connection.features.has_select_for_update:
17+            nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update
18+            result.append(self.connection.ops.for_update_sql(nowait=nowait))
19+
20         return ' '.join(result), tuple(params)
21 
22     def as_nested_sql(self):
23@@ -677,6 +682,11 @@
24         resolve_columns = hasattr(self, 'resolve_columns')
25         fields = None
26         has_aggregate_select = bool(self.query.aggregate_select)
27+        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
28+        # a subsequent commit/rollback is exectuted, so any database locks
29+        # are released.
30+        if self.query.select_for_update and transaction.is_managed(self.using):
31+            transaction.set_dirty(self.using)
32         for rows in self.execute_sql(MULTI):
33             for row in rows:
34                 if resolve_columns:
35Index: django/db/models/sql/query.py
36===================================================================
37--- django/db/models/sql/query.py       (revision 15007)
38+++ django/db/models/sql/query.py       (working copy)
39@@ -131,6 +131,8 @@
40         self.order_by = []
41         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
42         self.distinct = False
43+        self.select_for_update = False
44+        self.select_for_update_nowait = False
45         self.select_related = False
46         self.related_select_cols = []
47 
48@@ -260,6 +262,8 @@
49         obj.order_by = self.order_by[:]
50         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
51         obj.distinct = self.distinct
52+        obj.select_for_update = self.select_for_update
53+        obj.select_for_update_nowait = self.select_for_update_nowait
54         obj.select_related = self.select_related
55         obj.related_select_cols = []
56         obj.aggregates = deepcopy(self.aggregates, memo=memo)
57@@ -366,6 +370,7 @@
58 
59         query.clear_ordering(True)
60         query.clear_limits()
61+        query.select_for_update = False
62         query.select_related = False
63         query.related_select_cols = []
64         query.related_select_fields = []
65Index: django/db/models/manager.py
66===================================================================
67--- django/db/models/manager.py (revision 15007)
68+++ django/db/models/manager.py (working copy)
69@@ -164,6 +164,9 @@
70     def order_by(self, *args, **kwargs):
71         return self.get_query_set().order_by(*args, **kwargs)
72 
73+    def select_for_update(self, *args, **kwargs):
74+        return self.get_query_set().select_for_update(*args, **kwargs)
75+
76     def select_related(self, *args, **kwargs):
77         return self.get_query_set().select_related(*args, **kwargs)
78 
79Index: django/db/models/query.py
80===================================================================
81--- django/db/models/query.py   (revision 15007)
82+++ django/db/models/query.py   (working copy)
83@@ -432,6 +432,7 @@
84         del_query._for_write = True
85 
86         # Disable non-supported fields.
87+        del_query.query.select_for_update = False
88         del_query.query.select_related = False
89         del_query.query.clear_ordering()
90 
91@@ -580,6 +581,18 @@
92         else:
93             return self._filter_or_exclude(None, **filter_obj)
94 
95+    def select_for_update(self, **kwargs):
96+        """
97+        Returns a new QuerySet instance that will select objects with a
98+        FOR UPDATE lock.
99+        """
100+        # Default to false for nowait
101+        nowait = kwargs.pop('nowait', False)
102+        obj = self._clone()
103+        obj.query.select_for_update = True
104+        obj.query.select_for_update_nowait = nowait
105+        return obj
106+
107     def select_related(self, *fields, **kwargs):
108         """
109         Returns a new QuerySet instance that will select related objects.
110Index: django/db/backends/mysql/base.py
111===================================================================
112--- django/db/backends/mysql/base.py    (revision 15007)
113+++ django/db/backends/mysql/base.py    (working copy)
114@@ -124,6 +124,8 @@
115     allows_group_by_pk = True
116     related_fields_match_type = True
117     allow_sliced_subqueries = False
118+    has_select_for_update = True
119+    has_select_for_update_nowait = False
120     supports_forward_references = False
121     supports_long_model_names = False
122     supports_microsecond_precision = False
123Index: django/db/backends/oracle/base.py
124===================================================================
125--- django/db/backends/oracle/base.py   (revision 15007)
126+++ django/db/backends/oracle/base.py   (working copy)
127@@ -70,6 +70,8 @@
128     needs_datetime_string_cast = False
129     interprets_empty_strings_as_nulls = True
130     uses_savepoints = True
131+    has_select_for_update = True
132+    has_select_for_update_nowait = True
133     can_return_id_from_insert = True
134     allow_sliced_subqueries = False
135     supports_subqueries_in_group_by = False
136Index: django/db/backends/__init__.py
137===================================================================
138--- django/db/backends/__init__.py      (revision 15007)
139+++ django/db/backends/__init__.py      (working copy)
140@@ -103,6 +103,8 @@
141     # integer primary keys.
142     related_fields_match_type = False
143     allow_sliced_subqueries = True
144+    has_select_for_update = False
145+    has_select_for_update_nowait = False
146 
147     # Does the default test database allow multiple connections?
148     # Usually an indication that the test database is in-memory
149@@ -282,6 +284,15 @@
150         """
151         return []
152 
153+    def for_update_sql(self, nowait=False):
154+        """
155+        Return FOR UPDATE SQL clause to lock row for update
156+        """
157+        if nowait:
158+            return 'FOR UPDATE NOWAIT'
159+        else:
160+            return 'FOR UPDATE'
161+
162     def fulltext_search_sql(self, field_name):
163         """
164         Returns the SQL WHERE clause to use in order to perform a full-text
165Index: django/db/backends/postgresql_psycopg2/base.py
166===================================================================
167--- django/db/backends/postgresql_psycopg2/base.py      (revision 15007)
168+++ django/db/backends/postgresql_psycopg2/base.py      (working copy)
169@@ -70,6 +70,9 @@
170     requires_rollback_on_dirty_transaction = True
171     has_real_datatype = True
172     can_defer_constraint_checks = True
173+    has_select_for_update = True
174+    has_select_for_update_nowait = True
175+   
176 
177 class DatabaseOperations(PostgresqlDatabaseOperations):
178     def last_executed_query(self, cursor, sql, params):
179Index: tests/modeltests/select_for_update/__init__.py
180===================================================================
181Index: tests/modeltests/select_for_update/tests.py
182===================================================================
183--- tests/modeltests/select_for_update/tests.py (revision 0)
184+++ tests/modeltests/select_for_update/tests.py (revision 0)
185@@ -0,0 +1,218 @@
186+import functools
187+import time
188+from django.conf import settings
189+from django.db import connection
190+from django.db import transaction, connection
191+from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
192+from django.test import TransactionTestCase, skipUnlessDBFeature
193+from django.utils import unittest as ut2
194+
195+from models import Person
196+
197+try:
198+    import threading
199+    def requires_threading(func):
200+        return func
201+except ImportError:
202+    # Note we can't use dummy_threading here, as our tests will actually
203+    # block. We just have to skip the test completely.
204+    def requires_threading(func):
205+        @functools.wraps(func)
206+        def wrapped(*args, **kw):
207+            raise ut2.SkipTest('threading required')
208+
209+class SelectForUpdateTests(TransactionTestCase):
210+
211+    def setUp(self):
212+        connection._rollback()
213+        connection._enter_transaction_management(True)
214+        self.new_connections = ConnectionHandler(settings.DATABASES)
215+        self.person = Person.objects.create(name='Reinhardt')
216+
217+        # We need to set settings.DEBUG to True so we can capture
218+        # the output SQL to examine.
219+        self._old_debug = settings.DEBUG
220+        settings.DEBUG = True
221+
222+    def tearDown(self):
223+        connection._leave_transaction_management(True)
224+        settings.DEBUG = self._old_debug
225+        try:
226+            self.end_blocking_transaction()
227+        except (DatabaseError, AttributeError):
228+            pass
229+
230+    def start_blocking_transaction(self):
231+        # Start a blocking transaction. At some point,
232+        # end_blocking_transaction() should be called.
233+        self.new_connection = self.new_connections[DEFAULT_DB_ALIAS]
234+        self.new_connection._enter_transaction_management(True)
235+        self.cursor = self.new_connection.cursor()
236+        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
237+            'db_table': Person._meta.db_table,
238+            'for_update': self.new_connection.ops.for_update_sql(),
239+            }
240+        self.cursor.execute(sql, ())
241+        result = self.cursor.fetchone()
242+
243+    def end_blocking_transaction(self):
244+        # Roll back the blocking transaction.
245+        self.new_connection._rollback()
246+        self.new_connection.close()
247+        self.new_connection._leave_transaction_management(True)
248+
249+    def has_for_update_sql(self, tested_connection, nowait=False):
250+        # Examine the SQL that was executed to determine whether it
251+        # contains the 'SELECT..FOR UPDATE' stanza.
252+        for_update_sql = tested_connection.ops.for_update_sql(nowait)
253+        sql = tested_connection.queries[-1]['sql']
254+        return bool(sql.find(for_update_sql) > -1)
255+
256+    def check_exc(self, exc):
257+        self.failUnless(isinstance(exc, DatabaseError))
258+
259+    @skipUnlessDBFeature('has_select_for_update')
260+    def test_for_update_sql_generated(self):
261+        """
262+        Test that the backend's FOR UPDATE variant appears in
263+        generated SQL when select_for_update is invoked.
264+        """
265+        list(Person.objects.all().select_for_update())
266+        self.assertTrue(self.has_for_update_sql(connection))
267+
268+    @skipUnlessDBFeature('has_select_for_update_nowait')
269+    def test_for_update_sql_generated_nowait(self):
270+        """
271+        Test that the backend's FOR UPDATE NOWAIT variant appears in
272+        generated SQL when select_for_update is invoked.
273+        """
274+        list(Person.objects.all().select_for_update(nowait=True))
275+        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
276+
277+    @requires_threading
278+    @skipUnlessDBFeature('has_select_for_update_nowait')
279+    def test_nowait_raises_error_on_block(self):
280+        """
281+        If nowait is specified, we expect an error to be raised rather
282+        than blocking.
283+        """
284+        self.start_blocking_transaction()
285+        status = []
286+        thread = threading.Thread(
287+            target=self.run_select_for_update,
288+            args=(status,),
289+            kwargs={'nowait': True},
290+        )
291+
292+        thread.start()
293+        time.sleep(1)
294+        thread.join()
295+        self.end_blocking_transaction()
296+        self.check_exc(status[-1])
297+
298+    def run_select_for_update(self, status, nowait=False):
299+        status.append('started')
300+        try:
301+            connection._rollback()
302+            people = list(Person.objects.all().select_for_update(nowait=nowait))
303+            people[0].name = 'Fred'
304+            people[0].save()
305+            connection._commit()
306+        except DatabaseError, e:
307+            status.append(e)
308+        except Exception, e:
309+            raise
310+
311+    @requires_threading
312+    @skipUnlessDBFeature('has_select_for_update')
313+    def test_block(self):
314+        """
315+        Check that a thread running a select_for_update that
316+        accesses rows being touched by a similar operation
317+        on another connection blocks correctly.
318+        """
319+        # First, let's start the transaction in our thread.
320+        self.start_blocking_transaction()
321+
322+        # Now, try it again using the ORM's select_for_update
323+        # facility. Do this in a separate thread.
324+        status = []
325+        thread = threading.Thread(target=self.run_select_for_update, args=(status,))
326+
327+        # The thread should immediately block, but we'll sleep
328+        # for a bit to make sure
329+        thread.start()
330+        sanity_count = 0
331+        while len(status) != 1 and sanity_count < 10:
332+            sanity_count += 1
333+            time.sleep(1)
334+        if sanity_count >= 10:
335+            raise ValueError, 'Thread did not run and block'
336+
337+        # Check the person hasn't been updated. Since this isn't
338+        # using FOR UPDATE, it won't block.
339+        p = Person.objects.get(pk=self.person.pk)
340+        self.assertEqual('Reinhardt', p.name)
341+
342+        # When we end our blocking transaction, our thread should
343+        # be able to continue.
344+        self.end_blocking_transaction()
345+        thread.join(5.0)
346+
347+        # Check the thread has finished. Assuming it has, we should
348+        # find that it has updated the person's name.
349+        self.failIf(thread.is_alive())
350+        p = Person.objects.get(pk=self.person.pk)
351+        self.assertEqual('Fred', p.name)
352+
353+    @requires_threading
354+    @skipUnlessDBFeature('has_select_for_update')
355+    def test_raw_lock_not_available(self):
356+        """
357+        Check that running a raw query which can't obtain a FOR UPDATE lock
358+        raises the correct exception
359+        """
360+        self.start_blocking_transaction()
361+        def raw(status):
362+            try:
363+                list(
364+                    Person.objects.raw(
365+                        'SELECT * FROM %s %s' % (
366+                            Person._meta.db_table,
367+                            connection.ops.for_update_sql(nowait=True)
368+                        )
369+                    )
370+                )
371+            except DatabaseError, e:
372+                status.append(e)
373+        status = []
374+        thread = threading.Thread(target=raw, kwargs={'status': status})
375+        thread.start()
376+        time.sleep(1)
377+        thread.join()
378+        self.end_blocking_transaction()
379+        self.check_exc(status[-1])
380+
381+    @skipUnlessDBFeature('has_select_for_update')
382+    def test_transaction_dirty_managed(self):
383+        """ Check that a select_for_update sets the transaction to be
384+        dirty when executed under txn management. Setting the txn dirty
385+        means that it will be either committed or rolled back by Django,
386+        which will release any locks held by the SELECT FOR UPDATE.
387+        """
388+        transaction.enter_transaction_management(True)
389+        transaction.managed(True)
390+        try:
391+            people = list(Person.objects.select_for_update())
392+            self.assertTrue(transaction.is_dirty())
393+        finally:
394+            transaction.rollback()
395+            transaction.leave_transaction_management()
396+
397+    @skipUnlessDBFeature('has_select_for_update')
398+    def test_transaction_not_dirty_unmanaged(self):
399+        """ If we're not under txn management, the txn will never be
400+        marked as dirty.
401+        """
402+        people = list(Person.objects.select_for_update())
403+        self.assertFalse(transaction.is_dirty())
404Index: tests/modeltests/select_for_update/models.py
405===================================================================
406--- tests/modeltests/select_for_update/models.py        (revision 0)
407+++ tests/modeltests/select_for_update/models.py        (revision 0)
408@@ -0,0 +1,4 @@
409+from django.db import models
410+
411+class Person(models.Model):
412+    name = models.CharField(max_length=30)
413\ No newline at end of file
414Index: AUTHORS
415===================================================================
416--- AUTHORS     (revision 15007)
417+++ AUTHORS     (working copy)
418@@ -524,6 +524,7 @@
419     Gasper Zejn <zejn@kiberpipa.org>
420     Jarek Zgoda <jarek.zgoda@gmail.com>
421     Cheng Zhang
422+    Dan Fairs <dan@fezconsulting.com>
423 
424 A big THANK YOU goes to:
425 
426Index: docs/ref/models/querysets.txt
427===================================================================
428--- docs/ref/models/querysets.txt       (revision 15007)
429+++ docs/ref/models/querysets.txt       (working copy)
430@@ -975,7 +975,40 @@
431     # queries the database with the 'backup' alias
432     >>> Entry.objects.using('backup')
433 
434+select_for_update
435+~~~~~~~~~~~~~~~~~
436 
437+.. method:: select_for_update(nowait=False)
438+
439+.. versionadded:: 1.3
440+
441+Returns a queryset that will lock rows until the end of the transaction,
442+generating a SELECT ... FOR UPDATE statement on supported databases.
443+
444+For example::
445+
446+    entries = Entry.objects.select_for_update().filter(author=request.user)
447+
448+All matched entries will be locked until the end of the transaction block,
449+meaning that other transactions will be prevented from changing or acquiring
450+locks on them.
451+
452+Usually, if another transaction has already acquired a lock on one of the
453+selected rows, the query will block until the lock is released. If this is
454+not the behaviour you want, call ``select_for_update(nowait=True)``. This will
455+make the call non-blocking. If a conflicting lock is already acquired by
456+another transaction, ``django.db.utils.DatabaseError`` will be raised when
457+the queryset is evaluated.
458+
459+Note that using ``select_related`` will cause the current transaction to be
460+set dirty, if under transaction management. This is to ensure that Django
461+issues a COMMIT or ROLLBACK, releasing any locks held by the SELECT FOR UPDATE.
462+
463+Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
464+database backends support ``select_for_update()`` but MySQL has no
465+support for the ``nowait`` argument. Other backends will simply
466+generate queries as if ``select_for_update()`` had not been used.
467+
468 Methods that do not return QuerySets
469 ------------------------------------
470 
471@@ -1253,7 +1286,7 @@
472 the only restriction on the :class:`QuerySet` that is updated is that it can
473 only update columns in the model's main table. Filtering based on related
474 fields is still possible. You cannot call ``update()`` on a
475-:class:`QuerySet` that has had a slice taken or can otherwise no longer be
476+:class:`QuerySet` that has had a slice taken or can otherwise no longer be
477 filtered.
478 
479 For example, if you wanted to update all the entries in a particular blog
480Index: docs/ref/databases.txt
481===================================================================
482--- docs/ref/databases.txt      (revision 15007)
483+++ docs/ref/databases.txt      (working copy)
484@@ -364,6 +364,15 @@
485 column types have a maximum length restriction of 255 characters, regardless
486 of whether ``unique=True`` is specified or not.
487 
488+Row locking with ``QuerySet.select_for_update()``
489+-------------------------------------------------
490+
491+MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 
492+statement. However, you may call the ``select_for_update()`` method of a 
493+queryset with ``nowait=True``. In that case, the argument will be silently 
494+discarded and the generated query will block until the requested lock can be 
495+acquired.
496+
497 .. _sqlite-notes:
498 
499 SQLite notes