Ticket #11863: version_2.diff
File version_2.diff, 16.9 KB (added by , 15 years ago) |
---|
-
django/db/models/manager.py
diff --git a/django/db/models/manager.py b/django/db/models/manager.py index 7487fa0..d303682 100644
a b 1 1 import copy 2 from django.db.models.query import QuerySet, EmptyQuerySet, insert_query 2 from django.db.models.query import QuerySet, EmptyQuerySet, insert_query, RawQuerySet 3 3 from django.db.models import signals 4 4 from django.db.models.fields import FieldDoesNotExist 5 5 … … class Manager(object): 181 181 def _update(self, values, **kwargs): 182 182 return self.get_query_set()._update(values, **kwargs) 183 183 184 def raw(self, query, params=None, *args, **kwargs): 185 return RawQuerySet(model=self.model, query=query, params=params, *args, **kwargs) 186 184 187 class ManagerDescriptor(object): 185 188 # This class ensures managers aren't accessible via model instances. 186 189 # For example, Poll.objects works, but poll_obj.objects raises AttributeError. -
django/db/models/query.py
diff --git a/django/db/models/query.py b/django/db/models/query.py index 36949ac..8bf3fd3 100644
a b class QuerySet(object): 257 257 Returns a dictionary containing the calculations (aggregation) 258 258 over the current queryset 259 259 260 If args is present the expression is passed as a kwarg us sing260 If args is present the expression is passed as a kwarg using 261 261 the Aggregate object's default alias. 262 262 """ 263 263 for arg in args: … … def delete_objects(seen_objs): 1075 1075 if forced_managed: 1076 1076 transaction.leave_transaction_management() 1077 1077 1078 class InsuficientFields(Exception): 1079 """ 1080 The query passed to raw doesn't include all of the fields needed to build 1081 a model instance. 1082 """ 1083 pass 1084 1085 class RawQuerySet(object): 1086 """ 1087 Provides an iterator which converts the results of raw database 1088 queries into annotated model instances. 1089 """ 1090 def __init__(self, query, model=None, query_obj=None, params=None, translations=None): 1091 self.model = model 1092 self.raw_results = query_obj or sql.RawQuery(sql=query, connection=connection, params=params) 1093 self.query = query 1094 self.params = params or () 1095 # Figure out the column names 1096 self.columns = self.raw_results.get_columns() 1097 # Adjust any column names which don't match field names 1098 if translations is not None: 1099 for translation in translations: 1100 try: 1101 index = self.columns.index(translation[0]) 1102 self.columns[index] = translation[1] 1103 except ValueError: 1104 # Ignore transations for non-existant column names 1105 pass 1106 1107 # Build a list of column names known by the model 1108 self.model_fields = {} 1109 for field in model._meta.fields: 1110 name, column = field.get_attname_column() 1111 self.model_fields[column] = name 1112 1113 def __iter__(self): 1114 for row in self.raw_results: 1115 yield self.transform_results(row) 1116 1117 def __len__(self): 1118 return len(self.raw_results) 1119 1120 def __repr__(self): 1121 return self.query % self.params 1122 1123 def transform_results(self, values): 1124 kwargs = {} 1125 annotations = () 1126 1127 # Associate fields to values 1128 for pos, value in enumerate(values): 1129 column = self.columns[pos] 1130 1131 # Separate properties from annotations 1132 if column in self.model_fields.keys(): 1133 kwargs[self.model_fields[column]] = value 1134 else: 1135 annotations += (column, value), 1136 1137 if len(kwargs) < len(self.model_fields): 1138 missing = [column for column, field in self.model_fields.items() if field not in kwargs.keys()] 1139 raise InsuficientFields("They query passed doesn't contain all of the needed fields. The missing fields are: %s" % ', '.join(missing)) 1140 1141 # Construct model instance 1142 instance = self.model(**kwargs) 1143 1144 # Apply annotations 1145 for field, value in annotations: 1146 setattr(instance, field, value) 1147 1148 # make the kwargs and annotation metadata available to the unit tests 1149 return instance 1078 1150 1079 1151 def insert_query(model, values, return_id=False, raw_values=False): 1080 1152 """ -
django/db/models/sql/query.py
diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py index 7bc45cb..1b8980e 100644
a b from django.core.exceptions import FieldError 23 23 from datastructures import EmptyResultSet, Empty, MultiJoin 24 24 from constants import * 25 25 26 __all__ = ['Query', 'BaseQuery'] 26 __all__ = ['Query', 'BaseQuery', 'RawQuery', 'InvalidQuery'] 27 28 class InvalidQuery(Exception): 29 """ 30 The query passed to raw isn't a safe query to use with raw. 31 """ 32 pass 33 34 class RawQuery(object): 35 """ 36 A single raw SQL query 37 """ 38 39 def __init__(self, sql, connection, params=None): 40 self.validate_sql(sql) 41 self.params = params or () 42 self.sql = sql 43 self.connection = connection 44 self.cursor = connection.cursor() 45 self.cursor.execute(sql, self.params) 46 self._cache = None 47 self._count = None 48 49 50 def get_columns(self): 51 return [column_meta[0] for column_meta in self.cursor.description] 52 53 def validate_sql(self, sql): 54 if not sql.lower().strip().startswith('select'): 55 raise InvalidQuery('Raw SQL are limited to SELECT queries. Use connection.cursor for any other query types.') 56 57 def __len__(self): 58 if self._count is None: 59 self._count = self.cursor.rowcount 60 # Handle sqlite's quirky rowcount behavior 61 if self._count == -1: 62 self._populate_cache() 63 self._count = len(self._cache) 64 65 return self._count 66 67 def __iter__(self): 68 self._populate_cache() 69 for value in self._cache: 70 yield value 71 72 73 def __str__(self): 74 return self.sql % self.params 75 76 def _populate_cache(self): 77 if self._cache is None: 78 self._cache = self.cursor.fetchall() 27 79 28 80 class BaseQuery(object): 29 81 """ -
new file tests/modeltests/raw_query/fixtures/initial_data.json
diff --git a/tests/modeltests/raw_query/__init__.py b/tests/modeltests/raw_query/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/modeltests/raw_query/fixtures/initial_data.json b/tests/modeltests/raw_query/fixtures/initial_data.json new file mode 100644 index 0000000..4bea9ad
- + 1 [ 2 { 3 "pk": 1, 4 "model": "raw_query.author", 5 "fields": { 6 "dob": "1950-09-20", 7 "first_name": "Joe", 8 "last_name": "Smith" 9 } 10 }, 11 { 12 "pk": 2, 13 "model": "raw_query.author", 14 "fields": { 15 "dob": "1920-04-02", 16 "first_name": "Jill", 17 "last_name": "Doe" 18 } 19 }, 20 { 21 "pk": 3, 22 "model": "raw_query.author", 23 "fields": { 24 "dob": "1986-01-25", 25 "first_name": "Bob", 26 "last_name": "Smith" 27 } 28 }, 29 { 30 "pk": 4, 31 "model": "raw_query.author", 32 "fields": { 33 "dob": "1932-05-10", 34 "first_name": "Bill", 35 "last_name": "Jones" 36 } 37 }, 38 { 39 "pk": 1, 40 "model": "raw_query.book", 41 "fields": { 42 "author": 1, 43 "title": "The awesome book" 44 } 45 }, 46 { 47 "pk": 2, 48 "model": "raw_query.book", 49 "fields": { 50 "author": 1, 51 "title": "The horrible book" 52 } 53 }, 54 { 55 "pk": 3, 56 "model": "raw_query.book", 57 "fields": { 58 "author": 1, 59 "title": "Another awesome book" 60 } 61 }, 62 { 63 "pk": 4, 64 "model": "raw_query.book", 65 "fields": { 66 "author": 3, 67 "title": "Some other book" 68 } 69 }, 70 { 71 "pk": 1, 72 "model": "raw_query.coffee", 73 "fields": { 74 "brand": "dunkin doughnuts" 75 } 76 }, 77 { 78 "pk": 2, 79 "model": "raw_query.coffee", 80 "fields": { 81 "brand": "starbucks" 82 } 83 }, 84 { 85 "pk": 1, 86 "model": "raw_query.reviewer", 87 "fields": { 88 "reviewed": [ 89 2, 90 3, 91 4 92 ] 93 } 94 }, 95 { 96 "pk": 2, 97 "model": "raw_query.reviewer", 98 "fields": { 99 "reviewed": [] 100 } 101 } 102 ] -
new file tests/modeltests/raw_query/models.py
diff --git a/tests/modeltests/raw_query/models.py b/tests/modeltests/raw_query/models.py new file mode 100644 index 0000000..23de97a
- + 1 from django.db import models 2 3 class Author(models.Model): 4 first_name = models.CharField(max_length=255) 5 last_name = models.CharField(max_length=255) 6 dob = models.DateField() 7 8 def __init__(self, *args, **kwargs): 9 super(Author, self).__init__(*args, **kwargs) 10 11 # Perform a check for missing params to protect against real fields 12 # being added as annotations. 13 if len(args) == 0: 14 expected_keywords = ['dob', 'first_name', 'last_name'] 15 for keyword in expected_keywords: 16 if keyword not in kwargs.keys(): 17 raise Exception('All fields are required. %s is missing' % 18 keyword) 19 20 21 22 class Book(models.Model): 23 title = models.CharField(max_length=255) 24 author = models.ForeignKey(Author) 25 26 class Coffee(models.Model): 27 brand = models.CharField(max_length=255, db_column="name") 28 29 class Reviewer(models.Model): 30 reviewed = models.ManyToManyField(Book) 31 No newline at end of file -
new file tests/modeltests/raw_query/tests.py
diff --git a/tests/modeltests/raw_query/tests.py b/tests/modeltests/raw_query/tests.py new file mode 100644 index 0000000..dc8ff5c
- + 1 from django.test import TestCase 2 from datetime import datetime 3 from models import Author, Book, Coffee, Reviewer 4 5 from django.db.models.query import InsuficientFields 6 from django.db.models.sql.query import InvalidQuery 7 8 class RawQueryTests(TestCase): 9 10 def assertSuccessfulRawQuery(self, model, query, expected_results, \ 11 expected_annotations=(), params=[], translations=None): 12 """ 13 Execute the passed query against the passed model and check the output 14 """ 15 results = model.objects.raw(query=query, params=params, \ 16 translations=translations) 17 self.assertProcessed(results, expected_results, expected_annotations) 18 self.assertAnnotations(results, expected_annotations) 19 20 def assertProcessed(self, results, orig, expected_annotations=()): 21 """ 22 Compare the results of a raw query against expected results 23 """ 24 self.assertEqual(len(results), len(orig)) 25 for index, item in enumerate(results): 26 orig_item = orig[index] 27 for annotation in expected_annotations: 28 setattr(orig_item, *annotation) 29 30 self.assertEqual(item.id, orig_item.id) 31 32 def assertNoAnnotations(self, results): 33 """ 34 Check that the results of a raw query contain no annotations 35 """ 36 self.assertAnnotations(results, ()) 37 38 def assertAnnotations(self, results, expected_annotations): 39 """ 40 Check that the passed raw query results contain the expected 41 annotations 42 """ 43 if expected_annotations: 44 for index, result in enumerate(results): 45 annotation, value = expected_annotations[index] 46 self.assertTrue(hasattr(result, annotation)) 47 self.assertEqual(getattr(result, annotation), value) 48 49 def testSimpleRawQuery(self): 50 """ 51 Basic test of raw query with a simple database query 52 """ 53 query = "SELECT * FROM raw_query_author" 54 authors = Author.objects.all() 55 self.assertSuccessfulRawQuery(Author, query, authors) 56 57 def testFkeyRawQuery(self): 58 """ 59 Test of a simple raw query against a model containing a foreign key 60 """ 61 query = "SELECT * FROM raw_query_book" 62 books = Book.objects.all() 63 self.assertSuccessfulRawQuery(Book, query, books) 64 65 def testDBColumnHandler(self): 66 """ 67 Test of a simple raw query against a model containing a field with 68 db_column defined. 69 """ 70 query = "SELECT * FROM raw_query_coffee" 71 coffees = Coffee.objects.all() 72 self.assertSuccessfulRawQuery(Coffee, query, coffees) 73 74 def testOrderHandler(self): 75 """ 76 Test of raw raw query's tolerance for columns being returned in any 77 order 78 """ 79 selects = ( 80 ('dob, last_name, first_name, id'), 81 ('last_name, dob, first_name, id'), 82 ('first_name, last_name, dob, id'), 83 ) 84 85 for select in selects: 86 query = "SELECT %s FROM raw_query_author" % select 87 authors = Author.objects.all() 88 self.assertSuccessfulRawQuery(Author, query, authors) 89 90 def testTranslations(self): 91 """ 92 Test of raw query's optional ability to translate unexpected result 93 column names to specific model fields 94 """ 95 query = "SELECT first_name AS first, last_name AS last, dob, id FROM raw_query_author" 96 translations = ( 97 ('first', 'first_name'), 98 ('last', 'last_name'), 99 ) 100 authors = Author.objects.all() 101 self.assertSuccessfulRawQuery(Author, query, authors, translations=translations) 102 103 def testParams(self): 104 """ 105 Test passing optional query parameters 106 """ 107 query = "SELECT * FROM raw_query_author WHERE first_name = %s" 108 author = Author.objects.all()[2] 109 params = [author.first_name] 110 results = Author.objects.raw(query=query, params=params) 111 self.assertProcessed(results, [author]) 112 self.assertNoAnnotations(results) 113 self.assertEqual(len(results), 1) 114 115 def testManyToMany(self): 116 """ 117 Test of a simple raw query against a model containing a m2m field 118 """ 119 query = "SELECT * FROM raw_query_reviewer" 120 reviewers = Reviewer.objects.all() 121 self.assertSuccessfulRawQuery(Reviewer, query, reviewers) 122 123 def testExtraConversions(self): 124 """ 125 Test to insure that extra translations are ignored. 126 """ 127 query = "SELECT * FROM raw_query_author" 128 translations = (('something', 'else'),) 129 authors = Author.objects.all() 130 self.assertSuccessfulRawQuery(Author, query, authors, translations=translations) 131 132 def testInsufficientColumns(self): 133 query = "SELECT first_name, dob FROM raw_query_author" 134 raised = False 135 try: 136 results = Author.objects.raw(query) 137 results_list = list(results) 138 except InsuficientFields: 139 raised = True 140 141 self.assertTrue(raised) 142 143 def testAnnotations(self): 144 query = "SELECT a.*, count(b.id) as book_count FROM raw_query_author a LEFT JOIN raw_query_book b ON a.id = b.author_id GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id" 145 expected_annotations = ( 146 ('book_count', 3), 147 ('book_count', 0), 148 ('book_count', 1), 149 ('book_count', 0), 150 ) 151 authors = Author.objects.all() 152 self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations) 153 154 def testInvalidQuery(self): 155 query = "UPDATE raw_query_author SET first_name='thing' WHERE first_name='Joe'" 156 self.assertRaises(InvalidQuery, Author.objects.raw, query) 157 158 def testWhiteSpaceQuery(self): 159 query = " SELECT * FROM raw_query_author" 160 authors = Author.objects.all() 161 self.assertSuccessfulRawQuery(Author, query, authors) 162 163 def testMultipleIterations(self): 164 query = "SELECT * FROM raw_query_author" 165 normal_authors = Author.objects.all() 166 raw_authors = Author.objects.raw(query) 167 168 # First Iteration 169 first_iterations = 0 170 for index, raw_author in enumerate(raw_authors): 171 self.assertEqual(normal_authors[index], raw_author) 172 first_iterations += 1 173 174 # Second Iteration 175 second_iterations = 0 176 for index, raw_author in enumerate(raw_authors): 177 self.assertEqual(normal_authors[index], raw_author) 178 second_iterations += 1 179 180 self.assertEqual(first_iterations, second_iterations) 181 No newline at end of file