Usage:
class MyModel(ModelWithHistory):
class History:
model = True # save model changes into admin's LogEntry table
fields = ('f1', 'f2') # save these fields history to AttributeLogEntry table
f1 = CharField(max_length=100)
f2 = IntegerField()
for threadlocals, see http://code.djangoproject.com/wiki/CookBookThreadlocalsAndUser
Aware! Not thoroughly tested yet. May cause problems with loading fixtures.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | #core/history.py
from django.contrib.auth.models import User
from middleware import threadlocals
from django.contrib.contenttypes.models import ContentType
from django.contrib.admin.models import ADDITION
from django.contrib.admin.models import CHANGE
from django.contrib.admin.models import LogEntry
from django.db import models
from django.contrib.admin.models import DELETION
from django.db.models import signals
from django.db.models.base import ModelBase
import datetime
def prepare_fields(instance):
output = {}
class_fields = instance.__class__._meta.fields
instance._history_fields = {}
all = dict([(f.name, f) for f in class_fields])
for field_name in instance._history['fields']:
modelfield = all[field_name]
value = getattr(instance, modelfield.attname)
if value is None: value = ''
output[field_name] = unicode(value)
return output
def add_signals(cls):
def post_delete(instance, **_kwargs):
if instance._history.get('model', False):
instance._create_log_entry(DELETION)
def pre_save(instance, **_kwargs):
if instance._history.get('fields', []):
if instance.pk is None:
instance._history_fields = {}
for field_name in instance._history['fields']:
instance._history_fields[field_name] = ''
else:
try:
db_instance = instance.__class__.objects.get(pk=instance.pk)
except instance.__class__.DoesNotExist:
db_instance = instance
instance._history_fields = prepare_fields(db_instance)
if instance._history.get('model', False):
if instance.pk == None:
instance._history_action = ADDITION
else:
instance._history_action = CHANGE
def post_save(instance, **_kwargs):
if instance._history.get('fields', []):
pre_fields = instance._history_fields
post_fields = prepare_fields(instance)
for name, after in post_fields.iteritems():
#print 'looking if', name, 'changed...'
before = pre_fields[name]
if before != after:
# field has been changed
#print 'changed', name, 'from', before, 'to', after
instance._create_field_log_entry(name, after)
#print 'checking done.'
if instance._history.get('model', False):
instance._create_log_entry(instance._history_action)
signals.pre_save.connect(pre_save, sender=cls, weak=False)
signals.post_save.connect(post_save, sender=cls, weak=False)
signals.post_delete.connect(post_delete, sender=cls, weak=False)
class ModelWithHistoryBase(ModelBase):
def __new__(cls, name, bases, attrs):
Model = ModelBase.__new__(cls, name, bases, attrs)
history = getattr(Model, 'History', None)
if history:
history = history.__dict__
if not 'model' in history:
history['model'] = False
if not 'fields' in history:
history['fields'] = []
else:
#raise "Please add History subclass to your model"
history = {
'model': False,
'fields': []
}
Model._history = history
add_signals(Model)
return Model
class ModelWithHistory(models.Model):
__metaclass__ = ModelWithHistoryBase
class Meta:
abstract = True
def _create_log_entry(self, action):
if threadlocals.get_current_user().is_anonymous():
user = User.objects.get(pk=0)
else:
user = threadlocals.get_current_user()
history = LogEntry(user=user, object_id = self.pk, action_flag = action,
content_type = ContentType.objects.get_for_model(self))
try:
history.object_repr = repr(self)
except Exception:
history.object_repr = "(unknown)"
history.save()
def _create_field_log_entry(self, name, value):
if threadlocals.get_current_user().is_anonymous():
user = User.objects.get(pk=0)
else:
user = threadlocals.get_current_user()
from core.models import AttributeLogEntry
history = AttributeLogEntry(user=user, object_id = self.pk, field_name=name, field_value = value,
content_type = ContentType.objects.get_for_model(self))
try:
history.object_repr = repr(self)
except Exception:
history.object_repr = "(unknown)"
history.save()
def get_history(self):
content_type = ContentType.objects.get_for_model(self)
return LogEntry.objects.filter(object_id=self.pk, content_type=content_type)
def has_history(self):
return bool(self.__class__._history.get('model', False))
def last_edited_at(self):
history = list(self.get_history()[:1])
if not history:
return datetime.datetime(2000, 1, 1, 0, 0, 0)
else:
return history[0].action_time
def last_edited_by(self):
history = list(self.get_history()[:1])
if not history:
return User.objects.get(pk=1)
else:
return history[0].user
#core/models.py
from django.db import models
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.utils.safestring import mark_safe
from django.contrib.admin.util import quote
from django.utils.encoding import smart_unicode
from django.utils.translation import ugettext_lazy as _
from django.utils.datetime_safe import strftime
import datetime
#almost dupe of LogEntry model
class AttributeLogEntry(models.Model):
class Meta:
verbose_name = _('attribute log entry')
verbose_name_plural = _('attribute log entries')
db_table = 'django_attribute_log'
ordering = ('-action_time',)
action_time = models.DateTimeField(_('action time'), auto_now=True)
user = models.ForeignKey(User)
content_type = models.ForeignKey(ContentType, blank=True, null=True)
object_id = models.IntegerField(_('object id'), blank=True, null=True)
field_name = models.CharField(_('field name'), max_length=200, blank=True, null=True)
field_value = models.TextField(_('field value'), null=True, blank=True)
def __repr__(self):
return smart_unicode(self.action_time)
@staticmethod
def get_history(obj, field):
content_type = ContentType.objects.get_for_model(obj)
return AttributeLogEntry.objects.filter(object_id=obj.pk, field_name=field, content_type=content_type)
def get_edited_object(self):
"Returns the edited object represented by this log entry"
return self.content_type.get_object_for_this_type(pk=self.object_id)
def get_admin_url(self):
"""
Returns the admin URL to edit the object represented by this log entry.
This is relative to the Django admin index page.
"""
return mark_safe(u"%s/%s/%s/" % (self.content_type.app_label, self.content_type.model, quote(self.object_id)))
@staticmethod
def last_edited_at(obj, field):
history = list(AttributeLogEntry.get_history(obj, field)[:1])
if not history:
return None
else:
return strftime(history[0].action_time, "%Y-%m-%d %H:%M")
@staticmethod
def last_edited_by(obj, field):
history = list(AttributeLogEntry.get_history(obj, field)[:1])
if not history:
return None
else:
return history[0].user
|
More like this
- Template tag - list punctuation for a list of items by shapiromatron 12 months ago
- JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 1 year ago
- Serializer factory with Django Rest Framework by julio 1 year, 7 months ago
- Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 7 months ago
- Help text hyperlinks by sa2812 1 year, 8 months ago
Comments
Cool I hadn't known about threadlocals. Crazy hacky!
#
Please login first before commenting.