Login

Overwriting file storage

Author:
wolever
Posted:
August 24, 2010
Language:
Python
Version:
Not specified
Score:
0 (after 0 ratings)

The title says it all — a subclass of FileSystemStorage which will overwrite files.

Note that saves which fail part way though will leave the original file intact (see test_upload_fails).

Based roughly on http://djangosnippets.org/snippets/2044/ .

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
###
# Implementation
###

import tempfile
import os
import errno                                     

from django.core.files import locks
from django.core.files.move import file_move_safe
from django.utils.text import get_valid_filename
from django.core.files.storage import FileSystemStorage, Storage

class OverwritingStorage(FileSystemStorage):
    """ 
    File storage that allows overwriting of stored files.
    """ 
        
    def get_available_name(self, name):
        return name
            
    def _save(self, name, content):
        """
        Lifted partially from django/core/files/storage.py
        """ 
        full_path = self.path(name)
            
        directory = os.path.dirname(full_path)
        if not os.path.exists(directory):        
            os.makedirs(directory)
        elif not os.path.isdir(directory):
            raise IOError("%s exists and is not a directory." % directory)
                
        # This file has a file path that we can move.
        if hasattr(content, 'temporary_file_path'):
            temp_data_location = content.temporary_file_path()
        else:   
            tmp_prefix = "tmp_%s" %(get_valid_filename(name), )
            temp_data_location = tempfile.mktemp(prefix=tmp_prefix,
                                                 dir=self.location)
            try:
                # This is a normal uploadedfile that we can stream.
                # This fun binary flag incantation makes os.open throw an
                # OSError if the file already exists before we open it.
                fd = os.open(temp_data_location,
                             os.O_WRONLY | os.O_CREAT |
                             os.O_EXCL | getattr(os, 'O_BINARY', 0))
                locks.lock(fd, locks.LOCK_EX)
                for chunk in content.chunks():
                    os.write(fd, chunk)
                locks.unlock(fd)
                os.close(fd)
            except Exception, e:
                if os.path.exists(temp_data_location):
                    os.remove(temp_data_location)
                raise

        file_move_safe(temp_data_location, full_path)
        content.close()
                
        if settings.FILE_UPLOAD_PERMISSIONS is not None:
            os.chmod(full_path, settings.FILE_UPLOAD_PERMISSIONS)
                
        return name

###
# Tests (to be run with django-nose, although they could easily be adapted to work with unittest)
###

import os
import shutil
import tempfile

from django.core.files.base import ContentFile as C
from django.core.files import File
from django.conf import settings
from nose.tools import assert_equal

from .storage import OverwritingStorage

class TestOverwritingDefaultStorage(object):
    def setup(self):
        self.location = tempfile.mktemp(prefix="overwriting_storage_test")
        self.storage = OverwritingDefaultStorage(location=self.location)

    def teardown(self):
        shutil.rmtree(self.location)

    def test_new_file(self):
        s = self.storage
        assert not s.exists("foo")
        s.save("foo", C("new"))
        assert_equal(s.open("foo").read(), "new")

    def test_overwriting_existing_file_with_string(self):
        s = self.storage
    
        s.save("foo", C("old"))
        name = s.save("foo", C("new"))           
        assert_equal(s.open("foo").read(), "new")
        assert_equal(name, "foo")

    def test_overwrite_with_file(self):
        s = self.storage

        input_file = s.location + "/input_file"
        with open(input_file, "w") as input:
            input.write("new")
        
        s.save("foo", C("old"))
        name = s.save("foo", File(open(input_file)))

        assert_equal(s.open("foo").read(), "new")
        assert_equal(name, "foo")
        
    def test_upload_fails(self):
        s = self.storage
            
        class Explosion(Exception):
            pass
            
        class ExplodingContentFile(C):
            def __init__(self):
                super(ExplodingContentFile, self).__init__("")
        
            def chunks(self):
                yield "bad chunk"
                raise Explosion("explode!")      
            
        s.save("foo", C("old"))
                
        try:    
            s.save("foo", ExplodingContentFile())
            raise Exception("Oh no! ExplodingContentFile didn't explode.")
        except Explosion:    
            pass
                
        assert_equal(s.open("foo").read(), "old")

More like this

  1. Template tag - list punctuation for a list of items by shapiromatron 1 year ago
  2. JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 1 year ago
  3. Serializer factory with Django Rest Framework by julio 1 year, 7 months ago
  4. Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 8 months ago
  5. Help text hyperlinks by sa2812 1 year, 8 months ago

Comments

banks (on November 7, 2013):

On line 58, the call to file_move_safe should include the parameter allow_overwrite=True. This snipped functioned correctly in most cases prior to Django 1.6 because file_move_safe did not correctly check the value of allow_overwrite.

#

Please login first before commenting.