Login

django_bulk_save.py - defer saving of django models to bulk SQL commits

Author:
preetkukreti
Posted:
February 8, 2010
Language:
Python
Version:
1.1
Score:
1 (after 1 ratings)

When called, this module dynamically alters the behaviour of model.save() on a list of models so that the SQL is returned and aggregated for a bulk commit later on. This is much faster than performing bulk writing operations using the standard model.save().

To use, simply save the code as django_bulk_save.py and replace this idiom:

for m in model_list:
    # modify m ...
    m.save()   # ouch

with this one:

from django_bulk_save import DeferredBucket
deferred = DeferredBucket()
for m in model_list:
    # modify m ...
    deferred.append(m)
deferred.bulk_save()

Notes:

    • After performing a bulk_save(), the id's of the models do not get automatically updated, so code that depends on models having a pk (e.g. m2m assignments) will need to reload the objects via a queryset.
    • post-save signal is not sent. see above.
    • This code has not been thoroughly tested, and is not guaranteed (or recommended) for production use.
    • It may stop working in newer django versions, or whenever django's model.save() related code gets updated in the future.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# django_bulk_save.py - defer saving of django models to bulk SQL commits.
# -------------------------------------------------------------------------
# Version: 1.0
# Date: 08/02/10 (8 Jan)
# Compatibility: Django 1.1, 1.1.1
# Author: Preet Kukreti
# -------------------------------------------------------------------------
# The general idea is to replace the inefficient idiom:
#
#   for m in model_list:
#       # modify m ...
#       m.save()   # ouch
# 
# with this one:
#   
#   from django_bulk_save import DeferredBucket
#   deferred = DeferredBucket()
#   for m in model_list:
#       # modify m ...
#       deferred.append(m)
#   deferred.bulk_save()
#
# DeferredBucket.bulk_save takes keyword arguments:
#   split_interval - number of SQL operations per commit (default=100)
#   verbose - print progress info (default=False)
# see also the save_sqlx and execute_sqlx functions for more granular control


# -------------------------------------------------------------------------
# The following section dynamically monkey patches some model and queryset
# django code to return SQL queries from model.save() instead of actually
# executing those queries. Searching for 'sqlx' will highlight the main changes

from django.db import connection, transaction, DatabaseError
from django.db.models import signals, sql
from django.db.models.fields import AutoField, FieldDoesNotExist
from django.db.models import query

def _update_sqlx(qs, values):
    """overrides QuerySet._update()"""
    assert qs.query.can_filter(), \
            "Cannot update a query once a slice has been taken."
    query = qs.query.clone(sql.UpdateQuery)
    query.add_update_fields(values)
    qs._result_cache = None
    return query.as_sql() # return SQL tuple


def _insert_sqlx(model, func, values, **kwargs):
    """overrides models.Manager._insert()"""
    return func(model, values, **kwargs)


def insert_query_sqlx(model, values, return_id=False, raw_values=False):
    """overrides insert_query() from models.query module"""
    query = sql.InsertQuery(model, connection)
    query.insert_values(values, raw_values)
    return query.as_sql() # return SQL tuple


def save_sqlx(obj, force_insert=False, force_update=False):
    """overrides model.save()"""
    if force_insert and force_update:
        raise ValueError("Cannot force both insert and updating in "
                "model saving.")
    return save_base_sqlx(obj, force_insert=force_insert, force_update=force_update)


def save_base_sqlx(obj, raw=False, cls=None, origin=None,
            force_insert=False, force_update=False):
    """overrides model.save_base()"""
    assert not (force_insert and force_update)
    if cls is None:
        cls = obj.__class__
        meta = cls._meta
        if not meta.proxy:
            origin = cls
    else:
        meta = cls._meta

    if origin:
        signals.pre_save.send(sender=origin, instance=obj, raw=raw)
        
    if not raw or meta.proxy:
        if meta.proxy:
            org = cls
        else:
            org = None
        for parent, field in meta.parents.items():

            if field and getattr(obj, parent._meta.pk.attname) is None and getattr(obj, field.attname) is not None:
                setattr(obj, parent._meta.pk.attname, getattr(obj, field.attname))

            obj.save_base(cls=parent, origin=org)

            if field:
                setattr(obj, field.attname, obj._get_pk_val(parent._meta))
        if meta.proxy:
            return

    if not meta.proxy:
        non_pks = [f for f in meta.local_fields if not f.primary_key]

        # First, try an UPDATE. If that doesn't update anything, do an INSERT.
        pk_val = obj._get_pk_val(meta)
        pk_set = pk_val is not None
        record_exists = True
        manager = cls._base_manager
        if pk_set:
            # Determine whether a record with the primary key already exists.
            if (force_update or (not force_insert and
                    manager.filter(pk=pk_val).extra(select={'a': 1}).values('a').order_by())):
                # It does already exist, so do an UPDATE.
                if force_update or non_pks:
                    values = [(f, None, (raw and getattr(obj, f.attname) or f.pre_save(obj, False))) for f in non_pks]
                    row_qs = manager.filter(pk=pk_val)
                    result = _update_sqlx(row_qs, values)
                    return result   # return SQL tuple
            else:
                record_exists = False
        if not pk_set or not record_exists:
            if not pk_set:
                if force_update:
                    raise ValueError("Cannot force an update in save() with no primary key.")
                values = [(f, f.get_db_prep_save(raw and getattr(obj, f.attname) or f.pre_save(obj, True))) for f in meta.local_fields if not isinstance(f, AutoField)]
            else:
                values = [(f, f.get_db_prep_save(raw and getattr(obj, f.attname) or f.pre_save(obj, True))) for f in meta.local_fields]

            if meta.order_with_respect_to:
                field = meta.order_with_respect_to
                values.append((meta.get_field_by_name('_order')[0], manager.filter(**{field.name: getattr(obj, field.attname)}).count()))
            record_exists = False

            update_pk = bool(meta.has_auto_field and not pk_set)
            manager._insert = _insert_sqlx
            if values:
                # Create a new record.
                result = manager._insert(obj, insert_query_sqlx, values, return_id=update_pk)
                return result   # return SQL tuple
            else:
                # Create a new record with defaults for everything.
                result = manager._insert(obj, insert_query_sqlx, [(meta.pk, connection.ops.pk_default_value())], return_id=update_pk, raw_values=True)
                return result   # return SQL tuple
            if update_pk:
                setattr(obj, meta.pk.attname, result)
        transaction.commit_unless_managed()

    if origin:
        signals.post_save.send(sender=origin, instance=obj,
            created=(not record_exists), raw=raw)
save_base_sqlx.alters_data = True

# --------------------------------------------------------------------------
import sys
def execute_sqlx(queries, split_interval=100, verbose=False, ostream=sys.stdout):
    '''
    executes <split_interval> queries at a time
    
    queries -- a list of query tuples (field_template, values)
    split_interval -- how many queries to commit at once
    verbose -- print progress info
    '''
    assert type(split_interval) == int
    if split_interval < 1:
        return
    from django.db import connection, transaction
    cursor = connection.cursor()
    current_pass = 1
    qlen = len(queries)
    if qlen == 0:
        if verbose:
            ostream.write('no queries\n')
        return
    more = True
    while more:
        low = (current_pass - 1) * split_interval
        high = (current_pass * split_interval) - 1
        if high >= qlen: # last pass, clamp high, break out
            high = qlen - 1
            more = False
        if verbose:
            ostream.write('executing SQL query %d to %d of %d ...' % (low + 1, high + 1, qlen)) 
        [cursor.execute(f, v) for f, v in queries[low:high+1]]
        transaction.commit_unless_managed()
        current_pass += 1
        if verbose:
            ostream.write('committed\n')
    return

# -------------------------------------------------------------------------
class DeferredBucket(list):
    '''
    helper for deferred saving. just append model instances that
    need deferred saving. When you want to finally save, call bulk_save.
    bulk_save takes the same keyword arguments as execute_sqlx
    '''
    def bulk_save(self, **kwargs):
        execute_sqlx([save_sqlx(i) for i in self], **kwargs)

# -------------------------------------------------------------------------

More like this

  1. Template tag - list punctuation for a list of items by shapiromatron 10 months, 3 weeks ago
  2. JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 11 months ago
  3. Serializer factory with Django Rest Framework by julio 1 year, 5 months ago
  4. Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 6 months ago
  5. Help text hyperlinks by sa2812 1 year, 7 months ago

Comments

udfalkso (on February 8, 2010):

Nice idea. I'm wondering if there might be a sneakier way to do this by abusing the pre/post save signals...

#

kunitoki (on January 17, 2012):

I'm getting this in Django-1.3.1:



Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/home/kunitoki/Projects/xxx/apps/base/models.py", line 1326, in import_rete_actv
    deferred.bulk_save()
  File "home/kunitoki/Projects/xxx/apps/common/database/bulk.py", line 191, in bulk_save
    execute_sqlx([save_sqlx(i) for i in self], **kwargs)
  File "home/kunitoki/Projects/xxx/apps/common/database/bulk.py", line 59, in save_sqlx
    return save_base_sqlx(obj, force_insert=force_insert, force_update=force_update)
  File "home/kunitoki/Projects/xxx/apps/common/database/bulk.py", line 131, in save_base_sqlx
    result = manager._insert(obj, insert_query_sqlx, values, return_id=update_pk)
  File "home/kunitoki/Projects/xxx/apps/common/database/bulk.py", line 45, in _insert_sqlx
    return func(model, values, **kwargs)
  File "home/kunitoki/Projects/xxx/apps/common/database/bulk.py", line 50, in insert_query_sqlx
    query = sql.InsertQuery(model, connection)
  File "/usr/lib64/python2.7/site-packages/django/db/models/sql/subqueries.py", line 140, in __init__
    super(InsertQuery, self).__init__(*args, **kwargs)
  File "/usr/lib64/python2.7/site-packages/django/db/models/sql/query.py", line 121, in __init__
    self.where = where()
TypeError: 'DatabaseWrapper' object is not callable

any ideas how to fix this ?

#

kunitoki (on January 17, 2012):
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/home/kunitoki/xxx/apps/base/models.py", line 1326, in import_batch
    deferred.bulk_save()
  File "/home/kunitoki/xxx/apps/common/database/bulk.py", line 191, in bulk_save
    execute_sqlx([save_sqlx(i) for i in self], **kwargs)
  File "/home/kunitoki/xxx/apps/common/database/bulk.py", line 59, in save_sqlx
    return save_base_sqlx(obj, force_insert=force_insert, force_update=force_update)
  File "/home/kunitoki/xxx/apps/common/database/bulk.py", line 131, in save_base_sqlx
    result = manager._insert(obj, insert_query_sqlx, values, return_id=update_pk)
  File "/home/kunitoki/xxx/apps/common/database/bulk.py", line 45, in _insert_sqlx
    return func(model, values, **kwargs)
  File "/home/kunitoki/xxx/apps/common/database/bulk.py", line 50, in insert_query_sqlx
    query = sql.InsertQuery(model, connection)
  File "/usr/lib64/python2.7/site-packages/django/db/models/sql/subqueries.py", line 140, in __init__
    super(InsertQuery, self).__init__(*args, **kwargs)
  File "/usr/lib64/python2.7/site-packages/django/db/models/sql/query.py", line 121, in __init__
    self.where = where()
TypeError: 'DatabaseWrapper' object is not callable

#

Please login first before commenting.