Automating an audit trail

As raised in a recent discussion on django-developers, this code is one solution for creating an audit trail for a given model. This is working in multiple production sites, but is still incomplete. See Caveats below for more information. The code below requires an SVN checkout as of r8223 or later.

Usage

Copy the code at the bottom of this article into a location of your choice. It's just a one-file utility, so it doesn't require an app directory or anything. The examples below assume it's called audit.py and is somewhere on your PYTHONPATH.

In your models file, there are only a couple things to do. First, obviously you'll need to import your audit file, or possibly just get AuditTrail from within it. Then, add an AuditTrail to the model of your choice, assigning it to whatever name you like. That's the only thing necessary to set up the audit trail and get Python-level access to it. If you need to view the audit information in the admin interface, simply add show_in_admin=True as an argument to AuditTrail.

from django.db import models
import audit

class Person(models.Model):
    first_name = models.CharField(max_length=255)
    last_name = models.CharField(max_length=255)
    salary = models.PositiveIntegerField()

    history = audit.AuditTrail()

    def __str__(self):
        return "%s %s" % (self.first_name, self.last_name)

This simple addition will do the rest, allowing you to run syncdb and install the audit model. Once it's installed, the following code will work as shown below. As you will see, Person.history becomes a manager that's used to access the audit trail for a particular object. The type of manager available depends on how you access the audit trail. From an instance, the audit trail will automatically be filtered to only return results related to that instance. From the model class itself, the results will not be filtered in any way, and is the likely approach for doing reporting across several audited items.

>>> from myapp.models import Person
>>> person = Person.objects.create(first_name='John', last_name='Public', salary=50000)
>>>
<Person: John Public>
>>> person.history.count()
1
>>> person.salary = 65000
>>> person.save()
>>> person.history.count()
2
>>> for item in person.history.all():
...     print "%s: %s" % (item, item.salary)
John Public as of 2007-08-14 20:31:21.852000: 65000
John Public as of 2007-08-14 20:30:58.959000: 50000
>>> person2 = Person.objects.create(first_name='Tom', last_name='Smith', salary=25000)
>>> person2
<Person: Tom Smith>
>>> person.history.count()
2
>>> person2.history.count()
1
>>> Person.history.count()
3

As you can see, the audit trail is listed with the most recent state first. Each entry also inclues a timestamp when the edit took place.

Saves and deletes are both tracked, and can be filtered on via Person.history.filter(_audit_change_type='_') . Do not use underscore, use 'I' for inserts, 'U' for updates, and 'D' for deletes.

ForeignKeys and OneToOneFields are now supported both for saving and accessing the audit data. However, it does not archive the contents of the ForeignKey table for the appropriate entries at the same time, and will fail if the ForeignKey a given audit entry is related to is deleted (including if you're auditing the ForeignKey table as well, it does not have a way to link the two audit tables together).

Tracking Extra Information

Sometimes you need to track more information than is available in just the model. For instance, you may want to know who is performing the change on a particular entry, or track some sort of state information about the system. AuditTrail now supports this through the concept of "track fields". These can be specified on a per-model basis or a global basis, and the per-model options will stack with the global ones (but per-model options cannot override global ones currently). Here's an example:

def some_callback(instance):
    return `random.randrange(1, 99)` + 'trackable_val'

class Person(models.Model):
    first_name = models.CharField(maxlength=255)
    last_name = models.CharField(maxlength=255)
    salary = models.PositiveIntegerField()
    history = audit.AuditTrail(track_fields=(('extra_1', models.CharField(maxlength=50), 'hardcoded_value'), ('extra_2', models.CharField(maxlength=50), some_callback),))

    def __str__(self):
        return "%s %s" % (self.first_name, self.last_name)

The track_fields is a tuple of 3-tuples. The 3-tuples are structured (field_name, type_of_field, value) . type_of_field can be any currently functioning field type, although see the Caveats for issues related to ForeignKeys. value can be either a static value or a callback function, which will get called at the time of the save/delete. This means that if you want to do something involving threadlocals at runtime (for getting things out of the request, for instance) you can do it via this callback.

Assume we ran the example above with this new model. You could then do the following:

>>> p_hist = person.history.all()
[<PersonAudit: John Public as of 2007-08-27 09:29:14>, <PersonAudit: John Public as of 2007-08-27 09:28:57>]
>>> p_hist[0].extra_1
'hardcoded_value'
>>> p_hist[0].extra_2
'27trackable_val'

Currently, you cannot filter on these trackable columns, this should be fixable.

Global Track Fields

What if you have a field you want tracked on every model that supports history? No problem! In the root of your project, create a file called settings_audit.py, and put something like this in it:

from django.db import models

def callback_func_ptr2(original_instance):
    import random
    return `random.randrange(1, 99)` + 'hardcoded_global_2'

# Populate the fields that every Audit model in this app will use.
GLOBAL_TRACK_FIELDS = (
    ('global_1', models.CharField(maxlength=50), 'hardcoded_global_1'),
    ('global_2', models.CharField(maxlength=20), callback_func_ptr2),
)

GLOBAL_TRACK_FIELDS is set up exactly the same way as the track_fields option passed into AuditTrail, and has the same uses and limitations.

Caveats

This needs testing! This has only been used in a few cases, there's plenty of possible room for strangeness. It has specifically not been tested for things like safe (de-)serialization.

In order to copy the fields from the original model to the audit model, it uses some hackery I'm not particularly proud of. It seems to work for all the cases I would have hoped it would, but it relies on the arguments passed to the Field class being named the same as the attributes stored on the Field object after it's created. If there's ever a time that's not the case, it will fail completely on that Field type.

It fails completely on ManyToManyFields, something I've yet to remedy. That's definitely a must-have, but I haven't worked out the best way to go about it. And since this whole things isn't something I'm particularly interested in, I'm probably going to leave that up to somebody else to work out.

Likewise, it fails when there are multiple ForeignKeys pointing to the same Model, as it doesn't support / compensate for related_name.

It currently copies and overrides the model's __str__ method, so that it can helpfully describe each entry in the audit history. This means, however, that if your __str__ method relies on any other methods (such as get_full_name or similar), it won't work and will need to be adjusted.

Code

Hopefully there are enough comments to make sense of what's going on. More information can be found here.

from django.dispatch import dispatcher
from django.db import models
from django.core.exceptions import ImproperlyConfigured
from django.contrib import admin
import copy
import re
import types
try:
    import settings_audit
except ImportError:
    settings_audit = None
value_error_re = re.compile("^.+'(.+)'$")

class AuditTrail(object):
    def __init__(self, show_in_admin=False, save_change_type=True, audit_deletes=True,
                 track_fields=None):
        self.opts = {}
        self.opts['show_in_admin'] = show_in_admin
        self.opts['save_change_type'] = save_change_type
        self.opts['audit_deletes'] = audit_deletes
        if track_fields:
            self.opts['track_fields'] = track_fields
        else:
            self.opts['track_fields'] = []

    def contribute_to_class(self, cls, name):
        # This should only get added once the class is otherwise complete
        def _contribute(sender, **kwargs):
            model = create_audit_model(sender, **self.opts)
            if self.opts['show_in_admin']:
                # Enable admin integration
                # If ModelAdmin needs options or different base class, find
                # some way to make the commented code work
                #   cls_admin_name = cls.__name__ + 'Admin'
                #   clsAdmin = type(cls_admin_name, (admin.ModelAdmin,),{})
                #   admin.site.register(cls, clsAdmin)
                # Otherwise, register class with default ModelAdmin
                admin.site.register(model)
            descriptor = AuditTrailDescriptor(model._default_manager, sender._meta.pk.attname)
            setattr(sender, name, descriptor)

            def _audit_track(instance, field_arr, **kwargs):
                field_name = field_arr[0]
                try:
                    return getattr(instance, field_name)
                except:
                    if len(field_arr) > 2:
                        if callable(field_arr[2]):
                            fn = field_arr[2]
                            return fn(instance)
                        else:
                            return field_arr[2]

            def _audit(sender, instance, created, **kwargs):
                # Write model changes to the audit model.
                # instance is the current (non-audit) model.
                kwargs = {}
                for field in sender._meta.fields:
                    #kwargs[field.attname] = getattr(instance, field.attname)
                    kwargs[field.name] = getattr(instance, field.name)
                if self.opts['save_change_type']:
                    if created:
                        kwargs['_audit_change_type'] = 'I'
                    else:
                        kwargs['_audit_change_type'] = 'U'
                for field_arr in model._audit_track:
                    kwargs[field_arr[0]] = _audit_track(instance, field_arr)
                model._default_manager.create(**kwargs)
            ## Uncomment this line for pre r8223 Django builds
            #dispatcher.connect(_audit, signal=models.signals.post_save, sender=cls, weak=False)
            ## Comment this line for pre r8223 Django builds
            models.signals.post_save.connect(_audit, sender=cls, weak=False)

            if self.opts['audit_deletes']:
                def _audit_delete(sender, instance, **kwargs):
                    # Write model changes to the audit model
                    kwargs = {}
                    for field in sender._meta.fields:
                        kwargs[field.name] = getattr(instance, field.name)
                    if self.opts['save_change_type']:
                        kwargs['_audit_change_type'] = 'D'
                    for field_arr in model._audit_track:
                        kwargs[field_arr[0]] = _audit_track(instance, field_arr)
                    model._default_manager.create(**kwargs)
                ## Uncomment this line for pre r8223 Django builds
                #dispatcher.connect(_audit_delete, signal=models.signals.pre_delete, sender=cls, weak=False)
                ## Comment this line for pre r8223 Django builds
                models.signals.pre_delete.connect(_audit_delete, sender=cls, weak=False)
        
        ## Uncomment this line for pre r8223 Django builds
        #dispatcher.connect(_contribute, signal=models.signals.class_prepared, sender=cls, weak=False)
        ## Comment this line for pre r8223 Django builds
        models.signals.class_prepared.connect(_contribute, sender=cls, weak=False)

class AuditTrailDescriptor(object):
    def __init__(self, manager, pk_attribute):
        self.manager = manager
        self.pk_attribute = pk_attribute

    def __get__(self, instance=None, owner=None):
        if instance == None:
            #raise AttributeError, "Audit trail is only accessible via %s instances." % type.__name__
            return create_audit_manager_class(self.manager)
        else:
            return create_audit_manager_with_pk(self.manager, self.pk_attribute, instance._get_pk_val())

    def __set__(self, instance, value):
        raise AttributeError, "Audit trail may not be edited in this manner."

def create_audit_manager_with_pk(manager, pk_attribute, pk):
    """Create an audit trail manager based on the current object"""
    class AuditTrailWithPkManager(manager.__class__):
        def __init__(self, *arg, **kw):
            super(AuditTrailWithPkManager, self).__init__(*arg, **kw)
            self.model = manager.model

        def get_query_set(self):
            qs = super(AuditTrailWithPkManager, self).get_query_set().filter(**{pk_attribute: pk})
            if self._db is not None:
                qs = qs.using(self._db)
            return qs
    return AuditTrailWithPkManager()

def create_audit_manager_class(manager):
    """Create an audit trail manager based on the current object"""
    class AuditTrailManager(manager.__class__):
        def __init__(self, *arg, **kw):
            super(AuditTrailManager, self).__init__(*arg, **kw)
            self.model = manager.model
    return AuditTrailManager()

def create_audit_model(cls, **kwargs):
    """Create an audit model for the specific class"""
    name = cls.__name__ + 'Audit'

    class Meta:
        db_table = '%s_audit' % cls._meta.db_table
        app_label = cls._meta.app_label
        verbose_name_plural = '%s audit trail' % cls._meta.verbose_name
        ordering = ['-_audit_timestamp']

    # Set up a dictionary to simulate declarations within a class
    attrs = {
        '__module__': cls.__module__,
        'Meta': Meta,
        '_audit_id': models.AutoField(primary_key=True),
        '_audit_timestamp': models.DateTimeField(auto_now_add=True, db_index=True),
        '_audit__str__': cls.__str__.im_func,
        '__str__': lambda self: '%s as of %s' % (self._audit__str__(), self._audit_timestamp),
        '_audit_track': _track_fields(track_fields=kwargs['track_fields'], unprocessed=True)
    }

    if 'save_change_type' in kwargs and kwargs['save_change_type']:
        attrs['_audit_change_type'] = models.CharField(max_length=1)

    # Copy the fields from the existing model to the audit model
    for field in cls._meta.fields:
        #if field.attname in attrs:
        if field.name in attrs:
            raise ImproperlyConfigured, "%s cannot use %s as it is needed by AuditTrail." % (cls.__name__, field.attname)
        if isinstance(field, models.AutoField):
            # Audit models have a separate AutoField
            attrs[field.name] = models.IntegerField(db_index=True, editable=False)
        else:
            attrs[field.name] = copy.copy(field)
            # If 'unique' is in there, we need to remove it, otherwise the index
            # is created and multiple audit entries for one item fail.
            attrs[field.name]._unique = False
            # If a model has primary_key = True, a second primary key would be
            # created in the audit model. Set primary_key to false.
            attrs[field.name].primary_key = False

            # Rebuild and replace the 'rel' object to avoid foreign key clashes.
            # Borrowed from the Basie project 
            # Basie is MIT and GPL dual licensed.
            if isinstance(field, models.ForeignKey):
                rel = copy.copy(field.rel)
                rel.related_name = '_audit_' + field.related_query_name()
                attrs[field.name].rel = rel

    for track_field in _track_fields(kwargs['track_fields']):
        if track_field['name'] in attrs:
            raise NameError('Field named "%s" already exists in audit version of %s' % (track_field['name'], cls.__name__))
        attrs[track_field['name']] = copy.copy(track_field['field'])
    
    return type(name, (models.Model,), attrs)

def _build_track_field(track_item):
    track = {}
    track['name'] = track_item[0]
    if isinstance(track_item[1], models.Field):
        track['field'] = track_item[1]
    elif issubclass(track_item[1], models.Model):
        track['field'] = models.ForeignKey(track_item[1])
    else:
        raise TypeError('Track fields only support items that are Fields or Models.')
    return track

def _track_fields(track_fields=None, unprocessed=False):
    # Add in the fields from the Audit class "track" attribute.
    tracks_found = []
    
    if settings_audit:
        global_track_fields = getattr(settings_audit, 'GLOBAL_TRACK_FIELDS', [])
        for track_item in global_track_fields:
            if unprocessed:
                tracks_found.append(track_item)
            else:
                tracks_found.append(_build_track_field(track_item))
    
    if track_fields:
        for track_item in track_fields:
            if unprocessed:
                tracks_found.append(track_item)
            else:
                tracks_found.append(_build_track_field(track_item))
    return tracks_found
Last modified 14 years ago Last modified on Jun 3, 2011, 10:33:49 AM
Note: See TracWiki for help on using the wiki.
Back to Top