Login

Multiple querysets

Author:
t_rybik
Posted:
February 20, 2010
Language:
Python
Version:
1.2
Score:
2 (after 2 ratings)

This is an upgrade of snippet 1103.

Exemplary usage:

class Blog(models.Model):
    name = models.CharField(max_length=100)
    def __unicode__(self):
        return self.name

class Post(models.Model):
    title = models.CharField(max_length=50)
    blog = models.ForeignKey(Blog)
    def __unicode__(self):
        return self.title
    class Meta:
        abstract=True

class Article(Post):
    text = models.TextField()

class Link(Post):
    url = models.URLField()

blog = Blog(name="Exemplary blog")
blog.save()
Article(title="#1", text="Exemplary article 1", blog=blog).save()
Article(title="#2", text="Exemplary article 2", blog=blog).save()
Link(title="#3", url="http://exemplary.link.com/", blog=blog).save()

qs1 = Article.objects.all()
qs2 = Link.objects.all()
qsseq = QuerySetSequence(qs1, qs2)

# those all work also on IableSequence
len(qsseq)
len(QuerySetSequence(qs2, qs2))
qsseq[len(qs1)].title

# this is QuerySetSequence specific
qsseq.order_by('blog.name','-title')
excluded_homo = qsseq.exclude(title__contains="3")
# homogenic results - returns QuerySet
type(excluded_homo)
excluded_hetero = qsseq.exclude(title="#2")
# heterogenic results - returns QuerySetSequence
type(excluded_hetero)
excluded_hetero.exists()

You can implement more QuerySet API methods if needed. If full API is implemented it makes sense to also subclass the QuerySet class.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
from itertools import chain, dropwhile
from operator import mul, attrgetter, __not__

from django.db.models.query import REPR_OUTPUT_SIZE, EmptyQuerySet



def mul_it(it1, it2):
    '''
    Element-wise iterables multiplications.
    '''
    assert len(it1) == len(it2),\
           "Can not element-wise multiply iterables of different length."
    return map(mul, it1, it2)



def chain_sing(*iterables_or_items):
    '''
    As itertools.chain except that if an argument is not iterable then chain it
    as a singleton.
    '''
    for iter_or_item in iterables_or_items:
        if hasattr(iter_or_item, '__iter__'):
            for item in iter_or_item:
                yield item
        else:
            yield iter_or_item





class IableSequence(object):
    '''
    Wrapper for sequence of iterable and indexable by non-negative integers
    objects. That is a sequence of objects which implement __iter__, __len__ and
    __getitem__ for slices, ints and longs.

    Note: not a Django-specific class.
    '''
    def __init__(self, *args, **kwargs):
        self.iables = args # wrapped sequence
        self._len = None # length cache
        self._collapsed = [] # collapsed elements cache

    def __len__(self):
        if not self._len:
            self._len = sum(len(iable) for iable in self.iables)
        return self._len


    def __iter__(self):
        return chain(*self.iables)

    def __nonzero__(self):
        try:
            iter(self).next()
        except StopIteration:
            return False
        return True


    def _collect(self, start=0, stop=None, step=1):
        if not stop:
            stop = len(self)
        sub_iables = []
        # collect sub sets
        it = self.iables.__iter__()
        try:
            while stop>start:
                i = it.next()
                i_len = len(i)
                if i_len > start:
                    # no problem with 'stop' being too big
                    sub_iables.append(i[start:stop:step])
                start = max(0, start-i_len)
                stop -= i_len
        except StopIteration:
            pass
        return sub_iables

    def __getitem__(self, key):
        '''
        Preserves wrapped indexable sequences.
        Does not support negative indices.
        '''
        # params validation
        if not isinstance(key, (slice, int, long)):
            raise TypeError
        assert ((not isinstance(key, slice) and (key >= 0))
                or (isinstance(key, slice) and (key.start is None or key.start >= 0)
                    and (key.stop is None or key.stop >= 0))), \
                "Negative indexing is not supported."
        # initialization
        if isinstance(key, slice):
            start, stop, step = key.indices(len(self))
            ret_item=False
        else: # isinstance(key, (int,long))
            start, stop, step = key, key+1, 1
            ret_item=True
        # collect sub sets
        ret_iables = self._collect(start, stop, step)
        # return the simplest possible answer
        if not len(ret_iables):
            if ret_item:
                raise IndexError("'%s' index out of range" % self.__class__.__name__)
            return ()
        if ret_item:
            # we have exactly one query set with exactly one item
            assert len(ret_iables) == 1 and len(ret_iables[0]) == 1
            return ret_iables[0][0]
        # otherwise we have more then one item in at least one query set
        if len(ret_iables) == 1:
            return ret_iables[0]
        # Note: this can't be self.__class__ instead of IableSequence; exemplary
        # cause is that indexing over query sets returns lists so we can not
        # return QuerySetSequence by default. Some type checking enhancement can
        # be implemented in subclasses.
        return IableSequence(*ret_iables)


    def collapse(self, stop=None):
        '''
        Collapses sequence into a list.

        Try to do it effectively with caching.
        '''
        if not stop:
            stop = len(self)
        # if we already calculated sufficient collapse then return it
        if len(self._collapsed) >= stop:
            return self._collapsed[:stop]
        # otherwise collapse only the missing part
        items = self._collapsed
        sub_iables = self._collect(len(self._collapsed), stop)
        for sub_iable in sub_iables:
            items+=sub_iable
        # cache new collapsed items
        self._collapsed = items
        return self._collapsed

    def __repr__(self):
        # get +1 element for the truncation msg if applicable
        items = self.collapse(stop=REPR_OUTPUT_SIZE+1)
        if len(items) > REPR_OUTPUT_SIZE:
            items[-1] = "...(remaining elements truncated)..."
        return repr(items)



class QuerySetSequence(IableSequence):
    '''
    Wrapper for the query sets sequence without the restriction on the identity
    of the base models.
    '''
    def count(self):
        if not self._len:
            self._len = sum(qs.count() for qs in self.iables)
        return self._len

    def __len__(self):
        # override: use DB effective count's instead of len()
        return self.count()

    def order_by(self, *field_names):
        '''
        Returns a list of the QuerySetSequence items with the ordering changed.
        '''
        # construct a comparator function based on the field names prefixes
        reverses = [1] * len(field_names)
        field_names = list(field_names)
        for i in range(len(field_names)):
            field_name = field_names[i]
            if field_name[0] == '-':
                reverses[i] = -1
                field_names[i] = field_name[1:]
        # wanna iterable and attrgetter returns single item if 1 arg supplied
        fields_getter = lambda i: chain_sing(attrgetter(*field_names)(i))
        # comparator gets the first non-zero value of the field comparison
        # results taking into account reverse order for fields prefixed with '-'
        comparator = lambda i1, i2:\
            dropwhile(__not__,
              mul_it(map(cmp, fields_getter(i1), fields_getter(i2)), reverses)
            ).next()
        # return new sorted list
        return sorted(self.collapse(), cmp=comparator)

    def filter(self, *args, **kwargs):
        """
        Returns a new QuerySetSequence or instance with the args ANDed to the
        existing set.

        QuerySetSequence is simplified thus result actually can be one of:
        QuerySetSequence, QuerySet, EmptyQuerySet.
        """
        return self._filter_or_exclude(False, *args, **kwargs)

    def exclude(self, *args, **kwargs):
        """
        Returns a new QuerySetSequence instance with NOT (args) ANDed to the
        existing set.

        QuerySetSequence is simplified thus result actually can be one of:
        QuerySetSequence, QuerySet, EmptyQuerySet.
        """
        return self._filter_or_exclude(True, *args, **kwargs)

    def _simplify(self, qss=None):
        '''
        Returns QuerySetSequence, QuerySet or EmptyQuerySet depending on the
        contents of items, i.e. at least two non empty QuerySets, exactly one
        non empty QuerySet and all empty QuerySets respectively.

        Does not modify original QuerySetSequence.
        '''
        not_empty_qss = filter(None, qss if qss else self.iables)
        if not len(not_empty_qss):
            return EmptyQuerySet()
        if len(not_empty_qss) == 1:
            return not_empty_qss[0]
        return QuerySetSequence(*not_empty_qss)

    def _filter_or_exclude(self, negate, *args, **kwargs):
        '''
        Maps _filter_or_exclude over QuerySet items and simplifies the result.
        '''
        # each Query set is cloned separately
        return self._simplify(*map(lambda qs:
            qs._filter_or_exclude(negate, *args, **kwargs), self.iables))

    def exists(self):
        for qs in self.iables:
            if qs.exists():
                return True
        return False

More like this

  1. Template tag - list punctuation for a list of items by shapiromatron 11 months, 2 weeks ago
  2. JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 11 months, 3 weeks ago
  3. Serializer factory with Django Rest Framework by julio 1 year, 6 months ago
  4. Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 7 months ago
  5. Help text hyperlinks by sa2812 1 year, 8 months ago

Comments

Ojaybee (on August 1, 2015):

Would it be possible to add annotate() and distinct() functionality to this?

#

Please login first before commenting.