Version 2 (modified by 17 years ago) ( diff ) | ,
---|
Automating an audit trail
As raised in a recent discussion on django-developers, this code is one solution for creating an audit trail for a given model. This is very much incomplete, and is intended to be used as a base to work from. See Caveats below for more information.
Usage
Copy the code at the bottom of this article into a location of your choice. It's just a one-file utility, so it doesn't require an app directory or anything. The examples below assume it's called audit.py
and is somewhere on your PYTHONPATH.
In your models file, there are only a couple things to do. First, obviously you'll need to import your audit file, or possibly just get AuditTrail
from within it. Then, add an AuditTrail
to the model of your choice, assigning it to whatever name you like. That's the only thing necessary to set up the audit trail and get Python-level acecss to it. If you need to view the audit information in the admin interface, simply add show_in_admin=True
as an argument to AuditTrail
.
from django.db import models import audit class Person(models.Model): first_name = models.CharField(maxlength=255) last_name = models.CharField(maxlength=255) salary = models.PositiveIntegerField() history = audit.AuditTrail() def __str__(self): return "%s %s" % (self.first_name, self.last_name)
This simple addition will do the rest, allowing you to run syncdb
and install the audit model. Once it's installed, the following code will work as shown below. As you will see, Person.history
becomes a manager that's used to access the audit trail for a particular object. The type of manager available depends on how you access the audit trail. From an instance, the audit trail will automatically be filtered to only return results related to that instance. From the class itself, the results will not be filtered in any way, and is the likely approach for doing reporting across several audited items.
>>> from myapp.models import Person >>> person = Person.objects.create(first_name='John', last_name='Public', salary=50000) >>> <Person: John Public> >>> person.history.count() 1 >>> person.salary = 65000 >>> person.save() >>> person.history.count() 2 >>> for item in person.history.all(): ... print "%s: %s" % (item, item.salary) John Public as of 2007-08-14 20:31:21.852000: 65000 John Public as of 2007-08-14 20:30:58.959000: 50000 >>> person2 = Person.objects.create(first_name='Tom', last_name='Smith', salary=25000) >>> person2 <Person: Tom Smith> >>> person.history.count() 2 >>> person.history.count() 2L >>> person2.history.count() 1L >>> Person.history.count() 3L
As you can see, the audit trail is listed with the most recent state first. Each entry also inclues a timestamp when the edit took place.
Caveats
For one thing, only post_save
is used right now, so if you have need for post_delete
, you'll need to put that in whatever way you'd like.
Also, in order to copy the fields from the original model to the audit model, it uses some hackery I'm not particularly proud of. It seems to work for all the cases I would have hoped it would, but it relies on the arguments passed to the Field class being named the same as the attributes stored on the Field object after it's created. If there's ever a time that's not the case, it will fail completely on that Field type.
Speaking of which, it fails completely on ForeignKey
s and ManyToManyField
s, something I've yet to remedy. That's definitely a must-have, but I haven't worked out the best way to go about it. And since this whole things isn't something I'm particularly interested in, I'm probably going to leave that up to somebody else to work out.
It currently copies and overrides the model's __str__
method, so that it can helpfully describe each entry in the audit history. This means, however, that if your __str__
method relies on any other methods (such as get_full_name
or similar), it won't work and will need to be adjusted.
Code
Hopefully there are enough comments to make sense of what's going on. More information can be found here.
from django.dispatch import dispatcher from django.db import models from django.core.exceptions import ImproperlyConfigured import re import types try: import settings_audit except ImportError: settings_audit = None value_error_re = re.compile("^.+'(.+)'$") class AuditTrail(object): def __init__(self, show_in_admin=False, save_change_type=True, audit_deletes=True, track_fields=None): self.opts = {} self.opts['show_in_admin'] = show_in_admin self.opts['save_change_type'] = save_change_type self.opts['audit_deletes'] = audit_deletes if track_fields: self.opts['track_fields'] = track_fields else: self.opts['track_fields'] = [] def contribute_to_class(self, cls, name): # This should only get added once the class is otherwise complete def _contribute(sender): model = create_audit_model(sender, **self.opts) descriptor = AuditTrailDescriptor(model._default_manager, sender._meta.pk.attname) setattr(sender, name, descriptor) def _audit_track(instance, field_arr): field_name = field_arr[0] try: return getattr(instance, field_name) except: if len(field_arr) > 2: if callable(field_arr[2]): fn = field_arr[2] return fn(instance) else: return field_arr[2] def _audit(sender, instance): # Write model changes to the audit model. # instance is the current (non-audit) model. kwargs = {} for field in sender._meta.fields: #kwargs[field.attname] = getattr(instance, field.attname) kwargs[field.name] = getattr(instance, field.name) if self.opts['save_change_type']: kwargs['_audit_change_type'] = 'U' for field_arr in model._audit_track: kwargs[field_arr[0]] = _audit_track(instance, field_arr) model._default_manager.create(**kwargs) dispatcher.connect(_audit, signal=models.signals.post_save, sender=cls, weak=False) if self.opts['audit_deletes']: def _audit_delete(sender, instance): # Write model changes to the audit model kwargs = {} for field in sender._meta.fields: kwargs[field.name] = getattr(instance, field.name) if self.opts['save_change_type']: kwargs['_audit_change_type'] = 'D' for field_arr in model._audit_track: kwargs[field_arr[0]] = _audit_track(instance, field_arr) model._default_manager.create(**kwargs) dispatcher.connect(_audit_delete, signal=models.signals.pre_delete, sender=cls, weak=False) dispatcher.connect(_contribute, signal=models.signals.class_prepared, sender=cls, weak=False) class AuditTrailDescriptor(object): def __init__(self, manager, pk_attribute): self.manager = manager self.pk_attribute = pk_attribute def __get__(self, instance=None, owner=None): if instance == None: #raise AttributeError, "Audit trail is only accessible via %s instances." % type.__name__ return create_audit_manager_class(self.manager) else: return create_audit_manager_with_pk(self.manager, self.pk_attribute, instance._get_pk_val()) def __set__(self, instance, value): raise AttributeError, "Audit trail may not be edited in this manner." def create_audit_manager_with_pk(manager, pk_attribute, pk): """Create an audit trail manager based on the current object""" class AuditTrailWithPkManager(manager.__class__): def __init__(self): self.model = manager.model def get_query_set(self): return super(AuditTrailWithPkManager, self).get_query_set().filter(**{pk_attribute: pk}) return AuditTrailWithPkManager() def create_audit_manager_class(manager): """Create an audit trail manager based on the current object""" class AuditTrailManager(manager.__class__): def __init__(self): self.model = manager.model return AuditTrailManager() def create_audit_model(cls, **kwargs): """Create an audit model for the specific class""" name = cls.__name__ + 'Audit' class Meta: db_table = '%s_audit' % cls._meta.db_table verbose_name_plural = '%s audit trail' % cls._meta.verbose_name ordering = ['-_audit_timestamp'] # Set up a dictionary to simulate declarations within a class attrs = { '__module__': cls.__module__, 'Meta': Meta, '_audit_id': models.AutoField(primary_key=True), '_audit_timestamp': models.DateTimeField(auto_now_add=True), '_audit__str__': cls.__str__.im_func, '__str__': lambda self: '%s as of %s' % (self._audit__str__(), self._audit_timestamp), '_audit_track': _track_fields(track_fields=kwargs['track_fields'], unprocessed=True) } if 'save_change_type' in kwargs and kwargs['save_change_type']: attrs['_audit_change_type'] = models.CharField(maxlength=1) if 'show_in_admin' in kwargs and kwargs['show_in_admin']: # Enable admin integration class Admin: pass attrs['Admin'] = Admin # Copy the fields from the existing model to the audit model for field in cls._meta.fields: #if field.attname in attrs: if field.name in attrs: raise ImproperlyConfigured, "%s cannot use %s as it is needed by AuditTrail." % (cls.__name__, field.attname) #attrs[field.attname] = copy_field(field) attrs[field.name] = copy_field(field) for track_field in _track_fields(kwargs['track_fields']): if track_field['name'] in attrs: raise NameError('Field named "%s" already exists in audit version of %s' % (track_field['name'], cls.__name__)) attrs[track_field['name']] = copy_field(track_field['field']) return type(name, (models.Model,), attrs) def copy_field(field): """Copy an instantiated field to a new instantiated field""" if isinstance(field, models.AutoField): # Audit models have a separate AutoField return models.IntegerField(db_index=True, editable=False) copied_field = None cls = field.__class__ # Use the field's attributes to start with kwargs = field.__dict__.copy() # Swap primary keys for ordinary indexes if field.primary_key: kwargs['db_index'] = True del kwargs['primary_key'] # Some hackery to copy the field while copied_field is None: try: if isinstance(field, models.ForeignKey): copied_field = cls(field.rel.to, **kwargs) elif isinstance(field, models.OneToOneField): copied_field = models.ForeignKey(field.rel.to, **kwargs) else: copied_field = cls(**kwargs) except (TypeError, ValueError), e: # Some attributes, like creation_counter, aren't valid arguments # So try to remove that argument so the field can try again try: del kwargs[value_error_re.match(str(e)).group(1)] except: # The attribute was already removed, and something's still going wrong raise e return copied_field def _build_track_field(track_item): track = {} track['name'] = track_item[0] if isinstance(track_item[1], models.Field): track['field'] = track_item[1] elif issubclass(track_item[1], models.Model): track['field'] = models.ForeignKey(track_item[1]) else: raise TypeError('Track fields only support items that are Fields or Models.') return track def _track_fields(track_fields=None, unprocessed=False): # Add in the fields from the Audit class "track" attribute. tracks_found = [] if settings_audit: global_track_fields = getattr(settings_audit, 'GLOBAL_TRACK_FIELDS', []) for track_item in global_track_fields: if unprocessed: tracks_found.append(track_item) else: tracks_found.append(_build_track_field(track_item)) if track_fields: for track_item in track_fields: if unprocessed: tracks_found.append(track_item) else: tracks_found.append(_build_track_field(track_item)) return tracks_found