Ticket #35513: base.py

File base.py, 4.7 KB (added by Fournet Enzo, 3 months ago)
Line 
1import os
2from io import BytesIO, StringIO, UnsupportedOperation
3
4from django.core.files.utils import FileProxyMixin
5from django.utils.functional import cached_property
6
7
8class File(FileProxyMixin):
9 DEFAULT_CHUNK_SIZE = 64 * 2**10
10
11 def __init__(self, file, name=None):
12 self.file = file
13 if name is None:
14 name = getattr(file, "name", None)
15 self.name = name
16 if hasattr(file, "mode"):
17 self.mode = file.mode
18
19 def __str__(self):
20 return self.name or ""
21
22 def __repr__(self):
23 return "<%s: %s>" % (self.__class__.__name__, self or "None")
24
25 def __bool__(self):
26 return bool(self.name)
27
28 def __len__(self):
29 return self.size
30
31 @cached_property
32 def size(self):
33 if hasattr(self.file, "size"):
34 return self.file.size
35 if hasattr(self.file, "name"):
36 try:
37 return os.path.getsize(self.file.name)
38 except (OSError, TypeError):
39 pass
40 if hasattr(self.file, "tell") and hasattr(self.file, "seek"):
41 pos = self.file.tell()
42 self.file.seek(0, os.SEEK_END)
43 size = self.file.tell()
44 self.file.seek(pos)
45 return size
46 raise AttributeError("Unable to determine the file's size.")
47
48 def chunks(self, chunk_size=None):
49 """
50 Read the file and yield chunks of ``chunk_size`` bytes (defaults to
51 ``File.DEFAULT_CHUNK_SIZE``).
52 """
53 chunk_size = chunk_size or self.DEFAULT_CHUNK_SIZE
54 try:
55 self.seek(0)
56 except (AttributeError, UnsupportedOperation):
57 pass
58
59 while True:
60 data = self.read(chunk_size)
61 if not data:
62 break
63 yield data
64
65 def multiple_chunks(self, chunk_size=None):
66 """
67 Return ``True`` if you can expect multiple chunks.
68
69 NB: If a particular file representation is in memory, subclasses should
70 always return ``False`` -- there's no good reason to read from memory in
71 chunks.
72 """
73 return self.size > (chunk_size or self.DEFAULT_CHUNK_SIZE)
74
75 def __iter__(self):
76 # Iterate over this file-like object by newlines
77 buffer_ = None
78 for chunk in self.chunks():
79 for line in chunk.splitlines(True):
80 if buffer_:
81 if endswith_cr(buffer_) and not equals_lf(line):
82 # Line split after a \r newline; yield buffer_.
83 yield buffer_
84 # Continue with line.
85 else:
86 # Line either split without a newline (line
87 # continues after buffer_) or with \r\n
88 # newline (line == b'\n').
89 line = buffer_ + line
90 # buffer_ handled, clear it.
91 buffer_ = None
92
93 # If this is the end of a \n or \r\n line, yield.
94 if endswith_lf(line):
95 yield line
96 else:
97 buffer_ = line
98
99 if buffer_ is not None:
100 yield buffer_
101
102 def __enter__(self):
103 return self
104
105 def __exit__(self, exc_type, exc_value, tb):
106 self.close()
107
108 def open(self, mode=None, *args, **kwargs):
109 if not self.closed:
110 self.seek(0)
111 elif self.name and os.path.exists(self.name):
112 self.file = open(self.name, mode or self.mode, *args, **kwargs)
113 else:
114 raise ValueError("The file cannot be reopened.")
115 return self
116
117 def close(self):
118 self.file.close()
119
120
121class ContentFile(File):
122 """
123 A File-like object that takes just raw content, rather than an actual file.
124 """
125
126 def __init__(self, content, name=None):
127 stream_class = StringIO if isinstance(content, str) else BytesIO
128 super().__init__(stream_class(content), name=name)
129 self.size = len(content)
130
131 def __str__(self):
132 return "Raw content"
133
134 def __bool__(self):
135 return True
136
137 def open(self, mode=None):
138 self.seek(0)
139 return self
140
141 def close(self):
142 pass
143
144 def write(self, data):
145 self.__dict__.pop("size", None) # Clear the computed size.
146 return self.file.write(data)
147
148
149def endswith_cr(line):
150 """Return True if line (a text or bytestring) ends with '\r'."""
151 return line.endswith("\r" if isinstance(line, str) else b"\r")
152
153
154def endswith_lf(line):
155 """Return True if line (a text or bytestring) ends with '\n'."""
156 return line.endswith("\n" if isinstance(line, str) else b"\n")
157
158
159def equals_lf(line):
160 """Return True if line (a text or bytestring) equals '\n'."""
161 return line == ("\n" if isinstance(line, str) else b"\n")
Back to Top