Ticket #10154: 10154-r14446.diff

File 10154-r14446.diff, 19.9 KB (added by Karen Tracey, 13 years ago)
  • django/db/models/expressions.py

     
    1 from datetime import datetime
     1import datetime
    22
    33from django.utils import tree
    44from django.utils.copycompat import deepcopy
     
    2626        super(ExpressionNode, self).__init__(children, connector, negated)
    2727
    2828    def _combine(self, other, connector, reversed, node=None):
     29        if isinstance(other, datetime.timedelta):
     30            return DateModifierNode([self, other], connector)
     31
    2932        if reversed:
    3033            obj = ExpressionNode([other], connector)
    3134            obj.add(node or self, connector)
     
    111114
    112115    def evaluate(self, evaluator, qn, connection):
    113116        return evaluator.evaluate_leaf(self, qn, connection)
     117
     118class DateModifierNode(ExpressionNode):
     119    """
     120    Node that implements the following syntax:
     121    filter(end_date__gt=F('start_date') + datetime.timedelta(days=3, seconds=200))
     122
     123    which translates into:
     124    POSTGRES:
     125        WHERE end_date > (start_date + INTERVAL '3 days 200 seconds')
     126
     127    MYSQL:
     128        WHERE end_date > (start_date + INTERVAL '3 0:0:200:0' DAY_MICROSECOND)
     129
     130    ORACLE:
     131        WHERE end_date > (start_date + INTERVAL '3 00:03:20.000000' DAY(1) TO SECOND(6))
     132
     133    SQLITE:
     134        WHERE end_date > django_format_dtdelta(start_date, "+" "3", "200", "0")
     135        (A custom function is used in order to preserve six digits of fractional
     136        second information on sqlite, and to format both date and datetime values.)
     137
     138    Note that microsecond comparisons are not well supported with MySQL, since
     139    MySQL does not store microsecond information.
     140
     141    Only adding and subtracting timedeltas is supported, attempts to use other
     142    operations raise a TypeError.
     143    """
     144    def __init__(self, children, connector, negated=False):
     145        if len(children) != 2:
     146            raise TypeError('Must specify a node and a timedelta.')
     147        if not isinstance(children[1], datetime.timedelta):
     148            raise TypeError('Second child must be a timedelta.')
     149        if connector not in (self.ADD, self.SUB):
     150            raise TypeError('Connector must be + or -, not %s' % connector)
     151        super(DateModifierNode, self).__init__(children, connector, negated)
     152
     153    def evaluate(self, evaluator, qn, connection):
     154        timedelta = self.children.pop()
     155        sql, params = evaluator.evaluate_node(self, qn, connection)
     156
     157        if timedelta.days == 0 and timedelta.seconds == 0 and \
     158                timedelta.microseconds == 0:
     159            return sql, params
     160
     161        return connection.ops.date_interval_sql(sql, self.connector, timedelta), params
  • django/db/backends/postgresql/operations.py

     
    2727        else:
    2828            return "EXTRACT('%s' FROM %s)" % (lookup_type, field_name)
    2929
     30    def date_interval_sql(self, sql, connector, timedelta):
     31        """
     32        implements the interval functionality for expressions
     33        format for Postgres:
     34            (datefield + interval '3 days 200 seconds 5 microseconds')
     35        """
     36        modifiers = []
     37        if timedelta.days:
     38            modifiers.append(u'%s days' % timedelta.days)
     39        if timedelta.seconds:
     40            modifiers.append(u'%s seconds' % timedelta.seconds)
     41        if timedelta.microseconds:
     42            modifiers.append(u'%s microseconds' % timedelta.microseconds)
     43        mods = u' '.join(modifiers)
     44        conn = u' %s ' % connector
     45        return u'(%s)' % conn.join([sql, u'interval \'%s\'' % mods])
     46
    3047    def date_trunc_sql(self, lookup_type, field_name):
    3148        # http://www.postgresql.org/docs/8.0/static/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC
    3249        return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name)
  • django/db/backends/sqlite3/base.py

     
    99
    1010import re
    1111import sys
     12import datetime
    1213
    1314from django.db import utils
    1415from django.db.backends import *
     
    9091        # cause a collision with a field name).
    9192        return "django_extract('%s', %s)" % (lookup_type.lower(), field_name)
    9293
     94    def date_interval_sql(self, sql, connector, timedelta):
     95        # It would be more straightforward if we could use the sqlite strftime
     96        # function, but it does not allow for keeping six digits of fractional
     97        # second information, nor does it allow for formatting date and datetime
     98        # values differently. So instead we register our own function that
     99        # formats the datetime combined with the delta in a manner suitable
     100        # for comparisons.
     101        return  u'django_format_dtdelta(%s, "%s", "%d", "%d", "%d")' % (sql,
     102            connector, timedelta.days, timedelta.seconds, timedelta.microseconds)
     103
    93104    def date_trunc_sql(self, lookup_type, field_name):
    94105        # sqlite doesn't support DATE_TRUNC, so we fake it with a user-defined
    95106        # function django_date_trunc that's registered in connect(). Note that
     
    197208            self.connection.create_function("django_extract", 2, _sqlite_extract)
    198209            self.connection.create_function("django_date_trunc", 2, _sqlite_date_trunc)
    199210            self.connection.create_function("regexp", 2, _sqlite_regexp)
     211            self.connection.create_function("django_format_dtdelta", 5, _sqlite_format_dtdelta)
    200212            connection_created.send(sender=self.__class__, connection=self)
    201213        return self.connection.cursor(factory=SQLiteCursorWrapper)
    202214
     
    260272    elif lookup_type == 'day':
    261273        return "%i-%02i-%02i 00:00:00" % (dt.year, dt.month, dt.day)
    262274
     275def _sqlite_format_dtdelta(dt, conn, days, secs, usecs):
     276    try:
     277        dt = util.typecast_timestamp(dt)
     278        delta = datetime.timedelta(int(days), int(secs), int(usecs))
     279        if conn.strip() == '+':
     280            dt = dt + delta
     281        else:
     282            dt = dt - delta
     283    except (ValueError, TypeError):
     284        return None
     285
     286    if isinstance(dt, datetime.datetime):
     287        rv = dt.strftime("%Y-%m-%d %H:%M:%S")
     288        if dt.microsecond:
     289            rv = "%s.%0.6d" % (rv, dt.microsecond)
     290    else:
     291        rv = dt.strftime("%Y-%m-%d")
     292    return rv
     293
    263294def _sqlite_regexp(re_pattern, re_string):
    264295    import re
    265296    try:
  • django/db/backends/mysql/base.py

     
    158158            sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str)
    159159        return sql
    160160
     161    def date_interval_sql(self, sql, connector, timedelta):
     162        return "(%s %s INTERVAL '%d 0:0:%d:%d' DAY_MICROSECOND)" % (sql, connector,
     163                timedelta.days, timedelta.seconds, timedelta.microseconds)
     164
    161165    def drop_foreignkey_sql(self):
    162166        return "DROP FOREIGN KEY"
    163167
  • django/db/backends/oracle/base.py

     
    9595        else:
    9696            return "EXTRACT(%s FROM %s)" % (lookup_type, field_name)
    9797
     98    def date_interval_sql(self, sql, connector, timedelta):
     99        """
     100        Implements the interval functionality for expressions
     101        format for Oracle:
     102        (datefield + INTERVAL '3 00:03:20.000000' DAY(1) TO SECOND(6))
     103        """
     104        minutes, seconds = divmod(timedelta.seconds, 60)
     105        hours, minutes = divmod(minutes, 60)
     106        days = str(timedelta.days)
     107        day_precision = len(days)
     108        fmt = "(%s %s INTERVAL '%s %02d:%02d:%02d.%06d' DAY(%d) TO SECOND(6))"
     109        return fmt % (sql, connector, days, hours, minutes, seconds,
     110                timedelta.microseconds, day_precision)
     111
    98112    def date_trunc_sql(self, lookup_type, field_name):
    99113        # Oracle uses TRUNC() for both dates and numbers.
    100114        # http://download-east.oracle.com/docs/cd/B10501_01/server.920/a96540/functions155a.htm#SQLRF06151
  • django/db/backends/__init__.py

     
    216216        """
    217217        raise NotImplementedError()
    218218
     219    def date_interval_sql(self, sql, connector, timedelta):
     220        """
     221        Implements the date interval functionality for expressions
     222        """
     223        raise NotImplementedError()
     224
    219225    def date_trunc_sql(self, lookup_type, field_name):
    220226        """
    221227        Given a lookup_type of 'year', 'month' or 'day', returns the SQL that
  • tests/regressiontests/expressions_regress/tests.py

     
     1import datetime
     2
     3from django.test import TestCase
     4from django.db.models import F
     5from django.conf import settings
     6from django.db import DEFAULT_DB_ALIAS
     7
     8from models import Experiment
     9
     10class FTimeDeltaTests(TestCase):
     11
     12    def setUp(self):
     13        db = settings.DATABASES[DEFAULT_DB_ALIAS]
     14        self.db_is_mysql = db['ENGINE'] == 'django.db.backends.mysql'
     15        self.db_is_sqlite = db['ENGINE'] == 'django.db.backends.sqlite3'
     16
     17        sday = datetime.date(2010, 6, 25)
     18        stime = datetime.datetime(2010, 6, 25, 12, 15, 30, 747000)
     19        midnight = datetime.time(0)
     20
     21        delta0 = datetime.timedelta(0)
     22        delta1 = datetime.timedelta(microseconds=253000)
     23        delta2 = datetime.timedelta(seconds=44)
     24        delta3 = datetime.timedelta(hours=21, minutes=8)
     25        delta4 = datetime.timedelta(days=10)
     26
     27        # Test data is set so that deltas and delays will be
     28        # strictly increasing.
     29        self.deltas = []
     30        self.delays = []
     31        self.days_long = []
     32
     33        # e0: started same day as assigned, zero duration
     34        end = stime+delta0
     35        e0 = Experiment.objects.create(name='e0', assigned=sday, start=stime,
     36            end=end, completed=end.date())
     37        self.deltas.append(delta0)
     38        self.delays.append(e0.start-
     39            datetime.datetime.combine(e0.assigned, midnight))
     40        self.days_long.append(e0.completed-e0.assigned)
     41
     42        # e1: started one day after assigned, tiny duration, data
     43        # set so that end time has no fractional seconds, which
     44        # tests an edge case on sqlite. This Experiment is only
     45        # included in the test data when DB is not MySQL (since
     46        # on MySQL microseconds are dropped from datetime fields).
     47        if not self.db_is_mysql:
     48            delay = datetime.timedelta(1)
     49            end = stime + delay + delta1
     50            e1 = Experiment.objects.create(name='e1', assigned=sday,
     51                start=stime+delay, end=end, completed=end.date())
     52            self.deltas.append(delta1)
     53            self.delays.append(e1.start-
     54                datetime.datetime.combine(e1.assigned, midnight))
     55            self.days_long.append(e1.completed-e1.assigned)
     56
     57        # e2: started three days after assigned, small duration
     58        end = stime+delta2
     59        e2 = Experiment.objects.create(name='e2',
     60            assigned=sday-datetime.timedelta(3), start=stime, end=end,
     61            completed=end.date())
     62        self.deltas.append(delta2)
     63        self.delays.append(e2.start-
     64            datetime.datetime.combine(e2.assigned, midnight))
     65        self.days_long.append(e2.completed-e2.assigned)
     66
     67        # e3: started four days after assigned, medium duration
     68        delay = datetime.timedelta(4)
     69        end = stime + delay + delta3
     70        e3 = Experiment.objects.create(name='e3',
     71            assigned=sday, start=stime+delay, end=end, completed=end.date())
     72        self.deltas.append(delta3)
     73        self.delays.append(e3.start-
     74            datetime.datetime.combine(e3.assigned, midnight))
     75        self.days_long.append(e3.completed-e3.assigned)
     76
     77        # e4: started 10 days after assignment, long duration
     78        end = stime + delta4
     79        e4 = Experiment.objects.create(name='e4',
     80            assigned=sday-datetime.timedelta(10), start=stime, end=end,
     81            completed=end.date())
     82        self.deltas.append(delta4)
     83        self.delays.append(e4.start-
     84            datetime.datetime.combine(e4.assigned, midnight))
     85        self.days_long.append(e4.completed-e4.assigned)
     86        self.expnames = [e.name for e in Experiment.objects.all()]
     87
     88    def test_delta_add(self):
     89        for i in range(len(self.deltas)):
     90            delta = self.deltas[i]
     91            test_set = [e.name for e in
     92                Experiment.objects.filter(end__lt=F('start')+delta)]
     93            self.assertEqual(test_set, self.expnames[:i])
     94
     95            test_set = [e.name for e in
     96                Experiment.objects.filter(end__lte=F('start')+delta)]
     97            self.assertEqual(test_set, self.expnames[:i+1])
     98
     99    def test_delta_subtract(self):
     100        for i in range(len(self.deltas)):
     101            delta = self.deltas[i]
     102            test_set = [e.name for e in
     103                Experiment.objects.filter(start__gt=F('end')-delta)]
     104            self.assertEqual(test_set, self.expnames[:i])
     105
     106            test_set = [e.name for e in
     107                Experiment.objects.filter(start__gte=F('end')-delta)]
     108            self.assertEqual(test_set, self.expnames[:i+1])
     109
     110    def test_exclude(self):
     111        for i in range(len(self.deltas)):
     112            delta = self.deltas[i]
     113            test_set = [e.name for e in
     114                Experiment.objects.exclude(end__lt=F('start')+delta)]
     115            self.assertEqual(test_set, self.expnames[i:])
     116
     117            test_set = [e.name for e in
     118                Experiment.objects.exclude(end__lte=F('start')+delta)]
     119            self.assertEqual(test_set, self.expnames[i+1:])
     120
     121    def test_date_comparison(self):
     122        for i in range(len(self.days_long)):
     123            days = self.days_long[i]
     124            test_set = [e.name for e in
     125                Experiment.objects.filter(completed__lt=F('assigned')+days)]
     126            self.assertEqual(test_set, self.expnames[:i])
     127
     128            test_set = [e.name for e in
     129                Experiment.objects.filter(completed__lte=F('assigned')+days)]
     130            self.assertEqual(test_set, self.expnames[:i+1])
     131
     132    def test_mixed_comparisons1(self):
     133        for i in range(len(self.delays)):
     134            delay = self.delays[i]
     135            if self.db_is_mysql:
     136                delay = datetime.timedelta(delay.days, delay.seconds)
     137            test_set = [e.name for e in
     138                Experiment.objects.filter(assigned__gt=F('start')-delay)]
     139            self.assertEqual(test_set, self.expnames[:i])
     140
     141            test_set = [e.name for e in
     142                Experiment.objects.filter(assigned__gte=F('start')-delay)]
     143            self.assertEqual(test_set, self.expnames[:i+1])
     144
     145    def test_mixed_comparisons2(self):
     146        delays = [datetime.timedelta(delay.days) for delay in self.delays]
     147        for i in range(len(delays)):
     148            delay = delays[i]
     149            test_set = [e.name for e in
     150                Experiment.objects.filter(start__lt=F('assigned')+delay)]
     151            self.assertEqual(test_set, self.expnames[:i])
     152
     153            test_set = [e.name for e in
     154                Experiment.objects.filter(start__lte=F('assigned')+delay+
     155                    datetime.timedelta(1))]
     156            self.assertEqual(test_set, self.expnames[:i+1])
     157
     158    def test_delta_update(self):
     159        for i in range(len(self.deltas)):
     160            delta = self.deltas[i]
     161            exps = Experiment.objects.all()
     162            expected_durations = [e.duration() for e in exps]
     163            expected_starts = [e.start+delta for e in exps]
     164            expected_ends = [e.end+delta for e in exps]
     165
     166            Experiment.objects.update(start=F('start')+delta, end=F('end')+delta)
     167            exps = Experiment.objects.all()
     168            new_starts = [e.start for e in exps]
     169            new_ends = [e.end for e in exps]
     170            new_durations = [e.duration() for e in exps]
     171            self.assertEqual(expected_starts, new_starts)
     172            self.assertEqual(expected_ends, new_ends)
     173            self.assertEqual(expected_durations, new_durations)
     174
     175    def test_delta_invalid_op_mult(self):
     176        raised = False
     177        try:
     178            r = repr(Experiment.objects.filter(end__lt=F('start')*self.deltas[0]))
     179        except TypeError:
     180            raised = True
     181        self.assertTrue(raised, "TypeError not raised on attempt to multiply datetime by timedelta.")
     182
     183    def test_delta_invalid_op_div(self):
     184        raised = False
     185        try:
     186            r = repr(Experiment.objects.filter(end__lt=F('start')/self.deltas[0]))
     187        except TypeError:
     188            raised = True
     189        self.assertTrue(raised, "TypeError not raised on attempt to divide datetime by timedelta.")
     190
     191    def test_delta_invalid_op_mod(self):
     192        raised = False
     193        try:
     194            r = repr(Experiment.objects.filter(end__lt=F('start')%self.deltas[0]))
     195        except TypeError:
     196            raised = True
     197        self.assertTrue(raised, "TypeError not raised on attempt to modulo divide datetime by timedelta.")
     198
     199    def test_delta_invalid_op_and(self):
     200        raised = False
     201        try:
     202            r = repr(Experiment.objects.filter(end__lt=F('start')&self.deltas[0]))
     203        except TypeError:
     204            raised = True
     205        self.assertTrue(raised, "TypeError not raised on attempt to binary and a datetime with a timedelta.")
     206
     207    def test_delta_invalid_op_or(self):
     208        raised = False
     209        try:
     210            r = repr(Experiment.objects.filter(end__lt=F('start')|self.deltas[0]))
     211        except TypeError:
     212            raised = True
     213        self.assertTrue(raised, "TypeError not raised on attempt to binary or a datetime with a timedelta.")
     214
    1215"""
    2216Spanning tests for all the operations that F() expressions can perform.
    3217"""
  • tests/regressiontests/expressions_regress/models.py

     
    1010    def __unicode__(self):
    1111        return u'%i, %.3f' % (self.integer, self.float)
    1212
     13class Experiment(models.Model):
     14    name = models.CharField(max_length=24)
     15    assigned = models.DateField()
     16    completed = models.DateField()
     17    start = models.DateTimeField()
     18    end = models.DateTimeField()
     19     
     20    class Meta:
     21        ordering = ('name',)
     22 
     23    def duration(self):
     24        return self.end - self.start
     25
  • docs/topics/db/queries.txt

     
    3636        headline = models.CharField(max_length=255)
    3737        body_text = models.TextField()
    3838        pub_date = models.DateTimeField()
     39        mod_date = models.DateTimeField()
    3940        authors = models.ManyToManyField(Author)
    4041        n_comments = models.IntegerField()
    4142        n_pingbacks = models.IntegerField()
     
    538539
    539540    >>> Entry.objects.filter(authors__name=F('blog__name'))
    540541
     542For date fields, you can add or subtract a ``datetime.timedelta`` object.  The
     543following would return all entries that were modified more than 3 days after
     544they were published:
     545   
     546    >>> from datetime import timedelta
     547    >>> Entry.objects.filter(mod_date__gt=F('pub_date') + timedelta(days=3))
     548
    541549The pk lookup shortcut
    542550----------------------
    543551
Back to Top