diff --git a/AUTHORS b/AUTHORS
--- a/AUTHORS
+++ b/AUTHORS
@@ -164,6 +164,7 @@
     eriks@win.tue.nl
     Tomáš Ehrlich <tomas.ehrlich@gmail.com>
     Dirk Eschler <dirk.eschler@gmx.net>
+    Dan Fairs <dan@fezconsulting.com>
     Marc Fargas <telenieko@telenieko.com>
     Szilveszter Farkas <szilveszter.farkas@gmail.com>
     Grigory Fateyev <greg@dial.com.ru>
diff --git a/django/db/backends/__init__.py b/django/db/backends/__init__.py
--- a/django/db/backends/__init__.py
+++ b/django/db/backends/__init__.py
@@ -103,6 +103,8 @@
     # integer primary keys.
     related_fields_match_type = False
     allow_sliced_subqueries = True
+    has_select_for_update = False
+    has_select_for_update_nowait = False
 
     # Does the default test database allow multiple connections?
     # Usually an indication that the test database is in-memory
@@ -282,6 +284,15 @@
         """
         return []
 
+    def for_update_sql(self, nowait=False):
+        """
+        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
+        """
+        if nowait:
+            return 'FOR UPDATE NOWAIT'
+        else:
+            return 'FOR UPDATE'
+
     def fulltext_search_sql(self, field_name):
         """
         Returns the SQL WHERE clause to use in order to perform a full-text
diff --git a/django/db/backends/mysql/base.py b/django/db/backends/mysql/base.py
--- a/django/db/backends/mysql/base.py
+++ b/django/db/backends/mysql/base.py
@@ -124,6 +124,8 @@
     allows_group_by_pk = True
     related_fields_match_type = True
     allow_sliced_subqueries = False
+    has_select_for_update = True
+    has_select_for_update_nowait = False
     supports_forward_references = False
     supports_long_model_names = False
     supports_microsecond_precision = False
diff --git a/django/db/backends/oracle/base.py b/django/db/backends/oracle/base.py
--- a/django/db/backends/oracle/base.py
+++ b/django/db/backends/oracle/base.py
@@ -70,6 +70,8 @@
     needs_datetime_string_cast = False
     interprets_empty_strings_as_nulls = True
     uses_savepoints = True
+    has_select_for_update = True 
+    has_select_for_update_nowait = True
     can_return_id_from_insert = True
     allow_sliced_subqueries = False
     supports_subqueries_in_group_by = False
diff --git a/django/db/backends/postgresql_psycopg2/base.py b/django/db/backends/postgresql_psycopg2/base.py
--- a/django/db/backends/postgresql_psycopg2/base.py
+++ b/django/db/backends/postgresql_psycopg2/base.py
@@ -70,6 +70,9 @@
     requires_rollback_on_dirty_transaction = True
     has_real_datatype = True
     can_defer_constraint_checks = True
+    has_select_for_update = True
+    has_select_for_update_nowait = True
+    
 
 class DatabaseOperations(PostgresqlDatabaseOperations):
     def last_executed_query(self, cursor, sql, params):
diff --git a/django/db/models/manager.py b/django/db/models/manager.py
--- a/django/db/models/manager.py
+++ b/django/db/models/manager.py
@@ -164,6 +164,9 @@
     def order_by(self, *args, **kwargs):
         return self.get_query_set().order_by(*args, **kwargs)
 
+    def select_for_update(self, *args, **kwargs):
+        return self.get_query_set().select_for_update(*args, **kwargs)
+
     def select_related(self, *args, **kwargs):
         return self.get_query_set().select_related(*args, **kwargs)
 
diff --git a/django/db/models/query.py b/django/db/models/query.py
--- a/django/db/models/query.py
+++ b/django/db/models/query.py
@@ -432,6 +432,7 @@
         del_query._for_write = True
 
         # Disable non-supported fields.
+        del_query.query.select_for_update = False
         del_query.query.select_related = False
         del_query.query.clear_ordering()
 
@@ -580,6 +581,18 @@
         else:
             return self._filter_or_exclude(None, **filter_obj)
 
+    def select_for_update(self, **kwargs): 
+        """ 
+        Returns a new QuerySet instance that will select objects with a 
+        FOR UPDATE lock. 
+        """ 
+        # Default to false for nowait 
+        nowait = kwargs.pop('nowait', False) 
+        obj = self._clone() 
+        obj.query.select_for_update = True 
+        obj.query.select_for_update_nowait = nowait 
+        return obj
+
     def select_related(self, *fields, **kwargs):
         """
         Returns a new QuerySet instance that will select related objects.
diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py
--- a/django/db/models/sql/compiler.py
+++ b/django/db/models/sql/compiler.py
@@ -1,5 +1,6 @@
 from django.core.exceptions import FieldError
 from django.db import connections
+from django.db import transaction
 from django.db.backends.util import truncate_name
 from django.db.models.sql.constants import *
 from django.db.models.sql.datastructures import EmptyResultSet
@@ -117,6 +118,10 @@
                         result.append('LIMIT %d' % val)
                 result.append('OFFSET %d' % self.query.low_mark)
 
+        if self.query.select_for_update and self.connection.features.has_select_for_update:
+            nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update
+            result.append(self.connection.ops.for_update_sql(nowait=nowait))
+
         return ' '.join(result), tuple(params)
 
     def as_nested_sql(self):
@@ -677,6 +682,11 @@
         resolve_columns = hasattr(self, 'resolve_columns')
         fields = None
         has_aggregate_select = bool(self.query.aggregate_select)
+        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
+        # a subsequent commit/rollback is executed, so any database locks
+        # are released.
+        if self.query.select_for_update and transaction.is_managed(self.using):
+            transaction.set_dirty(self.using)
         for rows in self.execute_sql(MULTI):
             for row in rows:
                 if resolve_columns:
diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py
--- a/django/db/models/sql/query.py
+++ b/django/db/models/sql/query.py
@@ -131,6 +131,8 @@
         self.order_by = []
         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
         self.distinct = False
+        self.select_for_update = False
+        self.select_for_update_nowait = False
         self.select_related = False
         self.related_select_cols = []
 
@@ -260,6 +262,8 @@
         obj.order_by = self.order_by[:]
         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
         obj.distinct = self.distinct
+        obj.select_for_update = self.select_for_update
+        obj.select_for_update_nowait = self.select_for_update_nowait
         obj.select_related = self.select_related
         obj.related_select_cols = []
         obj.aggregates = deepcopy(self.aggregates, memo=memo)
@@ -366,6 +370,7 @@
 
         query.clear_ordering(True)
         query.clear_limits()
+        query.select_for_update = False
         query.select_related = False
         query.related_select_cols = []
         query.related_select_fields = []
diff --git a/docs/ref/databases.txt b/docs/ref/databases.txt
--- a/docs/ref/databases.txt
+++ b/docs/ref/databases.txt
@@ -364,6 +364,15 @@
 column types have a maximum length restriction of 255 characters, regardless
 of whether ``unique=True`` is specified or not.
 
+Row locking with ``QuerySet.select_for_update()``
+-------------------------------------------------
+
+MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
+statement. However, you may call the ``select_for_update()`` method of a
+queryset with ``nowait=True``. In that case, the argument will be silently
+discarded and the generated query will block until the requested lock can be
+acquired.
+
 .. _sqlite-notes:
 
 SQLite notes
diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt
--- a/docs/ref/models/querysets.txt
+++ b/docs/ref/models/querysets.txt
@@ -975,6 +975,40 @@
     # queries the database with the 'backup' alias
     >>> Entry.objects.using('backup')
 
+select_for_update
+~~~~~~~~~~~~~~~~~
+
+.. method:: select_for_update(nowait=False)
+
+.. versionadded:: 1.3
+
+Returns a queryset that will lock rows until the end of the transaction,
+generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
+
+For example::
+
+    entries = Entry.objects.select_for_update().filter(author=request.user)
+
+All matched entries will be locked until the end of the transaction block,
+meaning that other transactions will be prevented from changing or acquiring
+locks on them.
+
+Usually, if another transaction has already acquired a lock on one of the
+selected rows, the query will block until the lock is released. If this is
+not the behaviour you want, call ``select_for_update(nowait=True)``. This will
+make the call non-blocking. If a conflicting lock is already acquired by
+another transaction, ``django.db.utils.DatabaseError`` will be raised when
+the queryset is evaluated.
+
+Note that using ``select_related`` will cause the current transaction to be set
+dirty, if under transaction management. This is to ensure that Django issues a
+``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
+UPDATE``.
+
+Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
+database backends support ``select_for_update()`` but MySQL has no
+support for the ``nowait`` argument. Other backends will simply
+generate queries as if ``select_for_update()`` had not been used.
 
 Methods that do not return QuerySets
 ------------------------------------
@@ -1253,7 +1287,7 @@
 the only restriction on the :class:`QuerySet` that is updated is that it can
 only update columns in the model's main table. Filtering based on related
 fields is still possible. You cannot call ``update()`` on a
-:class:`QuerySet` that has had a slice taken or can otherwise no longer be 
+:class:`QuerySet` that has had a slice taken or can otherwise no longer be
 filtered.
 
 For example, if you wanted to update all the entries in a particular blog
diff --git a/tests/modeltests/select_for_update/__init__.py b/tests/modeltests/select_for_update/__init__.py
new file mode 100644
--- /dev/null
+++ b/tests/modeltests/select_for_update/__init__.py
@@ -0,0 +1,1 @@
+#
diff --git a/tests/modeltests/select_for_update/models.py b/tests/modeltests/select_for_update/models.py
new file mode 100644
--- /dev/null
+++ b/tests/modeltests/select_for_update/models.py
@@ -0,0 +1,4 @@
+from django.db import models
+
+class Person(models.Model):
+    name = models.CharField(max_length=30)
diff --git a/tests/modeltests/select_for_update/tests.py b/tests/modeltests/select_for_update/tests.py
new file mode 100644
--- /dev/null
+++ b/tests/modeltests/select_for_update/tests.py
@@ -0,0 +1,218 @@
+import time
+from django.conf import settings
+from django.db import transaction, connection
+from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
+from django.test import TransactionTestCase, skipUnlessDBFeature
+from django.utils.functional import wraps
+from django.utils import unittest
+
+from models import Person
+
+try:
+    import threading
+    def requires_threading(func):
+        return func
+except ImportError:
+    # Note we can't use dummy_threading here, as our tests will actually
+    # block. We just have to skip the test completely.
+    def requires_threading(func):
+        @wraps(func)
+        def wrapped(*args, **kw):
+            raise unittest.SkipTest('threading required')
+
+class SelectForUpdateTests(TransactionTestCase):
+
+    def setUp(self):
+        connection._rollback()
+        connection._enter_transaction_management(True)
+        self.new_connections = ConnectionHandler(settings.DATABASES)
+        self.person = Person.objects.create(name='Reinhardt')
+
+        # We need to set settings.DEBUG to True so we can capture
+        # the output SQL to examine.
+        self._old_debug = settings.DEBUG
+        settings.DEBUG = True
+
+    def tearDown(self):
+        connection._leave_transaction_management(True)
+        settings.DEBUG = self._old_debug
+        try:
+            self.end_blocking_transaction()
+        except (DatabaseError, AttributeError):
+            pass
+
+    def start_blocking_transaction(self):
+        # Start a blocking transaction. At some point,
+        # end_blocking_transaction() should be called.
+        self.new_connection = self.new_connections[DEFAULT_DB_ALIAS]
+        self.new_connection._enter_transaction_management(True)
+        self.cursor = self.new_connection.cursor()
+        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
+            'db_table': Person._meta.db_table,
+            'for_update': self.new_connection.ops.for_update_sql(),
+            }
+        self.cursor.execute(sql, ())
+        result = self.cursor.fetchone()
+
+    def end_blocking_transaction(self):
+        # Roll back the blocking transaction.
+        self.new_connection._rollback()
+        self.new_connection.close()
+        self.new_connection._leave_transaction_management(True)
+
+    def has_for_update_sql(self, tested_connection, nowait=False):
+        # Examine the SQL that was executed to determine whether it
+        # contains the 'SELECT..FOR UPDATE' stanza.
+        for_update_sql = tested_connection.ops.for_update_sql(nowait)
+        sql = tested_connection.queries[-1]['sql']
+        return bool(sql.find(for_update_sql) > -1)
+
+    def check_exc(self, exc):
+        self.failUnless(isinstance(exc, DatabaseError))
+
+    @skipUnlessDBFeature('has_select_for_update')
+    def test_for_update_sql_generated(self):
+        """
+        Test that the backend's FOR UPDATE variant appears in
+        generated SQL when select_for_update is invoked.
+        """
+        list(Person.objects.all().select_for_update())
+        self.assertTrue(self.has_for_update_sql(connection))
+
+    @skipUnlessDBFeature('has_select_for_update_nowait')
+    def test_for_update_sql_generated_nowait(self):
+        """
+        Test that the backend's FOR UPDATE NOWAIT variant appears in
+        generated SQL when select_for_update is invoked.
+        """
+        list(Person.objects.all().select_for_update(nowait=True))
+        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
+
+    @requires_threading
+    @skipUnlessDBFeature('has_select_for_update_nowait')
+    def test_nowait_raises_error_on_block(self):
+        """
+        If nowait is specified, we expect an error to be raised rather
+        than blocking.
+        """
+        self.start_blocking_transaction()
+        status = []
+        thread = threading.Thread(
+            target=self.run_select_for_update,
+            args=(status,),
+            kwargs={'nowait': True},
+        )
+
+        thread.start()
+        time.sleep(1)
+        thread.join()
+        self.end_blocking_transaction()
+        self.check_exc(status[-1])
+
+    def run_select_for_update(self, status, nowait=False):
+        status.append('started')
+        try:
+            connection._rollback()
+            people = list(Person.objects.all().select_for_update(nowait=nowait))
+            people[0].name = 'Fred'
+            people[0].save()
+            connection._commit()
+        except DatabaseError, e:
+            status.append(e)
+        except Exception, e:
+            raise
+
+    @requires_threading
+    @skipUnlessDBFeature('has_select_for_update')
+    @skipUnlessDBFeature('supports_transactions')
+    def test_block(self):
+        """
+        Check that a thread running a select_for_update that
+        accesses rows being touched by a similar operation
+        on another connection blocks correctly.
+        """
+        # First, let's start the transaction in our thread.
+        self.start_blocking_transaction()
+
+        # Now, try it again using the ORM's select_for_update
+        # facility. Do this in a separate thread.
+        status = []
+        thread = threading.Thread(target=self.run_select_for_update, args=(status,))
+
+        # The thread should immediately block, but we'll sleep
+        # for a bit to make sure
+        thread.start()
+        sanity_count = 0
+        while len(status) != 1 and sanity_count < 10:
+            sanity_count += 1
+            time.sleep(1)
+        if sanity_count >= 10:
+            raise ValueError, 'Thread did not run and block'
+
+        # Check the person hasn't been updated. Since this isn't
+        # using FOR UPDATE, it won't block.
+        p = Person.objects.get(pk=self.person.pk)
+        self.assertEqual('Reinhardt', p.name)
+
+        # When we end our blocking transaction, our thread should
+        # be able to continue.
+        self.end_blocking_transaction()
+        thread.join(5.0)
+
+        # Check the thread has finished. Assuming it has, we should
+        # find that it has updated the person's name.
+        self.failIf(thread.isAlive())
+        p = Person.objects.get(pk=self.person.pk)
+        self.assertEqual('Fred', p.name)
+
+    @requires_threading
+    @skipUnlessDBFeature('has_select_for_update')
+    def test_raw_lock_not_available(self):
+        """
+        Check that running a raw query which can't obtain a FOR UPDATE lock
+        raises the correct exception
+        """
+        self.start_blocking_transaction()
+        def raw(status):
+            try:
+                list(
+                    Person.objects.raw(
+                        'SELECT * FROM %s %s' % (
+                            Person._meta.db_table,
+                            connection.ops.for_update_sql(nowait=True)
+                        )
+                    )
+                )
+            except DatabaseError, e:
+                status.append(e)
+        status = []
+        thread = threading.Thread(target=raw, kwargs={'status': status})
+        thread.start()
+        time.sleep(1)
+        thread.join()
+        self.end_blocking_transaction()
+        self.check_exc(status[-1])
+
+    @skipUnlessDBFeature('has_select_for_update')
+    def test_transaction_dirty_managed(self):
+        """ Check that a select_for_update sets the transaction to be
+        dirty when executed under txn management. Setting the txn dirty
+        means that it will be either committed or rolled back by Django,
+        which will release any locks held by the SELECT FOR UPDATE.
+        """
+        transaction.enter_transaction_management(True)
+        transaction.managed(True)
+        try:
+            people = list(Person.objects.select_for_update())
+            self.assertTrue(transaction.is_dirty())
+        finally:
+            transaction.rollback()
+            transaction.leave_transaction_management()
+
+    @skipUnlessDBFeature('has_select_for_update')
+    def test_transaction_not_dirty_unmanaged(self):
+        """ If we're not under txn management, the txn will never be
+        marked as dirty.
+        """
+        people = list(Person.objects.select_for_update())
+        self.assertFalse(transaction.is_dirty())
