Ticket #2705: for_update_tests_14778.diff

File for_update_tests_14778.diff, 20.0 KB (added by Dan Fairs, 13 years ago)

Updated version of patch with tests - really this time! (passed on MySQL, PostgreSQL, sqlite)

  • django/db/models/sql/compiler.py

     
    117117                        result.append('LIMIT %d' % val)
    118118                result.append('OFFSET %d' % self.query.low_mark)
    119119
     120        if self.query.select_for_update and self.connection.features.has_select_for_update:
     121            nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update
     122            result.append("%s" % self.connection.ops.for_update_sql(nowait=nowait))
     123
    120124        return ' '.join(result), tuple(params)
    121125
    122126    def as_nested_sql(self):
  • django/db/models/sql/query.py

     
    1111from django.utils.tree import Node
    1212from django.utils.datastructures import SortedDict
    1313from django.utils.encoding import force_unicode
    14 from django.db import connections, DEFAULT_DB_ALIAS
     14from django.db import connections, DEFAULT_DB_ALIAS, DatabaseError
    1515from django.db.models import signals
    1616from django.db.models.fields import FieldDoesNotExist
    1717from django.db.models.query_utils import select_related_descend, InvalidQuery
     
    2323    ExtraWhere, AND, OR)
    2424from django.core.exceptions import FieldError
    2525
    26 __all__ = ['Query', 'RawQuery']
     26__all__ = ['Query', 'RawQuery', 'LockNotAvailable']
    2727
     28
     29class LockNotAvailable(DatabaseError):
     30    '''
     31    Raised when a query fails because a lock was not available.
     32    '''
     33    pass
     34
     35
    2836class RawQuery(object):
    2937    """
    3038    A single raw SQL query
     
    8391        return "<RawQuery: %r>" % (self.sql % self.params)
    8492
    8593    def _execute_query(self):
    86         self.cursor = connections[self.using].cursor()
    87         self.cursor.execute(self.sql, self.params)
     94        connection = connections[self.using]
     95        self.cursor = connection.cursor()
     96        try:
     97            self.cursor.execute(self.sql, self.params)
     98        except DatabaseError, e:
     99            if connection.features.has_select_for_update_nowait and connection.ops.signals_lock_not_available(e):
     100                raise LockNotAvailable(*e.args)
     101            raise
    88102
    89 
    90103class Query(object):
    91104    """
    92105    A single SQL query.
     
    131144        self.order_by = []
    132145        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    133146        self.distinct = False
     147        self.select_for_update = False
     148        self.select_for_update_nowait = False
    134149        self.select_related = False
    135150        self.related_select_cols = []
    136151
     
    260275        obj.order_by = self.order_by[:]
    261276        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    262277        obj.distinct = self.distinct
     278        obj.select_for_update = self.select_for_update
     279        obj.select_for_update_nowait = self.select_for_update_nowait
    263280        obj.select_related = self.select_related
    264281        obj.related_select_cols = []
    265282        obj.aggregates = deepcopy(self.aggregates, memo=memo)
     
    366383
    367384        query.clear_ordering(True)
    368385        query.clear_limits()
     386        query.select_for_update = False
    369387        query.select_related = False
    370388        query.related_select_cols = []
    371389        query.related_select_fields = []
  • django/db/models/manager.py

     
    164164    def order_by(self, *args, **kwargs):
    165165        return self.get_query_set().order_by(*args, **kwargs)
    166166
     167    def select_for_update(self, *args, **kwargs):
     168        return self.get_query_set().select_for_update(*args, **kwargs)
     169
    167170    def select_related(self, *args, **kwargs):
    168171        return self.get_query_set().select_related(*args, **kwargs)
    169172
  • django/db/models/query.py

     
    432432        del_query._for_write = True
    433433
    434434        # Disable non-supported fields.
     435        del_query.query.select_for_update = False
    435436        del_query.query.select_related = False
    436437        del_query.query.clear_ordering()
    437438
     
    580581        else:
    581582            return self._filter_or_exclude(None, **filter_obj)
    582583
     584    def select_for_update(self, **kwargs):
     585        """
     586        Returns a new QuerySet instance that will select objects with a
     587        FOR UPDATE lock.
     588        """
     589        # Default to false for nowait
     590        nowait = kwargs.pop('nowait', False)
     591        obj = self._clone()
     592        obj.query.select_for_update = True
     593        obj.query.select_for_update_nowait = nowait
     594        return obj
     595
    583596    def select_related(self, *fields, **kwargs):
    584597        """
    585598        Returns a new QuerySet instance that will select related objects.
  • django/db/backends/mysql/base.py

     
    2323    raise ImproperlyConfigured("MySQLdb-1.2.1p2 or newer is required; you have %s" % Database.__version__)
    2424
    2525from MySQLdb.converters import conversions
    26 from MySQLdb.constants import FIELD_TYPE, FLAG, CLIENT
     26from MySQLdb.constants import FIELD_TYPE, FLAG, CLIENT, ER
    2727
    2828from django.db import utils
    2929from django.db.backends import *
     
    124124    allows_group_by_pk = True
    125125    related_fields_match_type = True
    126126    allow_sliced_subqueries = False
     127    has_select_for_update = True
     128    has_select_for_update_nowait = False
    127129    supports_forward_references = False
    128130    supports_long_model_names = False
    129131    supports_microsecond_precision = False
     
    135137
    136138class DatabaseOperations(BaseDatabaseOperations):
    137139    compiler_module = "django.db.backends.mysql.compiler"
     140    signals_deadlock = lambda self, e: e.args[0] == ER.LOCK_DEADLOCK
    138141
    139142    def date_extract_sql(self, lookup_type, field_name):
    140143        # http://dev.mysql.com/doc/mysql/en/date-and-time-functions.html
  • django/db/backends/oracle/base.py

     
    4848    needs_datetime_string_cast = False
    4949    interprets_empty_strings_as_nulls = True
    5050    uses_savepoints = True
     51    has_select_for_update = True
     52    has_select_for_update_nowait = True
    5153    can_return_id_from_insert = True
    5254    allow_sliced_subqueries = False
    5355    supports_subqueries_in_group_by = False
     
    285287                                           'column': column_name})
    286288        return output
    287289
     290    def signals_deadlock(self, exception):
     291        return exception.args[0].code == 60
     292
     293    def signals_lock_not_available(self, exception):
     294        return exception.args[0].code == 54
     295
    288296    def start_transaction_sql(self):
    289297        return ''
    290298
  • django/db/backends/__init__.py

     
    103103    # integer primary keys.
    104104    related_fields_match_type = False
    105105    allow_sliced_subqueries = True
     106    has_select_for_update = False
     107    has_select_for_update_nowait = False
    106108
    107109    # Does the default test database allow multiple connections?
    108110    # Usually an indication that the test database is in-memory
     
    282284        """
    283285        return []
    284286
     287    def for_update_sql(self, nowait=False):
     288        """
     289        Return FOR UPDATE SQL clause to lock row for update
     290        """
     291        if nowait:
     292            nowaitstr = ' NOWAIT'
     293        else:
     294            nowaitstr = ''
     295        return 'FOR UPDATE' + nowaitstr
     296
    285297    def fulltext_search_sql(self, field_name):
    286298        """
    287299        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/db/backends/postgresql_psycopg2/base.py

     
    1919try:
    2020    import psycopg2 as Database
    2121    import psycopg2.extensions
     22    from psycopg2 import errorcodes
    2223except ImportError, e:
    2324    from django.core.exceptions import ImproperlyConfigured
    2425    raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e)
     
    7071    requires_rollback_on_dirty_transaction = True
    7172    has_real_datatype = True
    7273    can_defer_constraint_checks = True
     74    has_select_for_update = True
     75    has_select_for_update_nowait = True
     76   
    7377
    7478class DatabaseOperations(PostgresqlDatabaseOperations):
     79
     80    def _pg_error(self, e, code):
     81        return getattr(e, 'pgcode', None) == code
     82
     83    def signals_deadlock(self, e):
     84        return self._pg_error(e, errorcodes.DEADLOCK_DETECTED)
     85       
     86    def signals_lock_not_available(self, e):
     87        return self._pg_error(e, errorcodes.LOCK_NOT_AVAILABLE)
     88   
    7589    def last_executed_query(self, cursor, sql, params):
    7690        # With psycopg2, cursor objects have a "query" attribute that is the
    7791        # exact query sent to the database. See docs here:
  • tests/modeltests/select_for_update/tests.py

     
     1import threading
     2import time
     3from django.conf import settings
     4from django.db import connection
     5from django.db import transaction, connection
     6from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
     7from django.test import TransactionTestCase, skipUnlessDBFeature
     8
     9from models import Person
     10
     11class SelectForUpdateTests(TransactionTestCase):
     12
     13    def setUp(self):
     14        connection._rollback()
     15        connection._enter_transaction_management(True)
     16        self.new_connections = ConnectionHandler(settings.DATABASES)
     17        self.person = Person.objects.create(name='Reinhardt')
     18
     19        # We need to set settings.DEBUG to True so we can capture
     20        # the output SQL to examine.
     21        self._old_debug = settings.DEBUG
     22        settings.DEBUG = True
     23
     24    def tearDown(self):
     25        connection._leave_transaction_management(True)
     26        settings.DEBUG = self._old_debug
     27        try:
     28            self.end_blocking_transaction()
     29        except (DatabaseError, AttributeError):
     30            pass
     31
     32    def start_blocking_transaction(self):
     33        self.new_connection = self.new_connections[DEFAULT_DB_ALIAS]
     34        self.new_connection._enter_transaction_management(True)
     35        self.cursor = self.new_connection.cursor()
     36        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
     37            'db_table': Person._meta.db_table,
     38            'for_update': self.new_connection.ops.for_update_sql(),
     39            }
     40        self.cursor.execute(sql, ())
     41        result = self.cursor.fetchone()
     42
     43    def end_blocking_transaction(self):
     44        self.new_connection._rollback()
     45        self.new_connection.close()
     46        self.new_connection._leave_transaction_management(True)
     47
     48    def has_for_update_sql(self, tested_connection, nowait=False):
     49        for_update_sql = tested_connection.ops.for_update_sql(nowait)
     50        sql = tested_connection.queries[-1]['sql']
     51        return bool(sql.find(for_update_sql) > -1)
     52
     53    def check_exc(self, exc):
     54        self.failUnless(isinstance(exc, DatabaseError))
     55
     56    @skipUnlessDBFeature('has_select_for_update')
     57    def test_for_update_sql_generated(self):
     58        """
     59        Test that the backend's FOR UPDATE variant appears in
     60        generated SQL when select_for_update is invoked.
     61        """
     62        list(Person.objects.all().select_for_update())
     63        self.assertTrue(self.has_for_update_sql(connection))
     64
     65    @skipUnlessDBFeature('has_select_for_update_nowait')
     66    def test_for_update_sql_generated_nowait(self):
     67        """
     68        Test that the backend's FOR UPDATE NOWAIT variant appears in
     69        generated SQL when select_for_update is invoked.
     70        """
     71        list(Person.objects.all().select_for_update(nowait=True))
     72        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
     73
     74    @skipUnlessDBFeature('has_select_for_update_nowait')
     75    def test_nowait_raises_error_on_block(self):
     76        """
     77        If nowait is specified, we expect an error to be raised rather
     78        than blocking.
     79        """
     80        self.start_blocking_transaction()
     81        status = []
     82        thread = threading.Thread(
     83            target=self.run_select_for_update,
     84            args=(status,),
     85            kwargs={'nowait': True},
     86        )
     87
     88        thread.start()
     89        time.sleep(1)
     90        thread.join()
     91        self.end_blocking_transaction()
     92        self.check_exc(status[-1])
     93
     94    def run_select_for_update(self, status, nowait=False):
     95        status.append('started')
     96        try:
     97            connection._rollback()
     98            people = list(Person.objects.all().select_for_update(nowait=nowait))
     99            people[0].name = 'Fred'
     100            people[0].save()
     101            connection._commit()
     102        except DatabaseError, e:
     103            status.append(e)
     104        except Exception, e:
     105            raise
     106
     107    @skipUnlessDBFeature('has_select_for_update')
     108    def test_block(self):
     109        """
     110        Check that a thread running a select_for_update that
     111        accesses rows being touched by a similar operation
     112        on another connection blocks correctly.
     113        """
     114        # First, let's start the transaction in our thread.
     115        self.start_blocking_transaction()
     116
     117        # Now, try it again using the ORM's select_for_update
     118        # facility. Do this in a separate thread.
     119        status = []
     120        thread = threading.Thread(target=self.run_select_for_update, args=(status,))
     121
     122        # The thread should immediately block, but we'll sleep
     123        # for a bit to make sure
     124        thread.start()
     125        sanity_count = 0
     126        while len(status) != 1 and sanity_count < 10:
     127            sanity_count += 1
     128            time.sleep(1)
     129        if sanity_count >= 10:
     130            raise ValueError, 'Thread did not run and block'
     131
     132        # Check the person hasn't been updated. Since this isn't
     133        # using FOR UPDATE, it won't block.
     134        p = Person.objects.get(pk=self.person.pk)
     135        self.assertEqual('Reinhardt', p.name)
     136
     137        # When we end our blocking transaction, our thread should
     138        # be able to continue.
     139        self.end_blocking_transaction()
     140        thread.join(5.0)
     141
     142        # Check the thread has finished. Assuming it has, we should
     143        # find that it has updated the person's name.
     144        self.failIf(thread.is_alive())
     145        p = Person.objects.get(pk=self.person.pk)
     146        self.assertEqual('Fred', p.name)
     147
     148    @skipUnlessDBFeature('has_select_for_update')
     149    def test_raw_lock_not_available(self):
     150        """
     151        Check that running a raw query which can't obtain a FOR UPDATE lock
     152        raises the correct exception
     153        """
     154        self.start_blocking_transaction()
     155        def raw(status):
     156            try:
     157                list(
     158                    Person.objects.raw(
     159                        'SELECT * FROM %s %s' % (
     160                            Person._meta.db_table,
     161                            connection.ops.for_update_sql(nowait=True)
     162                        )
     163                    )
     164                )
     165            except DatabaseError, e:
     166                status.append(e)
     167        status = []
     168        thread = threading.Thread(target=raw, kwargs={'status': status})
     169        thread.start()
     170        time.sleep(1)
     171        thread.join()
     172        self.end_blocking_transaction()
     173        self.check_exc(status[-1])
  • tests/modeltests/select_for_update/models.py

     
     1from django.db import models
     2
     3class Person(models.Model):
     4    name = models.CharField(max_length=30)
     5 No newline at end of file
  • AUTHORS

     
    522522    Gasper Zejn <zejn@kiberpipa.org>
    523523    Jarek Zgoda <jarek.zgoda@gmail.com>
    524524    Cheng Zhang
     525    Dan Fairs <dan@fezconsulting.com>
    525526
    526527A big THANK YOU goes to:
    527528
  • docs/ref/models/querysets.txt

     
    975975    # queries the database with the 'backup' alias
    976976    >>> Entry.objects.using('backup')
    977977
     978``select_for_update(nowait=False)``
     979~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     980         
     981Returns a queryset that will lock rows until the end of the transaction, 
     982generating a SELECT ... FOR UPDATE statement on supported databases.
     983         
     984For example::
     985         
     986    entries = Entry.objects.select_for_update().filter(author=request.user)
     987 
     988All matched entries will be locked until the end of the transaction block, 
     989meaning that other transactions will be prevented from changing or acquiring 
     990locks on them.
     991 
     992Usually, if another transaction has already acquired a lock on one of the 
     993selected rows, the query will block until the lock is released. If this is 
     994not the behaviour you want, call ``select_for_update(nowait=True)``. This will 
     995make the call non-blocking. If a conflicting lock is already acquired by 
     996another transaction, ``django.db.models.LockNotAvailable`` will be raised when 
     997the queryset is evaluated.
     998 
     999Using blocking locks on a database can lead to deadlocks. This occurs when two 
     1000concurrent transactions are both waiting on a lock the other transaction 
     1001already holds. To deal with deadlocks, wrap your views that use 
     1002``select_for_update(nowait=False)`` with the 
     1003``django.views.decorators.deadlock.handle_deadlocks`` decorator. 
     1004 
     1005For example::
     1006 
     1007    from django.db import transaction
     1008    from django.views.decorators.deadlock import handle_deadlocks
     1009 
     1010    @handle_deadlocks(max_retries=2)
     1011    @transaction.commit_on_success
     1012    def my_view(request):
     1013        ...
     1014 
     1015If the database engine detects a deadlock involving ``my_view`` and decides 
     1016to abort its transaction, it will be automatically retried. If deadlocks keep 
     1017occurring after two repeated attempts, 
     1018``django.views.decorators.DeadlockError`` will be raised, which can be 
     1019propagated to the user or handled in a middleware.
     1020 
     1021Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
     1022database backends support ``select_for_update()`` but MySQL has no
     1023support for the ``nowait`` argument. Other backends will simply
     1024generate queries as if ``select_for_update()`` had not been used.
    9781025
    9791026Methods that do not return QuerySets
    9801027------------------------------------
  • docs/ref/databases.txt

     
    362362column types have a maximum length restriction of 255 characters, regardless
    363363of whether ``unique=True`` is specified or not.
    364364
     365Row locking with ``QuerySet.select_for_update()``
     366-------------------------------------------------
     367 
     368MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 
     369statement. However, you may call the ``select_for_update()`` method of a 
     370queryset with ``nowait=True``. In that case, the argument will be silently 
     371discarded and the generated query will block until the requested lock can be 
     372acquired.
     373
    365374.. _sqlite-notes:
    366375
    367376SQLite notes
Back to Top