Django

Code

root/django/branches/schema-evolution/django/db/backends/postgresql/introspection.py

Revision 5785, 6.9 kB (checked in by danderson, 1 year ago)

schema-evolution:
added new "pk_requires_unique" option to the backend, because sqlite3 requires "UNIQUE" when creating PKs in order to
_actually_ create the constraint.
fixed "get_known_column_flags" introspection for sqlite3
implemented "get_drop_column_sql" for sqlite3 to work around sqlite's lack of DROP COLUMN support
added partial of the sqlite3 unit tests

Line 
1 from django.db.backends.postgresql.base import quote_name
2
3 def get_table_list(cursor):
4     "Returns a list of table names in the current database."
5     cursor.execute("""
6         SELECT c.relname
7         FROM pg_catalog.pg_class c
8         LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
9         WHERE c.relkind IN ('r', 'v', '')
10             AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
11             AND pg_catalog.pg_table_is_visible(c.oid)""")
12     return [row[0] for row in cursor.fetchall()]
13
14 def get_table_description(cursor, table_name):
15     "Returns a description of the table, with the DB-API cursor.description interface."
16     cursor.execute("SELECT * FROM %s LIMIT 1" % quote_name(table_name))
17     return cursor.description
18
19 def get_relations(cursor, table_name):
20     """
21     Returns a dictionary of {field_index: (field_index_other_table, other_table)}
22     representing all relationships to the given table. Indexes are 0-based.
23     """
24     cursor.execute("""
25         SELECT con.conkey, con.confkey, c2.relname
26         FROM pg_constraint con, pg_class c1, pg_class c2
27         WHERE c1.oid = con.conrelid
28             AND c2.oid = con.confrelid
29             AND c1.relname = %s
30             AND con.contype = 'f'""", [table_name])
31     relations = {}
32     for row in cursor.fetchall():
33         try:
34             # row[0] and row[1] are like "{2}", so strip the curly braces.
35             relations[int(row[0][1:-1]) - 1] = (int(row[1][1:-1]) - 1, row[2])
36         except ValueError:
37             continue
38     return relations
39
40 def get_indexes(cursor, table_name):
41     """
42     Returns a dictionary of fieldname -> infodict for the given table,
43     where each infodict is in the format:
44         {'primary_key': boolean representing whether it's the primary key,
45          'unique': boolean representing whether it's a unique index}
46     """
47     # This query retrieves each index on the given table, including the
48     # first associated field name
49     cursor.execute("""
50         SELECT attr.attname, idx.indkey, idx.indisunique, idx.indisprimary
51         FROM pg_catalog.pg_class c, pg_catalog.pg_class c2,
52             pg_catalog.pg_index idx, pg_catalog.pg_attribute attr
53         WHERE c.oid = idx.indrelid
54             AND idx.indexrelid = c2.oid
55             AND attr.attrelid = c.oid
56             AND attr.attnum = idx.indkey[0]
57             AND c.relname = %s""", [table_name])
58     indexes = {}
59     for row in cursor.fetchall():
60         # row[1] (idx.indkey) is stored in the DB as an array. It comes out as
61         # a string of space-separated integers. This designates the field
62         # indexes (1-based) of the fields that have indexes on the table.
63         # Here, we skip any indexes across multiple fields.
64         if ' ' in row[1]:
65             continue
66         indexes[row[0]] = {'primary_key': row[3], 'unique': row[2]}
67     return indexes
68
69 def get_columns(cursor, table_name):
70     try:
71         cursor.execute("SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod), (SELECT substring(d.adsrc for 128) FROM pg_catalog.pg_attrdef d WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef), a.attnotnull, a.attnum, pg_catalog.col_description(a.attrelid, a.attnum) FROM pg_catalog.pg_attribute a WHERE a.attrelid = (SELECT c.oid from pg_catalog.pg_class c where c.relname ~ '^%s$') AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum" % table_name)
72         return [row[0] for row in cursor.fetchall()]
73     except:
74         return []
75    
76 def get_known_column_flags( cursor, table_name, column_name ):
77 #    print "SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod), (SELECT substring(d.adsrc for 128) FROM pg_catalog.pg_attrdef d WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef), a.attnotnull, a.attnum, pg_catalog.col_description(a.attrelid, a.attnum) FROM pg_catalog.pg_attribute a WHERE a.attrelid = (SELECT c.oid from pg_catalog.pg_class c where c.relname ~ '^%s$') AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum" % table_name
78     cursor.execute("SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod), (SELECT substring(d.adsrc for 128) FROM pg_catalog.pg_attrdef d WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef), a.attnotnull, a.attnum, pg_catalog.col_description(a.attrelid, a.attnum) FROM pg_catalog.pg_attribute a WHERE a.attrelid = (SELECT c.oid from pg_catalog.pg_class c where c.relname ~ '^%s$') AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum" % table_name)
79     dict = {}
80     dict['primary_key'] = False
81     dict['foreign_key'] = False
82     dict['unique'] = False
83     dict['default'] = ''
84     dict['allow_null'] = False
85
86     for row in cursor.fetchall():
87         if row[0] == column_name:
88
89             # maxlength check goes here
90             if row[1][0:17]=='character varying':
91                 dict['maxlength'] = row[1][18:len(row[1])-1]
92            
93             # null flag check goes here
94             dict['allow_null'] = not row[3]
95
96     # pk, fk and unique checks go here
97 #    print "select pg_constraint.conname, pg_constraint.contype, pg_attribute.attname from pg_constraint, pg_attribute where pg_constraint.conrelid=pg_attribute.attrelid and pg_attribute.attnum=any(pg_constraint.conkey) and pg_constraint.conname~'^%s'" % table_name
98     unique_conname = None
99     shared_unique_connames = set()
100     cursor.execute("select pg_constraint.conname, pg_constraint.contype, pg_attribute.attname from pg_constraint, pg_attribute, pg_class where pg_constraint.conrelid=pg_class.oid and pg_constraint.conrelid=pg_attribute.attrelid and pg_attribute.attnum=any(pg_constraint.conkey) and pg_class.relname='%s'" % table_name )
101     for row in cursor.fetchall():
102 #        print row
103         if row[2] == column_name:
104             if row[1]=='p': dict['primary_key'] = True
105             if row[1]=='f': dict['foreign_key'] = True
106             if row[1]=='u': unique_conname = row[0]
107         else:
108             if row[1]=='u': shared_unique_connames.add( row[0] )
109     if unique_conname and unique_conname not in shared_unique_connames:
110         dict['unique'] = True
111        
112             # default value check goes here
113     cursor.execute("select pg_attribute.attname, adsrc from pg_attrdef, pg_attribute WHERE pg_attrdef.adrelid=pg_attribute.attrelid and pg_attribute.attnum=pg_attrdef.adnum and pg_attrdef.adrelid = (SELECT c.oid from pg_catalog.pg_class c where c.relname ~ '^%s$')" % table_name )
114     for row in cursor.fetchall():
115         if row[0] == column_name:
116             if row[1][0:7] == 'nextval': continue
117             dict['default'] = row[1][1:row[1].index("'",1)]
118            
119 #    print table_name, column_name, dict
120     return dict
121
122 # Maps type codes to Django Field types.
123 DATA_TYPES_REVERSE = {
124     16: 'BooleanField',
125     21: 'SmallIntegerField',
126     23: 'IntegerField',
127     25: 'TextField',
128     701: 'FloatField',
129     869: 'IPAddressField',
130     1043: 'CharField',
131     1082: 'DateField',
132     1083: 'TimeField',
133     1114: 'DateTimeField',
134     1184: 'DateTimeField',
135     1266: 'TimeField',
136     1700: 'DecimalField',
137 }
Note: See TracBrowser for help on using the browser.