Ticket #11863: version_1.diff
File version_1.diff, 14.0 KB (added by , 15 years ago) |
---|
-
django/db/models/manager.py
diff --git a/django/db/models/manager.py b/django/db/models/manager.py index 52612d8..1f35a0a 100644
a b 1 1 import copy 2 2 3 from django.db.models.query import QuerySet, EmptyQuerySet, insert_query 3 from django.db.models.query import QuerySet, EmptyQuerySet, insert_query, RawQuerySet 4 4 from django.db.models import signals 5 5 from django.db.models.fields import FieldDoesNotExist 6 6 … … class Manager(object): 178 178 179 179 def _update(self, values, **kwargs): 180 180 return self.get_query_set()._update(values, **kwargs) 181 182 def raw(self, *args, **kwargs): 183 return RawQuerySet(model=self.model, *args, **kwargs) 181 184 182 185 class ManagerDescriptor(object): 183 186 # This class ensures managers aren't accessible via model instances. -
django/db/models/query.py
diff --git a/django/db/models/query.py b/django/db/models/query.py index 46a86fc..78d4c82 100644
a b def delete_objects(seen_objs): 1075 1075 if forced_managed: 1076 1076 transaction.leave_transaction_management() 1077 1077 1078 class InsuficientFieldsException(Exception): 1079 """ 1080 The query passed to raw doesn't include all of the fields needed to build 1081 a model instance. 1082 """ 1083 pass 1084 1085 class RawQuerySet(object): 1086 """ 1087 Provides an iterator which converts the results of raw database 1088 queries into annotated model instances. 1089 """ 1090 def __init__(self, query, model=None, query_obj=None, params=None, translations=None): 1091 self.model = model 1092 self.raw_results = query_obj or sql.RawQuery(sql=query, connection=connection, params=params) 1093 self.query = query 1094 self.params = params or () 1095 self._kwargs = {} 1096 self._annotations = () 1097 # Figure out the column names 1098 self.columns = self.raw_results.get_columns() 1099 # Adjust any column names which don't match field names 1100 if translations is not None: 1101 for translation in translations: 1102 try: 1103 index = self.columns.index(translation[0]) 1104 self.columns[index] = translation[1] 1105 except ValueError: 1106 # Ignore transations for non-existant column names 1107 pass 1108 1109 # Build a list of column names known by the model 1110 self.model_fields = {} 1111 for field in model._meta.fields: 1112 name, column = field.get_attname_column() 1113 self.model_fields[column] = name 1114 1115 def __iter__(self): 1116 for row in self.raw_results: 1117 yield self.transform_results(row) 1118 1119 def __len__(self): 1120 return len(self.raw_results) 1121 1122 def __repr__(self): 1123 return self.query % self.params 1124 1125 def transform_results(self, values): 1126 kwargs = self._kwargs 1127 annotations = self._annotations 1128 1129 # Associate fields to values 1130 for pos, value in enumerate(values): 1131 column = self.columns[pos] 1132 1133 # Separate properties from annotations 1134 if column in self.model_fields.keys(): 1135 kwargs[self.model_fields[column]] = value 1136 else: 1137 annotations += (column, value), 1138 1139 if len(kwargs) < len(self.model_fields): 1140 missing = [column for column, field in self.model_fields.items() if field not in kwargs.keys()] 1141 raise InsuficientFieldsException("They query passed doesn't contain all of the needed fields. The missing fields are: %s" % ', '.join(missing)) 1142 1143 # Construct model instance 1144 instance = self.model(**kwargs) 1145 1146 # Apply annotations 1147 for field, value in annotations: 1148 setattr(instance, field, value) 1149 1150 # make the kwargs and annotation metadata available to the unit tests 1151 self._kwargs = kwargs 1152 self._annotations = annotations 1153 return instance 1078 1154 1079 1155 def insert_query(model, values, return_id=False, raw_values=False): 1080 1156 """ -
django/db/models/sql/query.py
diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py index 23f99e4..64f7836 100644
a b try: 29 29 except NameError: 30 30 from sets import Set as set # Python 2.3 fallback 31 31 32 __all__ = ['Query', 'BaseQuery'] 32 __all__ = ['Query', 'BaseQuery', 'RawQuery', 'InvalidQueryException'] 33 34 class InvalidQueryException(Exception): 35 """ 36 The query passed to raw isn't a safe query to use with raw. 37 """ 38 pass 39 40 class RawQuery(object): 41 """ 42 A single raw SQL query 43 """ 44 45 def __init__(self, sql, connection, params=None): 46 self.validate_sql(sql) 47 self.params = params or () 48 self.sql = sql 49 self.connection = connection 50 self.cursor = connection.cursor() 51 self.cursor.execute(sql, params) 52 53 54 def get_columns(self): 55 return [column_meta[0] for column_meta in self.cursor.description] 56 57 def validate_sql(self, sql): 58 if not sql.lower().startswith('select'): 59 raise InvalidQueryException('Raw SQL are limited to SELECT queries. Use connection.cursor for any other query types.') 60 61 def __len__(self): 62 return self.cursor.rowcount 63 64 def __iter__(self): 65 values = self.cursor.fetchone() 66 while values: 67 yield values 68 values = self.cursor.fetchone() 69 70 def __str__(self): 71 return self.sql % self.params 72 33 73 34 74 class BaseQuery(object): 35 75 """ -
new file tests/modeltests/raw_query/models.py
diff --git a/tests/modeltests/raw_query/__init__.py b/tests/modeltests/raw_query/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/modeltests/raw_query/models.py b/tests/modeltests/raw_query/models.py new file mode 100644 index 0000000..b20efa4
- + 1 from django.db import models 2 3 class Author(models.Model): 4 first_name = models.CharField(max_length=255) 5 last_name = models.CharField(max_length=255) 6 dob = models.DateField() 7 8 class Book(models.Model): 9 title = models.CharField(max_length=255) 10 author = models.ForeignKey(Author) 11 12 class Coffee(models.Model): 13 brand = models.CharField(max_length=255, db_column="name") 14 15 class Reviewer(models.Model): 16 reviewed = models.ManyToManyField(Book) 17 No newline at end of file -
new file tests/modeltests/raw_query/tests.py
diff --git a/tests/modeltests/raw_query/tests.py b/tests/modeltests/raw_query/tests.py new file mode 100644 index 0000000..52f9b51
- + 1 from django.test import TestCase 2 from datetime import datetime 3 from models import Author, Book, Coffee, Reviewer 4 5 from django.db.models.query import InsuficientFieldsException 6 from django.db.models.sql.query import InvalidQueryException 7 8 class RawQueryTests(TestCase): 9 10 def setUp(self): 11 self.authors = [] 12 self.books = [] 13 self.coffees = [] 14 self.reviewers = [] 15 16 self.authors.append(Author.objects.create( 17 first_name="Joe", 18 last_name="Smith", 19 dob=datetime(year=1950, month=9, day=20), 20 )) 21 self.authors.append(Author.objects.create( 22 first_name="Jill", 23 last_name="Doe", 24 dob=datetime(year=1920, month=4, day=2), 25 )) 26 self.authors.append(Author.objects.create( 27 first_name="Bob", 28 last_name="Smith", 29 dob=datetime(year=1986, month=1, day=25), 30 )) 31 self.authors.append(Author.objects.create( 32 first_name="Bill", 33 last_name="Jones", 34 dob=datetime(year=1932, month=5, day=10), 35 )) 36 37 self.books.append(Book.objects.create( 38 title = 'The awesome book', 39 author = self.authors[0], 40 )) 41 42 self.books.append(Book.objects.create( 43 title = 'The horrible book', 44 author = self.authors[0], 45 )) 46 47 self.books.append(Book.objects.create( 48 title = 'Another awesome book', 49 author = self.authors[0], 50 )) 51 52 self.books.append(Book.objects.create( 53 title = 'Some other book', 54 author = self.authors[2], 55 )) 56 57 self.coffees.append(Coffee.objects.create(brand="dunkin doughnuts")) 58 self.coffees.append(Coffee.objects.create(brand="starbucks")) 59 60 self.reviewers.append(Reviewer.objects.create()) 61 self.reviewers.append(Reviewer.objects.create()) 62 self.reviewers[0].reviewed.add(self.books[3]) 63 self.reviewers[0].reviewed.add(self.books[1]) 64 self.reviewers[0].reviewed.add(self.books[2]) 65 66 def assertSuccessfulRawQuery(self, model, query, expected_results, \ 67 expected_annotations=(), params=[], translations=None): 68 """ 69 Execute the passed query against the passed model and check the output 70 """ 71 results = model.objects.raw(query=query, params=params, \ 72 translations=translations) 73 self.assertProcessed(results, expected_results, expected_annotations) 74 self.assertAnnotations(results, expected_annotations) 75 76 def assertProcessed(self, results, orig, expected_annotations=()): 77 """ 78 Compare the results of a raw query against expected results 79 """ 80 self.assertEqual(len(results), len(orig)) 81 for index, item in enumerate(results): 82 orig_item = orig[index] 83 for annotation in expected_annotations: 84 setattr(orig_item, *annotation) 85 86 self.assertEqual(item.id, orig_item.id) 87 88 def assertNoAnnotations(self, results): 89 """ 90 Check that the results of a raw query contain no annotations 91 """ 92 self.assertAnnotations(results, ()) 93 94 def assertAnnotations(self, results, expected_annotations): 95 """ 96 Check that the passed raw query results contain the expected 97 annotations 98 """ 99 self.assertEqual(results._annotations, expected_annotations) 100 101 def testSimpleRawQuery(self): 102 """ 103 Basic test of raw query with a simple database query 104 """ 105 query = "SELECT * FROM raw_query_author" 106 self.assertSuccessfulRawQuery(Author, query, self.authors) 107 108 def testFkeyRawQuery(self): 109 """ 110 Test of a simple raw query against a model containing a foreign key 111 """ 112 query = "SELECT * FROM raw_query_book" 113 self.assertSuccessfulRawQuery(Book, query, self.books) 114 115 def testDBColumnHandler(self): 116 """ 117 Test of a simple raw query against a model containing a field with 118 db_column defined. 119 """ 120 query = "SELECT * FROM raw_query_coffee" 121 self.assertSuccessfulRawQuery(Coffee, query, self.coffees) 122 123 def testOrderHandler(self): 124 """ 125 Test of raw raw query's tolerance for columns being returned in any 126 order 127 """ 128 selects = ( 129 ('dob, last_name, first_name, id'), 130 ('last_name, dob, first_name, id'), 131 ('first_name, last_name, dob, id'), 132 ) 133 134 for select in selects: 135 query = "SELECT %s FROM raw_query_author" % select 136 self.assertSuccessfulRawQuery(Author, query, self.authors) 137 138 def testTranslations(self): 139 """ 140 Test of raw query's optional ability to translate unexpected result 141 column names to specific model fields 142 """ 143 query = "SELECT first_name AS first, last_name AS last, dob, id FROM raw_query_author" 144 translations = ( 145 ('first', 'first_name'), 146 ('last', 'last_name'), 147 ) 148 self.assertSuccessfulRawQuery(Author, query, self.authors, translations=translations) 149 150 def testParams(self): 151 """ 152 Test passing optional query parameters 153 """ 154 query = "SELECT * FROM raw_query_author WHERE first_name = %s" 155 params = [self.authors[2].first_name] 156 results = Author.objects.raw(query=query, params=params) 157 self.assertProcessed(results, [self.authors[2]]) 158 self.assertNoAnnotations(results) 159 self.assertEqual(len(results), 1) 160 161 def testManyToMany(self): 162 """ 163 Test of a simple raw query against a model containing a m2m field 164 """ 165 query = "SELECT * FROM raw_query_reviewer" 166 self.assertSuccessfulRawQuery(Reviewer, query, self.reviewers) 167 168 def testExtraConversions(self): 169 """ 170 Test to insure that extra tranlations are ignored. 171 """ 172 query = "SELECT * FROM raw_query_author" 173 translations = (('something', 'else'),) 174 self.assertSuccessfulRawQuery(Author, query, self.authors, translations=translations) 175 176 def testInsufficientColumns(self): 177 query = "SELECT first_name, dob FROM raw_query_author" 178 raised = False 179 try: 180 results = Author.objects.raw(query) 181 results_list = list(results) 182 except InsuficientFieldsException: 183 raised = True 184 185 self.assertTrue(raised) 186 187 def testAnnotations(self): 188 query = "SELECT a.*, count(b.id) as book_count FROM raw_query_author a LEFT JOIN raw_query_book b ON a.id = b.author_id GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id" 189 expected_annotations = ( 190 ('book_count', 3), 191 ('book_count', 0), 192 ('book_count', 1), 193 ('book_count', 0), 194 ) 195 self.assertSuccessfulRawQuery(Author, query, self.authors, expected_annotations) 196 197 def testInvalidQuery(self): 198 query = "UPDATE raw_query_author SET first_name='thing' WHERE first_name='Joe'" 199 self.assertRaises(InvalidQueryException, Author.objects.raw, query) 200 No newline at end of file