Ticket #2705: 2705-for_update-r15021.diff

File 2705-for_update-r15021.diff, 18.7 KB (added by Dan Fairs, 14 years ago)

Update to prevent spurious failure on MySQL MyISAM


    diff --git a/AUTHORS b/AUTHORS
    a b  
    164164    eriks@win.tue.nl
    165165    Tomáš Ehrlich <tomas.ehrlich@gmail.com>
    166166    Dirk Eschler <dirk.eschler@gmx.net>
     167    Dan Fairs <dan@fezconsulting.com>
    167168    Marc Fargas <telenieko@telenieko.com>
    168169    Szilveszter Farkas <szilveszter.farkas@gmail.com>
    169170    Grigory Fateyev <greg@dial.com.ru>
  • django/db/backends/__init__.py

    diff --git a/django/db/backends/__init__.py b/django/db/backends/__init__.py
    a b  
    103103    # integer primary keys.
    104104    related_fields_match_type = False
    105105    allow_sliced_subqueries = True
     106    has_select_for_update = False
     107    has_select_for_update_nowait = False
    107109    # Does the default test database allow multiple connections?
    108110    # Usually an indication that the test database is in-memory
    282284        """
    283285        return []
     287    def for_update_sql(self, nowait=False):
     288        """
     289        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
     290        """
     291        if nowait:
     292            return 'FOR UPDATE NOWAIT'
     293        else:
     294            return 'FOR UPDATE'
    285296    def fulltext_search_sql(self, field_name):
    286297        """
    287298        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/db/backends/mysql/base.py

    diff --git a/django/db/backends/mysql/base.py b/django/db/backends/mysql/base.py
    a b  
    124124    allows_group_by_pk = True
    125125    related_fields_match_type = True
    126126    allow_sliced_subqueries = False
     127    has_select_for_update = True
     128    has_select_for_update_nowait = False
    127129    supports_forward_references = False
    128130    supports_long_model_names = False
    129131    supports_microsecond_precision = False
  • django/db/backends/oracle/base.py

    diff --git a/django/db/backends/oracle/base.py b/django/db/backends/oracle/base.py
    a b  
    7070    needs_datetime_string_cast = False
    7171    interprets_empty_strings_as_nulls = True
    7272    uses_savepoints = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
    7375    can_return_id_from_insert = True
    7476    allow_sliced_subqueries = False
    7577    supports_subqueries_in_group_by = False
  • django/db/backends/postgresql_psycopg2/base.py

    diff --git a/django/db/backends/postgresql_psycopg2/base.py b/django/db/backends/postgresql_psycopg2/base.py
    a b  
    7070    requires_rollback_on_dirty_transaction = True
    7171    has_real_datatype = True
    7272    can_defer_constraint_checks = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
    7477class DatabaseOperations(PostgresqlDatabaseOperations):
    7578    def last_executed_query(self, cursor, sql, params):
  • django/db/models/manager.py

    diff --git a/django/db/models/manager.py b/django/db/models/manager.py
    a b  
    164164    def order_by(self, *args, **kwargs):
    165165        return self.get_query_set().order_by(*args, **kwargs)
     167    def select_for_update(self, *args, **kwargs):
     168        return self.get_query_set().select_for_update(*args, **kwargs)
    167170    def select_related(self, *args, **kwargs):
    168171        return self.get_query_set().select_related(*args, **kwargs)
  • django/db/models/query.py

    diff --git a/django/db/models/query.py b/django/db/models/query.py
    a b  
    432432        del_query._for_write = True
    434434        # Disable non-supported fields.
     435        del_query.query.select_for_update = False
    435436        del_query.query.select_related = False
    436437        del_query.query.clear_ordering()
    580581        else:
    581582            return self._filter_or_exclude(None, **filter_obj)
     584    def select_for_update(self, **kwargs):
     585        """
     586        Returns a new QuerySet instance that will select objects with a
     587        FOR UPDATE lock.
     588        """
     589        # Default to false for nowait
     590        nowait = kwargs.pop('nowait', False)
     591        obj = self._clone()
     592        obj.query.select_for_update = True
     593        obj.query.select_for_update_nowait = nowait
     594        return obj
    583596    def select_related(self, *fields, **kwargs):
    584597        """
    585598        Returns a new QuerySet instance that will select related objects.
  • django/db/models/sql/compiler.py

    diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py
    a b  
    11from django.core.exceptions import FieldError
    22from django.db import connections
     3from django.db import transaction
    34from django.db.backends.util import truncate_name
    45from django.db.models.sql.constants import *
    56from django.db.models.sql.datastructures import EmptyResultSet
    117118                        result.append('LIMIT %d' % val)
    118119                result.append('OFFSET %d' % self.query.low_mark)
     121        if self.query.select_for_update and self.connection.features.has_select_for_update:
     122            nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update
     123            result.append(self.connection.ops.for_update_sql(nowait=nowait))
    120125        return ' '.join(result), tuple(params)
    122127    def as_nested_sql(self):
    677682        resolve_columns = hasattr(self, 'resolve_columns')
    678683        fields = None
    679684        has_aggregate_select = bool(self.query.aggregate_select)
     685        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
     686        # a subsequent commit/rollback is executed, so any database locks
     687        # are released.
     688        if self.query.select_for_update and transaction.is_managed(self.using):
     689            transaction.set_dirty(self.using)
    680690        for rows in self.execute_sql(MULTI):
    681691            for row in rows:
    682692                if resolve_columns:
  • django/db/models/sql/query.py

    diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py
    a b  
    131131        self.order_by = []
    132132        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    133133        self.distinct = False
     134        self.select_for_update = False
     135        self.select_for_update_nowait = False
    134136        self.select_related = False
    135137        self.related_select_cols = []
    260262        obj.order_by = self.order_by[:]
    261263        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    262264        obj.distinct = self.distinct
     265        obj.select_for_update = self.select_for_update
     266        obj.select_for_update_nowait = self.select_for_update_nowait
    263267        obj.select_related = self.select_related
    264268        obj.related_select_cols = []
    265269        obj.aggregates = deepcopy(self.aggregates, memo=memo)
    367371        query.clear_ordering(True)
    368372        query.clear_limits()
     373        query.select_for_update = False
    369374        query.select_related = False
    370375        query.related_select_cols = []
    371376        query.related_select_fields = []
  • docs/ref/databases.txt

    diff --git a/docs/ref/databases.txt b/docs/ref/databases.txt
    a b  
    364364column types have a maximum length restriction of 255 characters, regardless
    365365of whether ``unique=True`` is specified or not.
     367Row locking with ``QuerySet.select_for_update()``
     370MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
     371statement. However, you may call the ``select_for_update()`` method of a
     372queryset with ``nowait=True``. In that case, the argument will be silently
     373discarded and the generated query will block until the requested lock can be
    367376.. _sqlite-notes:
    369378SQLite notes
  • docs/ref/models/querysets.txt

    diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt
    a b  
    975975    # queries the database with the 'backup' alias
    976976    >>> Entry.objects.using('backup')
     981.. method:: select_for_update(nowait=False)
     983.. versionadded:: 1.3
     985Returns a queryset that will lock rows until the end of the transaction,
     986generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
     988For example::
     990    entries = Entry.objects.select_for_update().filter(author=request.user)
     992All matched entries will be locked until the end of the transaction block,
     993meaning that other transactions will be prevented from changing or acquiring
     994locks on them.
     996Usually, if another transaction has already acquired a lock on one of the
     997selected rows, the query will block until the lock is released. If this is
     998not the behaviour you want, call ``select_for_update(nowait=True)``. This will
     999make the call non-blocking. If a conflicting lock is already acquired by
     1000another transaction, ``django.db.utils.DatabaseError`` will be raised when
     1001the queryset is evaluated.
     1003Note that using ``select_related`` will cause the current transaction to be set
     1004dirty, if under transaction management. This is to ensure that Django issues a
     1005``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
     1008Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
     1009database backends support ``select_for_update()`` but MySQL has no
     1010support for the ``nowait`` argument. Other backends will simply
     1011generate queries as if ``select_for_update()`` had not been used.
    9791013Methods that do not return QuerySets
    12531287the only restriction on the :class:`QuerySet` that is updated is that it can
    12541288only update columns in the model's main table. Filtering based on related
    12551289fields is still possible. You cannot call ``update()`` on a
    1256 :class:`QuerySet` that has had a slice taken or can otherwise no longer be 
     1290:class:`QuerySet` that has had a slice taken or can otherwise no longer be
    12591293For example, if you wanted to update all the entries in a particular blog
  • new file tests/modeltests/select_for_update/__init__.py

    diff --git a/tests/modeltests/select_for_update/__init__.py b/tests/modeltests/select_for_update/__init__.py
    new file mode 100644
    - +  
  • new file tests/modeltests/select_for_update/models.py

    diff --git a/tests/modeltests/select_for_update/models.py b/tests/modeltests/select_for_update/models.py
    new file mode 100644
    - +  
     1from django.db import models
     3class Person(models.Model):
     4    name = models.CharField(max_length=30)
  • new file tests/modeltests/select_for_update/tests.py

    diff --git a/tests/modeltests/select_for_update/tests.py b/tests/modeltests/select_for_update/tests.py
    new file mode 100644
    - +  
     1import time
     2from django.conf import settings
     3from django.db import transaction, connection
     4from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
     5from django.test import TransactionTestCase, skipUnlessDBFeature
     6from django.utils.functional import wraps
     7from django.utils import unittest
     9from models import Person
     12    import threading
     13    def requires_threading(func):
     14        return func
     15except ImportError:
     16    # Note we can't use dummy_threading here, as our tests will actually
     17    # block. We just have to skip the test completely.
     18    def requires_threading(func):
     19        @wraps(func)
     20        def wrapped(*args, **kw):
     21            raise unittest.SkipTest('threading required')
     23class SelectForUpdateTests(TransactionTestCase):
     25    def setUp(self):
     26        connection._rollback()
     27        connection._enter_transaction_management(True)
     28        self.new_connections = ConnectionHandler(settings.DATABASES)
     29        self.person = Person.objects.create(name='Reinhardt')
     31        # We need to set settings.DEBUG to True so we can capture
     32        # the output SQL to examine.
     33        self._old_debug = settings.DEBUG
     34        settings.DEBUG = True
     36    def tearDown(self):
     37        connection._leave_transaction_management(True)
     38        settings.DEBUG = self._old_debug
     39        try:
     40            self.end_blocking_transaction()
     41        except (DatabaseError, AttributeError):
     42            pass
     44    def start_blocking_transaction(self):
     45        # Start a blocking transaction. At some point,
     46        # end_blocking_transaction() should be called.
     47        self.new_connection = self.new_connections[DEFAULT_DB_ALIAS]
     48        self.new_connection._enter_transaction_management(True)
     49        self.cursor = self.new_connection.cursor()
     50        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
     51            'db_table': Person._meta.db_table,
     52            'for_update': self.new_connection.ops.for_update_sql(),
     53            }
     54        self.cursor.execute(sql, ())
     55        result = self.cursor.fetchone()
     57    def end_blocking_transaction(self):
     58        # Roll back the blocking transaction.
     59        self.new_connection._rollback()
     60        self.new_connection.close()
     61        self.new_connection._leave_transaction_management(True)
     63    def has_for_update_sql(self, tested_connection, nowait=False):
     64        # Examine the SQL that was executed to determine whether it
     65        # contains the 'SELECT..FOR UPDATE' stanza.
     66        for_update_sql = tested_connection.ops.for_update_sql(nowait)
     67        sql = tested_connection.queries[-1]['sql']
     68        return bool(sql.find(for_update_sql) > -1)
     70    def check_exc(self, exc):
     71        self.failUnless(isinstance(exc, DatabaseError))
     73    @skipUnlessDBFeature('has_select_for_update')
     74    def test_for_update_sql_generated(self):
     75        """
     76        Test that the backend's FOR UPDATE variant appears in
     77        generated SQL when select_for_update is invoked.
     78        """
     79        list(Person.objects.all().select_for_update())
     80        self.assertTrue(self.has_for_update_sql(connection))
     82    @skipUnlessDBFeature('has_select_for_update_nowait')
     83    def test_for_update_sql_generated_nowait(self):
     84        """
     85        Test that the backend's FOR UPDATE NOWAIT variant appears in
     86        generated SQL when select_for_update is invoked.
     87        """
     88        list(Person.objects.all().select_for_update(nowait=True))
     89        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
     91    @requires_threading
     92    @skipUnlessDBFeature('has_select_for_update_nowait')
     93    def test_nowait_raises_error_on_block(self):
     94        """
     95        If nowait is specified, we expect an error to be raised rather
     96        than blocking.
     97        """
     98        self.start_blocking_transaction()
     99        status = []
     100        thread = threading.Thread(
     101            target=self.run_select_for_update,
     102            args=(status,),
     103            kwargs={'nowait': True},
     104        )
     106        thread.start()
     107        time.sleep(1)
     108        thread.join()
     109        self.end_blocking_transaction()
     110        self.check_exc(status[-1])
     112    def run_select_for_update(self, status, nowait=False):
     113        status.append('started')
     114        try:
     115            connection._rollback()
     116            people = list(Person.objects.all().select_for_update(nowait=nowait))
     117            people[0].name = 'Fred'
     118            people[0].save()
     119            connection._commit()
     120        except DatabaseError, e:
     121            status.append(e)
     122        except Exception, e:
     123            raise
     125    @requires_threading
     126    @skipUnlessDBFeature('has_select_for_update')
     127    @skipUnlessDBFeature('supports_transactions')
     128    def test_block(self):
     129        """
     130        Check that a thread running a select_for_update that
     131        accesses rows being touched by a similar operation
     132        on another connection blocks correctly.
     133        """
     134        # First, let's start the transaction in our thread.
     135        self.start_blocking_transaction()
     137        # Now, try it again using the ORM's select_for_update
     138        # facility. Do this in a separate thread.
     139        status = []
     140        thread = threading.Thread(target=self.run_select_for_update, args=(status,))
     142        # The thread should immediately block, but we'll sleep
     143        # for a bit to make sure
     144        thread.start()
     145        sanity_count = 0
     146        while len(status) != 1 and sanity_count < 10:
     147            sanity_count += 1
     148            time.sleep(1)
     149        if sanity_count >= 10:
     150            raise ValueError, 'Thread did not run and block'
     152        # Check the person hasn't been updated. Since this isn't
     153        # using FOR UPDATE, it won't block.
     154        p = Person.objects.get(pk=self.person.pk)
     155        self.assertEqual('Reinhardt', p.name)
     157        # When we end our blocking transaction, our thread should
     158        # be able to continue.
     159        self.end_blocking_transaction()
     160        thread.join(5.0)
     162        # Check the thread has finished. Assuming it has, we should
     163        # find that it has updated the person's name.
     164        self.failIf(thread.isAlive())
     165        p = Person.objects.get(pk=self.person.pk)
     166        self.assertEqual('Fred', p.name)
     168    @requires_threading
     169    @skipUnlessDBFeature('has_select_for_update')
     170    def test_raw_lock_not_available(self):
     171        """
     172        Check that running a raw query which can't obtain a FOR UPDATE lock
     173        raises the correct exception
     174        """
     175        self.start_blocking_transaction()
     176        def raw(status):
     177            try:
     178                list(
     179                    Person.objects.raw(
     180                        'SELECT * FROM %s %s' % (
     181                            Person._meta.db_table,
     182                            connection.ops.for_update_sql(nowait=True)
     183                        )
     184                    )
     185                )
     186            except DatabaseError, e:
     187                status.append(e)
     188        status = []
     189        thread = threading.Thread(target=raw, kwargs={'status': status})
     190        thread.start()
     191        time.sleep(1)
     192        thread.join()
     193        self.end_blocking_transaction()
     194        self.check_exc(status[-1])
     196    @skipUnlessDBFeature('has_select_for_update')
     197    def test_transaction_dirty_managed(self):
     198        """ Check that a select_for_update sets the transaction to be
     199        dirty when executed under txn management. Setting the txn dirty
     200        means that it will be either committed or rolled back by Django,
     201        which will release any locks held by the SELECT FOR UPDATE.
     202        """
     203        transaction.enter_transaction_management(True)
     204        transaction.managed(True)
     205        try:
     206            people = list(Person.objects.select_for_update())
     207            self.assertTrue(transaction.is_dirty())
     208        finally:
     209            transaction.rollback()
     210            transaction.leave_transaction_management()
     212    @skipUnlessDBFeature('has_select_for_update')
     213    def test_transaction_not_dirty_unmanaged(self):
     214        """ If we're not under txn management, the txn will never be
     215        marked as dirty.
     216        """
     217        people = list(Person.objects.select_for_update())
     218        self.assertFalse(transaction.is_dirty())
Back to Top