Usage:
class MyModel(ModelWithHistory):
class History:
model = True # save model changes into admin's LogEntry table
fields = ('f1', 'f2') # save these fields history to AttributeLogEntry table
f1 = CharField(max_length=100)
f2 = IntegerField()
for threadlocals, see http://code.djangoproject.com/wiki/CookBookThreadlocalsAndUser
Aware! Not thoroughly tested yet. May cause problems with loading fixtures.
| #core/history.py
from django.contrib.auth.models import User
from middleware import threadlocals
from django.contrib.contenttypes.models import ContentType
from django.contrib.admin.models import ADDITION
from django.contrib.admin.models import CHANGE
from django.contrib.admin.models import LogEntry
from django.db import models
from django.contrib.admin.models import DELETION
from django.db.models import signals
from django.db.models.base import ModelBase
import datetime
def prepare_fields(instance):
output = {}
class_fields = instance.__class__._meta.fields
instance._history_fields = {}
all = dict([(f.name, f) for f in class_fields])
for field_name in instance._history['fields']:
modelfield = all[field_name]
value = getattr(instance, modelfield.attname)
if value is None: value = ''
output[field_name] = unicode(value)
return output
def add_signals(cls):
def post_delete(instance, **_kwargs):
if instance._history.get('model', False):
instance._create_log_entry(DELETION)
def pre_save(instance, **_kwargs):
if instance._history.get('fields', []):
if instance.pk is None:
instance._history_fields = {}
for field_name in instance._history['fields']:
instance._history_fields[field_name] = ''
else:
try:
db_instance = instance.__class__.objects.get(pk=instance.pk)
except instance.__class__.DoesNotExist:
db_instance = instance
instance._history_fields = prepare_fields(db_instance)
if instance._history.get('model', False):
if instance.pk == None:
instance._history_action = ADDITION
else:
instance._history_action = CHANGE
def post_save(instance, **_kwargs):
if instance._history.get('fields', []):
pre_fields = instance._history_fields
post_fields = prepare_fields(instance)
for name, after in post_fields.iteritems():
#print 'looking if', name, 'changed...'
before = pre_fields[name]
if before != after:
# field has been changed
#print 'changed', name, 'from', before, 'to', after
instance._create_field_log_entry(name, after)
#print 'checking done.'
if instance._history.get('model', False):
instance._create_log_entry(instance._history_action)
signals.pre_save.connect(pre_save, sender=cls, weak=False)
signals.post_save.connect(post_save, sender=cls, weak=False)
signals.post_delete.connect(post_delete, sender=cls, weak=False)
class ModelWithHistoryBase(ModelBase):
def __new__(cls, name, bases, attrs):
Model = ModelBase.__new__(cls, name, bases, attrs)
history = getattr(Model, 'History', None)
if history:
history = history.__dict__
if not 'model' in history:
history['model'] = False
if not 'fields' in history:
history['fields'] = []
else:
#raise "Please add History subclass to your model"
history = {
'model': False,
'fields': []
}
Model._history = history
add_signals(Model)
return Model
class ModelWithHistory(models.Model):
__metaclass__ = ModelWithHistoryBase
class Meta:
abstract = True
def _create_log_entry(self, action):
if threadlocals.get_current_user().is_anonymous():
user = User.objects.get(pk=0)
else:
user = threadlocals.get_current_user()
history = LogEntry(user=user, object_id = self.pk, action_flag = action,
content_type = ContentType.objects.get_for_model(self))
try:
history.object_repr = repr(self)
except Exception:
history.object_repr = "(unknown)"
history.save()
def _create_field_log_entry(self, name, value):
if threadlocals.get_current_user().is_anonymous():
user = User.objects.get(pk=0)
else:
user = threadlocals.get_current_user()
from core.models import AttributeLogEntry
history = AttributeLogEntry(user=user, object_id = self.pk, field_name=name, field_value = value,
content_type = ContentType.objects.get_for_model(self))
try:
history.object_repr = repr(self)
except Exception:
history.object_repr = "(unknown)"
history.save()
def get_history(self):
content_type = ContentType.objects.get_for_model(self)
return LogEntry.objects.filter(object_id=self.pk, content_type=content_type)
def has_history(self):
return bool(self.__class__._history.get('model', False))
def last_edited_at(self):
history = list(self.get_history()[:1])
if not history:
return datetime.datetime(2000, 1, 1, 0, 0, 0)
else:
return history[0].action_time
def last_edited_by(self):
history = list(self.get_history()[:1])
if not history:
return User.objects.get(pk=1)
else:
return history[0].user
#core/models.py
from django.db import models
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.utils.safestring import mark_safe
from django.contrib.admin.util import quote
from django.utils.encoding import smart_unicode
from django.utils.translation import ugettext_lazy as _
from django.utils.datetime_safe import strftime
import datetime
#almost dupe of LogEntry model
class AttributeLogEntry(models.Model):
class Meta:
verbose_name = _('attribute log entry')
verbose_name_plural = _('attribute log entries')
db_table = 'django_attribute_log'
ordering = ('-action_time',)
action_time = models.DateTimeField(_('action time'), auto_now=True)
user = models.ForeignKey(User)
content_type = models.ForeignKey(ContentType, blank=True, null=True)
object_id = models.IntegerField(_('object id'), blank=True, null=True)
field_name = models.CharField(_('field name'), max_length=200, blank=True, null=True)
field_value = models.TextField(_('field value'), null=True, blank=True)
def __repr__(self):
return smart_unicode(self.action_time)
@staticmethod
def get_history(obj, field):
content_type = ContentType.objects.get_for_model(obj)
return AttributeLogEntry.objects.filter(object_id=obj.pk, field_name=field, content_type=content_type)
def get_edited_object(self):
"Returns the edited object represented by this log entry"
return self.content_type.get_object_for_this_type(pk=self.object_id)
def get_admin_url(self):
"""
Returns the admin URL to edit the object represented by this log entry.
This is relative to the Django admin index page.
"""
return mark_safe(u"%s/%s/%s/" % (self.content_type.app_label, self.content_type.model, quote(self.object_id)))
@staticmethod
def last_edited_at(obj, field):
history = list(AttributeLogEntry.get_history(obj, field)[:1])
if not history:
return None
else:
return strftime(history[0].action_time, "%Y-%m-%d %H:%M")
@staticmethod
def last_edited_by(obj, field):
history = list(AttributeLogEntry.get_history(obj, field)[:1])
if not history:
return None
else:
return history[0].user
|
More like this
- Template tag - list punctuation for a list of items by shapiromatron 10 months, 2 weeks ago
- JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 10 months, 3 weeks ago
- Serializer factory with Django Rest Framework by julio 1 year, 5 months ago
- Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 6 months ago
- Help text hyperlinks by sa2812 1 year, 7 months ago
Comments
Cool I hadn't known about threadlocals. Crazy hacky!
#
Please login first before commenting.