A variant of https://djangosnippets.org/snippets/2850/ that allows expensive calls to be deferred to an asynchronous process.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | from datetime import timedelta
from django.core.cache import cache
from hashlib import md5
from importlib import import_module
from random import randint
from redis import Redis
from rq import Queue
redis_conn = Redis()
redis_queue = Queue(connection=redis_conn)
class CacheWarmingUp(Exception):
# Based on http://djangosnippets.org/snippets/564/
# Modified to stagger cache results (+/- 20% to cache durations)
# Modified to prevent cache stampeding (on cache miss, inserts a placeholder while regenerating a hit)
# Modified to allow correct storage of None results without triggering cache expiry
# Modified to provide asynchronous cache regeneration through python-rq
# Cached objects should customize their __repr__ response to include updated timestamps and primary keys
def cache_result(*decorator_args, **decorator_kwargs):
# Extract the static parameters for this decorator
decorator_cache_key = decorator_kwargs.pop("cache_key_override", None)
duration = timedelta(*decorator_args, **decorator_kwargs)
seconds = int((duration.microseconds + (duration.seconds + duration.days * 24 * 3600) * 1e6) / 1e6)
prevent_asynchronous_deferral = decorator_kwargs.pop("cache_prevent_async", seconds < 600)
def doCache(original_function):
# If caching for 0 seconds, don't even bother
if not seconds:
return original_function
def wrapped_function(*function_args, **function_kwargs):
# Generate the key from the function name and given arguments
skip_cache_read = function_kwargs.pop("skip_cache_read", False)
function_cache_key = function_kwargs.pop("cache_key_override", None)
if function_cache_key or decorator_cache_key:
unhashed_key = function_cache_key or decorator_cache_key
key = unhashed_key
unhashed_key = unicode([
key = md5(unhashed_key).hexdigest()
flag = key + "flag"
# Determine what's already stored in cache
cached_flag = cache.get(flag) if not skip_cache_read else None
cached_result = cache.get(key) if not skip_cache_read else None
if cached_flag:
# If a result exists, return it.
if cached_result:
return cached_result[0]
# If the flag is set but we have no result then the cache is warming up
raise CacheWarmingUp(unhashed_key)
# Protect against cache stampeding
cache.set(flag, True, 10 + pow(seconds, 0.5))
# Enqueue a task to regenerate the result, returning a stale result in the interim
if cached_result and not prevent_asynchronous_deferral:
return cached_result[0]
# If we have no flag and no result, generate it right now (aka warm the cache)
result = original_function(*function_args, **function_kwargs)
store_cache_result(key, flag, seconds, result)
return result
return wrapped_function
return doCache
# A common way of storing both a result and a flag into the cache
def store_cache_result(key, flag, seconds, result):
cache.set(flag, True, seconds + randint(
-int(seconds * EXPIRY_VARIANCE),
+int(seconds * EXPIRY_VARIANCE),
cache.set(key, (result, ), seconds * 100)
# The function by which our job queue will process deferred calls
def refresh_cache(key, flag, seconds, function_module, function_name, function_args, function_kwargs):
function_kwargs["skip_cache_read"] = True
function = getattr(import_module(function_module), function_name)
except AttributeError:
function = getattr(function_args[0], function_name)
function_args = function_args[1:]
function_kwargs["cache_key_override"] = key
function(*function_args, **function_kwargs)
