Changes between Version 1 and Version 2 of AuditTrail
- Timestamp:
- Aug 27, 2007, 8:42:32 AM (17 years ago)
Legend:
- Unmodified
- Added
- Removed
- Modified
-
AuditTrail
v1 v2 25 25 }}} 26 26 27 This simple addition will do the rest, allowing you to run `syncdb` and install the audit model. Once it's installed, the following code will work as shown below. As you will see, `Person.history` becomes a manager that's used to access the audit trail for a particular object.27 This simple addition will do the rest, allowing you to run `syncdb` and install the audit model. Once it's installed, the following code will work as shown below. As you will see, `Person.history` becomes a manager that's used to access the audit trail for a particular object. The type of manager available depends on how you access the audit trail. From an instance, the audit trail will automatically be filtered to only return results related to that instance. From the class itself, the results will not be filtered in any way, and is the likely approach for doing reporting across several audited items. 28 28 29 29 {{{ … … 43 43 John Public as of 2007-08-14 20:31:21.852000: 65000 44 44 John Public as of 2007-08-14 20:30:58.959000: 50000 45 >>> person2 = Person.objects.create(first_name='Tom', last_name='Smith', salary=25000) 46 >>> person2 47 <Person: Tom Smith> 48 >>> person.history.count() 49 2 50 >>> person.history.count() 51 2L 52 >>> person2.history.count() 53 1L 54 >>> Person.history.count() 55 3L 45 56 }}} 46 57 … … 63 74 {{{ 64 75 #!python 65 import re66 67 76 from django.dispatch import dispatcher 68 77 from django.db import models 69 78 from django.core.exceptions import ImproperlyConfigured 79 import re 80 import types 81 try: 82 import settings_audit 83 except ImportError: 84 settings_audit = None 70 85 71 86 value_error_re = re.compile("^.+'(.+)'$") 72 87 73 88 class AuditTrail(object): 74 def __init__(self, show_in_admin=False): 75 self.show_in_admin = show_in_admin 89 def __init__(self, show_in_admin=False, save_change_type=True, audit_deletes=True, 90 track_fields=None): 91 self.opts = {} 92 self.opts['show_in_admin'] = show_in_admin 93 self.opts['save_change_type'] = save_change_type 94 self.opts['audit_deletes'] = audit_deletes 95 if track_fields: 96 self.opts['track_fields'] = track_fields 97 else: 98 self.opts['track_fields'] = [] 76 99 77 100 def contribute_to_class(self, cls, name): 78 101 # This should only get added once the class is otherwise complete 79 80 102 def _contribute(sender): 81 model = create_audit_model(sender, self.show_in_admin)103 model = create_audit_model(sender, **self.opts) 82 104 descriptor = AuditTrailDescriptor(model._default_manager, sender._meta.pk.attname) 83 105 setattr(sender, name, descriptor) 84 106 107 def _audit_track(instance, field_arr): 108 field_name = field_arr[0] 109 try: 110 return getattr(instance, field_name) 111 except: 112 if len(field_arr) > 2: 113 if callable(field_arr[2]): 114 fn = field_arr[2] 115 return fn(instance) 116 else: 117 return field_arr[2] 118 85 119 def _audit(sender, instance): 86 # Write model changes to the audit model 120 # Write model changes to the audit model. 121 # instance is the current (non-audit) model. 87 122 kwargs = {} 88 123 for field in sender._meta.fields: 89 kwargs[field.attname] = getattr(instance, field.attname) 124 #kwargs[field.attname] = getattr(instance, field.attname) 125 kwargs[field.name] = getattr(instance, field.name) 126 if self.opts['save_change_type']: 127 kwargs['_audit_change_type'] = 'U' 128 for field_arr in model._audit_track: 129 kwargs[field_arr[0]] = _audit_track(instance, field_arr) 90 130 model._default_manager.create(**kwargs) 91 131 dispatcher.connect(_audit, signal=models.signals.post_save, sender=cls, weak=False) 132 133 if self.opts['audit_deletes']: 134 def _audit_delete(sender, instance): 135 # Write model changes to the audit model 136 kwargs = {} 137 for field in sender._meta.fields: 138 kwargs[field.name] = getattr(instance, field.name) 139 if self.opts['save_change_type']: 140 kwargs['_audit_change_type'] = 'D' 141 for field_arr in model._audit_track: 142 kwargs[field_arr[0]] = _audit_track(instance, field_arr) 143 model._default_manager.create(**kwargs) 144 dispatcher.connect(_audit_delete, signal=models.signals.pre_delete, sender=cls, weak=False) 92 145 93 146 dispatcher.connect(_contribute, signal=models.signals.class_prepared, sender=cls, weak=False) … … 100 153 def __get__(self, instance=None, owner=None): 101 154 if instance == None: 102 raise AttributeError, "Audit trail is only accessible via %s instances." % type.__name__ 103 return create_audit_manager(self.manager, self.pk_attribute, instance._get_pk_val()) 155 #raise AttributeError, "Audit trail is only accessible via %s instances." % type.__name__ 156 return create_audit_manager_class(self.manager) 157 else: 158 return create_audit_manager_with_pk(self.manager, self.pk_attribute, instance._get_pk_val()) 104 159 105 160 def __set__(self, instance, value): 106 161 raise AttributeError, "Audit trail may not be edited in this manner." 107 162 108 def create_audit_manager(manager, pk_attribute, pk): 163 def create_audit_manager_with_pk(manager, pk_attribute, pk): 164 """Create an audit trail manager based on the current object""" 165 class AuditTrailWithPkManager(manager.__class__): 166 def __init__(self): 167 self.model = manager.model 168 169 def get_query_set(self): 170 return super(AuditTrailWithPkManager, self).get_query_set().filter(**{pk_attribute: pk}) 171 return AuditTrailWithPkManager() 172 173 def create_audit_manager_class(manager): 109 174 """Create an audit trail manager based on the current object""" 110 175 class AuditTrailManager(manager.__class__): 111 176 def __init__(self): 112 177 self.model = manager.model 113 114 def get_query_set(self):115 return super(AuditTrailManager, self).get_query_set().filter(**{pk_attribute: pk})116 178 return AuditTrailManager() 117 179 118 def create_audit_model(cls, show_in_admin):180 def create_audit_model(cls, **kwargs): 119 181 """Create an audit model for the specific class""" 120 182 name = cls.__name__ + 'Audit' … … 133 195 '_audit__str__': cls.__str__.im_func, 134 196 '__str__': lambda self: '%s as of %s' % (self._audit__str__(), self._audit_timestamp), 197 '_audit_track': _track_fields(track_fields=kwargs['track_fields'], unprocessed=True) 135 198 } 136 199 137 if show_in_admin: 200 if 'save_change_type' in kwargs and kwargs['save_change_type']: 201 attrs['_audit_change_type'] = models.CharField(maxlength=1) 202 203 if 'show_in_admin' in kwargs and kwargs['show_in_admin']: 138 204 # Enable admin integration 139 205 class Admin: … … 143 209 # Copy the fields from the existing model to the audit model 144 210 for field in cls._meta.fields: 145 if field.attname in attrs: 211 #if field.attname in attrs: 212 if field.name in attrs: 146 213 raise ImproperlyConfigured, "%s cannot use %s as it is needed by AuditTrail." % (cls.__name__, field.attname) 147 attrs[field.attname] = copy_field(field) 148 214 #attrs[field.attname] = copy_field(field) 215 attrs[field.name] = copy_field(field) 216 217 for track_field in _track_fields(kwargs['track_fields']): 218 if track_field['name'] in attrs: 219 raise NameError('Field named "%s" already exists in audit version of %s' % (track_field['name'], cls.__name__)) 220 attrs[track_field['name']] = copy_field(track_field['field']) 221 149 222 return type(name, (models.Model,), attrs) 150 223 … … 169 242 while copied_field is None: 170 243 try: 171 copied_field = cls(**kwargs) 244 if isinstance(field, models.ForeignKey): 245 copied_field = cls(field.rel.to, **kwargs) 246 elif isinstance(field, models.OneToOneField): 247 copied_field = models.ForeignKey(field.rel.to, **kwargs) 248 else: 249 copied_field = cls(**kwargs) 172 250 except (TypeError, ValueError), e: 173 251 # Some attributes, like creation_counter, aren't valid arguments … … 180 258 181 259 return copied_field 260 261 def _build_track_field(track_item): 262 track = {} 263 track['name'] = track_item[0] 264 if isinstance(track_item[1], models.Field): 265 track['field'] = track_item[1] 266 elif issubclass(track_item[1], models.Model): 267 track['field'] = models.ForeignKey(track_item[1]) 268 else: 269 raise TypeError('Track fields only support items that are Fields or Models.') 270 return track 271 272 def _track_fields(track_fields=None, unprocessed=False): 273 # Add in the fields from the Audit class "track" attribute. 274 tracks_found = [] 275 276 if settings_audit: 277 global_track_fields = getattr(settings_audit, 'GLOBAL_TRACK_FIELDS', []) 278 for track_item in global_track_fields: 279 if unprocessed: 280 tracks_found.append(track_item) 281 else: 282 tracks_found.append(_build_track_field(track_item)) 283 284 if track_fields: 285 for track_item in track_fields: 286 if unprocessed: 287 tracks_found.append(track_item) 288 else: 289 tracks_found.append(_build_track_field(track_item)) 290 return tracks_found 182 291 }}}