Ticket #18150: tests.py

File tests.py, 16.9 KB (added by supersteve9219, 11 years ago)
Line 
1#! -*- coding: utf-8 -*-
2from __future__ import absolute_import, unicode_literals
3
4import base64
5import errno
6import hashlib
7import json
8import os
9import shutil
10import tempfile as sys_tempfile
11
12from django.core.files import temp as tempfile
13from django.core.files.uploadedfile import SimpleUploadedFile
14from django.http.multipartparser import MultiPartParser
15from django.test import TestCase, client
16from django.test.utils import override_settings
17from django.utils.encoding import force_bytes
18from django.utils.six import StringIO
19from django.utils import unittest
20
21from . import uploadhandler
22from .models import FileModel
23
24
25UNICODE_FILENAME = 'test-0123456789_中文_Orléans.jpg'
26MEDIA_ROOT = sys_tempfile.mkdtemp()
27UPLOAD_TO = os.path.join(MEDIA_ROOT, 'test_upload')
28
29@override_settings(MEDIA_ROOT=MEDIA_ROOT)
30class FileUploadTests(TestCase):
31 @classmethod
32 def setUpClass(cls):
33 if not os.path.isdir(MEDIA_ROOT):
34 os.makedirs(MEDIA_ROOT)
35
36 @classmethod
37 def tearDownClass(cls):
38 shutil.rmtree(MEDIA_ROOT)
39
40 def test_fail_backslash(self):
41 """Tests filename ending with a backslash, issue #18150 reports crashes when a filename ends with a backslash"""
42 backSlashName = "backslash.jpg\\"
43 payload = client.FakePayload()
44 payload.write('\r\n'.join([
45 '--' + client.BOUNDARY,
46 'Content-Disposition: form-data; name="file1"; filename="%s"' % backSlashName,
47 'Content-Type: application/octet-stream',
48 '',
49 ''
50 ]))
51 payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
52
53 r = {
54 'CONTENT_LENGTH': len(payload),
55 'CONTENT_TYPE': client.MULTIPART_CONTENT,
56 'PATH_INFO': "/file_uploads/echo/",
57 'REQUEST_METHOD': 'POST',
58 'wsgi.input': payload,
59 }
60 response = self.client.request(**r)
61 self.assertEqual(response.status_code, 200)
62
63 def test_simple_upload(self):
64 with open(__file__, 'rb') as fp:
65 post_data = {
66 'name': 'Ringo',
67 'file_field': fp,
68 }
69 response = self.client.post('/file_uploads/upload/', post_data)
70 self.assertEqual(response.status_code, 200)
71
72 def test_large_upload(self):
73 tdir = tempfile.gettempdir()
74
75 file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
76 file1.write(b'a' * (2 ** 21))
77 file1.seek(0)
78
79 file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
80 file2.write(b'a' * (10 * 2 ** 20))
81 file2.seek(0)
82
83 post_data = {
84 'name': 'Ringo',
85 'file_field1': file1,
86 'file_field2': file2,
87 }
88
89 for key in list(post_data):
90 try:
91 post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
92 post_data[key].seek(0)
93 except AttributeError:
94 post_data[key + '_hash'] = hashlib.sha1(force_bytes(post_data[key])).hexdigest()
95
96 response = self.client.post('/file_uploads/verify/', post_data)
97
98 self.assertEqual(response.status_code, 200)
99
100 def _test_base64_upload(self, content):
101 payload = client.FakePayload("\r\n".join([
102 '--' + client.BOUNDARY,
103 'Content-Disposition: form-data; name="file"; filename="test.txt"',
104 'Content-Type: application/octet-stream',
105 'Content-Transfer-Encoding: base64',
106 '',]))
107 payload.write(b"\r\n" + base64.b64encode(force_bytes(content)) + b"\r\n")
108 payload.write('--' + client.BOUNDARY + '--\r\n')
109 r = {
110 'CONTENT_LENGTH': len(payload),
111 'CONTENT_TYPE': client.MULTIPART_CONTENT,
112 'PATH_INFO': "/file_uploads/echo_content/",
113 'REQUEST_METHOD': 'POST',
114 'wsgi.input': payload,
115 }
116 response = self.client.request(**r)
117 received = json.loads(response.content.decode('utf-8'))
118
119 self.assertEqual(received['file'], content)
120
121 def test_base64_upload(self):
122 self._test_base64_upload("This data will be transmitted base64-encoded.")
123
124 def test_big_base64_upload(self):
125 self._test_base64_upload("Big data" * 68000) # > 512Kb
126
127 def test_unicode_file_name(self):
128 tdir = sys_tempfile.mkdtemp()
129 self.addCleanup(shutil.rmtree, tdir, True)
130
131 # This file contains chinese symbols and an accented char in the name.
132 with open(os.path.join(tdir, UNICODE_FILENAME), 'w+b') as file1:
133 file1.write(b'b' * (2 ** 10))
134 file1.seek(0)
135
136 post_data = {
137 'file_unicode': file1,
138 }
139
140 response = self.client.post('/file_uploads/unicode_name/', post_data)
141
142 self.assertEqual(response.status_code, 200)
143
144 def test_dangerous_file_names(self):
145 """Uploaded file names should be sanitized before ever reaching the view."""
146 # This test simulates possible directory traversal attacks by a
147 # malicious uploader We have to do some monkeybusiness here to construct
148 # a malicious payload with an invalid file name (containing os.sep or
149 # os.pardir). This similar to what an attacker would need to do when
150 # trying such an attack.
151 scary_file_names = [
152 "/tmp/hax0rd.txt", # Absolute path, *nix-style.
153 "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle.
154 "C:/Windows/hax0rd.txt", # Absolute path, broken-style.
155 "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way.
156 "/tmp\\hax0rd.txt", # Absolute path, broken by mixing.
157 "subdir/hax0rd.txt", # Descendant path, *nix-style.
158 "subdir\\hax0rd.txt", # Descendant path, win-style.
159 "sub/dir\\hax0rd.txt", # Descendant path, mixed.
160 "../../hax0rd.txt", # Relative path, *nix-style.
161 "..\\..\\hax0rd.txt", # Relative path, win-style.
162 "../..\\hax0rd.txt" # Relative path, mixed.
163 ]
164
165 payload = client.FakePayload()
166 for i, name in enumerate(scary_file_names):
167 payload.write('\r\n'.join([
168 '--' + client.BOUNDARY,
169 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
170 'Content-Type: application/octet-stream',
171 '',
172 'You got pwnd.\r\n'
173 ]))
174 payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
175
176 r = {
177 'CONTENT_LENGTH': len(payload),
178 'CONTENT_TYPE': client.MULTIPART_CONTENT,
179 'PATH_INFO': "/file_uploads/echo/",
180 'REQUEST_METHOD': 'POST',
181 'wsgi.input': payload,
182 }
183 response = self.client.request(**r)
184
185 # The filenames should have been sanitized by the time it got to the view.
186 recieved = json.loads(response.content.decode('utf-8'))
187 for i, name in enumerate(scary_file_names):
188 got = recieved["file%s" % i]
189 self.assertEqual(got, "hax0rd.txt")
190
191 def test_filename_overflow(self):
192 """File names over 256 characters (dangerous on some platforms) get fixed up."""
193 name = "%s.txt" % ("f"*500)
194 payload = client.FakePayload("\r\n".join([
195 '--' + client.BOUNDARY,
196 'Content-Disposition: form-data; name="file"; filename="%s"' % name,
197 'Content-Type: application/octet-stream',
198 '',
199 'Oops.'
200 '--' + client.BOUNDARY + '--',
201 '',
202 ]))
203 r = {
204 'CONTENT_LENGTH': len(payload),
205 'CONTENT_TYPE': client.MULTIPART_CONTENT,
206 'PATH_INFO': "/file_uploads/echo/",
207 'REQUEST_METHOD': 'POST',
208 'wsgi.input': payload,
209 }
210 got = json.loads(self.client.request(**r).content.decode('utf-8'))
211 self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
212
213 def test_truncated_multipart_handled_gracefully(self):
214 """
215 If passed an incomplete multipart message, MultiPartParser does not
216 attempt to read beyond the end of the stream, and simply will handle
217 the part that can be parsed gracefully.
218 """
219 payload_str = "\r\n".join([
220 '--' + client.BOUNDARY,
221 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
222 'Content-Type: application/octet-stream',
223 '',
224 'file contents'
225 '--' + client.BOUNDARY + '--',
226 '',
227 ])
228 payload = client.FakePayload(payload_str[:-10])
229 r = {
230 'CONTENT_LENGTH': len(payload),
231 'CONTENT_TYPE': client.MULTIPART_CONTENT,
232 'PATH_INFO': '/file_uploads/echo/',
233 'REQUEST_METHOD': 'POST',
234 'wsgi.input': payload,
235 }
236 got = json.loads(self.client.request(**r).content.decode('utf-8'))
237 self.assertEqual(got, {})
238
239 def test_empty_multipart_handled_gracefully(self):
240 """
241 If passed an empty multipart message, MultiPartParser will return
242 an empty QueryDict.
243 """
244 r = {
245 'CONTENT_LENGTH': 0,
246 'CONTENT_TYPE': client.MULTIPART_CONTENT,
247 'PATH_INFO': '/file_uploads/echo/',
248 'REQUEST_METHOD': 'POST',
249 'wsgi.input': client.FakePayload(b''),
250 }
251 got = json.loads(self.client.request(**r).content.decode('utf-8'))
252 self.assertEqual(got, {})
253
254 def test_custom_upload_handler(self):
255 # A small file (under the 5M quota)
256 smallfile = tempfile.NamedTemporaryFile()
257 smallfile.write(b'a' * (2 ** 21))
258 smallfile.seek(0)
259
260 # A big file (over the quota)
261 bigfile = tempfile.NamedTemporaryFile()
262 bigfile.write(b'a' * (10 * 2 ** 20))
263 bigfile.seek(0)
264
265 # Small file posting should work.
266 response = self.client.post('/file_uploads/quota/', {'f': smallfile})
267 got = json.loads(response.content.decode('utf-8'))
268 self.assertTrue('f' in got)
269
270 # Large files don't go through.
271 response = self.client.post("/file_uploads/quota/", {'f': bigfile})
272 got = json.loads(response.content.decode('utf-8'))
273 self.assertTrue('f' not in got)
274
275 def test_broken_custom_upload_handler(self):
276 f = tempfile.NamedTemporaryFile()
277 f.write(b'a' * (2 ** 21))
278 f.seek(0)
279
280 # AttributeError: You cannot alter upload handlers after the upload has been processed.
281 self.assertRaises(
282 AttributeError,
283 self.client.post,
284 '/file_uploads/quota/broken/',
285 {'f': f}
286 )
287
288 def test_fileupload_getlist(self):
289 file1 = tempfile.NamedTemporaryFile()
290 file1.write(b'a' * (2 ** 23))
291 file1.seek(0)
292
293 file2 = tempfile.NamedTemporaryFile()
294 file2.write(b'a' * (2 * 2 ** 18))
295 file2.seek(0)
296
297 file2a = tempfile.NamedTemporaryFile()
298 file2a.write(b'a' * (5 * 2 ** 20))
299 file2a.seek(0)
300
301 response = self.client.post('/file_uploads/getlist_count/', {
302 'file1': file1,
303 'field1': 'test',
304 'field2': 'test3',
305 'field3': 'test5',
306 'field4': 'test6',
307 'field5': 'test7',
308 'file2': (file2, file2a)
309 })
310 got = json.loads(response.content.decode('utf-8'))
311
312 self.assertEqual(got.get('file1'), 1)
313 self.assertEqual(got.get('file2'), 2)
314
315 def test_file_error_blocking(self):
316 """
317 The server should not block when there are upload errors (bug #8622).
318 This can happen if something -- i.e. an exception handler -- tries to
319 access POST while handling an error in parsing POST. This shouldn't
320 cause an infinite loop!
321 """
322 class POSTAccessingHandler(client.ClientHandler):
323 """A handler that'll access POST during an exception."""
324 def handle_uncaught_exception(self, request, resolver, exc_info):
325 ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info)
326 p = request.POST
327 return ret
328
329 # Maybe this is a little more complicated that it needs to be; but if
330 # the django.test.client.FakePayload.read() implementation changes then
331 # this test would fail. So we need to know exactly what kind of error
332 # it raises when there is an attempt to read more than the available bytes:
333 try:
334 client.FakePayload(b'a').read(2)
335 except Exception as err:
336 reference_error = err
337
338 # install the custom handler that tries to access request.POST
339 self.client.handler = POSTAccessingHandler()
340
341 with open(__file__, 'rb') as fp:
342 post_data = {
343 'name': 'Ringo',
344 'file_field': fp,
345 }
346 try:
347 response = self.client.post('/file_uploads/upload_errors/', post_data)
348 except reference_error.__class__ as err:
349 self.assertFalse(
350 str(err) == str(reference_error),
351 "Caught a repeated exception that'll cause an infinite loop in file uploads."
352 )
353 except Exception as err:
354 # CustomUploadError is the error that should have been raised
355 self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
356
357 def test_filename_case_preservation(self):
358 """
359 The storage backend shouldn't mess with the case of the filenames
360 uploaded.
361 """
362 # Synthesize the contents of a file upload with a mixed case filename
363 # so we don't have to carry such a file in the Django tests source code
364 # tree.
365 vars = {'boundary': 'oUrBoUnDaRyStRiNg'}
366 post_data = [
367 '--%(boundary)s',
368 'Content-Disposition: form-data; name="file_field"; '
369 'filename="MiXeD_cAsE.txt"',
370 'Content-Type: application/octet-stream',
371 '',
372 'file contents\n'
373 '',
374 '--%(boundary)s--\r\n',
375 ]
376 response = self.client.post(
377 '/file_uploads/filename_case/',
378 '\r\n'.join(post_data) % vars,
379 'multipart/form-data; boundary=%(boundary)s' % vars
380 )
381 self.assertEqual(response.status_code, 200)
382 id = int(response.content)
383 obj = FileModel.objects.get(pk=id)
384 # The name of the file uploaded and the file stored in the server-side
385 # shouldn't differ.
386 self.assertEqual(os.path.basename(obj.testfile.path), 'MiXeD_cAsE.txt')
387
388@override_settings(MEDIA_ROOT=MEDIA_ROOT)
389class DirectoryCreationTests(TestCase):
390 """
391 Tests for error handling during directory creation
392 via _save_FIELD_file (ticket #6450)
393 """
394 @classmethod
395 def setUpClass(cls):
396 if not os.path.isdir(MEDIA_ROOT):
397 os.makedirs(MEDIA_ROOT)
398
399 @classmethod
400 def tearDownClass(cls):
401 shutil.rmtree(MEDIA_ROOT)
402
403 def setUp(self):
404 self.obj = FileModel()
405
406 def test_readonly_root(self):
407 """Permission errors are not swallowed"""
408 os.chmod(MEDIA_ROOT, 0o500)
409 self.addCleanup(os.chmod, MEDIA_ROOT, 0o700)
410 try:
411 self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
412 except OSError as err:
413 self.assertEqual(err.errno, errno.EACCES)
414 except Exception:
415 self.fail("OSError [Errno %s] not raised." % errno.EACCES)
416
417 def test_not_a_directory(self):
418 """The correct IOError is raised when the upload directory name exists but isn't a directory"""
419 # Create a file with the upload directory name
420 open(UPLOAD_TO, 'wb').close()
421 self.addCleanup(os.remove, UPLOAD_TO)
422 with self.assertRaises(IOError) as exc_info:
423 self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
424 # The test needs to be done on a specific string as IOError
425 # is raised even without the patch (just not early enough)
426 self.assertEqual(exc_info.exception.args[0],
427 "%s exists and is not a directory." % UPLOAD_TO)
428
429
430class MultiParserTests(unittest.TestCase):
431
432 def test_empty_upload_handlers(self):
433 # We're not actually parsing here; just checking if the parser properly
434 # instantiates with empty upload handlers.
435 parser = MultiPartParser({
436 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
437 'CONTENT_LENGTH': '1'
438 }, StringIO('x'), [], 'utf-8')
Back to Top