Version 5 (modified by George Vilches <gav@…>, 17 years ago) ( diff )

--

Automating an audit trail

As raised in a recent discussion on django-developers, this code is one solution for creating an audit trail for a given model. This is very much incomplete, and is intended to be used as a base to work from. See Caveats below for more information.

Usage

Copy the code at the bottom of this article into a location of your choice. It's just a one-file utility, so it doesn't require an app directory or anything. The examples below assume it's called audit.py and is somewhere on your PYTHONPATH.

In your models file, there are only a couple things to do. First, obviously you'll need to import your audit file, or possibly just get AuditTrail from within it. Then, add an AuditTrail to the model of your choice, assigning it to whatever name you like. That's the only thing necessary to set up the audit trail and get Python-level acecss to it. If you need to view the audit information in the admin interface, simply add show_in_admin=True as an argument to AuditTrail.

from django.db import models
import audit

class Person(models.Model):
    first_name = models.CharField(maxlength=255)
    last_name = models.CharField(maxlength=255)
    salary = models.PositiveIntegerField()

    history = audit.AuditTrail()

    def __str__(self):
        return "%s %s" % (self.first_name, self.last_name)

This simple addition will do the rest, allowing you to run syncdb and install the audit model. Once it's installed, the following code will work as shown below. As you will see, Person.history becomes a manager that's used to access the audit trail for a particular object. The type of manager available depends on how you access the audit trail. From an instance, the audit trail will automatically be filtered to only return results related to that instance. From the model class itself, the results will not be filtered in any way, and is the likely approach for doing reporting across several audited items.

>>> from myapp.models import Person
>>> person = Person.objects.create(first_name='John', last_name='Public', salary=50000)
>>>
<Person: John Public>
>>> person.history.count()
1
>>> person.salary = 65000
>>> person.save()
>>> person.history.count()
2
>>> for item in person.history.all():
...     print "%s: %s" % (item, item.salary)
John Public as of 2007-08-14 20:31:21.852000: 65000
John Public as of 2007-08-14 20:30:58.959000: 50000
>>> person2 = Person.objects.create(first_name='Tom', last_name='Smith', salary=25000)
>>> person2
<Person: Tom Smith>
>>> person.history.count()
2
>>> person2.history.count()
1
>>> Person.history.count()
3

As you can see, the audit trail is listed with the most recent state first. Each entry also inclues a timestamp when the edit took place.

Saves and deletes are both tracked, and can be filtered on via Person.history.filter(_audit_change_type='_') . Do not use underscore, use 'U' for inserts/updates, and 'D' for deletes. If #4879 lands, then we will be able to distinguish between inserts and updates, and at that point the audit trail will use 'I' for inserts and 'U' for updates.

ForeignKeys and OneToOneFields are now supported both for saving and accessing the audit data. However, it does not archive the contents of the ForeignKey table for the appropriate entries at the same time, and will fail if the ForeignKey a given audit entry is related to is deleted (including if you're auditing the ForeignKey table as well, it does not have a way to link the two audit tables together).

Caveats

This needs testing! This has only been used in a few cases, there's plenty of possible room for strangeness. It has specifically not been tested for things like safe (de-)serialization.

In order to copy the fields from the original model to the audit model, it uses some hackery I'm not particularly proud of. It seems to work for all the cases I would have hoped it would, but it relies on the arguments passed to the Field class being named the same as the attributes stored on the Field object after it's created. If there's ever a time that's not the case, it will fail completely on that Field type.

It fails completely on ManyToManyFields, something I've yet to remedy. That's definitely a must-have, but I haven't worked out the best way to go about it. And since this whole things isn't something I'm particularly interested in, I'm probably going to leave that up to somebody else to work out.

It currently copies and overrides the model's __str__ method, so that it can helpfully describe each entry in the audit history. This means, however, that if your __str__ method relies on any other methods (such as get_full_name or similar), it won't work and will need to be adjusted.

Code

Hopefully there are enough comments to make sense of what's going on. More information can be found here.

from django.dispatch import dispatcher
from django.db import models
from django.core.exceptions import ImproperlyConfigured
import re
import types
try:
    import settings_audit
except ImportError:
    settings_audit = None

value_error_re = re.compile("^.+'(.+)'$")

class AuditTrail(object):
    def __init__(self, show_in_admin=False, save_change_type=True, audit_deletes=True,
                 track_fields=None):
        self.opts = {}
        self.opts['show_in_admin'] = show_in_admin
        self.opts['save_change_type'] = save_change_type
        self.opts['audit_deletes'] = audit_deletes
        if track_fields:
            self.opts['track_fields'] = track_fields
        else:
            self.opts['track_fields'] = []

    def contribute_to_class(self, cls, name):
        # This should only get added once the class is otherwise complete
        def _contribute(sender):
            model = create_audit_model(sender, **self.opts)
            descriptor = AuditTrailDescriptor(model._default_manager, sender._meta.pk.attname)
            setattr(sender, name, descriptor)

            def _audit_track(instance, field_arr):
                field_name = field_arr[0]
                try:
                    return getattr(instance, field_name)
                except:
                    if len(field_arr) > 2:
                        if callable(field_arr[2]):
                            fn = field_arr[2]
                            return fn(instance)
                        else:
                            return field_arr[2]

            def _audit(sender, instance):
                # Write model changes to the audit model.
                # instance is the current (non-audit) model.
                kwargs = {}
                for field in sender._meta.fields:
                    #kwargs[field.attname] = getattr(instance, field.attname)
                    kwargs[field.name] = getattr(instance, field.name)
                if self.opts['save_change_type']:
                    kwargs['_audit_change_type'] = 'U'
                for field_arr in model._audit_track:
                    kwargs[field_arr[0]] = _audit_track(instance, field_arr)
                model._default_manager.create(**kwargs)
            dispatcher.connect(_audit, signal=models.signals.post_save, sender=cls, weak=False)

            if self.opts['audit_deletes']:
                def _audit_delete(sender, instance):
                    # Write model changes to the audit model
                    kwargs = {}
                    for field in sender._meta.fields:
                        kwargs[field.name] = getattr(instance, field.name)
                    if self.opts['save_change_type']:
                        kwargs['_audit_change_type'] = 'D'
                    for field_arr in model._audit_track:
                        kwargs[field_arr[0]] = _audit_track(instance, field_arr)
                    model._default_manager.create(**kwargs)
                dispatcher.connect(_audit_delete, signal=models.signals.pre_delete, sender=cls, weak=False)

        dispatcher.connect(_contribute, signal=models.signals.class_prepared, sender=cls, weak=False)

class AuditTrailDescriptor(object):
    def __init__(self, manager, pk_attribute):
        self.manager = manager
        self.pk_attribute = pk_attribute

    def __get__(self, instance=None, owner=None):
        if instance == None:
            #raise AttributeError, "Audit trail is only accessible via %s instances." % type.__name__
            return create_audit_manager_class(self.manager)
        else:
            return create_audit_manager_with_pk(self.manager, self.pk_attribute, instance._get_pk_val())

    def __set__(self, instance, value):
        raise AttributeError, "Audit trail may not be edited in this manner."

def create_audit_manager_with_pk(manager, pk_attribute, pk):
    """Create an audit trail manager based on the current object"""
    class AuditTrailWithPkManager(manager.__class__):
        def __init__(self):
            self.model = manager.model

        def get_query_set(self):
            return super(AuditTrailWithPkManager, self).get_query_set().filter(**{pk_attribute: pk})
    return AuditTrailWithPkManager()

def create_audit_manager_class(manager):
    """Create an audit trail manager based on the current object"""
    class AuditTrailManager(manager.__class__):
        def __init__(self):
            self.model = manager.model
    return AuditTrailManager()

def create_audit_model(cls, **kwargs):
    """Create an audit model for the specific class"""
    name = cls.__name__ + 'Audit'

    class Meta:
        db_table = '%s_audit' % cls._meta.db_table
        verbose_name_plural = '%s audit trail' % cls._meta.verbose_name
        ordering = ['-_audit_timestamp']

    # Set up a dictionary to simulate declarations within a class
    attrs = {
        '__module__': cls.__module__,
        'Meta': Meta,
        '_audit_id': models.AutoField(primary_key=True),
        '_audit_timestamp': models.DateTimeField(auto_now_add=True),
        '_audit__str__': cls.__str__.im_func,
        '__str__': lambda self: '%s as of %s' % (self._audit__str__(), self._audit_timestamp),
        '_audit_track': _track_fields(track_fields=kwargs['track_fields'], unprocessed=True)
    }

    if 'save_change_type' in kwargs and kwargs['save_change_type']:
        attrs['_audit_change_type'] = models.CharField(maxlength=1)

    if 'show_in_admin' in kwargs and kwargs['show_in_admin']:
        # Enable admin integration
        class Admin:
            pass
        attrs['Admin'] = Admin

    # Copy the fields from the existing model to the audit model
    for field in cls._meta.fields:
        #if field.attname in attrs:
        if field.name in attrs:
            raise ImproperlyConfigured, "%s cannot use %s as it is needed by AuditTrail." % (cls.__name__, field.attname)
        #attrs[field.attname] = copy_field(field)
        attrs[field.name] = copy_field(field)

    for track_field in _track_fields(kwargs['track_fields']):
        if track_field['name'] in attrs:
            raise NameError('Field named "%s" already exists in audit version of %s' % (track_field['name'], cls.__name__))
        attrs[track_field['name']] = copy_field(track_field['field'])
    
    return type(name, (models.Model,), attrs)

def copy_field(field):
    """Copy an instantiated field to a new instantiated field"""
    if isinstance(field, models.AutoField):
        # Audit models have a separate AutoField
        return models.IntegerField(db_index=True, editable=False)

    copied_field = None
    cls = field.__class__

    # Use the field's attributes to start with
    kwargs = field.__dict__.copy()

    # Swap primary keys for ordinary indexes
    if field.primary_key:
        kwargs['db_index'] = True
        del kwargs['primary_key']

    # Some hackery to copy the field
    while copied_field is None:
        try:
            if isinstance(field, models.ForeignKey):
                copied_field = cls(field.rel.to, **kwargs)
            elif isinstance(field, models.OneToOneField):
                copied_field = models.ForeignKey(field.rel.to, **kwargs)
            else:
                copied_field = cls(**kwargs)
        except (TypeError, ValueError), e:
            # Some attributes, like creation_counter, aren't valid arguments
            # So try to remove that argument so the field can try again
            try:
                del kwargs[value_error_re.match(str(e)).group(1)]
            except:
                # The attribute was already removed, and something's still going wrong
                raise e

    return copied_field

def _build_track_field(track_item):
    track = {}
    track['name'] = track_item[0]
    if isinstance(track_item[1], models.Field):
        track['field'] = track_item[1]
    elif issubclass(track_item[1], models.Model):
        track['field'] = models.ForeignKey(track_item[1])
    else:
        raise TypeError('Track fields only support items that are Fields or Models.')
    return track

def _track_fields(track_fields=None, unprocessed=False):
    # Add in the fields from the Audit class "track" attribute.
    tracks_found = []
    
    if settings_audit:
        global_track_fields = getattr(settings_audit, 'GLOBAL_TRACK_FIELDS', [])
        for track_item in global_track_fields:
            if unprocessed:
                tracks_found.append(track_item)
            else:
                tracks_found.append(_build_track_field(track_item))
    
    if track_fields:
        for track_item in track_fields:
            if unprocessed:
                tracks_found.append(track_item)
            else:
                tracks_found.append(_build_track_field(track_item))
    return tracks_found
Note: See TracWiki for help on using the wiki.
Back to Top