Ticket #2705: for_update_r15007.diff

File for_update_r15007.diff, 19.2 KB (added by Dan Fairs, 13 years ago)

Updated patch, with incomplete deadlock handling removed.

  • django/db/models/sql/compiler.py

     
    11from django.core.exceptions import FieldError
    22from django.db import connections
     3from django.db import transaction
    34from django.db.backends.util import truncate_name
    45from django.db.models.sql.constants import *
    56from django.db.models.sql.datastructures import EmptyResultSet
     
    117118                        result.append('LIMIT %d' % val)
    118119                result.append('OFFSET %d' % self.query.low_mark)
    119120
     121        if self.query.select_for_update and self.connection.features.has_select_for_update:
     122            nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update
     123            result.append(self.connection.ops.for_update_sql(nowait=nowait))
     124
    120125        return ' '.join(result), tuple(params)
    121126
    122127    def as_nested_sql(self):
     
    677682        resolve_columns = hasattr(self, 'resolve_columns')
    678683        fields = None
    679684        has_aggregate_select = bool(self.query.aggregate_select)
     685        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
     686        # a subsequent commit/rollback is exectuted, so any database locks
     687        # are released.
     688        if self.query.select_for_update and transaction.is_managed(self.using):
     689            transaction.set_dirty(self.using)
    680690        for rows in self.execute_sql(MULTI):
    681691            for row in rows:
    682692                if resolve_columns:
  • django/db/models/sql/query.py

     
    131131        self.order_by = []
    132132        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    133133        self.distinct = False
     134        self.select_for_update = False
     135        self.select_for_update_nowait = False
    134136        self.select_related = False
    135137        self.related_select_cols = []
    136138
     
    260262        obj.order_by = self.order_by[:]
    261263        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    262264        obj.distinct = self.distinct
     265        obj.select_for_update = self.select_for_update
     266        obj.select_for_update_nowait = self.select_for_update_nowait
    263267        obj.select_related = self.select_related
    264268        obj.related_select_cols = []
    265269        obj.aggregates = deepcopy(self.aggregates, memo=memo)
     
    366370
    367371        query.clear_ordering(True)
    368372        query.clear_limits()
     373        query.select_for_update = False
    369374        query.select_related = False
    370375        query.related_select_cols = []
    371376        query.related_select_fields = []
  • django/db/models/manager.py

     
    164164    def order_by(self, *args, **kwargs):
    165165        return self.get_query_set().order_by(*args, **kwargs)
    166166
     167    def select_for_update(self, *args, **kwargs):
     168        return self.get_query_set().select_for_update(*args, **kwargs)
     169
    167170    def select_related(self, *args, **kwargs):
    168171        return self.get_query_set().select_related(*args, **kwargs)
    169172
  • django/db/models/query.py

     
    432432        del_query._for_write = True
    433433
    434434        # Disable non-supported fields.
     435        del_query.query.select_for_update = False
    435436        del_query.query.select_related = False
    436437        del_query.query.clear_ordering()
    437438
     
    580581        else:
    581582            return self._filter_or_exclude(None, **filter_obj)
    582583
     584    def select_for_update(self, **kwargs):
     585        """
     586        Returns a new QuerySet instance that will select objects with a
     587        FOR UPDATE lock.
     588        """
     589        # Default to false for nowait
     590        nowait = kwargs.pop('nowait', False)
     591        obj = self._clone()
     592        obj.query.select_for_update = True
     593        obj.query.select_for_update_nowait = nowait
     594        return obj
     595
    583596    def select_related(self, *fields, **kwargs):
    584597        """
    585598        Returns a new QuerySet instance that will select related objects.
  • django/db/backends/mysql/base.py

     
    124124    allows_group_by_pk = True
    125125    related_fields_match_type = True
    126126    allow_sliced_subqueries = False
     127    has_select_for_update = True
     128    has_select_for_update_nowait = False
    127129    supports_forward_references = False
    128130    supports_long_model_names = False
    129131    supports_microsecond_precision = False
  • django/db/backends/oracle/base.py

     
    7070    needs_datetime_string_cast = False
    7171    interprets_empty_strings_as_nulls = True
    7272    uses_savepoints = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
    7375    can_return_id_from_insert = True
    7476    allow_sliced_subqueries = False
    7577    supports_subqueries_in_group_by = False
  • django/db/backends/__init__.py

     
    103103    # integer primary keys.
    104104    related_fields_match_type = False
    105105    allow_sliced_subqueries = True
     106    has_select_for_update = False
     107    has_select_for_update_nowait = False
    106108
    107109    # Does the default test database allow multiple connections?
    108110    # Usually an indication that the test database is in-memory
     
    282284        """
    283285        return []
    284286
     287    def for_update_sql(self, nowait=False):
     288        """
     289        Return FOR UPDATE SQL clause to lock row for update
     290        """
     291        if nowait:
     292            return 'FOR UPDATE NOWAIT'
     293        else:
     294            return 'FOR UPDATE'
     295
    285296    def fulltext_search_sql(self, field_name):
    286297        """
    287298        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/db/backends/postgresql_psycopg2/base.py

     
    7070    requires_rollback_on_dirty_transaction = True
    7171    has_real_datatype = True
    7272    can_defer_constraint_checks = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
     75   
    7376
    7477class DatabaseOperations(PostgresqlDatabaseOperations):
    7578    def last_executed_query(self, cursor, sql, params):
  • tests/modeltests/select_for_update/tests.py

     
     1import functools
     2import time
     3from django.conf import settings
     4from django.db import connection
     5from django.db import transaction, connection
     6from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
     7from django.test import TransactionTestCase, skipUnlessDBFeature
     8from django.utils import unittest as ut2
     9
     10from models import Person
     11
     12try:
     13    import threading
     14    def requires_threading(func):
     15        return func
     16except ImportError:
     17    # Note we can't use dummy_threading here, as our tests will actually
     18    # block. We just have to skip the test completely.
     19    def requires_threading(func):
     20        @functools.wraps(func)
     21        def wrapped(*args, **kw):
     22            raise ut2.SkipTest('threading required')
     23
     24class SelectForUpdateTests(TransactionTestCase):
     25
     26    def setUp(self):
     27        connection._rollback()
     28        connection._enter_transaction_management(True)
     29        self.new_connections = ConnectionHandler(settings.DATABASES)
     30        self.person = Person.objects.create(name='Reinhardt')
     31
     32        # We need to set settings.DEBUG to True so we can capture
     33        # the output SQL to examine.
     34        self._old_debug = settings.DEBUG
     35        settings.DEBUG = True
     36
     37    def tearDown(self):
     38        connection._leave_transaction_management(True)
     39        settings.DEBUG = self._old_debug
     40        try:
     41            self.end_blocking_transaction()
     42        except (DatabaseError, AttributeError):
     43            pass
     44
     45    def start_blocking_transaction(self):
     46        # Start a blocking transaction. At some point,
     47        # end_blocking_transaction() should be called.
     48        self.new_connection = self.new_connections[DEFAULT_DB_ALIAS]
     49        self.new_connection._enter_transaction_management(True)
     50        self.cursor = self.new_connection.cursor()
     51        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
     52            'db_table': Person._meta.db_table,
     53            'for_update': self.new_connection.ops.for_update_sql(),
     54            }
     55        self.cursor.execute(sql, ())
     56        result = self.cursor.fetchone()
     57
     58    def end_blocking_transaction(self):
     59        # Roll back the blocking transaction.
     60        self.new_connection._rollback()
     61        self.new_connection.close()
     62        self.new_connection._leave_transaction_management(True)
     63
     64    def has_for_update_sql(self, tested_connection, nowait=False):
     65        # Examine the SQL that was executed to determine whether it
     66        # contains the 'SELECT..FOR UPDATE' stanza.
     67        for_update_sql = tested_connection.ops.for_update_sql(nowait)
     68        sql = tested_connection.queries[-1]['sql']
     69        return bool(sql.find(for_update_sql) > -1)
     70
     71    def check_exc(self, exc):
     72        self.failUnless(isinstance(exc, DatabaseError))
     73
     74    @skipUnlessDBFeature('has_select_for_update')
     75    def test_for_update_sql_generated(self):
     76        """
     77        Test that the backend's FOR UPDATE variant appears in
     78        generated SQL when select_for_update is invoked.
     79        """
     80        list(Person.objects.all().select_for_update())
     81        self.assertTrue(self.has_for_update_sql(connection))
     82
     83    @skipUnlessDBFeature('has_select_for_update_nowait')
     84    def test_for_update_sql_generated_nowait(self):
     85        """
     86        Test that the backend's FOR UPDATE NOWAIT variant appears in
     87        generated SQL when select_for_update is invoked.
     88        """
     89        list(Person.objects.all().select_for_update(nowait=True))
     90        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
     91
     92    @requires_threading
     93    @skipUnlessDBFeature('has_select_for_update_nowait')
     94    def test_nowait_raises_error_on_block(self):
     95        """
     96        If nowait is specified, we expect an error to be raised rather
     97        than blocking.
     98        """
     99        self.start_blocking_transaction()
     100        status = []
     101        thread = threading.Thread(
     102            target=self.run_select_for_update,
     103            args=(status,),
     104            kwargs={'nowait': True},
     105        )
     106
     107        thread.start()
     108        time.sleep(1)
     109        thread.join()
     110        self.end_blocking_transaction()
     111        self.check_exc(status[-1])
     112
     113    def run_select_for_update(self, status, nowait=False):
     114        status.append('started')
     115        try:
     116            connection._rollback()
     117            people = list(Person.objects.all().select_for_update(nowait=nowait))
     118            people[0].name = 'Fred'
     119            people[0].save()
     120            connection._commit()
     121        except DatabaseError, e:
     122            status.append(e)
     123        except Exception, e:
     124            raise
     125
     126    @requires_threading
     127    @skipUnlessDBFeature('has_select_for_update')
     128    def test_block(self):
     129        """
     130        Check that a thread running a select_for_update that
     131        accesses rows being touched by a similar operation
     132        on another connection blocks correctly.
     133        """
     134        # First, let's start the transaction in our thread.
     135        self.start_blocking_transaction()
     136
     137        # Now, try it again using the ORM's select_for_update
     138        # facility. Do this in a separate thread.
     139        status = []
     140        thread = threading.Thread(target=self.run_select_for_update, args=(status,))
     141
     142        # The thread should immediately block, but we'll sleep
     143        # for a bit to make sure
     144        thread.start()
     145        sanity_count = 0
     146        while len(status) != 1 and sanity_count < 10:
     147            sanity_count += 1
     148            time.sleep(1)
     149        if sanity_count >= 10:
     150            raise ValueError, 'Thread did not run and block'
     151
     152        # Check the person hasn't been updated. Since this isn't
     153        # using FOR UPDATE, it won't block.
     154        p = Person.objects.get(pk=self.person.pk)
     155        self.assertEqual('Reinhardt', p.name)
     156
     157        # When we end our blocking transaction, our thread should
     158        # be able to continue.
     159        self.end_blocking_transaction()
     160        thread.join(5.0)
     161
     162        # Check the thread has finished. Assuming it has, we should
     163        # find that it has updated the person's name.
     164        self.failIf(thread.is_alive())
     165        p = Person.objects.get(pk=self.person.pk)
     166        self.assertEqual('Fred', p.name)
     167
     168    @requires_threading
     169    @skipUnlessDBFeature('has_select_for_update')
     170    def test_raw_lock_not_available(self):
     171        """
     172        Check that running a raw query which can't obtain a FOR UPDATE lock
     173        raises the correct exception
     174        """
     175        self.start_blocking_transaction()
     176        def raw(status):
     177            try:
     178                list(
     179                    Person.objects.raw(
     180                        'SELECT * FROM %s %s' % (
     181                            Person._meta.db_table,
     182                            connection.ops.for_update_sql(nowait=True)
     183                        )
     184                    )
     185                )
     186            except DatabaseError, e:
     187                status.append(e)
     188        status = []
     189        thread = threading.Thread(target=raw, kwargs={'status': status})
     190        thread.start()
     191        time.sleep(1)
     192        thread.join()
     193        self.end_blocking_transaction()
     194        self.check_exc(status[-1])
     195
     196    @skipUnlessDBFeature('has_select_for_update')
     197    def test_transaction_dirty_managed(self):
     198        """ Check that a select_for_update sets the transaction to be
     199        dirty when executed under txn management. Setting the txn dirty
     200        means that it will be either committed or rolled back by Django,
     201        which will release any locks held by the SELECT FOR UPDATE.
     202        """
     203        transaction.enter_transaction_management(True)
     204        transaction.managed(True)
     205        try:
     206            people = list(Person.objects.select_for_update())
     207            self.assertTrue(transaction.is_dirty())
     208        finally:
     209            transaction.rollback()
     210            transaction.leave_transaction_management()
     211
     212    @skipUnlessDBFeature('has_select_for_update')
     213    def test_transaction_not_dirty_unmanaged(self):
     214        """ If we're not under txn management, the txn will never be
     215        marked as dirty.
     216        """
     217        people = list(Person.objects.select_for_update())
     218        self.assertFalse(transaction.is_dirty())
  • tests/modeltests/select_for_update/models.py

     
     1from django.db import models
     2
     3class Person(models.Model):
     4    name = models.CharField(max_length=30)
     5 No newline at end of file
  • AUTHORS

     
    524524    Gasper Zejn <zejn@kiberpipa.org>
    525525    Jarek Zgoda <jarek.zgoda@gmail.com>
    526526    Cheng Zhang
     527    Dan Fairs <dan@fezconsulting.com>
    527528
    528529A big THANK YOU goes to:
    529530
  • docs/ref/models/querysets.txt

     
    975975    # queries the database with the 'backup' alias
    976976    >>> Entry.objects.using('backup')
    977977
     978select_for_update
     979~~~~~~~~~~~~~~~~~
    978980
     981.. method:: select_for_update(nowait=False)
     982
     983.. versionadded:: 1.3
     984
     985Returns a queryset that will lock rows until the end of the transaction,
     986generating a SELECT ... FOR UPDATE statement on supported databases.
     987
     988For example::
     989
     990    entries = Entry.objects.select_for_update().filter(author=request.user)
     991
     992All matched entries will be locked until the end of the transaction block,
     993meaning that other transactions will be prevented from changing or acquiring
     994locks on them.
     995
     996Usually, if another transaction has already acquired a lock on one of the
     997selected rows, the query will block until the lock is released. If this is
     998not the behaviour you want, call ``select_for_update(nowait=True)``. This will
     999make the call non-blocking. If a conflicting lock is already acquired by
     1000another transaction, ``django.db.utils.DatabaseError`` will be raised when
     1001the queryset is evaluated.
     1002
     1003Note that using ``select_related`` will cause the current transaction to be
     1004set dirty, if under transaction management. This is to ensure that Django
     1005issues a COMMIT or ROLLBACK, releasing any locks held by the SELECT FOR UPDATE.
     1006
     1007Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
     1008database backends support ``select_for_update()`` but MySQL has no
     1009support for the ``nowait`` argument. Other backends will simply
     1010generate queries as if ``select_for_update()`` had not been used.
     1011
    9791012Methods that do not return QuerySets
    9801013------------------------------------
    9811014
     
    12531286the only restriction on the :class:`QuerySet` that is updated is that it can
    12541287only update columns in the model's main table. Filtering based on related
    12551288fields is still possible. You cannot call ``update()`` on a
    1256 :class:`QuerySet` that has had a slice taken or can otherwise no longer be 
     1289:class:`QuerySet` that has had a slice taken or can otherwise no longer be
    12571290filtered.
    12581291
    12591292For example, if you wanted to update all the entries in a particular blog
  • docs/ref/databases.txt

     
    364364column types have a maximum length restriction of 255 characters, regardless
    365365of whether ``unique=True`` is specified or not.
    366366
     367Row locking with ``QuerySet.select_for_update()``
     368-------------------------------------------------
     369 
     370MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 
     371statement. However, you may call the ``select_for_update()`` method of a 
     372queryset with ``nowait=True``. In that case, the argument will be silently 
     373discarded and the generated query will block until the requested lock can be 
     374acquired.
     375
    367376.. _sqlite-notes:
    368377
    369378SQLite notes
Back to Top