Ticket #2705: 2705-for_update-r16053.diff

File 2705-for_update-r16053.diff, 22.1 KB (added by Dan Fairs, 13 years ago)

Same as previous patch, but skip a test on Py2.6.

  • django/AUTHORS

     
    168168    eriks@win.tue.nl
    169169    Tomáš Ehrlich <tomas.ehrlich@gmail.com>
    170170    Dirk Eschler <dirk.eschler@gmx.net>
     171    Dan Fairs <dan@fezconsulting.com>
    171172    Marc Fargas <telenieko@telenieko.com>
    172173    Szilveszter Farkas <szilveszter.farkas@gmail.com>
    173174    Grigory Fateyev <greg@dial.com.ru>
  • django/docs/ref/models/querysets.txt

     
    966966    # queries the database with the 'backup' alias
    967967    >>> Entry.objects.using('backup')
    968968
     969select_for_update
     970~~~~~~~~~~~~~~~~~
    969971
     972.. method:: select_for_update(nowait=False)
     973
     974.. versionadded:: 1.4
     975
     976Returns a queryset that will lock rows until the end of the transaction,
     977generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
     978
     979For example::
     980
     981    entries = Entry.objects.select_for_update().filter(author=request.user)
     982
     983All matched entries will be locked until the end of the transaction block,
     984meaning that other transactions will be prevented from changing or acquiring
     985locks on them.
     986
     987Usually, if another transaction has already acquired a lock on one of the
     988selected rows, the query will block until the lock is released. If this is
     989not the behaviour you want, call ``select_for_update(nowait=True)``. This will
     990make the call non-blocking. If a conflicting lock is already acquired by
     991another transaction, ``django.db.utils.DatabaseError`` will be raised when
     992the queryset is evaluated.
     993
     994Note that using ``select_for_update`` will cause the current transaction to be set
     995dirty, if under transaction management. This is to ensure that Django issues a
     996``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
     997UPDATE``.
     998
     999Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
     1000database backends support ``select_for_update()``. However, MySQL has no
     1001support for the ``nowait`` argument.
     1002
     1003Passing ``nowait=True`` to ``select_for_update`` using database backends that
     1004do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be
     1005raised. This is in order to prevent code unexpectedly blocking.
     1006
     1007Using ``select_for_update`` on backends which do not support
     1008``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect.
     1009
     1010
    9701011Methods that do not return QuerySets
    9711012------------------------------------
    9721013
  • django/docs/ref/databases.txt

     
    359359:class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
    360360respectively, a ``ValueError`` is raised rather than truncating data.
    361361
     362Row locking with ``QuerySet.select_for_update()``
     363-------------------------------------------------
     364
     365MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
     366statement. If ``select_for_update()`` is used with ``nowait=True`` then a
     367``DatabaseError`` will be raised.
     368
    362369.. _sqlite-notes:
    363370
    364371SQLite notes
     
    493500      This will simply make SQLite wait a bit longer before throwing "database
    494501      is locked" errors; it won't really do anything to solve them.
    495502
     503``QuerySet.select_for_update()`` not supported
     504----------------------------------------------
     505
     506SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will
     507have no effect.
     508
    496509.. _oracle-notes:
    497510
    498511Oracle notes
  • django/tests/modeltests/select_for_update/__init__.py

     
     1#
  • django/tests/modeltests/select_for_update/tests.py

     
     1import sys
     2import time
     3import unittest
     4from django.conf import settings
     5from django.db import transaction, connection
     6from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
     7from django.test import (TransactionTestCase, skipIfDBFeature,
     8    skipUnlessDBFeature)
     9from django.utils.functional import wraps
     10from django.utils import unittest
     11
     12from models import Person
     13
     14try:
     15    import threading
     16    def requires_threading(func):
     17        return func
     18except ImportError:
     19    # Note we can't use dummy_threading here, as our tests will actually
     20    # block. We just have to skip the test completely.
     21    def requires_threading(func):
     22        @wraps(func)
     23        def wrapped(*args, **kw):
     24            raise unittest.SkipTest('threading required')
     25
     26class SelectForUpdateTests(TransactionTestCase):
     27
     28    def setUp(self):
     29        transaction.enter_transaction_management(True)
     30        transaction.managed(True)
     31        self.person = Person.objects.create(name='Reinhardt')
     32
     33        # We have to commit here so that code in run_select_for_update can
     34        # see this data.
     35        transaction.commit()
     36
     37        # We need another database connection to test that one connection
     38        # issuing a SELECT ... FOR UPDATE will block.
     39        new_connections = ConnectionHandler(settings.DATABASES)
     40        self.new_connection = new_connections[DEFAULT_DB_ALIAS]
     41
     42        # We need to set settings.DEBUG to True so we can capture
     43        # the output SQL to examine.
     44        self._old_debug = settings.DEBUG
     45        settings.DEBUG = True
     46
     47    def tearDown(self):
     48        try:
     49            # We don't really care if this fails - some of the tests will set
     50            # this in the course of their run.
     51            transaction.managed(False)
     52            transaction.leave_transaction_management()
     53        except transaction.TransactionManagementError:
     54            pass
     55        self.new_connection.close()
     56        settings.DEBUG = self._old_debug
     57        try:
     58            self.end_blocking_transaction()
     59        except (DatabaseError, AttributeError):
     60            pass
     61
     62    def start_blocking_transaction(self):
     63        # Start a blocking transaction. At some point,
     64        # end_blocking_transaction() should be called.
     65        self.cursor = self.new_connection.cursor()
     66        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
     67            'db_table': Person._meta.db_table,
     68            'for_update': self.new_connection.ops.for_update_sql(),
     69            }
     70        self.cursor.execute(sql, ())
     71        result = self.cursor.fetchone()
     72
     73    def end_blocking_transaction(self):
     74        # Roll back the blocking transaction.
     75        self.new_connection._rollback()
     76
     77    def has_for_update_sql(self, tested_connection, nowait=False):
     78        # Examine the SQL that was executed to determine whether it
     79        # contains the 'SELECT..FOR UPDATE' stanza.
     80        for_update_sql = tested_connection.ops.for_update_sql(nowait)
     81        sql = tested_connection.queries[-1]['sql']
     82        return bool(sql.find(for_update_sql) > -1)
     83
     84    def check_exc(self, exc):
     85        self.failUnless(isinstance(exc, DatabaseError))
     86
     87    @skipUnlessDBFeature('has_select_for_update')
     88    def test_for_update_sql_generated(self):
     89        """
     90        Test that the backend's FOR UPDATE variant appears in
     91        generated SQL when select_for_update is invoked.
     92        """
     93        list(Person.objects.all().select_for_update())
     94        self.assertTrue(self.has_for_update_sql(connection))
     95
     96    @skipUnlessDBFeature('has_select_for_update_nowait')
     97    def test_for_update_sql_generated_nowait(self):
     98        """
     99        Test that the backend's FOR UPDATE NOWAIT variant appears in
     100        generated SQL when select_for_update is invoked.
     101        """
     102        list(Person.objects.all().select_for_update(nowait=True))
     103        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
     104
     105    # In Python 2.6 beta and some final releases, exceptions raised in __len__
     106    # are swallowed (Python issue 1242657), so these cases return an empty
     107    # list, rather than raising an exception. Not a lot we can do about that,
     108    # unfortunately, due to the way Python handles list() calls internally.
     109    # Thus, we skip this test for Python 2.6.
     110    @requires_threading
     111    @skipUnlessDBFeature('has_select_for_update_nowait')
     112    @unittest.skipIf(sys.version_info[:2] == (2, 6), "Python version is 2.6")
     113    def test_nowait_raises_error_on_block(self):
     114        """
     115        If nowait is specified, we expect an error to be raised rather
     116        than blocking.
     117        """
     118        self.start_blocking_transaction()
     119        status = []
     120        thread = threading.Thread(
     121            target=self.run_select_for_update,
     122            args=(status,),
     123            kwargs={'nowait': True},
     124        )
     125
     126        thread.start()
     127        time.sleep(1)
     128        thread.join()
     129        self.end_blocking_transaction()
     130        self.check_exc(status[-1])
     131
     132    @skipIfDBFeature('has_select_for_update_nowait')
     133    @skipUnlessDBFeature('has_select_for_update')
     134    def test_unsupported_nowait_raises_error(self):
     135        """
     136        If a SELECT...FOR UPDATE NOWAIT is run on a database backend
     137        that supports FOR UPDATE but not NOWAIT, then we should find
     138        that a DatabaseError is raised.
     139        """
     140        self.assertRaises(
     141            DatabaseError,
     142            list,
     143            Person.objects.all().select_for_update(nowait=True)
     144        )
     145
     146    def run_select_for_update(self, status, nowait=False):
     147        """
     148        Utility method that runs a SELECT FOR UPDATE against all
     149        Person instances. After the select_for_update, it attempts
     150        to update the name of the only record, save, and commit.
     151
     152        In general, this will be run in a separate thread.
     153        """
     154        status.append('started')
     155        try:
     156            # We need to enter transaction management again, as this is done on
     157            # per-thread basis
     158            transaction.enter_transaction_management(True)
     159            transaction.managed(True)
     160            people = list(
     161                Person.objects.all().select_for_update(nowait=nowait)
     162            )
     163            people[0].name = 'Fred'
     164            people[0].save()
     165            transaction.commit()
     166        except DatabaseError, e:
     167            status.append(e)
     168        except Exception, e:
     169            raise
     170
     171    @requires_threading
     172    @skipUnlessDBFeature('has_select_for_update')
     173    @skipUnlessDBFeature('supports_transactions')
     174    def test_block(self):
     175        """
     176        Check that a thread running a select_for_update that
     177        accesses rows being touched by a similar operation
     178        on another connection blocks correctly.
     179        """
     180        # First, let's start the transaction in our thread.
     181        self.start_blocking_transaction()
     182
     183        # Now, try it again using the ORM's select_for_update
     184        # facility. Do this in a separate thread.
     185        status = []
     186        thread = threading.Thread(
     187            target=self.run_select_for_update, args=(status,)
     188        )
     189
     190        # The thread should immediately block, but we'll sleep
     191        # for a bit to make sure.
     192        thread.start()
     193        sanity_count = 0
     194        while len(status) != 1 and sanity_count < 10:
     195            sanity_count += 1
     196            time.sleep(1)
     197        if sanity_count >= 10:
     198            raise ValueError, 'Thread did not run and block'
     199
     200        # Check the person hasn't been updated. Since this isn't
     201        # using FOR UPDATE, it won't block.
     202        p = Person.objects.get(pk=self.person.pk)
     203        self.assertEqual('Reinhardt', p.name)
     204
     205        # When we end our blocking transaction, our thread should
     206        # be able to continue.
     207        self.end_blocking_transaction()
     208        thread.join(5.0)
     209
     210        # Check the thread has finished. Assuming it has, we should
     211        # find that it has updated the person's name.
     212        self.failIf(thread.isAlive())
     213        p = Person.objects.get(pk=self.person.pk)
     214        self.assertEqual('Fred', p.name)
     215
     216    @requires_threading
     217    @skipUnlessDBFeature('has_select_for_update')
     218    def test_raw_lock_not_available(self):
     219        """
     220        Check that running a raw query which can't obtain a FOR UPDATE lock
     221        raises the correct exception
     222        """
     223        self.start_blocking_transaction()
     224        def raw(status):
     225            try:
     226                list(
     227                    Person.objects.raw(
     228                        'SELECT * FROM %s %s' % (
     229                            Person._meta.db_table,
     230                            connection.ops.for_update_sql(nowait=True)
     231                        )
     232                    )
     233                )
     234            except DatabaseError, e:
     235                status.append(e)
     236        status = []
     237        thread = threading.Thread(target=raw, kwargs={'status': status})
     238        thread.start()
     239        time.sleep(1)
     240        thread.join()
     241        self.end_blocking_transaction()
     242        self.check_exc(status[-1])
     243
     244    @skipUnlessDBFeature('has_select_for_update')
     245    def test_transaction_dirty_managed(self):
     246        """ Check that a select_for_update sets the transaction to be
     247        dirty when executed under txn management. Setting the txn dirty
     248        means that it will be either committed or rolled back by Django,
     249        which will release any locks held by the SELECT FOR UPDATE.
     250        """
     251        people = list(Person.objects.select_for_update())
     252        self.assertTrue(transaction.is_dirty())
     253
     254    @skipUnlessDBFeature('has_select_for_update')
     255    def test_transaction_not_dirty_unmanaged(self):
     256        """ If we're not under txn management, the txn will never be
     257        marked as dirty.
     258        """
     259        transaction.managed(False)
     260        transaction.leave_transaction_management()
     261        people = list(Person.objects.select_for_update())
     262        self.assertFalse(transaction.is_dirty())
  • django/tests/modeltests/select_for_update/models.py

     
     1from django.db import models
     2
     3class Person(models.Model):
     4    name = models.CharField(max_length=30)
  • django/django/db/models/sql/compiler.py

     
    11from django.core.exceptions import FieldError
    22from django.db import connections
     3from django.db import transaction
    34from django.db.backends.util import truncate_name
    45from django.db.models.sql.constants import *
    56from django.db.models.sql.datastructures import EmptyResultSet
    67from django.db.models.sql.expressions import SQLEvaluator
    78from django.db.models.sql.query import get_proxied_model, get_order_dir, \
    89     select_related_descend, Query
     10from django.db.utils import DatabaseError
    911
    1012class SQLCompiler(object):
    1113    def __init__(self, query, connection, using):
     
    117119                        result.append('LIMIT %d' % val)
    118120                result.append('OFFSET %d' % self.query.low_mark)
    119121
     122        if self.query.select_for_update and self.connection.features.has_select_for_update:
     123            # If we've been asked for a NOWAIT query but the backend does not support it,
     124            # raise a DatabaseError otherwise we could get an unexpected deadlock.
     125            nowait = self.query.select_for_update_nowait
     126            if nowait and not self.connection.features.has_select_for_update_nowait:
     127                raise DatabaseError('NOWAIT is not supported on this database backend.')
     128            result.append(self.connection.ops.for_update_sql(nowait=nowait))
     129
    120130        return ' '.join(result), tuple(params)
    121131
    122132    def as_nested_sql(self):
     
    677687        resolve_columns = hasattr(self, 'resolve_columns')
    678688        fields = None
    679689        has_aggregate_select = bool(self.query.aggregate_select)
     690        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
     691        # a subsequent commit/rollback is executed, so any database locks
     692        # are released.
     693        if self.query.select_for_update and transaction.is_managed(self.using):
     694            transaction.set_dirty(self.using)
    680695        for rows in self.execute_sql(MULTI):
    681696            for row in rows:
    682697                if resolve_columns:
  • django/django/db/models/sql/query.py

     
    125125        self.order_by = []
    126126        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    127127        self.distinct = False
     128        self.select_for_update = False
     129        self.select_for_update_nowait = False
    128130        self.select_related = False
    129131        self.related_select_cols = []
    130132
     
    254256        obj.order_by = self.order_by[:]
    255257        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    256258        obj.distinct = self.distinct
     259        obj.select_for_update = self.select_for_update
     260        obj.select_for_update_nowait = self.select_for_update_nowait
    257261        obj.select_related = self.select_related
    258262        obj.related_select_cols = []
    259263        obj.aggregates = copy.deepcopy(self.aggregates, memo=memo)
     
    360364
    361365        query.clear_ordering(True)
    362366        query.clear_limits()
     367        query.select_for_update = False
    363368        query.select_related = False
    364369        query.related_select_cols = []
    365370        query.related_select_fields = []
  • django/django/db/models/manager.py

     
    164164    def order_by(self, *args, **kwargs):
    165165        return self.get_query_set().order_by(*args, **kwargs)
    166166
     167    def select_for_update(self, *args, **kwargs):
     168        return self.get_query_set().select_for_update(*args, **kwargs)
     169
    167170    def select_related(self, *args, **kwargs):
    168171        return self.get_query_set().select_related(*args, **kwargs)
    169172
  • django/django/db/models/query.py

     
    435435        del_query._for_write = True
    436436
    437437        # Disable non-supported fields.
     438        del_query.query.select_for_update = False
    438439        del_query.query.select_related = False
    439440        del_query.query.clear_ordering()
    440441
     
    583584        else:
    584585            return self._filter_or_exclude(None, **filter_obj)
    585586
     587    def select_for_update(self, **kwargs):
     588        """
     589        Returns a new QuerySet instance that will select objects with a
     590        FOR UPDATE lock.
     591        """
     592        # Default to false for nowait
     593        nowait = kwargs.pop('nowait', False)
     594        obj = self._clone()
     595        obj.query.select_for_update = True
     596        obj.query.select_for_update_nowait = nowait
     597        return obj
     598
    586599    def select_related(self, *fields, **kwargs):
    587600        """
    588601        Returns a new QuerySet instance that will select related objects.
  • django/django/db/backends/mysql/base.py

     
    124124    allows_group_by_pk = True
    125125    related_fields_match_type = True
    126126    allow_sliced_subqueries = False
     127    has_select_for_update = True
     128    has_select_for_update_nowait = False
    127129    supports_forward_references = False
    128130    supports_long_model_names = False
    129131    supports_microsecond_precision = False
  • django/django/db/backends/oracle/base.py

     
    7070    needs_datetime_string_cast = False
    7171    interprets_empty_strings_as_nulls = True
    7272    uses_savepoints = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
    7375    can_return_id_from_insert = True
    7476    allow_sliced_subqueries = False
    7577    supports_subqueries_in_group_by = False
  • django/django/db/backends/__init__.py

     
    279279    # integer primary keys.
    280280    related_fields_match_type = False
    281281    allow_sliced_subqueries = True
     282    has_select_for_update = False
     283    has_select_for_update_nowait = False
    282284
    283285    # Does the default test database allow multiple connections?
    284286    # Usually an indication that the test database is in-memory
     
    476478        """
    477479        return []
    478480
     481    def for_update_sql(self, nowait=False):
     482        """
     483        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
     484        """
     485        if nowait:
     486            return 'FOR UPDATE NOWAIT'
     487        else:
     488            return 'FOR UPDATE'
     489
    479490    def fulltext_search_sql(self, field_name):
    480491        """
    481492        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/django/db/backends/postgresql_psycopg2/base.py

     
    7070    requires_rollback_on_dirty_transaction = True
    7171    has_real_datatype = True
    7272    can_defer_constraint_checks = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
     75   
    7376
    7477class DatabaseWrapper(BaseDatabaseWrapper):
    7578    vendor = 'postgresql'
Back to Top