OracleBranch: introspection_without_debug.py

File introspection_without_debug.py, 3.7 KB (added by Marc Mengel <mengel@…>, 16 years ago)

Improved database introspection code for Oracle which does primary keys, foreign keys, etc.

Line 
1
2import re
3
4foreign_key_re = re.compile(r"\sCONSTRAINT `[^`]*` FOREIGN KEY \(`([^`]*)`\) REFERENCES `([^`]*)` \(`([^`]*)`\)")
5
6from cx_Oracle import NUMBER, ROWID, LONG_STRING, STRING, FIXED_CHAR, Timestamp, LOB, BLOB, CLOB, BINARY
7
8
9def get_table_list(cursor):
10 "Returns a list of table names in the current database."
11 cursor.execute("SELECT TABLE_NAME FROM USER_TABLES")
12 return [row[0] for row in cursor.fetchall()]
13
14table_description_cache = {}
15
16def get_table_description(cursor, table_name):
17 "Returns a description of the table, with the DB-API cursor.description interface."
18 cursor.execute("SELECT * FROM \"%s\" where rownum < 2" % table_name)
19 return cursor.description
20
21_name_to_index_cache = {}
22
23def _name_to_index(cursor, table_name):
24 """
25 Returns a dictionary of {field_name: field_index} for the given table.
26 Indexes are 0-based.
27 """
28 if not _name_to_index_cache.get(table_name):
29 _name_to_index_cache[table_name] = dict([(d[0], i) for i, d in enumerate(get_table_description(cursor, table_name))])
30 return _name_to_index_cache[table_name]
31
32
33def columnum(cursor, table_name, column_name):
34 res = _name_to_index(cursor,table_name)[column_name]
35 return res
36
37def get_relations(cursor, table_name):
38 """
39 Returns a dictionary of {field_index: (field_index_other_table, other_table)}
40 representing all relationships to the given table. Indexes are 0-based.
41
42 """
43
44 cursor.execute("select col.column_name, con.constraint_type from all_cons_columns col, all_constraints con where col.constraint_name = con.constraint_name and con.constraint_type = 'P' and col.table_name = '%s'" % table_name )
45 rows = cursor.fetchall()
46 try:
47 primary_key = rows[0][0];
48 except:
49 primary_key = None
50
51
52 res = {}
53 query = """
54 select col.column_name, col.table_name, col.constraint_name,
55 c2.table_name, c2.column_name
56 from all_cons_columns col, all_constraints con, all_cons_columns c2
57 where con.constraint_type = 'R'
58 and con.r_constraint_name = c2.constraint_name
59 and con.constraint_name = col.constraint_name
60 and not (col.position = c2.position and
61 col.table_name = c2.table_name)
62 and col.table_name = '%s'
63 """
64 cursor.execute(query % table_name)
65 relations = {}
66 rows = cursor.fetchall()
67 for row in rows:
68 if row[0] != primary_key:
69 try:
70 relations[columnum(cursor, table_name, row[0])] = (
71 columnum(cursor,row[3],row[4]),
72 row[3]
73 )
74 except:
75 pass
76 return relations
77
78def get_indexes(cursor, table_name):
79 """
80 Returns a dictionary of fieldname -> infodict for the given table,
81 where each infodict is in the format:
82 {'primary_key': boolean representing whether it's the primary key,
83 'unique': boolean representing whether it's a unique index}
84 """
85 cursor.execute("select col.column_name, con.constraint_type from all_cons_columns col, all_constraints con where col.constraint_name = con.constraint_name and con.constraint_type in ('P','U') and col.table_name = '%s'" % table_name )
86 rows = cursor.fetchall()
87 res = {}
88 for r in rows:
89 res[r[0]] = {'primary_key': 0, 'unique': 0}
90
91 for r in rows:
92 if r[1] == 'P':
93 res[r[0]]['primary_key'] = 1
94 if r[1] == 'U':
95 res[r[0]]['unique'] = 1
96
97 return res
98
99# Maps type codes to Django Field types.
100DATA_TYPES_REVERSE = {
101 NUMBER: 'IntegerField',
102 ROWID: 'IntegerField',
103 LONG_STRING: 'TextField',
104 STRING: 'TextField',
105 FIXED_CHAR: 'CharField',
106 Timestamp: 'DateTimeField',
107 LOB: 'TextField',
108 BLOB: 'TextField',
109 CLOB: 'TextField',
110 BINARY:'TextField',
111}
Back to Top