Module 15Watt_Wsgi.multipart
Parser for multipart/form-data
This module provides a parser for the multipart/form-data format. It can read from a file, a socket or a WSGI environment. The parser can be used to replace cgi.FieldStorage to work around its limitations.
Expand source code
# -*- coding: utf-8 -*-
"""
Parser for multipart/form-data
------------------------------
This module provides a parser for the multipart/form-data format. It can read
from a file, a socket or a WSGI environment. The parser can be used to replace
cgi.FieldStorage to work around its limitations.
"""
__author__ = "Marcel Hellkamp"
__version__ = "0.2.4"
__license__ = "MIT"
__all__ = ["MultipartError", "MultipartParser", "MultipartPart", "parse_form_data"]
import re
import sys
from io import BytesIO
from tempfile import TemporaryFile
from urllib.parse import parse_qs
from wsgiref.headers import Headers
from collections.abc import MutableMapping as DictMixin
##############################################################################
################################ Helper & Misc ###############################
##############################################################################
# Some of these were copied from bottle: https://bottlepy.org
# ---------
# MultiDict
# ---------
class MultiDict(DictMixin):
""" A dict that remembers old values for each key.
HTTP headers may repeat with differing values,
such as Set-Cookie. We need to remember all
values.
"""
def __init__(self, *args, **kwargs):
self.dict = dict()
for k, v in dict(*args, **kwargs).items():
self[k] = v
def __len__(self):
return len(self.dict)
def __iter__(self):
return iter(self.dict)
def __contains__(self, key):
return key in self.dict
def __delitem__(self, key):
del self.dict[key]
def keys(self):
return self.dict.keys()
def __getitem__(self, key):
return self.get(key, KeyError, -1)
def __setitem__(self, key, value):
self.append(key, value)
def append(self, key, value):
self.dict.setdefault(key, []).append(value)
def replace(self, key, value):
self.dict[key] = [value]
def getall(self, key):
return self.dict.get(key) or []
def get(self, key, default=None, index=-1):
if key not in self.dict and default != KeyError:
return [default][index]
return self.dict[key][index]
def iterallitems(self):
for key, values in self.dict.items():
for value in values:
yield key, value
def to_bytes(data, enc="utf8"):
if isinstance(data, str):
data = data.encode(enc)
return data
def copy_file(stream, target, maxread=-1, buffer_size=2 ** 16):
""" Read from :stream and write to :target until :maxread or EOF. """
size, read = 0, stream.read
while True:
to_read = buffer_size if maxread < 0 else min(buffer_size, maxread - size)
part = read(to_read)
if not part:
return size
target.write(part)
size += len(part)
# -------------
# Header Parser
# -------------
_special = re.escape('()<>@,;:"\\/[]?={} \t')
_re_special = re.compile(r'[%s]' % _special)
_quoted_string = r'"(?:\\.|[^"])*"' # Quoted string
_value = r'(?:[^%s]+|%s)' % (_special, _quoted_string) # Save or quoted string
_option = r'(?:;|^)\s*([^%s]+)\s*=\s*(%s)' % (_special, _value)
_re_option = re.compile(_option) # key=value part of an Content-Type like header
def header_quote(val):
if not _re_special.search(val):
return val
return '"' + val.replace("\\", "\\\\").replace('"', '\\"') + '"'
def header_unquote(val, filename=False):
if val[0] == val[-1] == '"':
val = val[1:-1]
if val[1:3] == ":\\" or val[:2] == "\\\\":
val = val.split("\\")[-1] # fix ie6 bug: full path --> filename
return val.replace("\\\\", "\\").replace('\\"', '"')
return val
def parse_options_header(header, options=None):
if ";" not in header:
return header.lower().strip(), {}
content_type, tail = header.split(";", 1)
options = options or {}
for match in _re_option.finditer(tail):
key = match.group(1).lower()
value = header_unquote(match.group(2), key == "filename")
options[key] = value
return content_type, options
##############################################################################
################################## Multipart #################################
##############################################################################
class MultipartError(ValueError):
pass
class MultipartParser(object):
def __init__(
self,
stream,
boundary,
content_length=-1,
disk_limit=2 ** 30,
mem_limit=2 ** 20,
memfile_limit=2 ** 18,
buffer_size=2 ** 16,
charset="latin1",
):
""" Parse a multipart/form-data byte stream. This object is an iterator
over the parts of the message.
:param stream: A file-like stream. Must implement ``.read(size)``.
:param boundary: The multipart boundary as a byte string.
:param content_length: The maximum number of bytes to read.
"""
self.stream = stream
self.boundary = boundary
self.content_length = content_length
self.disk_limit = disk_limit
self.memfile_limit = memfile_limit
self.mem_limit = min(mem_limit, self.disk_limit)
self.buffer_size = min(buffer_size, self.mem_limit)
self.charset = charset
if self.buffer_size - 6 < len(boundary): # "--boundary--\r\n"
raise MultipartError("Boundary does not fit into buffer_size.")
self._done = []
self._part_iter = None
def __iter__(self):
""" Iterate over the parts of the multipart message. """
if not self._part_iter:
self._part_iter = self._iterparse()
for part in self._done:
yield part
for part in self._part_iter:
self._done.append(part)
yield part
def parts(self):
""" Returns a list with all parts of the multipart message. """
return list(self)
def get(self, name, default=None):
""" Return the first part with that name or a default value (None). """
for part in self:
if name == part.name:
return part
return default
def get_all(self, name):
""" Return a list of parts with that name. """
return [p for p in self if p.name == name]
def _lineiter(self):
""" Iterate over a binary file-like object line by line. Each line is
returned as a (line, line_ending) tuple. If the line does not fit
into self.buffer_size, line_ending is empty and the rest of the line
is returned with the next iteration.
"""
read = self.stream.read
maxread, maxbuf = self.content_length, self.buffer_size
buffer = b"" # buffer for the last (partial) line
while True:
data = read(maxbuf if maxread < 0 else min(maxbuf, maxread))
maxread -= len(data)
lines = (buffer + data).splitlines(True)
len_first_line = len(lines[0])
# be sure that the first line does not become too big
if len_first_line > self.buffer_size:
# at the same time don't split a '\r\n' accidentally
if len_first_line == self.buffer_size + 1 and lines[0].endswith(b"\r\n"):
splitpos = self.buffer_size - 1
else:
splitpos = self.buffer_size
lines[:1] = [lines[0][:splitpos], lines[0][splitpos:]]
if data:
buffer = lines[-1]
lines = lines[:-1]
for line in lines:
if line.endswith(b"\r\n"):
yield line[:-2], b"\r\n"
elif line.endswith(b"\n"):
yield line[:-1], b"\n"
elif line.endswith(b"\r"):
yield line[:-1], b"\r"
else:
yield line, b""
if not data:
break
def _iterparse(self):
lines, line = self._lineiter(), ""
separator = b"--" + to_bytes(self.boundary)
terminator = b"--" + to_bytes(self.boundary) + b"--"
# Consume first boundary. Ignore any preamble, as required by RFC
# 2046, section 5.1.1.
for line, nl in lines:
if line in (separator, terminator):
break
else:
raise MultipartError("Stream does not contain boundary")
# Check for empty data
if line == terminator:
for _ in lines:
raise MultipartError("Data after end of stream")
return
# For each part in stream...
mem_used, disk_used = 0, 0 # Track used resources to prevent DoS
is_tail = False # True if the last line was incomplete (cutted)
opts = {
"buffer_size": self.buffer_size,
"memfile_limit": self.memfile_limit,
"charset": self.charset,
}
part = MultipartPart(**opts)
for line, nl in lines:
if line == terminator and not is_tail:
part.file.seek(0)
yield part
break
elif line == separator and not is_tail:
if part.is_buffered():
mem_used += part.size
else:
disk_used += part.size
part.file.seek(0)
yield part
part = MultipartPart(**opts)
else:
is_tail = not nl # The next line continues this one
try:
part.feed(line, nl)
if part.is_buffered():
if part.size + mem_used > self.mem_limit:
raise MultipartError("Memory limit reached.")
elif part.size + disk_used > self.disk_limit:
raise MultipartError("Disk limit reached.")
except MultipartError:
part.close()
raise
else:
# If we run off the end of the loop, the current MultipartPart
# will not have been yielded, so it's our responsibility to
# close it.
part.close()
if line != terminator:
raise MultipartError("Unexpected end of multipart stream.")
class MultipartPart(object):
def __init__(self, buffer_size=2 ** 16, memfile_limit=2 ** 18, charset="latin1"):
self.headerlist = []
self.headers = None
self.file = False
self.size = 0
self._buf = b""
self.disposition = None
self.name = None
self.filename = None
self.content_type = None
self.charset = charset
self.memfile_limit = memfile_limit
self.buffer_size = buffer_size
def feed(self, line, nl=""):
if self.file:
return self.write_body(line, nl)
return self.write_header(line, nl)
def write_header(self, line, nl):
line = line.decode(self.charset)
if not nl:
raise MultipartError("Unexpected end of line in header.")
if not line.strip(): # blank line -> end of header segment
self.finish_header()
elif line[0] in " \t" and self.headerlist:
name, value = self.headerlist.pop()
self.headerlist.append((name, value + line.strip()))
else:
if ":" not in line:
raise MultipartError("Syntax error in header: No colon.")
name, value = line.split(":", 1)
self.headerlist.append((name.strip(), value.strip()))
def write_body(self, line, nl):
if not line and not nl:
return # This does not even flush the buffer
self.size += len(line) + len(self._buf)
self.file.write(self._buf + line)
self._buf = nl
if self.content_length > 0 and self.size > self.content_length:
raise MultipartError("Size of body exceeds Content-Length header.")
if self.size > self.memfile_limit and isinstance(self.file, BytesIO):
# TODO: What about non-file uploads that exceed the memfile_limit?
self.file, old = TemporaryFile(mode="w+b"), self.file
old.seek(0)
copy_file(old, self.file, self.size, self.buffer_size)
def finish_header(self):
self.file = BytesIO()
self.headers = Headers(self.headerlist)
content_disposition = self.headers.get("Content-Disposition", "")
content_type = self.headers.get("Content-Type", "")
if not content_disposition:
raise MultipartError("Content-Disposition header is missing.")
self.disposition, self.options = parse_options_header(content_disposition)
self.name = self.options.get("name")
self.filename = self.options.get("filename")
self.content_type, options = parse_options_header(content_type)
self.charset = options.get("charset") or self.charset
self.content_length = int(self.headers.get("Content-Length", "-1"))
def is_buffered(self):
""" Return true if the data is fully buffered in memory."""
return isinstance(self.file, BytesIO)
@property
def value(self):
""" Data decoded with the specified charset """
return self.raw.decode(self.charset)
@property
def raw(self):
""" Data without decoding """
pos = self.file.tell()
self.file.seek(0)
try:
val = self.file.read()
except IOError:
raise
finally:
self.file.seek(pos)
return val
def save_as(self, path):
with open(path, "wb") as fp:
pos = self.file.tell()
try:
self.file.seek(0)
size = copy_file(self.file, fp)
finally:
self.file.seek(pos)
return size
def close(self):
if self.file:
self.file.close()
self.file = False
##############################################################################
#################################### WSGI ####################################
##############################################################################
def parse_form_data(environ, charset="utf8", strict=False, **kwargs):
""" Parse form data from an environ dict and return a (forms, files) tuple.
Both tuple values are dictionaries with the form-field name as a key
(unicode) and lists as values (multiple values per key are possible).
The forms-dictionary contains form-field values as unicode strings.
The files-dictionary contains :class:`MultipartPart` instances, either
because the form-field was a file-upload or the value is too big to fit
into memory limits.
:param environ: An WSGI environment dict.
:param charset: The charset to use if unsure. (default: utf8)
:param strict: If True, raise :exc:`MultipartError` on any parsing
errors. These are silently ignored by default.
"""
forms, files = MultiDict(), MultiDict()
try:
if environ.get("REQUEST_METHOD", "GET").upper() not in ("POST", "PUT"):
raise MultipartError("Request method other than POST or PUT.")
content_length = int(environ.get("CONTENT_LENGTH", "-1"))
content_type = environ.get("CONTENT_TYPE", "")
if not content_type:
raise MultipartError("Missing Content-Type header.")
content_type, options = parse_options_header(content_type)
stream = environ.get("wsgi.input") or BytesIO()
kwargs["charset"] = charset = options.get("charset", charset)
if content_type == "multipart/form-data":
boundary = options.get("boundary", "")
if not boundary:
raise MultipartError("No boundary for multipart/form-data.")
for part in MultipartParser(stream, boundary, content_length, **kwargs):
if part.filename or not part.is_buffered():
files[part.name] = part
else: # TODO: Big form-fields are in the files dict. really?
forms[part.name] = part.value
elif content_type in (
"application/x-www-form-urlencoded",
"application/x-url-encoded",
):
mem_limit = kwargs.get("mem_limit", 2 ** 20)
if content_length > mem_limit:
raise MultipartError("Request too big. Increase MAXMEM.")
data = stream.read(mem_limit).decode(charset)
if stream.read(1): # These is more that does not fit mem_limit
raise MultipartError("Request too big. Increase MAXMEM.")
data = parse_qs(data, keep_blank_values=True, encoding=charset)
for key, values in data.items():
for value in values:
forms[key] = value
else:
raise MultipartError("Unsupported content type.")
except MultipartError:
if strict:
for part in files.values():
part.close()
raise
return forms, files
Functions
def parse_form_data(environ, charset='utf8', strict=False, **kwargs)
-
Parse form data from an environ dict and return a (forms, files) tuple. Both tuple values are dictionaries with the form-field name as a key (unicode) and lists as values (multiple values per key are possible). The forms-dictionary contains form-field values as unicode strings. The files-dictionary contains :class:
MultipartPart
instances, either because the form-field was a file-upload or the value is too big to fit into memory limits.:param environ: An WSGI environment dict. :param charset: The charset to use if unsure. (default: utf8) :param strict: If True, raise :exc:
MultipartError
on any parsing errors. These are silently ignored by default.Expand source code
def parse_form_data(environ, charset="utf8", strict=False, **kwargs): """ Parse form data from an environ dict and return a (forms, files) tuple. Both tuple values are dictionaries with the form-field name as a key (unicode) and lists as values (multiple values per key are possible). The forms-dictionary contains form-field values as unicode strings. The files-dictionary contains :class:`MultipartPart` instances, either because the form-field was a file-upload or the value is too big to fit into memory limits. :param environ: An WSGI environment dict. :param charset: The charset to use if unsure. (default: utf8) :param strict: If True, raise :exc:`MultipartError` on any parsing errors. These are silently ignored by default. """ forms, files = MultiDict(), MultiDict() try: if environ.get("REQUEST_METHOD", "GET").upper() not in ("POST", "PUT"): raise MultipartError("Request method other than POST or PUT.") content_length = int(environ.get("CONTENT_LENGTH", "-1")) content_type = environ.get("CONTENT_TYPE", "") if not content_type: raise MultipartError("Missing Content-Type header.") content_type, options = parse_options_header(content_type) stream = environ.get("wsgi.input") or BytesIO() kwargs["charset"] = charset = options.get("charset", charset) if content_type == "multipart/form-data": boundary = options.get("boundary", "") if not boundary: raise MultipartError("No boundary for multipart/form-data.") for part in MultipartParser(stream, boundary, content_length, **kwargs): if part.filename or not part.is_buffered(): files[part.name] = part else: # TODO: Big form-fields are in the files dict. really? forms[part.name] = part.value elif content_type in ( "application/x-www-form-urlencoded", "application/x-url-encoded", ): mem_limit = kwargs.get("mem_limit", 2 ** 20) if content_length > mem_limit: raise MultipartError("Request too big. Increase MAXMEM.") data = stream.read(mem_limit).decode(charset) if stream.read(1): # These is more that does not fit mem_limit raise MultipartError("Request too big. Increase MAXMEM.") data = parse_qs(data, keep_blank_values=True, encoding=charset) for key, values in data.items(): for value in values: forms[key] = value else: raise MultipartError("Unsupported content type.") except MultipartError: if strict: for part in files.values(): part.close() raise return forms, files
Classes
class MultipartError (*args, **kwargs)
-
Inappropriate argument value (of correct type).
Expand source code
class MultipartError(ValueError): pass
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class MultipartParser (stream, boundary, content_length=-1, disk_limit=1073741824, mem_limit=1048576, memfile_limit=262144, buffer_size=65536, charset='latin1')
-
Parse a multipart/form-data byte stream. This object is an iterator over the parts of the message.
:param stream: A file-like stream. Must implement
.read(size)
. :param boundary: The multipart boundary as a byte string. :param content_length: The maximum number of bytes to read.Expand source code
class MultipartParser(object): def __init__( self, stream, boundary, content_length=-1, disk_limit=2 ** 30, mem_limit=2 ** 20, memfile_limit=2 ** 18, buffer_size=2 ** 16, charset="latin1", ): """ Parse a multipart/form-data byte stream. This object is an iterator over the parts of the message. :param stream: A file-like stream. Must implement ``.read(size)``. :param boundary: The multipart boundary as a byte string. :param content_length: The maximum number of bytes to read. """ self.stream = stream self.boundary = boundary self.content_length = content_length self.disk_limit = disk_limit self.memfile_limit = memfile_limit self.mem_limit = min(mem_limit, self.disk_limit) self.buffer_size = min(buffer_size, self.mem_limit) self.charset = charset if self.buffer_size - 6 < len(boundary): # "--boundary--\r\n" raise MultipartError("Boundary does not fit into buffer_size.") self._done = [] self._part_iter = None def __iter__(self): """ Iterate over the parts of the multipart message. """ if not self._part_iter: self._part_iter = self._iterparse() for part in self._done: yield part for part in self._part_iter: self._done.append(part) yield part def parts(self): """ Returns a list with all parts of the multipart message. """ return list(self) def get(self, name, default=None): """ Return the first part with that name or a default value (None). """ for part in self: if name == part.name: return part return default def get_all(self, name): """ Return a list of parts with that name. """ return [p for p in self if p.name == name] def _lineiter(self): """ Iterate over a binary file-like object line by line. Each line is returned as a (line, line_ending) tuple. If the line does not fit into self.buffer_size, line_ending is empty and the rest of the line is returned with the next iteration. """ read = self.stream.read maxread, maxbuf = self.content_length, self.buffer_size buffer = b"" # buffer for the last (partial) line while True: data = read(maxbuf if maxread < 0 else min(maxbuf, maxread)) maxread -= len(data) lines = (buffer + data).splitlines(True) len_first_line = len(lines[0]) # be sure that the first line does not become too big if len_first_line > self.buffer_size: # at the same time don't split a '\r\n' accidentally if len_first_line == self.buffer_size + 1 and lines[0].endswith(b"\r\n"): splitpos = self.buffer_size - 1 else: splitpos = self.buffer_size lines[:1] = [lines[0][:splitpos], lines[0][splitpos:]] if data: buffer = lines[-1] lines = lines[:-1] for line in lines: if line.endswith(b"\r\n"): yield line[:-2], b"\r\n" elif line.endswith(b"\n"): yield line[:-1], b"\n" elif line.endswith(b"\r"): yield line[:-1], b"\r" else: yield line, b"" if not data: break def _iterparse(self): lines, line = self._lineiter(), "" separator = b"--" + to_bytes(self.boundary) terminator = b"--" + to_bytes(self.boundary) + b"--" # Consume first boundary. Ignore any preamble, as required by RFC # 2046, section 5.1.1. for line, nl in lines: if line in (separator, terminator): break else: raise MultipartError("Stream does not contain boundary") # Check for empty data if line == terminator: for _ in lines: raise MultipartError("Data after end of stream") return # For each part in stream... mem_used, disk_used = 0, 0 # Track used resources to prevent DoS is_tail = False # True if the last line was incomplete (cutted) opts = { "buffer_size": self.buffer_size, "memfile_limit": self.memfile_limit, "charset": self.charset, } part = MultipartPart(**opts) for line, nl in lines: if line == terminator and not is_tail: part.file.seek(0) yield part break elif line == separator and not is_tail: if part.is_buffered(): mem_used += part.size else: disk_used += part.size part.file.seek(0) yield part part = MultipartPart(**opts) else: is_tail = not nl # The next line continues this one try: part.feed(line, nl) if part.is_buffered(): if part.size + mem_used > self.mem_limit: raise MultipartError("Memory limit reached.") elif part.size + disk_used > self.disk_limit: raise MultipartError("Disk limit reached.") except MultipartError: part.close() raise else: # If we run off the end of the loop, the current MultipartPart # will not have been yielded, so it's our responsibility to # close it. part.close() if line != terminator: raise MultipartError("Unexpected end of multipart stream.")
Methods
def get(self, name, default=None)
-
Return the first part with that name or a default value (None).
Expand source code
def get(self, name, default=None): """ Return the first part with that name or a default value (None). """ for part in self: if name == part.name: return part return default
def get_all(self, name)
-
Return a list of parts with that name.
Expand source code
def get_all(self, name): """ Return a list of parts with that name. """ return [p for p in self if p.name == name]
def parts(self)
-
Returns a list with all parts of the multipart message.
Expand source code
def parts(self): """ Returns a list with all parts of the multipart message. """ return list(self)
class MultipartPart (buffer_size=65536, memfile_limit=262144, charset='latin1')
-
Expand source code
class MultipartPart(object): def __init__(self, buffer_size=2 ** 16, memfile_limit=2 ** 18, charset="latin1"): self.headerlist = [] self.headers = None self.file = False self.size = 0 self._buf = b"" self.disposition = None self.name = None self.filename = None self.content_type = None self.charset = charset self.memfile_limit = memfile_limit self.buffer_size = buffer_size def feed(self, line, nl=""): if self.file: return self.write_body(line, nl) return self.write_header(line, nl) def write_header(self, line, nl): line = line.decode(self.charset) if not nl: raise MultipartError("Unexpected end of line in header.") if not line.strip(): # blank line -> end of header segment self.finish_header() elif line[0] in " \t" and self.headerlist: name, value = self.headerlist.pop() self.headerlist.append((name, value + line.strip())) else: if ":" not in line: raise MultipartError("Syntax error in header: No colon.") name, value = line.split(":", 1) self.headerlist.append((name.strip(), value.strip())) def write_body(self, line, nl): if not line and not nl: return # This does not even flush the buffer self.size += len(line) + len(self._buf) self.file.write(self._buf + line) self._buf = nl if self.content_length > 0 and self.size > self.content_length: raise MultipartError("Size of body exceeds Content-Length header.") if self.size > self.memfile_limit and isinstance(self.file, BytesIO): # TODO: What about non-file uploads that exceed the memfile_limit? self.file, old = TemporaryFile(mode="w+b"), self.file old.seek(0) copy_file(old, self.file, self.size, self.buffer_size) def finish_header(self): self.file = BytesIO() self.headers = Headers(self.headerlist) content_disposition = self.headers.get("Content-Disposition", "") content_type = self.headers.get("Content-Type", "") if not content_disposition: raise MultipartError("Content-Disposition header is missing.") self.disposition, self.options = parse_options_header(content_disposition) self.name = self.options.get("name") self.filename = self.options.get("filename") self.content_type, options = parse_options_header(content_type) self.charset = options.get("charset") or self.charset self.content_length = int(self.headers.get("Content-Length", "-1")) def is_buffered(self): """ Return true if the data is fully buffered in memory.""" return isinstance(self.file, BytesIO) @property def value(self): """ Data decoded with the specified charset """ return self.raw.decode(self.charset) @property def raw(self): """ Data without decoding """ pos = self.file.tell() self.file.seek(0) try: val = self.file.read() except IOError: raise finally: self.file.seek(pos) return val def save_as(self, path): with open(path, "wb") as fp: pos = self.file.tell() try: self.file.seek(0) size = copy_file(self.file, fp) finally: self.file.seek(pos) return size def close(self): if self.file: self.file.close() self.file = False
Instance variables
var raw
-
Data without decoding
Expand source code
@property def raw(self): """ Data without decoding """ pos = self.file.tell() self.file.seek(0) try: val = self.file.read() except IOError: raise finally: self.file.seek(pos) return val
var value
-
Data decoded with the specified charset
Expand source code
@property def value(self): """ Data decoded with the specified charset """ return self.raw.decode(self.charset)
Methods
def close(self)
-
Expand source code
def close(self): if self.file: self.file.close() self.file = False
def feed(self, line, nl='')
-
Expand source code
def feed(self, line, nl=""): if self.file: return self.write_body(line, nl) return self.write_header(line, nl)
def finish_header(self)
-
Expand source code
def finish_header(self): self.file = BytesIO() self.headers = Headers(self.headerlist) content_disposition = self.headers.get("Content-Disposition", "") content_type = self.headers.get("Content-Type", "") if not content_disposition: raise MultipartError("Content-Disposition header is missing.") self.disposition, self.options = parse_options_header(content_disposition) self.name = self.options.get("name") self.filename = self.options.get("filename") self.content_type, options = parse_options_header(content_type) self.charset = options.get("charset") or self.charset self.content_length = int(self.headers.get("Content-Length", "-1"))
def is_buffered(self)
-
Return true if the data is fully buffered in memory.
Expand source code
def is_buffered(self): """ Return true if the data is fully buffered in memory.""" return isinstance(self.file, BytesIO)
def save_as(self, path)
-
Expand source code
def save_as(self, path): with open(path, "wb") as fp: pos = self.file.tell() try: self.file.seek(0) size = copy_file(self.file, fp) finally: self.file.seek(pos) return size
def write_body(self, line, nl)
-
Expand source code
def write_body(self, line, nl): if not line and not nl: return # This does not even flush the buffer self.size += len(line) + len(self._buf) self.file.write(self._buf + line) self._buf = nl if self.content_length > 0 and self.size > self.content_length: raise MultipartError("Size of body exceeds Content-Length header.") if self.size > self.memfile_limit and isinstance(self.file, BytesIO): # TODO: What about non-file uploads that exceed the memfile_limit? self.file, old = TemporaryFile(mode="w+b"), self.file old.seek(0) copy_file(old, self.file, self.size, self.buffer_size)
def write_header(self, line, nl)
-
Expand source code
def write_header(self, line, nl): line = line.decode(self.charset) if not nl: raise MultipartError("Unexpected end of line in header.") if not line.strip(): # blank line -> end of header segment self.finish_header() elif line[0] in " \t" and self.headerlist: name, value = self.headerlist.pop() self.headerlist.append((name, value + line.strip())) else: if ":" not in line: raise MultipartError("Syntax error in header: No colon.") name, value = line.split(":", 1) self.headerlist.append((name.strip(), value.strip()))