import logging from django.db.models.manager import BaseManager from django.db.models.query import QuerySet logger = logging.getLogger("CacheInDictManager") class CacheInDictQuerySet(QuerySet): def __init__( self, model=None, query=None, using=None, hints=None, manager=None, debug=None, ): super().__init__(model, query, using, hints) self.manager = manager self.debug = debug def get(self, *args, **kwargs): instance = self.manager.get_instance_from_cache(*args, **kwargs) if instance is not None: return instance instance = super().get(*args, **kwargs) if self.debug: logger.info(f"CacheInDictQuerySet.get() instance fetched:{instance}") self.manager.fill_cache_for_instance(instance) return instance class CacheInDictManager(BaseManager.from_queryset(CacheInDictQuerySet)): """ A manager that caches get() request with some keys. Each key can be made of one or more fields of the model. """ def __init__(self, *args, **kwargs): additional_custom_cache_keys_definitions = kwargs.pop( "additional_custom_cache_keys_definitions", None ) super().__init__(*args, **kwargs) self.custom_cache_keys_definitions = { # each cache key definition must be ordered lexicographically # example : ("aaa_id", "bbb_id", "bca_id"), "pk": ("pk",), "id": ("id",), } if additional_custom_cache_keys_definitions is not None: self.custom_cache_keys_definitions.update( additional_custom_cache_keys_definitions ) self.custom_cache = { key_identifier: {} for key_identifier in self.custom_cache_keys_definitions.keys() } self.debug = False if self.debug: logger.info( f"CacheInDictManager created id:{id(self)}" f" custom_cache_keys_definitions:{self.custom_cache_keys_definitions}" ) def get_queryset(self): """ Return a new QuerySet object. Subclasses can override this method to customize the behavior of the Manager. """ return self._queryset_class( model=self.model, using=self._db, hints=self._hints, manager=self, debug=self.debug, ) @staticmethod def get_key_identifier(key): return "_".join(key) @staticmethod def get_key_value_for_instance(key_definition, instance): sub_key_values = [] for sub_key in key_definition: if not hasattr(instance, sub_key): return None sub_key_value = getattr(instance, sub_key) if sub_key_value is None: return None if not isinstance(sub_key_value, int): return None if sub_key_value <= 0: return None sub_key_values.append(str(sub_key_value)) return "_".join(sub_key_values) def fill_cache_for_instance(self, instance): if self.debug: logger.info( f"CacheInDictManager.fill_cache_for_instance() instance:{instance}" ) for ( key_identifier, key_definition, ) in self.custom_cache_keys_definitions.items(): key_value = CacheInDictManager.get_key_value_for_instance( key_definition, instance ) if self.debug: logger.info( "CacheInDictManager.fill_cache_for_instance()" f" key_identifier:{key_identifier} key_value:{key_value}" ) self.custom_cache[key_identifier][key_value] = instance def get_instance_from_cache(self, *args, **kwargs): sorted_kwargs_keys = list(kwargs.keys()) sorted_kwargs_keys.sort(key=lambda x: x) key_identifier = CacheInDictManager.get_key_identifier(sorted_kwargs_keys) if self.debug: logger.info( f"CacheInDictManager.get_instance_from_cache() computed key_identifier:{key_identifier}" ) if self.custom_cache_keys_definitions.get(key_identifier) is not None: if self.debug: logger.info( "CacheInDictManager.get_instance_from_cache() key_identifier exists" ) sorted_kwargs = list(kwargs.items()) sorted_kwargs.sort(key=lambda x: x[0]) key_value = "_".join(map(lambda x: str(x[1]), sorted_kwargs)) if self.debug: logger.info( f"CacheInDictManager.get_instance_from_cache() computed key_value:{key_value}" ) instance = self.custom_cache[key_identifier].get(key_value) if instance is not None: if self.debug: logger.info( "CacheInDictManager.get_instance_from_cache() cache hit" ) return instance return None