1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 | ###
# Implementation
###
import tempfile
import os
import errno
from django.core.files import locks
from django.core.files.move import file_move_safe
from django.utils.text import get_valid_filename
from django.core.files.storage import FileSystemStorage, Storage
class OverwritingStorage(FileSystemStorage):
"""
File storage that allows overwriting of stored files.
"""
def get_available_name(self, name):
return name
def _save(self, name, content):
"""
Lifted partially from django/core/files/storage.py
"""
full_path = self.path(name)
directory = os.path.dirname(full_path)
if not os.path.exists(directory):
os.makedirs(directory)
elif not os.path.isdir(directory):
raise IOError("%s exists and is not a directory." % directory)
# This file has a file path that we can move.
if hasattr(content, 'temporary_file_path'):
temp_data_location = content.temporary_file_path()
else:
tmp_prefix = "tmp_%s" %(get_valid_filename(name), )
temp_data_location = tempfile.mktemp(prefix=tmp_prefix,
dir=self.location)
try:
# This is a normal uploadedfile that we can stream.
# This fun binary flag incantation makes os.open throw an
# OSError if the file already exists before we open it.
fd = os.open(temp_data_location,
os.O_WRONLY | os.O_CREAT |
os.O_EXCL | getattr(os, 'O_BINARY', 0))
locks.lock(fd, locks.LOCK_EX)
for chunk in content.chunks():
os.write(fd, chunk)
locks.unlock(fd)
os.close(fd)
except Exception, e:
if os.path.exists(temp_data_location):
os.remove(temp_data_location)
raise
file_move_safe(temp_data_location, full_path)
content.close()
if settings.FILE_UPLOAD_PERMISSIONS is not None:
os.chmod(full_path, settings.FILE_UPLOAD_PERMISSIONS)
return name
###
# Tests (to be run with django-nose, although they could easily be adapted to work with unittest)
###
import os
import shutil
import tempfile
from django.core.files.base import ContentFile as C
from django.core.files import File
from django.conf import settings
from nose.tools import assert_equal
from .storage import OverwritingStorage
class TestOverwritingDefaultStorage(object):
def setup(self):
self.location = tempfile.mktemp(prefix="overwriting_storage_test")
self.storage = OverwritingDefaultStorage(location=self.location)
def teardown(self):
shutil.rmtree(self.location)
def test_new_file(self):
s = self.storage
assert not s.exists("foo")
s.save("foo", C("new"))
assert_equal(s.open("foo").read(), "new")
def test_overwriting_existing_file_with_string(self):
s = self.storage
s.save("foo", C("old"))
name = s.save("foo", C("new"))
assert_equal(s.open("foo").read(), "new")
assert_equal(name, "foo")
def test_overwrite_with_file(self):
s = self.storage
input_file = s.location + "/input_file"
with open(input_file, "w") as input:
input.write("new")
s.save("foo", C("old"))
name = s.save("foo", File(open(input_file)))
assert_equal(s.open("foo").read(), "new")
assert_equal(name, "foo")
def test_upload_fails(self):
s = self.storage
class Explosion(Exception):
pass
class ExplodingContentFile(C):
def __init__(self):
super(ExplodingContentFile, self).__init__("")
def chunks(self):
yield "bad chunk"
raise Explosion("explode!")
s.save("foo", C("old"))
try:
s.save("foo", ExplodingContentFile())
raise Exception("Oh no! ExplodingContentFile didn't explode.")
except Explosion:
pass
assert_equal(s.open("foo").read(), "old")
|
Comments