Code

Ticket #2705: 2705-for_update-r16053.diff

File 2705-for_update-r16053.diff, 22.1 KB (added by danfairs, 3 years ago)

Same as previous patch, but skip a test on Py2.6.

Line 
1Index: django/AUTHORS
2===================================================================
3--- django/AUTHORS      (revision 16053)
4+++ django/AUTHORS      (working copy)
5@@ -168,6 +168,7 @@
6     eriks@win.tue.nl
7     Tomáš Ehrlich <tomas.ehrlich@gmail.com>
8     Dirk Eschler <dirk.eschler@gmx.net>
9+    Dan Fairs <dan@fezconsulting.com>
10     Marc Fargas <telenieko@telenieko.com>
11     Szilveszter Farkas <szilveszter.farkas@gmail.com>
12     Grigory Fateyev <greg@dial.com.ru>
13Index: django/docs/ref/models/querysets.txt
14===================================================================
15--- django/docs/ref/models/querysets.txt        (revision 16053)
16+++ django/docs/ref/models/querysets.txt        (working copy)
17@@ -966,7 +966,48 @@
18     # queries the database with the 'backup' alias
19     >>> Entry.objects.using('backup')
20 
21+select_for_update
22+~~~~~~~~~~~~~~~~~
23 
24+.. method:: select_for_update(nowait=False)
25+
26+.. versionadded:: 1.4
27+
28+Returns a queryset that will lock rows until the end of the transaction,
29+generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
30+
31+For example::
32+
33+    entries = Entry.objects.select_for_update().filter(author=request.user)
34+
35+All matched entries will be locked until the end of the transaction block,
36+meaning that other transactions will be prevented from changing or acquiring
37+locks on them.
38+
39+Usually, if another transaction has already acquired a lock on one of the
40+selected rows, the query will block until the lock is released. If this is
41+not the behaviour you want, call ``select_for_update(nowait=True)``. This will
42+make the call non-blocking. If a conflicting lock is already acquired by
43+another transaction, ``django.db.utils.DatabaseError`` will be raised when
44+the queryset is evaluated.
45+
46+Note that using ``select_for_update`` will cause the current transaction to be set
47+dirty, if under transaction management. This is to ensure that Django issues a
48+``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
49+UPDATE``.
50+
51+Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
52+database backends support ``select_for_update()``. However, MySQL has no
53+support for the ``nowait`` argument.
54+
55+Passing ``nowait=True`` to ``select_for_update`` using database backends that
56+do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be
57+raised. This is in order to prevent code unexpectedly blocking.
58+
59+Using ``select_for_update`` on backends which do not support
60+``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect.
61+
62+
63 Methods that do not return QuerySets
64 ------------------------------------
65 
66Index: django/docs/ref/databases.txt
67===================================================================
68--- django/docs/ref/databases.txt       (revision 16053)
69+++ django/docs/ref/databases.txt       (working copy)
70@@ -359,6 +359,13 @@
71 :class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
72 respectively, a ``ValueError`` is raised rather than truncating data.
73 
74+Row locking with ``QuerySet.select_for_update()``
75+-------------------------------------------------
76+
77+MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
78+statement. If ``select_for_update()`` is used with ``nowait=True`` then a
79+``DatabaseError`` will be raised.
80+
81 .. _sqlite-notes:
82 
83 SQLite notes
84@@ -493,6 +500,12 @@
85       This will simply make SQLite wait a bit longer before throwing "database
86       is locked" errors; it won't really do anything to solve them.
87 
88+``QuerySet.select_for_update()`` not supported
89+----------------------------------------------
90+
91+SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will
92+have no effect.
93+
94 .. _oracle-notes:
95 
96 Oracle notes
97Index: django/tests/modeltests/select_for_update/__init__.py
98===================================================================
99--- django/tests/modeltests/select_for_update/__init__.py       (revision 0)
100+++ django/tests/modeltests/select_for_update/__init__.py       (revision 0)
101@@ -0,0 +1 @@
102+#
103Index: django/tests/modeltests/select_for_update/tests.py
104===================================================================
105--- django/tests/modeltests/select_for_update/tests.py  (revision 0)
106+++ django/tests/modeltests/select_for_update/tests.py  (revision 0)
107@@ -0,0 +1,262 @@
108+import sys
109+import time
110+import unittest
111+from django.conf import settings
112+from django.db import transaction, connection
113+from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
114+from django.test import (TransactionTestCase, skipIfDBFeature,
115+    skipUnlessDBFeature)
116+from django.utils.functional import wraps
117+from django.utils import unittest
118+
119+from models import Person
120+
121+try:
122+    import threading
123+    def requires_threading(func):
124+        return func
125+except ImportError:
126+    # Note we can't use dummy_threading here, as our tests will actually
127+    # block. We just have to skip the test completely.
128+    def requires_threading(func):
129+        @wraps(func)
130+        def wrapped(*args, **kw):
131+            raise unittest.SkipTest('threading required')
132+
133+class SelectForUpdateTests(TransactionTestCase):
134+
135+    def setUp(self):
136+        transaction.enter_transaction_management(True)
137+        transaction.managed(True)
138+        self.person = Person.objects.create(name='Reinhardt')
139+
140+        # We have to commit here so that code in run_select_for_update can
141+        # see this data.
142+        transaction.commit()
143+
144+        # We need another database connection to test that one connection
145+        # issuing a SELECT ... FOR UPDATE will block.
146+        new_connections = ConnectionHandler(settings.DATABASES)
147+        self.new_connection = new_connections[DEFAULT_DB_ALIAS]
148+
149+        # We need to set settings.DEBUG to True so we can capture
150+        # the output SQL to examine.
151+        self._old_debug = settings.DEBUG
152+        settings.DEBUG = True
153+
154+    def tearDown(self):
155+        try:
156+            # We don't really care if this fails - some of the tests will set
157+            # this in the course of their run.
158+            transaction.managed(False)
159+            transaction.leave_transaction_management()
160+        except transaction.TransactionManagementError:
161+            pass
162+        self.new_connection.close()
163+        settings.DEBUG = self._old_debug
164+        try:
165+            self.end_blocking_transaction()
166+        except (DatabaseError, AttributeError):
167+            pass
168+
169+    def start_blocking_transaction(self):
170+        # Start a blocking transaction. At some point,
171+        # end_blocking_transaction() should be called.
172+        self.cursor = self.new_connection.cursor()
173+        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
174+            'db_table': Person._meta.db_table,
175+            'for_update': self.new_connection.ops.for_update_sql(),
176+            }
177+        self.cursor.execute(sql, ())
178+        result = self.cursor.fetchone()
179+
180+    def end_blocking_transaction(self):
181+        # Roll back the blocking transaction.
182+        self.new_connection._rollback()
183+
184+    def has_for_update_sql(self, tested_connection, nowait=False):
185+        # Examine the SQL that was executed to determine whether it
186+        # contains the 'SELECT..FOR UPDATE' stanza.
187+        for_update_sql = tested_connection.ops.for_update_sql(nowait)
188+        sql = tested_connection.queries[-1]['sql']
189+        return bool(sql.find(for_update_sql) > -1)
190+
191+    def check_exc(self, exc):
192+        self.failUnless(isinstance(exc, DatabaseError))
193+
194+    @skipUnlessDBFeature('has_select_for_update')
195+    def test_for_update_sql_generated(self):
196+        """
197+        Test that the backend's FOR UPDATE variant appears in
198+        generated SQL when select_for_update is invoked.
199+        """
200+        list(Person.objects.all().select_for_update())
201+        self.assertTrue(self.has_for_update_sql(connection))
202+
203+    @skipUnlessDBFeature('has_select_for_update_nowait')
204+    def test_for_update_sql_generated_nowait(self):
205+        """
206+        Test that the backend's FOR UPDATE NOWAIT variant appears in
207+        generated SQL when select_for_update is invoked.
208+        """
209+        list(Person.objects.all().select_for_update(nowait=True))
210+        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
211+
212+    # In Python 2.6 beta and some final releases, exceptions raised in __len__
213+    # are swallowed (Python issue 1242657), so these cases return an empty
214+    # list, rather than raising an exception. Not a lot we can do about that,
215+    # unfortunately, due to the way Python handles list() calls internally.
216+    # Thus, we skip this test for Python 2.6.
217+    @requires_threading
218+    @skipUnlessDBFeature('has_select_for_update_nowait')
219+    @unittest.skipIf(sys.version_info[:2] == (2, 6), "Python version is 2.6")
220+    def test_nowait_raises_error_on_block(self):
221+        """
222+        If nowait is specified, we expect an error to be raised rather
223+        than blocking.
224+        """
225+        self.start_blocking_transaction()
226+        status = []
227+        thread = threading.Thread(
228+            target=self.run_select_for_update,
229+            args=(status,),
230+            kwargs={'nowait': True},
231+        )
232+
233+        thread.start()
234+        time.sleep(1)
235+        thread.join()
236+        self.end_blocking_transaction()
237+        self.check_exc(status[-1])
238+
239+    @skipIfDBFeature('has_select_for_update_nowait')
240+    @skipUnlessDBFeature('has_select_for_update')
241+    def test_unsupported_nowait_raises_error(self):
242+        """
243+        If a SELECT...FOR UPDATE NOWAIT is run on a database backend
244+        that supports FOR UPDATE but not NOWAIT, then we should find
245+        that a DatabaseError is raised.
246+        """
247+        self.assertRaises(
248+            DatabaseError,
249+            list,
250+            Person.objects.all().select_for_update(nowait=True)
251+        )
252+
253+    def run_select_for_update(self, status, nowait=False):
254+        """
255+        Utility method that runs a SELECT FOR UPDATE against all
256+        Person instances. After the select_for_update, it attempts
257+        to update the name of the only record, save, and commit.
258+
259+        In general, this will be run in a separate thread.
260+        """
261+        status.append('started')
262+        try:
263+            # We need to enter transaction management again, as this is done on
264+            # per-thread basis
265+            transaction.enter_transaction_management(True)
266+            transaction.managed(True)
267+            people = list(
268+                Person.objects.all().select_for_update(nowait=nowait)
269+            )
270+            people[0].name = 'Fred'
271+            people[0].save()
272+            transaction.commit()
273+        except DatabaseError, e:
274+            status.append(e)
275+        except Exception, e:
276+            raise
277+
278+    @requires_threading
279+    @skipUnlessDBFeature('has_select_for_update')
280+    @skipUnlessDBFeature('supports_transactions')
281+    def test_block(self):
282+        """
283+        Check that a thread running a select_for_update that
284+        accesses rows being touched by a similar operation
285+        on another connection blocks correctly.
286+        """
287+        # First, let's start the transaction in our thread.
288+        self.start_blocking_transaction()
289+
290+        # Now, try it again using the ORM's select_for_update
291+        # facility. Do this in a separate thread.
292+        status = []
293+        thread = threading.Thread(
294+            target=self.run_select_for_update, args=(status,)
295+        )
296+
297+        # The thread should immediately block, but we'll sleep
298+        # for a bit to make sure.
299+        thread.start()
300+        sanity_count = 0
301+        while len(status) != 1 and sanity_count < 10:
302+            sanity_count += 1
303+            time.sleep(1)
304+        if sanity_count >= 10:
305+            raise ValueError, 'Thread did not run and block'
306+
307+        # Check the person hasn't been updated. Since this isn't
308+        # using FOR UPDATE, it won't block.
309+        p = Person.objects.get(pk=self.person.pk)
310+        self.assertEqual('Reinhardt', p.name)
311+
312+        # When we end our blocking transaction, our thread should
313+        # be able to continue.
314+        self.end_blocking_transaction()
315+        thread.join(5.0)
316+
317+        # Check the thread has finished. Assuming it has, we should
318+        # find that it has updated the person's name.
319+        self.failIf(thread.isAlive())
320+        p = Person.objects.get(pk=self.person.pk)
321+        self.assertEqual('Fred', p.name)
322+
323+    @requires_threading
324+    @skipUnlessDBFeature('has_select_for_update')
325+    def test_raw_lock_not_available(self):
326+        """
327+        Check that running a raw query which can't obtain a FOR UPDATE lock
328+        raises the correct exception
329+        """
330+        self.start_blocking_transaction()
331+        def raw(status):
332+            try:
333+                list(
334+                    Person.objects.raw(
335+                        'SELECT * FROM %s %s' % (
336+                            Person._meta.db_table,
337+                            connection.ops.for_update_sql(nowait=True)
338+                        )
339+                    )
340+                )
341+            except DatabaseError, e:
342+                status.append(e)
343+        status = []
344+        thread = threading.Thread(target=raw, kwargs={'status': status})
345+        thread.start()
346+        time.sleep(1)
347+        thread.join()
348+        self.end_blocking_transaction()
349+        self.check_exc(status[-1])
350+
351+    @skipUnlessDBFeature('has_select_for_update')
352+    def test_transaction_dirty_managed(self):
353+        """ Check that a select_for_update sets the transaction to be
354+        dirty when executed under txn management. Setting the txn dirty
355+        means that it will be either committed or rolled back by Django,
356+        which will release any locks held by the SELECT FOR UPDATE.
357+        """
358+        people = list(Person.objects.select_for_update())
359+        self.assertTrue(transaction.is_dirty())
360+
361+    @skipUnlessDBFeature('has_select_for_update')
362+    def test_transaction_not_dirty_unmanaged(self):
363+        """ If we're not under txn management, the txn will never be
364+        marked as dirty.
365+        """
366+        transaction.managed(False)
367+        transaction.leave_transaction_management()
368+        people = list(Person.objects.select_for_update())
369+        self.assertFalse(transaction.is_dirty())
370Index: django/tests/modeltests/select_for_update/models.py
371===================================================================
372--- django/tests/modeltests/select_for_update/models.py (revision 0)
373+++ django/tests/modeltests/select_for_update/models.py (revision 0)
374@@ -0,0 +1,4 @@
375+from django.db import models
376+
377+class Person(models.Model):
378+    name = models.CharField(max_length=30)
379Index: django/django/db/models/sql/compiler.py
380===================================================================
381--- django/django/db/models/sql/compiler.py     (revision 16053)
382+++ django/django/db/models/sql/compiler.py     (working copy)
383@@ -1,11 +1,13 @@
384 from django.core.exceptions import FieldError
385 from django.db import connections
386+from django.db import transaction
387 from django.db.backends.util import truncate_name
388 from django.db.models.sql.constants import *
389 from django.db.models.sql.datastructures import EmptyResultSet
390 from django.db.models.sql.expressions import SQLEvaluator
391 from django.db.models.sql.query import get_proxied_model, get_order_dir, \
392      select_related_descend, Query
393+from django.db.utils import DatabaseError
394 
395 class SQLCompiler(object):
396     def __init__(self, query, connection, using):
397@@ -117,6 +119,14 @@
398                         result.append('LIMIT %d' % val)
399                 result.append('OFFSET %d' % self.query.low_mark)
400 
401+        if self.query.select_for_update and self.connection.features.has_select_for_update:
402+            # If we've been asked for a NOWAIT query but the backend does not support it,
403+            # raise a DatabaseError otherwise we could get an unexpected deadlock.
404+            nowait = self.query.select_for_update_nowait
405+            if nowait and not self.connection.features.has_select_for_update_nowait:
406+                raise DatabaseError('NOWAIT is not supported on this database backend.')
407+            result.append(self.connection.ops.for_update_sql(nowait=nowait))
408+
409         return ' '.join(result), tuple(params)
410 
411     def as_nested_sql(self):
412@@ -677,6 +687,11 @@
413         resolve_columns = hasattr(self, 'resolve_columns')
414         fields = None
415         has_aggregate_select = bool(self.query.aggregate_select)
416+        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
417+        # a subsequent commit/rollback is executed, so any database locks
418+        # are released.
419+        if self.query.select_for_update and transaction.is_managed(self.using):
420+            transaction.set_dirty(self.using)
421         for rows in self.execute_sql(MULTI):
422             for row in rows:
423                 if resolve_columns:
424Index: django/django/db/models/sql/query.py
425===================================================================
426--- django/django/db/models/sql/query.py        (revision 16053)
427+++ django/django/db/models/sql/query.py        (working copy)
428@@ -125,6 +125,8 @@
429         self.order_by = []
430         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
431         self.distinct = False
432+        self.select_for_update = False
433+        self.select_for_update_nowait = False
434         self.select_related = False
435         self.related_select_cols = []
436 
437@@ -254,6 +256,8 @@
438         obj.order_by = self.order_by[:]
439         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
440         obj.distinct = self.distinct
441+        obj.select_for_update = self.select_for_update
442+        obj.select_for_update_nowait = self.select_for_update_nowait
443         obj.select_related = self.select_related
444         obj.related_select_cols = []
445         obj.aggregates = copy.deepcopy(self.aggregates, memo=memo)
446@@ -360,6 +364,7 @@
447 
448         query.clear_ordering(True)
449         query.clear_limits()
450+        query.select_for_update = False
451         query.select_related = False
452         query.related_select_cols = []
453         query.related_select_fields = []
454Index: django/django/db/models/manager.py
455===================================================================
456--- django/django/db/models/manager.py  (revision 16053)
457+++ django/django/db/models/manager.py  (working copy)
458@@ -164,6 +164,9 @@
459     def order_by(self, *args, **kwargs):
460         return self.get_query_set().order_by(*args, **kwargs)
461 
462+    def select_for_update(self, *args, **kwargs):
463+        return self.get_query_set().select_for_update(*args, **kwargs)
464+
465     def select_related(self, *args, **kwargs):
466         return self.get_query_set().select_related(*args, **kwargs)
467 
468Index: django/django/db/models/query.py
469===================================================================
470--- django/django/db/models/query.py    (revision 16053)
471+++ django/django/db/models/query.py    (working copy)
472@@ -435,6 +435,7 @@
473         del_query._for_write = True
474 
475         # Disable non-supported fields.
476+        del_query.query.select_for_update = False
477         del_query.query.select_related = False
478         del_query.query.clear_ordering()
479 
480@@ -583,6 +584,18 @@
481         else:
482             return self._filter_or_exclude(None, **filter_obj)
483 
484+    def select_for_update(self, **kwargs):
485+        """
486+        Returns a new QuerySet instance that will select objects with a
487+        FOR UPDATE lock.
488+        """
489+        # Default to false for nowait
490+        nowait = kwargs.pop('nowait', False)
491+        obj = self._clone()
492+        obj.query.select_for_update = True
493+        obj.query.select_for_update_nowait = nowait
494+        return obj
495+
496     def select_related(self, *fields, **kwargs):
497         """
498         Returns a new QuerySet instance that will select related objects.
499Index: django/django/db/backends/mysql/base.py
500===================================================================
501--- django/django/db/backends/mysql/base.py     (revision 16053)
502+++ django/django/db/backends/mysql/base.py     (working copy)
503@@ -124,6 +124,8 @@
504     allows_group_by_pk = True
505     related_fields_match_type = True
506     allow_sliced_subqueries = False
507+    has_select_for_update = True
508+    has_select_for_update_nowait = False
509     supports_forward_references = False
510     supports_long_model_names = False
511     supports_microsecond_precision = False
512Index: django/django/db/backends/oracle/base.py
513===================================================================
514--- django/django/db/backends/oracle/base.py    (revision 16053)
515+++ django/django/db/backends/oracle/base.py    (working copy)
516@@ -70,6 +70,8 @@
517     needs_datetime_string_cast = False
518     interprets_empty_strings_as_nulls = True
519     uses_savepoints = True
520+    has_select_for_update = True
521+    has_select_for_update_nowait = True
522     can_return_id_from_insert = True
523     allow_sliced_subqueries = False
524     supports_subqueries_in_group_by = False
525Index: django/django/db/backends/__init__.py
526===================================================================
527--- django/django/db/backends/__init__.py       (revision 16053)
528+++ django/django/db/backends/__init__.py       (working copy)
529@@ -279,6 +279,8 @@
530     # integer primary keys.
531     related_fields_match_type = False
532     allow_sliced_subqueries = True
533+    has_select_for_update = False
534+    has_select_for_update_nowait = False
535 
536     # Does the default test database allow multiple connections?
537     # Usually an indication that the test database is in-memory
538@@ -476,6 +478,15 @@
539         """
540         return []
541 
542+    def for_update_sql(self, nowait=False):
543+        """
544+        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
545+        """
546+        if nowait:
547+            return 'FOR UPDATE NOWAIT'
548+        else:
549+            return 'FOR UPDATE'
550+
551     def fulltext_search_sql(self, field_name):
552         """
553         Returns the SQL WHERE clause to use in order to perform a full-text
554Index: django/django/db/backends/postgresql_psycopg2/base.py
555===================================================================
556--- django/django/db/backends/postgresql_psycopg2/base.py       (revision 16053)
557+++ django/django/db/backends/postgresql_psycopg2/base.py       (working copy)
558@@ -70,6 +70,9 @@
559     requires_rollback_on_dirty_transaction = True
560     has_real_datatype = True
561     can_defer_constraint_checks = True
562+    has_select_for_update = True
563+    has_select_for_update_nowait = True
564+   
565 
566 class DatabaseWrapper(BaseDatabaseWrapper):
567     vendor = 'postgresql'