Code

Ticket #2705: 2705-for_update-r15174.diff

File 2705-for_update-r15174.diff, 21.2 KB (added by danfairs, 4 years ago)

Updated patch to resolve incorrect transaction handling in the tests, and modify NOWAIT behaviour on backends that do not support it.

Line 
1Index: django/db/models/sql/compiler.py
2===================================================================
3--- django/db/models/sql/compiler.py    (revision 15174)
4+++ django/db/models/sql/compiler.py    (working copy)
5@@ -1,11 +1,13 @@
6 from django.core.exceptions import FieldError
7 from django.db import connections
8+from django.db import transaction
9 from django.db.backends.util import truncate_name
10 from django.db.models.sql.constants import *
11 from django.db.models.sql.datastructures import EmptyResultSet
12 from django.db.models.sql.expressions import SQLEvaluator
13 from django.db.models.sql.query import get_proxied_model, get_order_dir, \
14      select_related_descend, Query
15+from django.db.utils import DatabaseError
16 
17 class SQLCompiler(object):
18     def __init__(self, query, connection, using):
19@@ -117,6 +119,14 @@
20                         result.append('LIMIT %d' % val)
21                 result.append('OFFSET %d' % self.query.low_mark)
22 
23+        if self.query.select_for_update and self.connection.features.has_select_for_update:
24+            # If we've been asked for a NOWAIT query but the backend does not support it,
25+            # raise a DatabaseError otherwise we could get an unexpected deadlock.
26+            nowait = self.query.select_for_update_nowait
27+            if nowait and not self.connection.features.has_select_for_update_nowait:
28+                raise DatabaseError('NOWAIT is not supported on this database backend.')
29+            result.append(self.connection.ops.for_update_sql(nowait=nowait))
30+
31         return ' '.join(result), tuple(params)
32 
33     def as_nested_sql(self):
34@@ -677,6 +687,11 @@
35         resolve_columns = hasattr(self, 'resolve_columns')
36         fields = None
37         has_aggregate_select = bool(self.query.aggregate_select)
38+        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
39+        # a subsequent commit/rollback is executed, so any database locks
40+        # are released.
41+        if self.query.select_for_update and transaction.is_managed(self.using):
42+            transaction.set_dirty(self.using)
43         for rows in self.execute_sql(MULTI):
44             for row in rows:
45                 if resolve_columns:
46Index: django/db/models/sql/query.py
47===================================================================
48--- django/db/models/sql/query.py       (revision 15174)
49+++ django/db/models/sql/query.py       (working copy)
50@@ -131,6 +131,8 @@
51         self.order_by = []
52         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
53         self.distinct = False
54+        self.select_for_update = False
55+        self.select_for_update_nowait = False
56         self.select_related = False
57         self.related_select_cols = []
58 
59@@ -260,6 +262,8 @@
60         obj.order_by = self.order_by[:]
61         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
62         obj.distinct = self.distinct
63+        obj.select_for_update = self.select_for_update
64+        obj.select_for_update_nowait = self.select_for_update_nowait
65         obj.select_related = self.select_related
66         obj.related_select_cols = []
67         obj.aggregates = deepcopy(self.aggregates, memo=memo)
68@@ -366,6 +370,7 @@
69 
70         query.clear_ordering(True)
71         query.clear_limits()
72+        query.select_for_update = False
73         query.select_related = False
74         query.related_select_cols = []
75         query.related_select_fields = []
76Index: django/db/models/manager.py
77===================================================================
78--- django/db/models/manager.py (revision 15174)
79+++ django/db/models/manager.py (working copy)
80@@ -164,6 +164,9 @@
81     def order_by(self, *args, **kwargs):
82         return self.get_query_set().order_by(*args, **kwargs)
83 
84+    def select_for_update(self, *args, **kwargs):
85+        return self.get_query_set().select_for_update(*args, **kwargs)
86+
87     def select_related(self, *args, **kwargs):
88         return self.get_query_set().select_related(*args, **kwargs)
89 
90Index: django/db/models/query.py
91===================================================================
92--- django/db/models/query.py   (revision 15174)
93+++ django/db/models/query.py   (working copy)
94@@ -436,6 +436,7 @@
95         del_query._for_write = True
96 
97         # Disable non-supported fields.
98+        del_query.query.select_for_update = False
99         del_query.query.select_related = False
100         del_query.query.clear_ordering()
101 
102@@ -584,6 +585,18 @@
103         else:
104             return self._filter_or_exclude(None, **filter_obj)
105 
106+    def select_for_update(self, **kwargs):
107+        """
108+        Returns a new QuerySet instance that will select objects with a
109+        FOR UPDATE lock.
110+        """
111+        # Default to false for nowait
112+        nowait = kwargs.pop('nowait', False)
113+        obj = self._clone()
114+        obj.query.select_for_update = True
115+        obj.query.select_for_update_nowait = nowait
116+        return obj
117+
118     def select_related(self, *fields, **kwargs):
119         """
120         Returns a new QuerySet instance that will select related objects.
121Index: django/db/backends/mysql/base.py
122===================================================================
123--- django/db/backends/mysql/base.py    (revision 15174)
124+++ django/db/backends/mysql/base.py    (working copy)
125@@ -124,6 +124,8 @@
126     allows_group_by_pk = True
127     related_fields_match_type = True
128     allow_sliced_subqueries = False
129+    has_select_for_update = True
130+    has_select_for_update_nowait = False
131     supports_forward_references = False
132     supports_long_model_names = False
133     supports_microsecond_precision = False
134Index: django/db/backends/oracle/base.py
135===================================================================
136--- django/db/backends/oracle/base.py   (revision 15174)
137+++ django/db/backends/oracle/base.py   (working copy)
138@@ -70,6 +70,8 @@
139     needs_datetime_string_cast = False
140     interprets_empty_strings_as_nulls = True
141     uses_savepoints = True
142+    has_select_for_update = True
143+    has_select_for_update_nowait = True
144     can_return_id_from_insert = True
145     allow_sliced_subqueries = False
146     supports_subqueries_in_group_by = False
147Index: django/db/backends/__init__.py
148===================================================================
149--- django/db/backends/__init__.py      (revision 15174)
150+++ django/db/backends/__init__.py      (working copy)
151@@ -103,6 +103,8 @@
152     # integer primary keys.
153     related_fields_match_type = False
154     allow_sliced_subqueries = True
155+    has_select_for_update = False
156+    has_select_for_update_nowait = False
157 
158     # Does the default test database allow multiple connections?
159     # Usually an indication that the test database is in-memory
160@@ -291,6 +293,15 @@
161         """
162         return []
163 
164+    def for_update_sql(self, nowait=False):
165+        """
166+        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
167+        """
168+        if nowait:
169+            return 'FOR UPDATE NOWAIT'
170+        else:
171+            return 'FOR UPDATE'
172+
173     def fulltext_search_sql(self, field_name):
174         """
175         Returns the SQL WHERE clause to use in order to perform a full-text
176Index: django/db/backends/postgresql_psycopg2/base.py
177===================================================================
178--- django/db/backends/postgresql_psycopg2/base.py      (revision 15174)
179+++ django/db/backends/postgresql_psycopg2/base.py      (working copy)
180@@ -70,6 +70,9 @@
181     requires_rollback_on_dirty_transaction = True
182     has_real_datatype = True
183     can_defer_constraint_checks = True
184+    has_select_for_update = True
185+    has_select_for_update_nowait = True
186+   
187 
188 class DatabaseOperations(PostgresqlDatabaseOperations):
189     def last_executed_query(self, cursor, sql, params):
190Index: tests/modeltests/select_for_update/__init__.py
191===================================================================
192--- tests/modeltests/select_for_update/__init__.py      (revision 0)
193+++ tests/modeltests/select_for_update/__init__.py      (revision 0)
194@@ -0,0 +1 @@
195+#
196Index: tests/modeltests/select_for_update/tests.py
197===================================================================
198--- tests/modeltests/select_for_update/tests.py (revision 0)
199+++ tests/modeltests/select_for_update/tests.py (revision 0)
200@@ -0,0 +1,254 @@
201+import time
202+from django.conf import settings
203+from django.db import transaction, connection
204+from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
205+from django.test import (TransactionTestCase, skipIfDBFeature,
206+    skipUnlessDBFeature)
207+from django.utils.functional import wraps
208+from django.utils import unittest
209+
210+from models import Person
211+
212+try:
213+    import threading
214+    def requires_threading(func):
215+        return func
216+except ImportError:
217+    # Note we can't use dummy_threading here, as our tests will actually
218+    # block. We just have to skip the test completely.
219+    def requires_threading(func):
220+        @wraps(func)
221+        def wrapped(*args, **kw):
222+            raise unittest.SkipTest('threading required')
223+
224+class SelectForUpdateTests(TransactionTestCase):
225+
226+    def setUp(self):
227+        transaction.enter_transaction_management(True)
228+        transaction.managed(True)
229+        self.person = Person.objects.create(name='Reinhardt')
230+
231+        # We have to commit here so that code in run_select_for_update can
232+        # see this data.
233+        transaction.commit()
234+
235+        # We need another database connection to test that one connection
236+        # issuing a SELECT ... FOR UPDATE will block.
237+        new_connections = ConnectionHandler(settings.DATABASES)
238+        self.new_connection = new_connections[DEFAULT_DB_ALIAS]
239+
240+        # We need to set settings.DEBUG to True so we can capture
241+        # the output SQL to examine.
242+        self._old_debug = settings.DEBUG
243+        settings.DEBUG = True
244+
245+    def tearDown(self):
246+        try:
247+            # We don't really care if this fails - some of the tests will set
248+            # this in the course of their run.
249+            transaction.managed(False)
250+            transaction.leave_transaction_management()
251+        except transaction.TransactionManagementError:
252+            pass
253+        self.new_connection.close()
254+        settings.DEBUG = self._old_debug
255+        try:
256+            self.end_blocking_transaction()
257+        except (DatabaseError, AttributeError):
258+            pass
259+
260+    def start_blocking_transaction(self):
261+        # Start a blocking transaction. At some point,
262+        # end_blocking_transaction() should be called.
263+        self.cursor = self.new_connection.cursor()
264+        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
265+            'db_table': Person._meta.db_table,
266+            'for_update': self.new_connection.ops.for_update_sql(),
267+            }
268+        self.cursor.execute(sql, ())
269+        result = self.cursor.fetchone()
270+
271+    def end_blocking_transaction(self):
272+        # Roll back the blocking transaction.
273+        self.new_connection._rollback()
274+
275+    def has_for_update_sql(self, tested_connection, nowait=False):
276+        # Examine the SQL that was executed to determine whether it
277+        # contains the 'SELECT..FOR UPDATE' stanza.
278+        for_update_sql = tested_connection.ops.for_update_sql(nowait)
279+        sql = tested_connection.queries[-1]['sql']
280+        return bool(sql.find(for_update_sql) > -1)
281+
282+    def check_exc(self, exc):
283+        self.failUnless(isinstance(exc, DatabaseError))
284+
285+    @skipUnlessDBFeature('has_select_for_update')
286+    def test_for_update_sql_generated(self):
287+        """
288+        Test that the backend's FOR UPDATE variant appears in
289+        generated SQL when select_for_update is invoked.
290+        """
291+        list(Person.objects.all().select_for_update())
292+        self.assertTrue(self.has_for_update_sql(connection))
293+
294+    @skipUnlessDBFeature('has_select_for_update_nowait')
295+    def test_for_update_sql_generated_nowait(self):
296+        """
297+        Test that the backend's FOR UPDATE NOWAIT variant appears in
298+        generated SQL when select_for_update is invoked.
299+        """
300+        list(Person.objects.all().select_for_update(nowait=True))
301+        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
302+
303+    @requires_threading
304+    @skipUnlessDBFeature('has_select_for_update_nowait')
305+    def test_nowait_raises_error_on_block(self):
306+        """
307+        If nowait is specified, we expect an error to be raised rather
308+        than blocking.
309+        """
310+        self.start_blocking_transaction()
311+        status = []
312+        thread = threading.Thread(
313+            target=self.run_select_for_update,
314+            args=(status,),
315+            kwargs={'nowait': True},
316+        )
317+
318+        thread.start()
319+        time.sleep(1)
320+        thread.join()
321+        self.end_blocking_transaction()
322+        self.check_exc(status[-1])
323+
324+    @skipIfDBFeature('has_select_for_update_nowait')
325+    @skipUnlessDBFeature('has_select_for_update')
326+    def test_unsupported_nowait_raises_error(self):
327+        """
328+        If a SELECT...FOR UPDATE NOWAIT is run on a database backend
329+        that supports FOR UPDATE but not NOWAIT, then we should find
330+        that a DatabaseError is raised.
331+        """
332+        self.assertRaises(
333+            DatabaseError,
334+            list,
335+            Person.objects.all().select_for_update(nowait=True)
336+        )
337+
338+    def run_select_for_update(self, status, nowait=False):
339+        """
340+        Utility method that runs a SELECT FOR UPDATE against all
341+        Person instances. After the select_for_update, it attempts
342+        to update the name of the only record, save, and commit.
343+
344+        In general, this will be run in a separate thread.
345+        """
346+        status.append('started')
347+        try:
348+            # We need to enter transaction management again, as this is done on
349+            # per-thread basis
350+            transaction.enter_transaction_management(True)
351+            transaction.managed(True)
352+            people = list(
353+                Person.objects.all().select_for_update(nowait=nowait)
354+            )
355+            people[0].name = 'Fred'
356+            people[0].save()
357+            transaction.commit()
358+        except DatabaseError, e:
359+            status.append(e)
360+        except Exception, e:
361+            raise
362+
363+    @requires_threading
364+    @skipUnlessDBFeature('has_select_for_update')
365+    @skipUnlessDBFeature('supports_transactions')
366+    def test_block(self):
367+        """
368+        Check that a thread running a select_for_update that
369+        accesses rows being touched by a similar operation
370+        on another connection blocks correctly.
371+        """
372+        # First, let's start the transaction in our thread.
373+        self.start_blocking_transaction()
374+
375+        # Now, try it again using the ORM's select_for_update
376+        # facility. Do this in a separate thread.
377+        status = []
378+        thread = threading.Thread(
379+            target=self.run_select_for_update, args=(status,)
380+        )
381+
382+        # The thread should immediately block, but we'll sleep
383+        # for a bit to make sure.
384+        thread.start()
385+        sanity_count = 0
386+        while len(status) != 1 and sanity_count < 10:
387+            sanity_count += 1
388+            time.sleep(1)
389+        if sanity_count >= 10:
390+            raise ValueError, 'Thread did not run and block'
391+
392+        # Check the person hasn't been updated. Since this isn't
393+        # using FOR UPDATE, it won't block.
394+        p = Person.objects.get(pk=self.person.pk)
395+        self.assertEqual('Reinhardt', p.name)
396+
397+        # When we end our blocking transaction, our thread should
398+        # be able to continue.
399+        self.end_blocking_transaction()
400+        thread.join(5.0)
401+
402+        # Check the thread has finished. Assuming it has, we should
403+        # find that it has updated the person's name.
404+        self.failIf(thread.isAlive())
405+        p = Person.objects.get(pk=self.person.pk)
406+        self.assertEqual('Fred', p.name)
407+
408+    @requires_threading
409+    @skipUnlessDBFeature('has_select_for_update')
410+    def test_raw_lock_not_available(self):
411+        """
412+        Check that running a raw query which can't obtain a FOR UPDATE lock
413+        raises the correct exception
414+        """
415+        self.start_blocking_transaction()
416+        def raw(status):
417+            try:
418+                list(
419+                    Person.objects.raw(
420+                        'SELECT * FROM %s %s' % (
421+                            Person._meta.db_table,
422+                            connection.ops.for_update_sql(nowait=True)
423+                        )
424+                    )
425+                )
426+            except DatabaseError, e:
427+                status.append(e)
428+        status = []
429+        thread = threading.Thread(target=raw, kwargs={'status': status})
430+        thread.start()
431+        time.sleep(1)
432+        thread.join()
433+        self.end_blocking_transaction()
434+        self.check_exc(status[-1])
435+
436+    @skipUnlessDBFeature('has_select_for_update')
437+    def test_transaction_dirty_managed(self):
438+        """ Check that a select_for_update sets the transaction to be
439+        dirty when executed under txn management. Setting the txn dirty
440+        means that it will be either committed or rolled back by Django,
441+        which will release any locks held by the SELECT FOR UPDATE.
442+        """
443+        people = list(Person.objects.select_for_update())
444+        self.assertTrue(transaction.is_dirty())
445+
446+    @skipUnlessDBFeature('has_select_for_update')
447+    def test_transaction_not_dirty_unmanaged(self):
448+        """ If we're not under txn management, the txn will never be
449+        marked as dirty.
450+        """
451+        transaction.managed(False)
452+        transaction.leave_transaction_management()
453+        people = list(Person.objects.select_for_update())
454+        self.assertFalse(transaction.is_dirty())
455Index: tests/modeltests/select_for_update/models.py
456===================================================================
457--- tests/modeltests/select_for_update/models.py        (revision 0)
458+++ tests/modeltests/select_for_update/models.py        (revision 0)
459@@ -0,0 +1,4 @@
460+from django.db import models
461+
462+class Person(models.Model):
463+    name = models.CharField(max_length=30)
464Index: AUTHORS
465===================================================================
466--- AUTHORS     (revision 15174)
467+++ AUTHORS     (working copy)
468@@ -165,6 +165,7 @@
469     eriks@win.tue.nl
470     Tomáš Ehrlich <tomas.ehrlich@gmail.com>
471     Dirk Eschler <dirk.eschler@gmx.net>
472+    Dan Fairs <dan@fezconsulting.com>
473     Marc Fargas <telenieko@telenieko.com>
474     Szilveszter Farkas <szilveszter.farkas@gmail.com>
475     Grigory Fateyev <greg@dial.com.ru>
476Index: docs/ref/models/querysets.txt
477===================================================================
478--- docs/ref/models/querysets.txt       (revision 15174)
479+++ docs/ref/models/querysets.txt       (working copy)
480@@ -940,7 +940,48 @@
481     # queries the database with the 'backup' alias
482     >>> Entry.objects.using('backup')
483 
484+select_for_update
485+~~~~~~~~~~~~~~~~~
486 
487+.. method:: select_for_update(nowait=False)
488+
489+.. versionadded:: 1.4
490+
491+Returns a queryset that will lock rows until the end of the transaction,
492+generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
493+
494+For example::
495+
496+    entries = Entry.objects.select_for_update().filter(author=request.user)
497+
498+All matched entries will be locked until the end of the transaction block,
499+meaning that other transactions will be prevented from changing or acquiring
500+locks on them.
501+
502+Usually, if another transaction has already acquired a lock on one of the
503+selected rows, the query will block until the lock is released. If this is
504+not the behaviour you want, call ``select_for_update(nowait=True)``. This will
505+make the call non-blocking. If a conflicting lock is already acquired by
506+another transaction, ``django.db.utils.DatabaseError`` will be raised when
507+the queryset is evaluated.
508+
509+Note that using ``select_related`` will cause the current transaction to be set
510+dirty, if under transaction management. This is to ensure that Django issues a
511+``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
512+UPDATE``.
513+
514+Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
515+database backends support ``select_for_update()``. However, MySQL has no
516+support for the ``nowait`` argument.
517+
518+Passing ``nowait=True`` to ``select_for_update`` using database backends that
519+do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be
520+raised. This is in order to prevent code unexpectedly blocking.
521+
522+Using ``select_for_update`` on backends which do not support
523+``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect.
524+
525+
526 Methods that do not return QuerySets
527 ------------------------------------
528 
529Index: docs/ref/databases.txt
530===================================================================
531--- docs/ref/databases.txt      (revision 15174)
532+++ docs/ref/databases.txt      (working copy)
533@@ -368,6 +368,15 @@
534 :class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
535 respectively, a ``ValueError`` is raised rather than truncating data.
536 
537+Row locking with ``QuerySet.select_for_update()``
538+-------------------------------------------------
539+
540+MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
541+statement. However, you may call the ``select_for_update()`` method of a
542+queryset with ``nowait=True``. In that case, the argument will be silently
543+discarded and the generated query will block until the requested lock can be
544+acquired.
545+
546 .. _sqlite-notes:
547 
548 SQLite notes