Login

Queryset Foreach

Author:
kcarnold
Posted:
March 29, 2009
Language:
Python
Version:
1.0
Tags:
queryset progress callback map foreach iterator
Score:
1 (after 1 ratings)

Call a function for each element in a queryset (actually, any list).

Features:

  • stable memory usage (thanks to Django paginators)
  • progress indicators
  • wraps batches in transactions
  • can take managers or even models (e.g., Assertion.objects)
  • warns about DEBUG.
  • handles failures of single items without dying in general.
  • stable even if items are added or removed during processing (gets a list of ids at the start)

Returns a Status object, with the following interesting attributes

  • total: number of items in the queryset
  • num_successful: count of successful items
  • failed_ids: list of ids of items that failed
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import time, traceback, logging, sys
class Status(object):
    def __init__(self):
        self.num_successful = 0
        self.failed_ids = []
        self.done = False
        self.cur_idx = 0

    def __repr__(self):
        return u'<Status: %s/%s, %s failed>' % (
            getattr(self, 'cur_idx', '-'),
            getattr(self, 'total', '-'),
            self.num_failed)

    @property
    def num_failed(self): return len(self.failed_ids)

    def start(self):
        self.start_time = time.time()

    def finished(self):
        self.cur_idx = self.total
        self.done = True
        self.end_time = time.time()

    @property
    def rate(self):
        if self.done:
            end_time = self.end_time
        else:
            end_time = time.time()
        return self.cur_idx / (end_time - self.start_time)

    @property
    def time_left(self):
        rate = self.rate
        if rate == 0: return 0
        return (self.total - self.cur_idx) / self.rate

def progress_callback(status):
    sys.stderr.write('%d/%d failed=%d, rate~%.2f per second, left~%.2f sec    \r' % (
            status.cur_idx, status.total, status.num_failed, status.rate, status.time_left))
    if status.done: sys.stderr.write('\n')
    sys.stderr.flush()


def queryset_foreach(queryset, f, batch_size=1000,
                     progress_callback=progress_callback, transaction=True):
    '''
    Call a function for each element in a queryset (actually, any list).

    Features:
    * stable memory usage (thanks to Django paginators)
    * progress indicators
    * wraps batches in transactions
    * can take managers or even models (e.g., Assertion.objects)
    * warns about DEBUG.
    * handles failures of single items without dying in general.
    * stable even if items are added or removed during processing
    (gets a list of ids at the start)

    Returns a Status object, with the following interesting attributes
      total: number of items in the queryset
      num_successful: count of successful items
      failed_ids: list of ids of items that failed
    '''
    
    from django.conf import settings
    if settings.DEBUG:
        print >> sys.stderr, 'Warning: DEBUG is on. django.db.connection.queries may use up a lot of memory.'
    
    # Get querysets corresponding to managers
    from django.shortcuts import _get_queryset
    queryset = _get_queryset(queryset)
    
    # Get a snapshot of all the ids that match the query
    logging.info('qs4e: Getting list of objects')
    ids = list(queryset.values_list(queryset.model._meta.pk.name, flat=True))

    # Initialize status
    status = Status()
    status.total = len(ids)

    def do_all_objects(objects):
        for id, obj in objects.iteritems():
            try:
                f(obj)
                status.num_successful += 1
            except Exception: # python 2.5+: doesn't catch KeyboardInterrupt or SystemExit
                traceback.print_exc()
                status.failed_ids.append(id)
        
    if transaction:
        # Wrap each batch in a transaction
        from django.db import transaction
        do_all_objects = transaction.commit_on_success(do_all_objects)
    
    from django.core.paginator import Paginator
    paginator = Paginator(ids, batch_size)

    status.start()
    progress_callback(status)
    for page_num in paginator.page_range:
        status.page = page = paginator.page(page_num)
        status.cur_idx = page.start_index()-1
        progress_callback(status)
        objects = queryset.in_bulk(page.object_list)
        do_all_objects(objects)
    status.finished()

    progress_callback(status)

    return status

More like this

  1. RelatedMixin for Details and Updates with Related Object Lists by christhekeele 2 years, 10 months ago
  2. Template filter that divides a list into exact columns by davmuz 3 years, 2 months ago
  3. django_bulk_save.py - defer saving of django models to bulk SQL commits by preetkukreti 5 years, 1 month ago
  4. Ajax progress bar by ebartels 6 years, 12 months ago
  5. Generic views with row-level permission handling by mwicat 3 years, 11 months ago

Comments

Please login first before commenting.