1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 | import time, traceback, logging, sys
class Status(object):
def __init__(self):
self.num_successful = 0
self.failed_ids = []
self.done = False
self.cur_idx = 0
def __repr__(self):
return u'<Status: %s/%s, %s failed>' % (
getattr(self, 'cur_idx', '-'),
getattr(self, 'total', '-'),
self.num_failed)
@property
def num_failed(self): return len(self.failed_ids)
def start(self):
self.start_time = time.time()
def finished(self):
self.cur_idx = self.total
self.done = True
self.end_time = time.time()
@property
def rate(self):
if self.done:
end_time = self.end_time
else:
end_time = time.time()
return self.cur_idx / (end_time - self.start_time)
@property
def time_left(self):
rate = self.rate
if rate == 0: return 0
return (self.total - self.cur_idx) / self.rate
def progress_callback(status):
sys.stderr.write('%d/%d failed=%d, rate~%.2f per second, left~%.2f sec \r' % (
status.cur_idx, status.total, status.num_failed, status.rate, status.time_left))
if status.done: sys.stderr.write('\n')
sys.stderr.flush()
def queryset_foreach(queryset, f, batch_size=1000,
progress_callback=progress_callback, transaction=True):
'''
Call a function for each element in a queryset (actually, any list).
Features:
* stable memory usage (thanks to Django paginators)
* progress indicators
* wraps batches in transactions
* can take managers or even models (e.g., Assertion.objects)
* warns about DEBUG.
* handles failures of single items without dying in general.
* stable even if items are added or removed during processing
(gets a list of ids at the start)
Returns a Status object, with the following interesting attributes
total: number of items in the queryset
num_successful: count of successful items
failed_ids: list of ids of items that failed
'''
from django.conf import settings
if settings.DEBUG:
print >> sys.stderr, 'Warning: DEBUG is on. django.db.connection.queries may use up a lot of memory.'
# Get querysets corresponding to managers
from django.shortcuts import _get_queryset
queryset = _get_queryset(queryset)
# Get a snapshot of all the ids that match the query
logging.info('qs4e: Getting list of objects')
ids = list(queryset.values_list(queryset.model._meta.pk.name, flat=True))
# Initialize status
status = Status()
status.total = len(ids)
def do_all_objects(objects):
for id, obj in objects.iteritems():
try:
f(obj)
status.num_successful += 1
except Exception: # python 2.5+: doesn't catch KeyboardInterrupt or SystemExit
traceback.print_exc()
status.failed_ids.append(id)
if transaction:
# Wrap each batch in a transaction
from django.db import transaction
do_all_objects = transaction.commit_on_success(do_all_objects)
from django.core.paginator import Paginator
paginator = Paginator(ids, batch_size)
status.start()
progress_callback(status)
for page_num in paginator.page_range:
status.page = page = paginator.page(page_num)
status.cur_idx = page.start_index()-1
progress_callback(status)
objects = queryset.in_bulk(page.object_list)
do_all_objects(objects)
status.finished()
progress_callback(status)
return status
|
Comments