Ticket #2705: for_update-1.0.1-v2.diff

File for_update-1.0.1-v2.diff, 18.7 KB (added by Sebastian Bauer, 16 years ago)

with all

  • django/db/models/sql/query.py

     
    1212from django.utils.tree import Node
    1313from django.utils.datastructures import SortedDict
    1414from django.utils.encoding import force_unicode
    15 from django.db import connection
     15from django.db import connection, DatabaseError
    1616from django.db.models import signals
    1717from django.db.models.fields import FieldDoesNotExist
    1818from django.db.models.query_utils import select_related_descend
     
    2727except NameError:
    2828    from sets import Set as set     # Python 2.3 fallback
    2929
    30 __all__ = ['Query', 'BaseQuery']
     30__all__ = ['Query', 'BaseQuery', 'LockNotAvailable']
    3131
     32class LockNotAvailable(DatabaseError):
     33    '''
     34    Raised when a query fails because a lock was not available.
     35    '''
     36    pass
     37
    3238class BaseQuery(object):
    3339    """
    3440    A single SQL query.
     
    7177        self.order_by = []
    7278        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    7379        self.distinct = False
     80        self.select_for_update = False
     81        self.select_for_update_nowait = False
    7482        self.select_related = False
    7583        self.related_select_cols = []
    7684
     
    179187        obj.order_by = self.order_by[:]
    180188        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    181189        obj.distinct = self.distinct
     190        obj.select_for_update = self.select_for_update
     191        obj.select_for_update_nowait = self.select_for_update_nowait
    182192        obj.select_related = self.select_related
    183193        obj.related_select_cols = []
    184194        obj.max_depth = self.max_depth
     
    225235        obj = self.clone()
    226236        obj.clear_ordering(True)
    227237        obj.clear_limits()
     238        obj.select_for_update = False
    228239        obj.select_related = False
    229240        obj.related_select_cols = []
    230241        obj.related_select_fields = []
     
    310321                        result.append('LIMIT %d' % val)
    311322                result.append('OFFSET %d' % self.low_mark)
    312323
     324        if self.select_for_update and self.connection.features.has_select_for_update:
     325            nowait = self.select_for_update_nowait and self.connection.features.has_select_for_update
     326            result.append("%s" % self.connection.ops.for_update_sql(nowait=nowait))
     327
    313328        params.extend(self.extra_params)
    314329        return ' '.join(result), tuple(params)
    315330
     
    17311746                return
    17321747
    17331748        cursor = self.connection.cursor()
    1734         cursor.execute(sql, params)
     1749        try:
     1750            cursor.execute(sql, params)
     1751        except DatabaseError, e:
     1752            if self.connection.features.has_select_for_update_nowait and self.connection.ops.signals_lock_not_available(e):
     1753                raise LockNotAvailable(*e.args)
     1754            raise
    17351755
    17361756        if not result_type:
    17371757            return cursor
  • django/db/models/manager.py

     
    119119    def order_by(self, *args, **kwargs):
    120120        return self.get_query_set().order_by(*args, **kwargs)
    121121
     122    def select_for_update(self, *args, **kwargs):
     123        return self.get_query_set().select_for_update(*args, **kwargs)
     124       
    122125    def select_related(self, *args, **kwargs):
    123126        return self.get_query_set().select_related(*args, **kwargs)
    124127
  • django/db/models/__init__.py

     
    99from django.db.models.fields.subclassing import SubfieldBase
    1010from django.db.models.fields.files import FileField, ImageField
    1111from django.db.models.fields.related import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel
     12from django.db.models.sql.query import LockNotAvailable
    1213from django.db.models import signals
    1314
    1415# Admin stages.
  • django/db/models/query.py

     
    385385        del_query = self._clone()
    386386
    387387        # Disable non-supported fields.
     388        del_query.query.select_for_update = False
    388389        del_query.query.select_related = False
    389390        del_query.query.clear_ordering()
    390391
     
    524525        else:
    525526            return self._filter_or_exclude(None, **filter_obj)
    526527
     528    def select_for_update(self, **kwargs):
     529        """
     530        Returns a new QuerySet instance that will select objects with a
     531        FOR UPDATE lock.
     532        """
     533        # Default to false for nowait
     534        nowait = kwargs.pop('nowait', False)
     535        obj = self._clone()
     536        obj.query.select_for_update = True
     537        obj.query.select_for_update_nowait = nowait
     538        return obj
     539
    527540    def select_related(self, *fields, **kwargs):
    528541        """
    529542        Returns a new QuerySet instance that will select related objects.
  • django/db/backends/mysql/base.py

     
    2222    raise ImproperlyConfigured("MySQLdb-1.2.1p2 or newer is required; you have %s" % Database.__version__)
    2323
    2424from MySQLdb.converters import conversions
    25 from MySQLdb.constants import FIELD_TYPE, FLAG
     25from MySQLdb.constants import FIELD_TYPE, FLAG, ER
    2626
    2727from django.db.backends import *
    2828from django.db.backends.mysql.client import DatabaseClient
     
    111111    empty_fetchmany_value = ()
    112112    update_can_self_select = False
    113113    related_fields_match_type = True
     114    has_select_for_update = True
     115    has_select_for_update_nowait = False
    114116
    115117class DatabaseOperations(BaseDatabaseOperations):
    116118    def date_extract_sql(self, lookup_type, field_name):
     
    193195        # MySQL doesn't support microseconds
    194196        return unicode(value.replace(microsecond=0))
    195197
     198    signals_deadlock = lambda self, e: e.args[0] == ER.LOCK_DEADLOCK
     199
    196200    def year_lookup_bounds(self, value):
    197201        # Again, no microseconds
    198202        first = '%s-01-01 00:00:00'
  • django/db/backends/__init__.py

     
    7474    # If True, don't use integer foreign keys referring to, e.g., positive
    7575    # integer primary keys.
    7676    related_fields_match_type = False
     77    has_select_for_update = False
     78    has_select_for_update_nowait = False
    7779
    7880class BaseDatabaseOperations(object):
    7981    """
     
    143145        """
    144146        return '%s'
    145147
     148    def for_update_sql(self, nowait=False):
     149        """
     150        Return FOR UPDATE SQL clause to lock row for update
     151        """
     152        if nowait:
     153            nowaitstr = ' NOWAIT'
     154        else:
     155            nowaitstr = ''
     156        return 'FOR UPDATE' + nowaitstr
     157
    146158    def fulltext_search_sql(self, field_name):
    147159        """
    148160        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/db/backends/postgresql_psycopg2/base.py

     
    1515try:
    1616    import psycopg2 as Database
    1717    import psycopg2.extensions
     18    from psycopg2 import errorcodes
    1819except ImportError, e:
    1920    from django.core.exceptions import ImproperlyConfigured
    2021    raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e)
     
    2930class DatabaseFeatures(BaseDatabaseFeatures):
    3031    needs_datetime_string_cast = False
    3132    uses_savepoints = True
     33    has_select_for_update = True
     34    has_select_for_update_nowait = True
    3235
    3336class DatabaseOperations(PostgresqlDatabaseOperations):
    3437    def last_executed_query(self, cursor, sql, params):
     
    3740        # http://www.initd.org/tracker/psycopg/wiki/psycopg2_documentation#postgresql-status-message-and-executed-query
    3841        return cursor.query
    3942
     43    signals_deadlock = lambda self, e: e.pgcode == errorcodes.DEADLOCK_DETECTED
     44
     45    signals_lock_not_available = lambda self, e: e.pgcode == errorcodes.LOCK_NOT_AVAILABLE
     46   
     47
    4048class DatabaseWrapper(BaseDatabaseWrapper):
    4149    operators = {
    4250        'exact': '= %s',
  • django/views/decorators/deadlock.py

     
     1"""
     2Decorators for deadlock handling.
     3"""
     4import sys
     5try:
     6    from functools import wraps
     7except ImportError:
     8    from django.utils.functional import wraps  # Python 2.3, 2.4 fallback.
     9
     10from django.db import transaction, connection, DatabaseError
     11
     12class DeadlockError(Exception):
     13    """
     14    Thrown by a view decorated by handle_deadlock(max_retries) when a deadlock
     15    has been detected and the view won't be called again to retry the aborted
     16    transaction.
     17    """
     18    pass
     19
     20def handle_deadlocks(max_retries=2):
     21    """
     22    Decorator to retry a view when a database deadlock is detected.
     23
     24    When there are no retries left, raises DeadlockError with the traceback of
     25    the original, database backend-specific exception.
     26
     27    Views using querysets constructed with select_for_update() should use this
     28    decorator. If the backend does not support locking and/or deadlock
     29    handling, this doesn't do anything.
     30    """
     31    if connection.features.has_select_for_update:
     32        signals_deadlock = connection.ops.signals_deadlock
     33    else:
     34        return lambda f: f
     35    def decorator(func):
     36        def inner(*args, **kwargs):
     37            retries = 0
     38            while 1:
     39                try:
     40                    return func(*args, **kwargs)
     41                except DatabaseError, e:
     42                    if signals_deadlock(e):
     43                        if retries == max_retries:
     44                            raise DeadlockError, 'Deadlock detected', sys.exc_info()[2]
     45                        retries += 1
     46                        # Rollback needed by PostgreSQL
     47                        transaction.rollback()
     48                        transaction.set_clean()
     49                        continue
     50                    raise
     51        return wraps(func)(inner)
     52    return decorator
     53
  • tests/regressiontests/select_for_update/__init__.py

     
     1
     2
  • tests/regressiontests/select_for_update/tests.py

     
     1import time
     2import threading
     3from unittest import TestCase
     4
     5from django.conf import settings
     6from django.db import transaction, connection
     7from django.db.models import LockNotAvailable
     8from django.views.decorators.deadlock import DeadlockError, handle_deadlocks
     9
     10from regressiontests.select_for_update.models import Tag
     11
     12class SelectForUpdateTests(TestCase):
     13
     14    def setUp(self):
     15        Tag.objects.create(name='1')
     16        Tag.objects.create(name='2')
     17
     18    def test_basics(self):
     19        def test():
     20            t = Tag(name='update')
     21            t.save()
     22            transaction.commit()
     23            tfound = Tag.objects.select_for_update().get(pk=t.id)
     24            tfound.name = 'update2'
     25            tfound.save()
     26            transaction.commit()
     27            tfound = Tag.objects.select_for_update().get(pk=t.id)
     28            tfound.delete()
     29            transaction.commit()
     30        test = transaction.commit_manually(test)
     31        test()
     32
     33    def test_backend_features(self):
     34        if settings.DATABASE_ENGINE == 'postgresql_psycopg2':
     35            self.failUnless(hasattr(connection.ops, 'signals_deadlock'))
     36            self.failUnless(hasattr(connection.ops, 'signals_lock_not_available'))
     37        elif settings.DATABASE_ENGINE == 'mysql':
     38            self.failUnless(hasattr(connection.ops, 'signals_deadlock'))
     39   
     40    def test_deadlock(self):
     41        '''
     42        This test will fail on MySQL if the storage engine is not InnoDB.
     43        '''
     44        # Don't look for deadlocks if the backend doesn't support SELECT FOR UPDATE
     45        if not connection.features.has_select_for_update:
     46            return
     47        def test(max_retries):
     48            vars = {0: None, 1: None}
     49            def view0():
     50                t1 = Tag.objects.select_for_update().get(pk=1)
     51                time.sleep(1)
     52                t2 = Tag.objects.select_for_update().get(pk=2)
     53                transaction.commit()
     54            view0 = handle_deadlocks(max_retries=max_retries)(transaction.commit_manually(view0))
     55            def view1():
     56                t2 = Tag.objects.select_for_update().get(pk=2)
     57                time.sleep(1)
     58                t1 = Tag.objects.select_for_update().get(pk=1)
     59                transaction.commit()
     60            view1 = handle_deadlocks(max_retries=max_retries)(transaction.commit_manually(view1))
     61            def thread0(vars):
     62                try:
     63                    view0()
     64                except Exception, e:
     65                    vars[0] = e
     66            def thread1(vars):
     67                try:
     68                    view1()
     69                except Exception, e:
     70                    vars[1] = e
     71            t0 = threading.Thread(target=thread0, args=(vars,))
     72            t1 = threading.Thread(target=thread1, args=(vars,))
     73            t0.start()
     74            t1.start()
     75            t0.join()
     76            t1.join()
     77            return vars[0], vars[1]
     78        # Make a deadlock and don't retry the aborted transaction
     79        # We are expecting a DeadlockError
     80        e0, e1 = test(0)
     81        self.assertEqual(e0 or e1, e1 or e0)
     82        self.assert_(isinstance(e0 or e1, DeadlockError))
     83        # Make a deadlock and retry the aborted transaction
     84        # We expect no errors
     85        e0, e1 = test(1)
     86        self.assertEqual(e0 or e1, None)
     87
     88    def test_nowait(self):
     89        if not connection.features.has_select_for_update_nowait:
     90            return
     91        def view():
     92            try:
     93                t1 = Tag.objects.select_for_update(nowait=True).get(pk=1)
     94                time.sleep(1)
     95            finally:
     96                transaction.rollback()
     97        view = transaction.commit_manually(view)
     98        t = threading.Thread(target=view)
     99        t.start()
     100        time.sleep(.25)
     101        try:
     102            view()
     103        except LockNotAvailable:
     104            pass
     105        else:
     106            self.fail('Expected view to raise LockNotAvailable')
     107
  • tests/regressiontests/select_for_update/models.py

     
     1from django.db import models
     2
     3class Tag(models.Model):
     4    name = models.CharField(max_length=10)
     5    parent = models.ForeignKey('self', blank=True, null=True,
     6            related_name='children')
     7
     8    class Meta:
     9        ordering = ['name']
     10
     11    def __unicode__(self):
     12        return self.name
     13
  • docs/ref/models/querysets.txt

     
    726726
    727727        Entry.objects.extra(where=['headline=%s'], params=['Lennon'])
    728728
     729``select_for_update(nowait=False)``
     730~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     731
     732Returns a queryset that will lock rows until the end of the transaction,
     733generating a SELECT ... FOR UPDATE statement on supported databases.
     734
     735For example::
     736
     737    entries = Entry.objects.select_for_update().filter(author=request.user)
     738
     739All matched entries will be locked until the end of the transaction block,
     740meaning that other transactions will be prevented from changing or acquiring
     741locks on them.
     742
     743Usually, if another transaction has already acquired a lock on one of the
     744selected rows, the query will block until the lock is released. If this is
     745not the behaviour you want, call ``select_for_update(nowait=True)``. This will
     746make the call non-blocking. If a conflicting lock is already acquired by
     747another transaction, ``django.db.models.LockNotAvailable`` will be raised when
     748the queryset is evaluated.
     749
     750Using blocking locks on a database can lead to deadlocks. This occurs when two
     751concurrent transactions are both waiting on a lock the other transaction
     752already holds. To deal with deadlocks, wrap your views that use
     753``select_for_update(nowait=False)`` with the
     754``django.views.decorators.deadlock.handle_deadlocks`` decorator.
     755
     756For example::
     757
     758    from django.db import transaction
     759    from django.views.decorators.deadlock import handle_deadlocks
     760
     761    @handle_deadlocks(max_retries=2)
     762    @transaction.commit_on_success
     763    def my_view(request):
     764        ...
     765
     766If the database engine detects a deadlock involving ``my_view`` and decides
     767to abort its transaction, it will be automatically retried. If deadlocks keep
     768occurring after two repeated attempts,
     769``django.views.decorators.DeadlockError`` will be raised, which can be
     770propagated to the user or handled in a middleware.
     771
     772Currently the ``postgresql_psycopg2`` and ``mysql`` database backends
     773support ``select_for_update()`` but MySQL has no support for the ``nowait``
     774argument. Other backends will simply generate queries as if
     775``select_for_update()`` had not been used.
     776
    729777QuerySet methods that do not return QuerySets
    730778---------------------------------------------
    731779
  • docs/ref/databases.txt

     
    244244matter unless you're printing out the field values and are expecting to see
    245245``True`` and ``False.``.
    246246
     247Row locking with ``QuerySet.select_for_update()``
     248-------------------------------------------------
     249
     250MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE
     251statement. However, you may call the ``select_for_update()`` method of a
     252queryset with ``nowait=True``. In that case, the argument will be silently
     253discarded and the generated query will block until the requested lock can be
     254acquired.
     255
    247256.. _sqlite-notes:
    248257
    249258SQLite notes
Back to Top