Ticket #2705: 2705-for_update-r15013.diff

File 2705-for_update-r15013.diff, 18.6 KB (added by Ramiro Morales, 13 years ago)

Tweaks to Dan Fairs' for_update_r15007.diff

  • AUTHORS

    diff --git a/AUTHORS b/AUTHORS
    a b  
    164164    eriks@win.tue.nl
    165165    Tomáš Ehrlich <tomas.ehrlich@gmail.com>
    166166    Dirk Eschler <dirk.eschler@gmx.net>
     167    Dan Fairs <dan@fezconsulting.com>
    167168    Marc Fargas <telenieko@telenieko.com>
    168169    Szilveszter Farkas <szilveszter.farkas@gmail.com>
    169170    Grigory Fateyev <greg@dial.com.ru>
  • django/db/backends/__init__.py

    diff --git a/django/db/backends/__init__.py b/django/db/backends/__init__.py
    a b  
    103103    # integer primary keys.
    104104    related_fields_match_type = False
    105105    allow_sliced_subqueries = True
     106    has_select_for_update = False
     107    has_select_for_update_nowait = False
    106108
    107109    # Does the default test database allow multiple connections?
    108110    # Usually an indication that the test database is in-memory
     
    282284        """
    283285        return []
    284286
     287    def for_update_sql(self, nowait=False):
     288        """
     289        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
     290        """
     291        if nowait:
     292            return 'FOR UPDATE NOWAIT'
     293        else:
     294            return 'FOR UPDATE'
     295
    285296    def fulltext_search_sql(self, field_name):
    286297        """
    287298        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/db/backends/mysql/base.py

    diff --git a/django/db/backends/mysql/base.py b/django/db/backends/mysql/base.py
    a b  
    124124    allows_group_by_pk = True
    125125    related_fields_match_type = True
    126126    allow_sliced_subqueries = False
     127    has_select_for_update = True
     128    has_select_for_update_nowait = False
    127129    supports_forward_references = False
    128130    supports_long_model_names = False
    129131    supports_microsecond_precision = False
  • django/db/backends/oracle/base.py

    diff --git a/django/db/backends/oracle/base.py b/django/db/backends/oracle/base.py
    a b  
    7070    needs_datetime_string_cast = False
    7171    interprets_empty_strings_as_nulls = True
    7272    uses_savepoints = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
    7375    can_return_id_from_insert = True
    7476    allow_sliced_subqueries = False
    7577    supports_subqueries_in_group_by = False
  • django/db/backends/postgresql_psycopg2/base.py

    diff --git a/django/db/backends/postgresql_psycopg2/base.py b/django/db/backends/postgresql_psycopg2/base.py
    a b  
    7070    requires_rollback_on_dirty_transaction = True
    7171    has_real_datatype = True
    7272    can_defer_constraint_checks = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
     75   
    7376
    7477class DatabaseOperations(PostgresqlDatabaseOperations):
    7578    def last_executed_query(self, cursor, sql, params):
  • django/db/models/manager.py

    diff --git a/django/db/models/manager.py b/django/db/models/manager.py
    a b  
    164164    def order_by(self, *args, **kwargs):
    165165        return self.get_query_set().order_by(*args, **kwargs)
    166166
     167    def select_for_update(self, *args, **kwargs):
     168        return self.get_query_set().select_for_update(*args, **kwargs)
     169
    167170    def select_related(self, *args, **kwargs):
    168171        return self.get_query_set().select_related(*args, **kwargs)
    169172
  • django/db/models/query.py

    diff --git a/django/db/models/query.py b/django/db/models/query.py
    a b  
    432432        del_query._for_write = True
    433433
    434434        # Disable non-supported fields.
     435        del_query.query.select_for_update = False
    435436        del_query.query.select_related = False
    436437        del_query.query.clear_ordering()
    437438
     
    580581        else:
    581582            return self._filter_or_exclude(None, **filter_obj)
    582583
     584    def select_for_update(self, **kwargs):
     585        """
     586        Returns a new QuerySet instance that will select objects with a
     587        FOR UPDATE lock.
     588        """
     589        # Default to false for nowait
     590        nowait = kwargs.pop('nowait', False)
     591        obj = self._clone()
     592        obj.query.select_for_update = True
     593        obj.query.select_for_update_nowait = nowait
     594        return obj
     595
    583596    def select_related(self, *fields, **kwargs):
    584597        """
    585598        Returns a new QuerySet instance that will select related objects.
  • django/db/models/sql/compiler.py

    diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py
    a b  
    11from django.core.exceptions import FieldError
    22from django.db import connections
     3from django.db import transaction
    34from django.db.backends.util import truncate_name
    45from django.db.models.sql.constants import *
    56from django.db.models.sql.datastructures import EmptyResultSet
     
    117118                        result.append('LIMIT %d' % val)
    118119                result.append('OFFSET %d' % self.query.low_mark)
    119120
     121        if self.query.select_for_update and self.connection.features.has_select_for_update:
     122            nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update
     123            result.append(self.connection.ops.for_update_sql(nowait=nowait))
     124
    120125        return ' '.join(result), tuple(params)
    121126
    122127    def as_nested_sql(self):
     
    677682        resolve_columns = hasattr(self, 'resolve_columns')
    678683        fields = None
    679684        has_aggregate_select = bool(self.query.aggregate_select)
     685        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
     686        # a subsequent commit/rollback is executed, so any database locks
     687        # are released.
     688        if self.query.select_for_update and transaction.is_managed(self.using):
     689            transaction.set_dirty(self.using)
    680690        for rows in self.execute_sql(MULTI):
    681691            for row in rows:
    682692                if resolve_columns:
  • django/db/models/sql/query.py

    diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py
    a b  
    131131        self.order_by = []
    132132        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    133133        self.distinct = False
     134        self.select_for_update = False
     135        self.select_for_update_nowait = False
    134136        self.select_related = False
    135137        self.related_select_cols = []
    136138
     
    260262        obj.order_by = self.order_by[:]
    261263        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    262264        obj.distinct = self.distinct
     265        obj.select_for_update = self.select_for_update
     266        obj.select_for_update_nowait = self.select_for_update_nowait
    263267        obj.select_related = self.select_related
    264268        obj.related_select_cols = []
    265269        obj.aggregates = deepcopy(self.aggregates, memo=memo)
     
    366370
    367371        query.clear_ordering(True)
    368372        query.clear_limits()
     373        query.select_for_update = False
    369374        query.select_related = False
    370375        query.related_select_cols = []
    371376        query.related_select_fields = []
  • docs/ref/databases.txt

    diff --git a/docs/ref/databases.txt b/docs/ref/databases.txt
    a b  
    364364column types have a maximum length restriction of 255 characters, regardless
    365365of whether ``unique=True`` is specified or not.
    366366
     367Row locking with ``QuerySet.select_for_update()``
     368-------------------------------------------------
     369
     370MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
     371statement. However, you may call the ``select_for_update()`` method of a
     372queryset with ``nowait=True``. In that case, the argument will be silently
     373discarded and the generated query will block until the requested lock can be
     374acquired.
     375
    367376.. _sqlite-notes:
    368377
    369378SQLite notes
  • docs/ref/models/querysets.txt

    diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt
    a b  
    975975    # queries the database with the 'backup' alias
    976976    >>> Entry.objects.using('backup')
    977977
     978select_for_update
     979~~~~~~~~~~~~~~~~~
     980
     981.. method:: select_for_update(nowait=False)
     982
     983.. versionadded:: 1.3
     984
     985Returns a queryset that will lock rows until the end of the transaction,
     986generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
     987
     988For example::
     989
     990    entries = Entry.objects.select_for_update().filter(author=request.user)
     991
     992All matched entries will be locked until the end of the transaction block,
     993meaning that other transactions will be prevented from changing or acquiring
     994locks on them.
     995
     996Usually, if another transaction has already acquired a lock on one of the
     997selected rows, the query will block until the lock is released. If this is
     998not the behaviour you want, call ``select_for_update(nowait=True)``. This will
     999make the call non-blocking. If a conflicting lock is already acquired by
     1000another transaction, ``django.db.utils.DatabaseError`` will be raised when
     1001the queryset is evaluated.
     1002
     1003Note that using ``select_related`` will cause the current transaction to be set
     1004dirty, if under transaction management. This is to ensure that Django issues a
     1005``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
     1006UPDATE``.
     1007
     1008Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
     1009database backends support ``select_for_update()`` but MySQL has no
     1010support for the ``nowait`` argument. Other backends will simply
     1011generate queries as if ``select_for_update()`` had not been used.
    9781012
    9791013Methods that do not return QuerySets
    9801014------------------------------------
     
    12531287the only restriction on the :class:`QuerySet` that is updated is that it can
    12541288only update columns in the model's main table. Filtering based on related
    12551289fields is still possible. You cannot call ``update()`` on a
    1256 :class:`QuerySet` that has had a slice taken or can otherwise no longer be 
     1290:class:`QuerySet` that has had a slice taken or can otherwise no longer be
    12571291filtered.
    12581292
    12591293For example, if you wanted to update all the entries in a particular blog
  • new file tests/modeltests/select_for_update/__init__.py

    diff --git a/tests/modeltests/select_for_update/__init__.py b/tests/modeltests/select_for_update/__init__.py
    new file mode 100644
    - +  
     1#
  • new file tests/modeltests/select_for_update/models.py

    diff --git a/tests/modeltests/select_for_update/models.py b/tests/modeltests/select_for_update/models.py
    new file mode 100644
    - +  
     1from django.db import models
     2
     3class Person(models.Model):
     4    name = models.CharField(max_length=30)
  • new file tests/modeltests/select_for_update/tests.py

    diff --git a/tests/modeltests/select_for_update/tests.py b/tests/modeltests/select_for_update/tests.py
    new file mode 100644
    - +  
     1import time
     2from django.conf import settings
     3from django.db import transaction, connection
     4from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
     5from django.test import TransactionTestCase, skipUnlessDBFeature
     6from django.utils.functional import wraps
     7from django.utils import unittest
     8
     9from models import Person
     10
     11try:
     12    import threading
     13    def requires_threading(func):
     14        return func
     15except ImportError:
     16    # Note we can't use dummy_threading here, as our tests will actually
     17    # block. We just have to skip the test completely.
     18    def requires_threading(func):
     19        @wraps(func)
     20        def wrapped(*args, **kw):
     21            raise unittest.SkipTest('threading required')
     22
     23class SelectForUpdateTests(TransactionTestCase):
     24
     25    def setUp(self):
     26        connection._rollback()
     27        connection._enter_transaction_management(True)
     28        self.new_connections = ConnectionHandler(settings.DATABASES)
     29        self.person = Person.objects.create(name='Reinhardt')
     30
     31        # We need to set settings.DEBUG to True so we can capture
     32        # the output SQL to examine.
     33        self._old_debug = settings.DEBUG
     34        settings.DEBUG = True
     35
     36    def tearDown(self):
     37        connection._leave_transaction_management(True)
     38        settings.DEBUG = self._old_debug
     39        try:
     40            self.end_blocking_transaction()
     41        except (DatabaseError, AttributeError):
     42            pass
     43
     44    def start_blocking_transaction(self):
     45        # Start a blocking transaction. At some point,
     46        # end_blocking_transaction() should be called.
     47        self.new_connection = self.new_connections[DEFAULT_DB_ALIAS]
     48        self.new_connection._enter_transaction_management(True)
     49        self.cursor = self.new_connection.cursor()
     50        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
     51            'db_table': Person._meta.db_table,
     52            'for_update': self.new_connection.ops.for_update_sql(),
     53            }
     54        self.cursor.execute(sql, ())
     55        result = self.cursor.fetchone()
     56
     57    def end_blocking_transaction(self):
     58        # Roll back the blocking transaction.
     59        self.new_connection._rollback()
     60        self.new_connection.close()
     61        self.new_connection._leave_transaction_management(True)
     62
     63    def has_for_update_sql(self, tested_connection, nowait=False):
     64        # Examine the SQL that was executed to determine whether it
     65        # contains the 'SELECT..FOR UPDATE' stanza.
     66        for_update_sql = tested_connection.ops.for_update_sql(nowait)
     67        sql = tested_connection.queries[-1]['sql']
     68        return bool(sql.find(for_update_sql) > -1)
     69
     70    def check_exc(self, exc):
     71        self.failUnless(isinstance(exc, DatabaseError))
     72
     73    @skipUnlessDBFeature('has_select_for_update')
     74    def test_for_update_sql_generated(self):
     75        """
     76        Test that the backend's FOR UPDATE variant appears in
     77        generated SQL when select_for_update is invoked.
     78        """
     79        list(Person.objects.all().select_for_update())
     80        self.assertTrue(self.has_for_update_sql(connection))
     81
     82    @skipUnlessDBFeature('has_select_for_update_nowait')
     83    def test_for_update_sql_generated_nowait(self):
     84        """
     85        Test that the backend's FOR UPDATE NOWAIT variant appears in
     86        generated SQL when select_for_update is invoked.
     87        """
     88        list(Person.objects.all().select_for_update(nowait=True))
     89        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
     90
     91    @requires_threading
     92    @skipUnlessDBFeature('has_select_for_update_nowait')
     93    def test_nowait_raises_error_on_block(self):
     94        """
     95        If nowait is specified, we expect an error to be raised rather
     96        than blocking.
     97        """
     98        self.start_blocking_transaction()
     99        status = []
     100        thread = threading.Thread(
     101            target=self.run_select_for_update,
     102            args=(status,),
     103            kwargs={'nowait': True},
     104        )
     105
     106        thread.start()
     107        time.sleep(1)
     108        thread.join()
     109        self.end_blocking_transaction()
     110        self.check_exc(status[-1])
     111
     112    def run_select_for_update(self, status, nowait=False):
     113        status.append('started')
     114        try:
     115            connection._rollback()
     116            people = list(Person.objects.all().select_for_update(nowait=nowait))
     117            people[0].name = 'Fred'
     118            people[0].save()
     119            connection._commit()
     120        except DatabaseError, e:
     121            status.append(e)
     122        except Exception, e:
     123            raise
     124
     125    @requires_threading
     126    @skipUnlessDBFeature('has_select_for_update')
     127    def test_block(self):
     128        """
     129        Check that a thread running a select_for_update that
     130        accesses rows being touched by a similar operation
     131        on another connection blocks correctly.
     132        """
     133        # First, let's start the transaction in our thread.
     134        self.start_blocking_transaction()
     135
     136        # Now, try it again using the ORM's select_for_update
     137        # facility. Do this in a separate thread.
     138        status = []
     139        thread = threading.Thread(target=self.run_select_for_update, args=(status,))
     140
     141        # The thread should immediately block, but we'll sleep
     142        # for a bit to make sure
     143        thread.start()
     144        sanity_count = 0
     145        while len(status) != 1 and sanity_count < 10:
     146            sanity_count += 1
     147            time.sleep(1)
     148        if sanity_count >= 10:
     149            raise ValueError, 'Thread did not run and block'
     150
     151        # Check the person hasn't been updated. Since this isn't
     152        # using FOR UPDATE, it won't block.
     153        p = Person.objects.get(pk=self.person.pk)
     154        self.assertEqual('Reinhardt', p.name)
     155
     156        # When we end our blocking transaction, our thread should
     157        # be able to continue.
     158        self.end_blocking_transaction()
     159        thread.join(5.0)
     160
     161        # Check the thread has finished. Assuming it has, we should
     162        # find that it has updated the person's name.
     163        self.failIf(thread.isAlive())
     164        p = Person.objects.get(pk=self.person.pk)
     165        self.assertEqual('Fred', p.name)
     166
     167    @requires_threading
     168    @skipUnlessDBFeature('has_select_for_update')
     169    def test_raw_lock_not_available(self):
     170        """
     171        Check that running a raw query which can't obtain a FOR UPDATE lock
     172        raises the correct exception
     173        """
     174        self.start_blocking_transaction()
     175        def raw(status):
     176            try:
     177                list(
     178                    Person.objects.raw(
     179                        'SELECT * FROM %s %s' % (
     180                            Person._meta.db_table,
     181                            connection.ops.for_update_sql(nowait=True)
     182                        )
     183                    )
     184                )
     185            except DatabaseError, e:
     186                status.append(e)
     187        status = []
     188        thread = threading.Thread(target=raw, kwargs={'status': status})
     189        thread.start()
     190        time.sleep(1)
     191        thread.join()
     192        self.end_blocking_transaction()
     193        self.check_exc(status[-1])
     194
     195    @skipUnlessDBFeature('has_select_for_update')
     196    def test_transaction_dirty_managed(self):
     197        """ Check that a select_for_update sets the transaction to be
     198        dirty when executed under txn management. Setting the txn dirty
     199        means that it will be either committed or rolled back by Django,
     200        which will release any locks held by the SELECT FOR UPDATE.
     201        """
     202        transaction.enter_transaction_management(True)
     203        transaction.managed(True)
     204        try:
     205            people = list(Person.objects.select_for_update())
     206            self.assertTrue(transaction.is_dirty())
     207        finally:
     208            transaction.rollback()
     209            transaction.leave_transaction_management()
     210
     211    @skipUnlessDBFeature('has_select_for_update')
     212    def test_transaction_not_dirty_unmanaged(self):
     213        """ If we're not under txn management, the txn will never be
     214        marked as dirty.
     215        """
     216        people = list(Person.objects.select_for_update())
     217        self.assertFalse(transaction.is_dirty())
Back to Top