1 | #! -*- coding: utf-8 -*-
|
---|
2 | from __future__ import absolute_import, unicode_literals
|
---|
3 |
|
---|
4 | import base64
|
---|
5 | import errno
|
---|
6 | import hashlib
|
---|
7 | import json
|
---|
8 | import os
|
---|
9 | import shutil
|
---|
10 | import tempfile as sys_tempfile
|
---|
11 |
|
---|
12 | from django.core.files import temp as tempfile
|
---|
13 | from django.core.files.uploadedfile import SimpleUploadedFile
|
---|
14 | from django.http.multipartparser import MultiPartParser
|
---|
15 | from django.test import TestCase, client
|
---|
16 | from django.test.utils import override_settings
|
---|
17 | from django.utils.encoding import force_bytes
|
---|
18 | from django.utils.six import StringIO
|
---|
19 | from django.utils import unittest
|
---|
20 |
|
---|
21 | from . import uploadhandler
|
---|
22 | from .models import FileModel
|
---|
23 |
|
---|
24 |
|
---|
25 | UNICODE_FILENAME = 'test-0123456789_中文_Orléans.jpg'
|
---|
26 | MEDIA_ROOT = sys_tempfile.mkdtemp()
|
---|
27 | UPLOAD_TO = os.path.join(MEDIA_ROOT, 'test_upload')
|
---|
28 |
|
---|
29 | @override_settings(MEDIA_ROOT=MEDIA_ROOT)
|
---|
30 | class FileUploadTests(TestCase):
|
---|
31 | @classmethod
|
---|
32 | def setUpClass(cls):
|
---|
33 | if not os.path.isdir(MEDIA_ROOT):
|
---|
34 | os.makedirs(MEDIA_ROOT)
|
---|
35 |
|
---|
36 | @classmethod
|
---|
37 | def tearDownClass(cls):
|
---|
38 | shutil.rmtree(MEDIA_ROOT)
|
---|
39 |
|
---|
40 | def test_fail_backslash(self):
|
---|
41 | """Tests filename ending with a backslash, issue #18150 reports crashes when a filename ends with a backslash"""
|
---|
42 | backSlashName = "backslash.jpg\\"
|
---|
43 | payload = client.FakePayload()
|
---|
44 | payload.write('\r\n'.join([
|
---|
45 | '--' + client.BOUNDARY,
|
---|
46 | 'Content-Disposition: form-data; name="file1"; filename="%s"' % backSlashName,
|
---|
47 | 'Content-Type: application/octet-stream',
|
---|
48 | '',
|
---|
49 | ''
|
---|
50 | ]))
|
---|
51 | payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
|
---|
52 |
|
---|
53 | r = {
|
---|
54 | 'CONTENT_LENGTH': len(payload),
|
---|
55 | 'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
---|
56 | 'PATH_INFO': "/file_uploads/echo/",
|
---|
57 | 'REQUEST_METHOD': 'POST',
|
---|
58 | 'wsgi.input': payload,
|
---|
59 | }
|
---|
60 | response = self.client.request(**r)
|
---|
61 | self.assertEqual(response.status_code, 200)
|
---|
62 |
|
---|
63 | def test_simple_upload(self):
|
---|
64 | with open(__file__, 'rb') as fp:
|
---|
65 | post_data = {
|
---|
66 | 'name': 'Ringo',
|
---|
67 | 'file_field': fp,
|
---|
68 | }
|
---|
69 | response = self.client.post('/file_uploads/upload/', post_data)
|
---|
70 | self.assertEqual(response.status_code, 200)
|
---|
71 |
|
---|
72 | def test_large_upload(self):
|
---|
73 | tdir = tempfile.gettempdir()
|
---|
74 |
|
---|
75 | file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
|
---|
76 | file1.write(b'a' * (2 ** 21))
|
---|
77 | file1.seek(0)
|
---|
78 |
|
---|
79 | file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
|
---|
80 | file2.write(b'a' * (10 * 2 ** 20))
|
---|
81 | file2.seek(0)
|
---|
82 |
|
---|
83 | post_data = {
|
---|
84 | 'name': 'Ringo',
|
---|
85 | 'file_field1': file1,
|
---|
86 | 'file_field2': file2,
|
---|
87 | }
|
---|
88 |
|
---|
89 | for key in list(post_data):
|
---|
90 | try:
|
---|
91 | post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
|
---|
92 | post_data[key].seek(0)
|
---|
93 | except AttributeError:
|
---|
94 | post_data[key + '_hash'] = hashlib.sha1(force_bytes(post_data[key])).hexdigest()
|
---|
95 |
|
---|
96 | response = self.client.post('/file_uploads/verify/', post_data)
|
---|
97 |
|
---|
98 | self.assertEqual(response.status_code, 200)
|
---|
99 |
|
---|
100 | def _test_base64_upload(self, content):
|
---|
101 | payload = client.FakePayload("\r\n".join([
|
---|
102 | '--' + client.BOUNDARY,
|
---|
103 | 'Content-Disposition: form-data; name="file"; filename="test.txt"',
|
---|
104 | 'Content-Type: application/octet-stream',
|
---|
105 | 'Content-Transfer-Encoding: base64',
|
---|
106 | '',]))
|
---|
107 | payload.write(b"\r\n" + base64.b64encode(force_bytes(content)) + b"\r\n")
|
---|
108 | payload.write('--' + client.BOUNDARY + '--\r\n')
|
---|
109 | r = {
|
---|
110 | 'CONTENT_LENGTH': len(payload),
|
---|
111 | 'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
---|
112 | 'PATH_INFO': "/file_uploads/echo_content/",
|
---|
113 | 'REQUEST_METHOD': 'POST',
|
---|
114 | 'wsgi.input': payload,
|
---|
115 | }
|
---|
116 | response = self.client.request(**r)
|
---|
117 | received = json.loads(response.content.decode('utf-8'))
|
---|
118 |
|
---|
119 | self.assertEqual(received['file'], content)
|
---|
120 |
|
---|
121 | def test_base64_upload(self):
|
---|
122 | self._test_base64_upload("This data will be transmitted base64-encoded.")
|
---|
123 |
|
---|
124 | def test_big_base64_upload(self):
|
---|
125 | self._test_base64_upload("Big data" * 68000) # > 512Kb
|
---|
126 |
|
---|
127 | def test_unicode_file_name(self):
|
---|
128 | tdir = sys_tempfile.mkdtemp()
|
---|
129 | self.addCleanup(shutil.rmtree, tdir, True)
|
---|
130 |
|
---|
131 | # This file contains chinese symbols and an accented char in the name.
|
---|
132 | with open(os.path.join(tdir, UNICODE_FILENAME), 'w+b') as file1:
|
---|
133 | file1.write(b'b' * (2 ** 10))
|
---|
134 | file1.seek(0)
|
---|
135 |
|
---|
136 | post_data = {
|
---|
137 | 'file_unicode': file1,
|
---|
138 | }
|
---|
139 |
|
---|
140 | response = self.client.post('/file_uploads/unicode_name/', post_data)
|
---|
141 |
|
---|
142 | self.assertEqual(response.status_code, 200)
|
---|
143 |
|
---|
144 | def test_dangerous_file_names(self):
|
---|
145 | """Uploaded file names should be sanitized before ever reaching the view."""
|
---|
146 | # This test simulates possible directory traversal attacks by a
|
---|
147 | # malicious uploader We have to do some monkeybusiness here to construct
|
---|
148 | # a malicious payload with an invalid file name (containing os.sep or
|
---|
149 | # os.pardir). This similar to what an attacker would need to do when
|
---|
150 | # trying such an attack.
|
---|
151 | scary_file_names = [
|
---|
152 | "/tmp/hax0rd.txt", # Absolute path, *nix-style.
|
---|
153 | "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle.
|
---|
154 | "C:/Windows/hax0rd.txt", # Absolute path, broken-style.
|
---|
155 | "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way.
|
---|
156 | "/tmp\\hax0rd.txt", # Absolute path, broken by mixing.
|
---|
157 | "subdir/hax0rd.txt", # Descendant path, *nix-style.
|
---|
158 | "subdir\\hax0rd.txt", # Descendant path, win-style.
|
---|
159 | "sub/dir\\hax0rd.txt", # Descendant path, mixed.
|
---|
160 | "../../hax0rd.txt", # Relative path, *nix-style.
|
---|
161 | "..\\..\\hax0rd.txt", # Relative path, win-style.
|
---|
162 | "../..\\hax0rd.txt" # Relative path, mixed.
|
---|
163 | ]
|
---|
164 |
|
---|
165 | payload = client.FakePayload()
|
---|
166 | for i, name in enumerate(scary_file_names):
|
---|
167 | payload.write('\r\n'.join([
|
---|
168 | '--' + client.BOUNDARY,
|
---|
169 | 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
|
---|
170 | 'Content-Type: application/octet-stream',
|
---|
171 | '',
|
---|
172 | 'You got pwnd.\r\n'
|
---|
173 | ]))
|
---|
174 | payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
|
---|
175 |
|
---|
176 | r = {
|
---|
177 | 'CONTENT_LENGTH': len(payload),
|
---|
178 | 'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
---|
179 | 'PATH_INFO': "/file_uploads/echo/",
|
---|
180 | 'REQUEST_METHOD': 'POST',
|
---|
181 | 'wsgi.input': payload,
|
---|
182 | }
|
---|
183 | response = self.client.request(**r)
|
---|
184 |
|
---|
185 | # The filenames should have been sanitized by the time it got to the view.
|
---|
186 | recieved = json.loads(response.content.decode('utf-8'))
|
---|
187 | for i, name in enumerate(scary_file_names):
|
---|
188 | got = recieved["file%s" % i]
|
---|
189 | self.assertEqual(got, "hax0rd.txt")
|
---|
190 |
|
---|
191 | def test_filename_overflow(self):
|
---|
192 | """File names over 256 characters (dangerous on some platforms) get fixed up."""
|
---|
193 | name = "%s.txt" % ("f"*500)
|
---|
194 | payload = client.FakePayload("\r\n".join([
|
---|
195 | '--' + client.BOUNDARY,
|
---|
196 | 'Content-Disposition: form-data; name="file"; filename="%s"' % name,
|
---|
197 | 'Content-Type: application/octet-stream',
|
---|
198 | '',
|
---|
199 | 'Oops.'
|
---|
200 | '--' + client.BOUNDARY + '--',
|
---|
201 | '',
|
---|
202 | ]))
|
---|
203 | r = {
|
---|
204 | 'CONTENT_LENGTH': len(payload),
|
---|
205 | 'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
---|
206 | 'PATH_INFO': "/file_uploads/echo/",
|
---|
207 | 'REQUEST_METHOD': 'POST',
|
---|
208 | 'wsgi.input': payload,
|
---|
209 | }
|
---|
210 | got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
---|
211 | self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
|
---|
212 |
|
---|
213 | def test_truncated_multipart_handled_gracefully(self):
|
---|
214 | """
|
---|
215 | If passed an incomplete multipart message, MultiPartParser does not
|
---|
216 | attempt to read beyond the end of the stream, and simply will handle
|
---|
217 | the part that can be parsed gracefully.
|
---|
218 | """
|
---|
219 | payload_str = "\r\n".join([
|
---|
220 | '--' + client.BOUNDARY,
|
---|
221 | 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
|
---|
222 | 'Content-Type: application/octet-stream',
|
---|
223 | '',
|
---|
224 | 'file contents'
|
---|
225 | '--' + client.BOUNDARY + '--',
|
---|
226 | '',
|
---|
227 | ])
|
---|
228 | payload = client.FakePayload(payload_str[:-10])
|
---|
229 | r = {
|
---|
230 | 'CONTENT_LENGTH': len(payload),
|
---|
231 | 'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
---|
232 | 'PATH_INFO': '/file_uploads/echo/',
|
---|
233 | 'REQUEST_METHOD': 'POST',
|
---|
234 | 'wsgi.input': payload,
|
---|
235 | }
|
---|
236 | got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
---|
237 | self.assertEqual(got, {})
|
---|
238 |
|
---|
239 | def test_empty_multipart_handled_gracefully(self):
|
---|
240 | """
|
---|
241 | If passed an empty multipart message, MultiPartParser will return
|
---|
242 | an empty QueryDict.
|
---|
243 | """
|
---|
244 | r = {
|
---|
245 | 'CONTENT_LENGTH': 0,
|
---|
246 | 'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
---|
247 | 'PATH_INFO': '/file_uploads/echo/',
|
---|
248 | 'REQUEST_METHOD': 'POST',
|
---|
249 | 'wsgi.input': client.FakePayload(b''),
|
---|
250 | }
|
---|
251 | got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
---|
252 | self.assertEqual(got, {})
|
---|
253 |
|
---|
254 | def test_custom_upload_handler(self):
|
---|
255 | # A small file (under the 5M quota)
|
---|
256 | smallfile = tempfile.NamedTemporaryFile()
|
---|
257 | smallfile.write(b'a' * (2 ** 21))
|
---|
258 | smallfile.seek(0)
|
---|
259 |
|
---|
260 | # A big file (over the quota)
|
---|
261 | bigfile = tempfile.NamedTemporaryFile()
|
---|
262 | bigfile.write(b'a' * (10 * 2 ** 20))
|
---|
263 | bigfile.seek(0)
|
---|
264 |
|
---|
265 | # Small file posting should work.
|
---|
266 | response = self.client.post('/file_uploads/quota/', {'f': smallfile})
|
---|
267 | got = json.loads(response.content.decode('utf-8'))
|
---|
268 | self.assertTrue('f' in got)
|
---|
269 |
|
---|
270 | # Large files don't go through.
|
---|
271 | response = self.client.post("/file_uploads/quota/", {'f': bigfile})
|
---|
272 | got = json.loads(response.content.decode('utf-8'))
|
---|
273 | self.assertTrue('f' not in got)
|
---|
274 |
|
---|
275 | def test_broken_custom_upload_handler(self):
|
---|
276 | f = tempfile.NamedTemporaryFile()
|
---|
277 | f.write(b'a' * (2 ** 21))
|
---|
278 | f.seek(0)
|
---|
279 |
|
---|
280 | # AttributeError: You cannot alter upload handlers after the upload has been processed.
|
---|
281 | self.assertRaises(
|
---|
282 | AttributeError,
|
---|
283 | self.client.post,
|
---|
284 | '/file_uploads/quota/broken/',
|
---|
285 | {'f': f}
|
---|
286 | )
|
---|
287 |
|
---|
288 | def test_fileupload_getlist(self):
|
---|
289 | file1 = tempfile.NamedTemporaryFile()
|
---|
290 | file1.write(b'a' * (2 ** 23))
|
---|
291 | file1.seek(0)
|
---|
292 |
|
---|
293 | file2 = tempfile.NamedTemporaryFile()
|
---|
294 | file2.write(b'a' * (2 * 2 ** 18))
|
---|
295 | file2.seek(0)
|
---|
296 |
|
---|
297 | file2a = tempfile.NamedTemporaryFile()
|
---|
298 | file2a.write(b'a' * (5 * 2 ** 20))
|
---|
299 | file2a.seek(0)
|
---|
300 |
|
---|
301 | response = self.client.post('/file_uploads/getlist_count/', {
|
---|
302 | 'file1': file1,
|
---|
303 | 'field1': 'test',
|
---|
304 | 'field2': 'test3',
|
---|
305 | 'field3': 'test5',
|
---|
306 | 'field4': 'test6',
|
---|
307 | 'field5': 'test7',
|
---|
308 | 'file2': (file2, file2a)
|
---|
309 | })
|
---|
310 | got = json.loads(response.content.decode('utf-8'))
|
---|
311 |
|
---|
312 | self.assertEqual(got.get('file1'), 1)
|
---|
313 | self.assertEqual(got.get('file2'), 2)
|
---|
314 |
|
---|
315 | def test_file_error_blocking(self):
|
---|
316 | """
|
---|
317 | The server should not block when there are upload errors (bug #8622).
|
---|
318 | This can happen if something -- i.e. an exception handler -- tries to
|
---|
319 | access POST while handling an error in parsing POST. This shouldn't
|
---|
320 | cause an infinite loop!
|
---|
321 | """
|
---|
322 | class POSTAccessingHandler(client.ClientHandler):
|
---|
323 | """A handler that'll access POST during an exception."""
|
---|
324 | def handle_uncaught_exception(self, request, resolver, exc_info):
|
---|
325 | ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info)
|
---|
326 | p = request.POST
|
---|
327 | return ret
|
---|
328 |
|
---|
329 | # Maybe this is a little more complicated that it needs to be; but if
|
---|
330 | # the django.test.client.FakePayload.read() implementation changes then
|
---|
331 | # this test would fail. So we need to know exactly what kind of error
|
---|
332 | # it raises when there is an attempt to read more than the available bytes:
|
---|
333 | try:
|
---|
334 | client.FakePayload(b'a').read(2)
|
---|
335 | except Exception as err:
|
---|
336 | reference_error = err
|
---|
337 |
|
---|
338 | # install the custom handler that tries to access request.POST
|
---|
339 | self.client.handler = POSTAccessingHandler()
|
---|
340 |
|
---|
341 | with open(__file__, 'rb') as fp:
|
---|
342 | post_data = {
|
---|
343 | 'name': 'Ringo',
|
---|
344 | 'file_field': fp,
|
---|
345 | }
|
---|
346 | try:
|
---|
347 | response = self.client.post('/file_uploads/upload_errors/', post_data)
|
---|
348 | except reference_error.__class__ as err:
|
---|
349 | self.assertFalse(
|
---|
350 | str(err) == str(reference_error),
|
---|
351 | "Caught a repeated exception that'll cause an infinite loop in file uploads."
|
---|
352 | )
|
---|
353 | except Exception as err:
|
---|
354 | # CustomUploadError is the error that should have been raised
|
---|
355 | self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
|
---|
356 |
|
---|
357 | def test_filename_case_preservation(self):
|
---|
358 | """
|
---|
359 | The storage backend shouldn't mess with the case of the filenames
|
---|
360 | uploaded.
|
---|
361 | """
|
---|
362 | # Synthesize the contents of a file upload with a mixed case filename
|
---|
363 | # so we don't have to carry such a file in the Django tests source code
|
---|
364 | # tree.
|
---|
365 | vars = {'boundary': 'oUrBoUnDaRyStRiNg'}
|
---|
366 | post_data = [
|
---|
367 | '--%(boundary)s',
|
---|
368 | 'Content-Disposition: form-data; name="file_field"; '
|
---|
369 | 'filename="MiXeD_cAsE.txt"',
|
---|
370 | 'Content-Type: application/octet-stream',
|
---|
371 | '',
|
---|
372 | 'file contents\n'
|
---|
373 | '',
|
---|
374 | '--%(boundary)s--\r\n',
|
---|
375 | ]
|
---|
376 | response = self.client.post(
|
---|
377 | '/file_uploads/filename_case/',
|
---|
378 | '\r\n'.join(post_data) % vars,
|
---|
379 | 'multipart/form-data; boundary=%(boundary)s' % vars
|
---|
380 | )
|
---|
381 | self.assertEqual(response.status_code, 200)
|
---|
382 | id = int(response.content)
|
---|
383 | obj = FileModel.objects.get(pk=id)
|
---|
384 | # The name of the file uploaded and the file stored in the server-side
|
---|
385 | # shouldn't differ.
|
---|
386 | self.assertEqual(os.path.basename(obj.testfile.path), 'MiXeD_cAsE.txt')
|
---|
387 |
|
---|
388 | @override_settings(MEDIA_ROOT=MEDIA_ROOT)
|
---|
389 | class DirectoryCreationTests(TestCase):
|
---|
390 | """
|
---|
391 | Tests for error handling during directory creation
|
---|
392 | via _save_FIELD_file (ticket #6450)
|
---|
393 | """
|
---|
394 | @classmethod
|
---|
395 | def setUpClass(cls):
|
---|
396 | if not os.path.isdir(MEDIA_ROOT):
|
---|
397 | os.makedirs(MEDIA_ROOT)
|
---|
398 |
|
---|
399 | @classmethod
|
---|
400 | def tearDownClass(cls):
|
---|
401 | shutil.rmtree(MEDIA_ROOT)
|
---|
402 |
|
---|
403 | def setUp(self):
|
---|
404 | self.obj = FileModel()
|
---|
405 |
|
---|
406 | def test_readonly_root(self):
|
---|
407 | """Permission errors are not swallowed"""
|
---|
408 | os.chmod(MEDIA_ROOT, 0o500)
|
---|
409 | self.addCleanup(os.chmod, MEDIA_ROOT, 0o700)
|
---|
410 | try:
|
---|
411 | self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
|
---|
412 | except OSError as err:
|
---|
413 | self.assertEqual(err.errno, errno.EACCES)
|
---|
414 | except Exception:
|
---|
415 | self.fail("OSError [Errno %s] not raised." % errno.EACCES)
|
---|
416 |
|
---|
417 | def test_not_a_directory(self):
|
---|
418 | """The correct IOError is raised when the upload directory name exists but isn't a directory"""
|
---|
419 | # Create a file with the upload directory name
|
---|
420 | open(UPLOAD_TO, 'wb').close()
|
---|
421 | self.addCleanup(os.remove, UPLOAD_TO)
|
---|
422 | with self.assertRaises(IOError) as exc_info:
|
---|
423 | self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
|
---|
424 | # The test needs to be done on a specific string as IOError
|
---|
425 | # is raised even without the patch (just not early enough)
|
---|
426 | self.assertEqual(exc_info.exception.args[0],
|
---|
427 | "%s exists and is not a directory." % UPLOAD_TO)
|
---|
428 |
|
---|
429 |
|
---|
430 | class MultiParserTests(unittest.TestCase):
|
---|
431 |
|
---|
432 | def test_empty_upload_handlers(self):
|
---|
433 | # We're not actually parsing here; just checking if the parser properly
|
---|
434 | # instantiates with empty upload handlers.
|
---|
435 | parser = MultiPartParser({
|
---|
436 | 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
|
---|
437 | 'CONTENT_LENGTH': '1'
|
---|
438 | }, StringIO('x'), [], 'utf-8')
|
---|