Ticket #2705: 2705-for_update-r16022.diff

File 2705-for_update-r16022.diff, 21.7 KB (added by Dan Fairs, 13 years ago)

Updated patch with better docs

  • django/AUTHORS

     
    168168    eriks@win.tue.nl
    169169    Tomáš Ehrlich <tomas.ehrlich@gmail.com>
    170170    Dirk Eschler <dirk.eschler@gmx.net>
     171    Dan Fairs <dan@fezconsulting.com>
    171172    Marc Fargas <telenieko@telenieko.com>
    172173    Szilveszter Farkas <szilveszter.farkas@gmail.com>
    173174    Grigory Fateyev <greg@dial.com.ru>
  • django/docs/ref/models/querysets.txt

     
    966966    # queries the database with the 'backup' alias
    967967    >>> Entry.objects.using('backup')
    968968
     969select_for_update
     970~~~~~~~~~~~~~~~~~
    969971
     972.. method:: select_for_update(nowait=False)
     973
     974.. versionadded:: 1.4
     975
     976Returns a queryset that will lock rows until the end of the transaction,
     977generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
     978
     979For example::
     980
     981    entries = Entry.objects.select_for_update().filter(author=request.user)
     982
     983All matched entries will be locked until the end of the transaction block,
     984meaning that other transactions will be prevented from changing or acquiring
     985locks on them.
     986
     987Usually, if another transaction has already acquired a lock on one of the
     988selected rows, the query will block until the lock is released. If this is
     989not the behaviour you want, call ``select_for_update(nowait=True)``. This will
     990make the call non-blocking. If a conflicting lock is already acquired by
     991another transaction, ``django.db.utils.DatabaseError`` will be raised when
     992the queryset is evaluated.
     993
     994Note that using ``select_for_update`` will cause the current transaction to be set
     995dirty, if under transaction management. This is to ensure that Django issues a
     996``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
     997UPDATE``.
     998
     999Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
     1000database backends support ``select_for_update()``. However, MySQL has no
     1001support for the ``nowait`` argument.
     1002
     1003Passing ``nowait=True`` to ``select_for_update`` using database backends that
     1004do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be
     1005raised. This is in order to prevent code unexpectedly blocking.
     1006
     1007Using ``select_for_update`` on backends which do not support
     1008``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect.
     1009
     1010
    9701011Methods that do not return QuerySets
    9711012------------------------------------
    9721013
  • django/docs/ref/databases.txt

     
    359359:class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
    360360respectively, a ``ValueError`` is raised rather than truncating data.
    361361
     362Row locking with ``QuerySet.select_for_update()``
     363-------------------------------------------------
     364
     365MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
     366statement. If ``select_for_update()`` is used with ``nowait=True`` then a
     367``DatabaseError`` will be raised.
     368
    362369.. _sqlite-notes:
    363370
    364371SQLite notes
     
    493500      This will simply make SQLite wait a bit longer before throwing "database
    494501      is locked" errors; it won't really do anything to solve them.
    495502
     503``QuerySet.select_for_update()`` not supported
     504----------------------------------------------
     505
     506SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will
     507have no effect.
     508
    496509.. _oracle-notes:
    497510
    498511Oracle notes
  • django/django/db/models/sql/compiler.py

     
    11from django.core.exceptions import FieldError
    22from django.db import connections
     3from django.db import transaction
    34from django.db.backends.util import truncate_name
    45from django.db.models.sql.constants import *
    56from django.db.models.sql.datastructures import EmptyResultSet
    67from django.db.models.sql.expressions import SQLEvaluator
    78from django.db.models.sql.query import get_proxied_model, get_order_dir, \
    89     select_related_descend, Query
     10from django.db.utils import DatabaseError
    911
    1012class SQLCompiler(object):
    1113    def __init__(self, query, connection, using):
     
    117119                        result.append('LIMIT %d' % val)
    118120                result.append('OFFSET %d' % self.query.low_mark)
    119121
     122        if self.query.select_for_update and self.connection.features.has_select_for_update:
     123            # If we've been asked for a NOWAIT query but the backend does not support it,
     124            # raise a DatabaseError otherwise we could get an unexpected deadlock.
     125            nowait = self.query.select_for_update_nowait
     126            if nowait and not self.connection.features.has_select_for_update_nowait:
     127                raise DatabaseError('NOWAIT is not supported on this database backend.')
     128            result.append(self.connection.ops.for_update_sql(nowait=nowait))
     129
    120130        return ' '.join(result), tuple(params)
    121131
    122132    def as_nested_sql(self):
     
    677687        resolve_columns = hasattr(self, 'resolve_columns')
    678688        fields = None
    679689        has_aggregate_select = bool(self.query.aggregate_select)
     690        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
     691        # a subsequent commit/rollback is executed, so any database locks
     692        # are released.
     693        if self.query.select_for_update and transaction.is_managed(self.using):
     694            transaction.set_dirty(self.using)
    680695        for rows in self.execute_sql(MULTI):
    681696            for row in rows:
    682697                if resolve_columns:
  • django/django/db/models/sql/query.py

     
    125125        self.order_by = []
    126126        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    127127        self.distinct = False
     128        self.select_for_update = False
     129        self.select_for_update_nowait = False
    128130        self.select_related = False
    129131        self.related_select_cols = []
    130132
     
    254256        obj.order_by = self.order_by[:]
    255257        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    256258        obj.distinct = self.distinct
     259        obj.select_for_update = self.select_for_update
     260        obj.select_for_update_nowait = self.select_for_update_nowait
    257261        obj.select_related = self.select_related
    258262        obj.related_select_cols = []
    259263        obj.aggregates = copy.deepcopy(self.aggregates, memo=memo)
     
    360364
    361365        query.clear_ordering(True)
    362366        query.clear_limits()
     367        query.select_for_update = False
    363368        query.select_related = False
    364369        query.related_select_cols = []
    365370        query.related_select_fields = []
  • django/django/db/models/manager.py

     
    164164    def order_by(self, *args, **kwargs):
    165165        return self.get_query_set().order_by(*args, **kwargs)
    166166
     167    def select_for_update(self, *args, **kwargs):
     168        return self.get_query_set().select_for_update(*args, **kwargs)
     169
    167170    def select_related(self, *args, **kwargs):
    168171        return self.get_query_set().select_related(*args, **kwargs)
    169172
  • django/django/db/models/query.py

     
    435435        del_query._for_write = True
    436436
    437437        # Disable non-supported fields.
     438        del_query.query.select_for_update = False
    438439        del_query.query.select_related = False
    439440        del_query.query.clear_ordering()
    440441
     
    583584        else:
    584585            return self._filter_or_exclude(None, **filter_obj)
    585586
     587    def select_for_update(self, **kwargs):
     588        """
     589        Returns a new QuerySet instance that will select objects with a
     590        FOR UPDATE lock.
     591        """
     592        # Default to false for nowait
     593        nowait = kwargs.pop('nowait', False)
     594        obj = self._clone()
     595        obj.query.select_for_update = True
     596        obj.query.select_for_update_nowait = nowait
     597        return obj
     598
    586599    def select_related(self, *fields, **kwargs):
    587600        """
    588601        Returns a new QuerySet instance that will select related objects.
  • django/django/db/backends/mysql/base.py

     
    124124    allows_group_by_pk = True
    125125    related_fields_match_type = True
    126126    allow_sliced_subqueries = False
     127    has_select_for_update = True
     128    has_select_for_update_nowait = False
    127129    supports_forward_references = False
    128130    supports_long_model_names = False
    129131    supports_microsecond_precision = False
  • django/django/db/backends/oracle/base.py

     
    7070    needs_datetime_string_cast = False
    7171    interprets_empty_strings_as_nulls = True
    7272    uses_savepoints = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
    7375    can_return_id_from_insert = True
    7476    allow_sliced_subqueries = False
    7577    supports_subqueries_in_group_by = False
  • django/django/db/backends/__init__.py

     
    279279    # integer primary keys.
    280280    related_fields_match_type = False
    281281    allow_sliced_subqueries = True
     282    has_select_for_update = False
     283    has_select_for_update_nowait = False
    282284
    283285    # Does the default test database allow multiple connections?
    284286    # Usually an indication that the test database is in-memory
     
    476478        """
    477479        return []
    478480
     481    def for_update_sql(self, nowait=False):
     482        """
     483        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
     484        """
     485        if nowait:
     486            return 'FOR UPDATE NOWAIT'
     487        else:
     488            return 'FOR UPDATE'
     489
    479490    def fulltext_search_sql(self, field_name):
    480491        """
    481492        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/django/db/backends/postgresql_psycopg2/base.py

     
    7070    requires_rollback_on_dirty_transaction = True
    7171    has_real_datatype = True
    7272    can_defer_constraint_checks = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
     75   
    7376
    7477class DatabaseWrapper(BaseDatabaseWrapper):
    7578    vendor = 'postgresql'
  • django/tests/modeltests/select_for_update/__init__.py

     
     1#
  • django/tests/modeltests/select_for_update/tests.py

     
     1import time
     2from django.conf import settings
     3from django.db import transaction, connection
     4from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
     5from django.test import (TransactionTestCase, skipIfDBFeature,
     6    skipUnlessDBFeature)
     7from django.utils.functional import wraps
     8from django.utils import unittest
     9
     10from models import Person
     11
     12try:
     13    import threading
     14    def requires_threading(func):
     15        return func
     16except ImportError:
     17    # Note we can't use dummy_threading here, as our tests will actually
     18    # block. We just have to skip the test completely.
     19    def requires_threading(func):
     20        @wraps(func)
     21        def wrapped(*args, **kw):
     22            raise unittest.SkipTest('threading required')
     23
     24class SelectForUpdateTests(TransactionTestCase):
     25
     26    def setUp(self):
     27        transaction.enter_transaction_management(True)
     28        transaction.managed(True)
     29        self.person = Person.objects.create(name='Reinhardt')
     30
     31        # We have to commit here so that code in run_select_for_update can
     32        # see this data.
     33        transaction.commit()
     34
     35        # We need another database connection to test that one connection
     36        # issuing a SELECT ... FOR UPDATE will block.
     37        new_connections = ConnectionHandler(settings.DATABASES)
     38        self.new_connection = new_connections[DEFAULT_DB_ALIAS]
     39
     40        # We need to set settings.DEBUG to True so we can capture
     41        # the output SQL to examine.
     42        self._old_debug = settings.DEBUG
     43        settings.DEBUG = True
     44
     45    def tearDown(self):
     46        try:
     47            # We don't really care if this fails - some of the tests will set
     48            # this in the course of their run.
     49            transaction.managed(False)
     50            transaction.leave_transaction_management()
     51        except transaction.TransactionManagementError:
     52            pass
     53        self.new_connection.close()
     54        settings.DEBUG = self._old_debug
     55        try:
     56            self.end_blocking_transaction()
     57        except (DatabaseError, AttributeError):
     58            pass
     59
     60    def start_blocking_transaction(self):
     61        # Start a blocking transaction. At some point,
     62        # end_blocking_transaction() should be called.
     63        self.cursor = self.new_connection.cursor()
     64        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
     65            'db_table': Person._meta.db_table,
     66            'for_update': self.new_connection.ops.for_update_sql(),
     67            }
     68        self.cursor.execute(sql, ())
     69        result = self.cursor.fetchone()
     70
     71    def end_blocking_transaction(self):
     72        # Roll back the blocking transaction.
     73        self.new_connection._rollback()
     74
     75    def has_for_update_sql(self, tested_connection, nowait=False):
     76        # Examine the SQL that was executed to determine whether it
     77        # contains the 'SELECT..FOR UPDATE' stanza.
     78        for_update_sql = tested_connection.ops.for_update_sql(nowait)
     79        sql = tested_connection.queries[-1]['sql']
     80        return bool(sql.find(for_update_sql) > -1)
     81
     82    def check_exc(self, exc):
     83        self.failUnless(isinstance(exc, DatabaseError))
     84
     85    @skipUnlessDBFeature('has_select_for_update')
     86    def test_for_update_sql_generated(self):
     87        """
     88        Test that the backend's FOR UPDATE variant appears in
     89        generated SQL when select_for_update is invoked.
     90        """
     91        list(Person.objects.all().select_for_update())
     92        self.assertTrue(self.has_for_update_sql(connection))
     93
     94    @skipUnlessDBFeature('has_select_for_update_nowait')
     95    def test_for_update_sql_generated_nowait(self):
     96        """
     97        Test that the backend's FOR UPDATE NOWAIT variant appears in
     98        generated SQL when select_for_update is invoked.
     99        """
     100        list(Person.objects.all().select_for_update(nowait=True))
     101        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
     102
     103    @requires_threading
     104    @skipUnlessDBFeature('has_select_for_update_nowait')
     105    def test_nowait_raises_error_on_block(self):
     106        """
     107        If nowait is specified, we expect an error to be raised rather
     108        than blocking.
     109        """
     110        self.start_blocking_transaction()
     111        status = []
     112        thread = threading.Thread(
     113            target=self.run_select_for_update,
     114            args=(status,),
     115            kwargs={'nowait': True},
     116        )
     117
     118        thread.start()
     119        time.sleep(1)
     120        thread.join()
     121        self.end_blocking_transaction()
     122        self.check_exc(status[-1])
     123
     124    @skipIfDBFeature('has_select_for_update_nowait')
     125    @skipUnlessDBFeature('has_select_for_update')
     126    def test_unsupported_nowait_raises_error(self):
     127        """
     128        If a SELECT...FOR UPDATE NOWAIT is run on a database backend
     129        that supports FOR UPDATE but not NOWAIT, then we should find
     130        that a DatabaseError is raised.
     131        """
     132        self.assertRaises(
     133            DatabaseError,
     134            list,
     135            Person.objects.all().select_for_update(nowait=True)
     136        )
     137
     138    def run_select_for_update(self, status, nowait=False):
     139        """
     140        Utility method that runs a SELECT FOR UPDATE against all
     141        Person instances. After the select_for_update, it attempts
     142        to update the name of the only record, save, and commit.
     143
     144        In general, this will be run in a separate thread.
     145        """
     146        status.append('started')
     147        try:
     148            # We need to enter transaction management again, as this is done on
     149            # per-thread basis
     150            transaction.enter_transaction_management(True)
     151            transaction.managed(True)
     152            people = list(
     153                Person.objects.all().select_for_update(nowait=nowait)
     154            )
     155            people[0].name = 'Fred'
     156            people[0].save()
     157            transaction.commit()
     158        except DatabaseError, e:
     159            status.append(e)
     160        except Exception, e:
     161            raise
     162
     163    @requires_threading
     164    @skipUnlessDBFeature('has_select_for_update')
     165    @skipUnlessDBFeature('supports_transactions')
     166    def test_block(self):
     167        """
     168        Check that a thread running a select_for_update that
     169        accesses rows being touched by a similar operation
     170        on another connection blocks correctly.
     171        """
     172        # First, let's start the transaction in our thread.
     173        self.start_blocking_transaction()
     174
     175        # Now, try it again using the ORM's select_for_update
     176        # facility. Do this in a separate thread.
     177        status = []
     178        thread = threading.Thread(
     179            target=self.run_select_for_update, args=(status,)
     180        )
     181
     182        # The thread should immediately block, but we'll sleep
     183        # for a bit to make sure.
     184        thread.start()
     185        sanity_count = 0
     186        while len(status) != 1 and sanity_count < 10:
     187            sanity_count += 1
     188            time.sleep(1)
     189        if sanity_count >= 10:
     190            raise ValueError, 'Thread did not run and block'
     191
     192        # Check the person hasn't been updated. Since this isn't
     193        # using FOR UPDATE, it won't block.
     194        p = Person.objects.get(pk=self.person.pk)
     195        self.assertEqual('Reinhardt', p.name)
     196
     197        # When we end our blocking transaction, our thread should
     198        # be able to continue.
     199        self.end_blocking_transaction()
     200        thread.join(5.0)
     201
     202        # Check the thread has finished. Assuming it has, we should
     203        # find that it has updated the person's name.
     204        self.failIf(thread.isAlive())
     205        p = Person.objects.get(pk=self.person.pk)
     206        self.assertEqual('Fred', p.name)
     207
     208    @requires_threading
     209    @skipUnlessDBFeature('has_select_for_update')
     210    def test_raw_lock_not_available(self):
     211        """
     212        Check that running a raw query which can't obtain a FOR UPDATE lock
     213        raises the correct exception
     214        """
     215        self.start_blocking_transaction()
     216        def raw(status):
     217            try:
     218                list(
     219                    Person.objects.raw(
     220                        'SELECT * FROM %s %s' % (
     221                            Person._meta.db_table,
     222                            connection.ops.for_update_sql(nowait=True)
     223                        )
     224                    )
     225                )
     226            except DatabaseError, e:
     227                status.append(e)
     228        status = []
     229        thread = threading.Thread(target=raw, kwargs={'status': status})
     230        thread.start()
     231        time.sleep(1)
     232        thread.join()
     233        self.end_blocking_transaction()
     234        self.check_exc(status[-1])
     235
     236    @skipUnlessDBFeature('has_select_for_update')
     237    def test_transaction_dirty_managed(self):
     238        """ Check that a select_for_update sets the transaction to be
     239        dirty when executed under txn management. Setting the txn dirty
     240        means that it will be either committed or rolled back by Django,
     241        which will release any locks held by the SELECT FOR UPDATE.
     242        """
     243        people = list(Person.objects.select_for_update())
     244        self.assertTrue(transaction.is_dirty())
     245
     246    @skipUnlessDBFeature('has_select_for_update')
     247    def test_transaction_not_dirty_unmanaged(self):
     248        """ If we're not under txn management, the txn will never be
     249        marked as dirty.
     250        """
     251        transaction.managed(False)
     252        transaction.leave_transaction_management()
     253        people = list(Person.objects.select_for_update())
     254        self.assertFalse(transaction.is_dirty())
  • django/tests/modeltests/select_for_update/models.py

     
     1from django.db import models
     2
     3class Person(models.Model):
     4    name = models.CharField(max_length=30)
Back to Top