Login

keeping Model and Field History (everywhere)

Author:
buriy
Posted:
September 12, 2008
Language:
Python
Version:
1.0
Score:
3 (after 3 ratings)

Usage:

class MyModel(ModelWithHistory):
    class History:
        model = True # save model changes into admin's LogEntry table
        fields = ('f1', 'f2') # save these fields history to AttributeLogEntry table
    f1 = CharField(max_length=100)
    f2 = IntegerField()

for threadlocals, see http://code.djangoproject.com/wiki/CookBookThreadlocalsAndUser

Aware! Not thoroughly tested yet. May cause problems with loading fixtures.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#core/history.py
from django.contrib.auth.models import User

from middleware import threadlocals
from django.contrib.contenttypes.models import ContentType
from django.contrib.admin.models import ADDITION
from django.contrib.admin.models import CHANGE
from django.contrib.admin.models import LogEntry
from django.db import models
from django.contrib.admin.models import DELETION
from django.db.models import signals
from django.db.models.base import ModelBase
import datetime

def prepare_fields(instance):
    output = {}
    class_fields = instance.__class__._meta.fields
    instance._history_fields = {}
    all = dict([(f.name, f) for f in class_fields])
    for field_name in instance._history['fields']:
        modelfield = all[field_name]
        value = getattr(instance, modelfield.attname)
        if value is None: value = ''
        output[field_name] = unicode(value)
    return output

def add_signals(cls):
    def post_delete(instance, **_kwargs):
        if instance._history.get('model', False):
            instance._create_log_entry(DELETION)

    def pre_save(instance, **_kwargs):
        if instance._history.get('fields', []):
            if instance.pk is None:
                instance._history_fields = {}
                for field_name in instance._history['fields']:
                    instance._history_fields[field_name] = ''
            else:
                try:
                    db_instance = instance.__class__.objects.get(pk=instance.pk)
                except instance.__class__.DoesNotExist:
                    db_instance = instance
                instance._history_fields = prepare_fields(db_instance)
        if instance._history.get('model', False):
            if instance.pk == None:
                instance._history_action = ADDITION
            else:
                instance._history_action = CHANGE
    
    def post_save(instance, **_kwargs):
        if instance._history.get('fields', []):
            pre_fields = instance._history_fields 
            post_fields = prepare_fields(instance)
            for name, after in post_fields.iteritems():
                #print 'looking if', name, 'changed...'
                before = pre_fields[name] 
                if before != after:
                    # field has been changed
                    #print 'changed', name, 'from', before, 'to', after
                    instance._create_field_log_entry(name, after)
            #print 'checking done.'
            
        if instance._history.get('model', False):
            instance._create_log_entry(instance._history_action)

    signals.pre_save.connect(pre_save, sender=cls, weak=False)
    signals.post_save.connect(post_save, sender=cls, weak=False)
    signals.post_delete.connect(post_delete, sender=cls, weak=False)

class ModelWithHistoryBase(ModelBase):
    def __new__(cls, name, bases, attrs):
        Model = ModelBase.__new__(cls, name, bases, attrs)
        history = getattr(Model, 'History', None)
        if history:
            history = history.__dict__
            if not 'model' in history:
                history['model'] = False 
            if not 'fields' in history:
                history['fields'] = []
        else:
            #raise "Please add History subclass to your model"
            history = {
                'model': False,
                'fields': []
            }
        Model._history = history
        add_signals(Model)
        return Model

class ModelWithHistory(models.Model):
    __metaclass__ = ModelWithHistoryBase
    class Meta:
        abstract = True
        
    def _create_log_entry(self, action):
        if threadlocals.get_current_user().is_anonymous():
            user = User.objects.get(pk=0)
        else:
            user = threadlocals.get_current_user()
        history = LogEntry(user=user, object_id = self.pk, action_flag = action,
                            content_type = ContentType.objects.get_for_model(self))
        try:
            history.object_repr = repr(self)
        except Exception:
            history.object_repr = "(unknown)"
        history.save()

    def _create_field_log_entry(self, name, value):
        if threadlocals.get_current_user().is_anonymous():
            user = User.objects.get(pk=0)
        else:
            user = threadlocals.get_current_user()
        from core.models import AttributeLogEntry
        history = AttributeLogEntry(user=user, object_id = self.pk, field_name=name, field_value = value,
                            content_type = ContentType.objects.get_for_model(self))
        try:
            history.object_repr = repr(self)
        except Exception:
            history.object_repr = "(unknown)"
        history.save()

    def get_history(self):
        content_type = ContentType.objects.get_for_model(self)
        return LogEntry.objects.filter(object_id=self.pk, content_type=content_type)

    def has_history(self):
        return bool(self.__class__._history.get('model', False)) 
        
    def last_edited_at(self):
        history = list(self.get_history()[:1])
        if not history:
            return datetime.datetime(2000, 1, 1, 0, 0, 0)
        else:
            return history[0].action_time
        
    def last_edited_by(self):
        history = list(self.get_history()[:1])
        if not history:
            return User.objects.get(pk=1)
        else:
            return history[0].user

#core/models.py
from django.db import models
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.utils.safestring import mark_safe
from django.contrib.admin.util import quote
from django.utils.encoding import smart_unicode
from django.utils.translation import ugettext_lazy as _
from django.utils.datetime_safe import strftime
import datetime

#almost dupe of LogEntry model
class AttributeLogEntry(models.Model):
    class Meta:
        verbose_name = _('attribute log entry')
        verbose_name_plural = _('attribute log entries')
        db_table = 'django_attribute_log'
        ordering = ('-action_time',)

    action_time = models.DateTimeField(_('action time'), auto_now=True)
    user = models.ForeignKey(User)
    content_type = models.ForeignKey(ContentType, blank=True, null=True)
    object_id = models.IntegerField(_('object id'), blank=True, null=True)
    field_name = models.CharField(_('field name'), max_length=200, blank=True, null=True)
    field_value = models.TextField(_('field value'), null=True, blank=True)
    
    def __repr__(self):
        return smart_unicode(self.action_time)

    @staticmethod
    def get_history(obj, field):
        content_type = ContentType.objects.get_for_model(obj)
        return AttributeLogEntry.objects.filter(object_id=obj.pk, field_name=field, content_type=content_type)

    def get_edited_object(self):
        "Returns the edited object represented by this log entry"
        return self.content_type.get_object_for_this_type(pk=self.object_id)

    def get_admin_url(self):
        """
        Returns the admin URL to edit the object represented by this log entry.
        This is relative to the Django admin index page.
        """
        return mark_safe(u"%s/%s/%s/" % (self.content_type.app_label, self.content_type.model, quote(self.object_id)))

    @staticmethod
    def last_edited_at(obj, field):
        history = list(AttributeLogEntry.get_history(obj, field)[:1])
        if not history:
            return None
        else:
            return strftime(history[0].action_time, "%Y-%m-%d %H:%M")
        
    @staticmethod
    def last_edited_by(obj, field):
        history = list(AttributeLogEntry.get_history(obj, field)[:1])
        if not history:
            return None
        else:
            return history[0].user

More like this

  1. Template tag - list punctuation for a list of items by shapiromatron 12 months ago
  2. JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 1 year ago
  3. Serializer factory with Django Rest Framework by julio 1 year, 7 months ago
  4. Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 7 months ago
  5. Help text hyperlinks by sa2812 1 year, 8 months ago

Comments

gsf0 (on October 29, 2009):

Cool I hadn't known about threadlocals. Crazy hacky!

#

Please login first before commenting.