1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 | ### DYNAMIC ###
#Utilities for creating dynamic models, primarily to support the same model
#in many db's
#################
from django.db import connections
from django.db import models
from django.db.models import fields
from django.db.models.fields.related import RelatedField
from django.db.models import loading
from django.db.models.manager import ManagerDescriptor
def create_model(name, fields=None, app_label='', module='', options=None, admin=None):
"""One example of how to create a model dynamically at run-time. The majority
of this is taken from code.djangoproject.com/wiki/DynamicModels"""
class Meta:
# Using type('Meta', ...) gives a dictproxy error during model creation
pass
if app_label:
# app_label must be set using the Meta inner class
setattr(Meta, 'app_label', app_label)
# Update Meta with any options that were provided
if options is not None:
for key, value in options.items():
setattr(Meta, key, value)
# Set up a dictionary to simulate declarations within a class
attrs = {'__module__': module, 'Meta': Meta}
# Add in any fields that were provided
if fields:
attrs.update(fields)
# Create an Admin inner class if admin options were provided
if admin is not None:
class Admin:
pass
for key, value in admin:
setattr(Admin, key, value)
attrs['Admin'] = Admin
# Create the class, which automatically triggers ModelBase processing
return type(name, (models.Model,), attrs)
def copy_field(field):
"""Instantiate a new field, with all of the values from the old one, except the
to and to_field in the case of related fields"""
base_kw = dict([(n, getattr(field,n, '_null')) for n in fields.Field.__init__.im_func.func_code.co_varnames])
if isinstance(field, fields.related.RelatedField):
rel = base_kw.get('rel')
rel_kw = dict([(n, getattr(rel,n, '_null')) for n in rel.__init__.im_func.func_code.co_varnames])
if isinstance(field, fields.related.ForeignKey):
base_kw['to_field'] = rel_kw.pop('field_name')
base_kw.update(rel_kw)
base_kw.pop('self')
return field.__class__(**base_kw)
def get_names(model, other_db):
"""Get names for the new class and the (dummy) module it will be stored under"""
name = "%s_%s" % (model.__name__, other_db)
app_label = "%s.%s" %(other_db, model._meta.app_label)
return name, app_label
def duplicate_model(model, other_db):
"""Given model and other_db (which is a key in django.db.__init__.connections)
create a new model called model.__name__ + other_db, where the default_manager's
db points to the correct connection object. Return the new model and any reverse
relations that need to be created"""
meta_opts = ['db_table','db_tablespace','get_latest_by','order_with_respect_to',\
'ordering','unique_together','verbose_name','verbose_name_plural']
options = dict([(k, getattr(model._meta, k)) for k in meta_opts])
name, app_label = get_names(model, other_db)
module = app_label
#Copy the functions, properties and any defined managers
items = model.__dict__.items()
#Luckily all of the functions that django adds are called _curried
funcs = filter(lambda x: x[1].func_name != '_curried', \
[i for i in items if i[1].__class__.__name__ == 'function'])
props = [i for i in items if i[1].__class__ == property]
managers = [i for i in items if isinstance(i[1], ManagerDescriptor)]
fields = dict(funcs + props + managers)
#None of the items in fields are *really* fields, they will be dealt with next
new_cls = create_model(name, fields, app_label, module, options)
setattr(new_cls, 'db', other_db)
#Reset _meta.pk to None to allow correct functioning of Options.add_field
new_cls._meta.pk, new_cls._meta.has_auto_field = None, False
#reset fields to empty to get rid of id field created during model creation
new_cls._meta.fields = []
new_cls._default_manager.db = connections[other_db]
fields = dict([(f.name, copy_field(f)) for f in model._meta.fields + model._meta.many_to_many])
for fld_name, f in fields.items():
if isinstance(f, RelatedField):
to = f.rel.to
if to == model:
new_to = new_cls
else:
new_to = duplicate_model(to, other_db)[0]
f.rel.to = new_to
new_cls.add_to_class(fld_name, f)
#Make sure all of the reverse reltionships work too.
rev_related = []
for ro in model._meta.get_all_related_objects() \
+ model._meta.get_all_related_many_to_many_objects():
rel_model_name = get_names(ro.model, other_db)[0]
if not loading._app_models[module].has_key(rel_model_name.lower()): #is it already created?
rev_related.append(ro.model)
return new_cls, rev_related
def duplicate_model_and_rels(model,other_db):
"""This is necessary to prevent infinate recursion within duplicate_model"""
new_cls, rels = duplicate_model(model,other_db)
for r in rels:
duplicate_model(r, other_db)
return new_cls
def migrate_table_structure(model, new_db_key):
"""Get the CREATE TABLE statement from the existing model and execute
it on the new db, in the new db's sql dialect"""
new_db = connections[new_db_key]
builder = new_db.get_creation_module().builder
cursor = new_db.connection.cursor()
new_introspection_mod = new_db.get_introspection_module()
try:
assert model._meta.db_table.upper() not in \
[t.upper() for t in new_introspection_mod.get_table_list(cursor)], \
'Table %s already exists' % model._meta.db_table
except AssertionError:
return #Skip it if this table already exisits
#A BoundStatement's __str__() is the SQL itself
create_table = builder.get_create_table(model)
cursor.execute(create_table[0][0].__str__())
|
Comments