Commit 3cd7c6e6 authored by Olivier Grisel's avatar Olivier Grisel Committed by Serhiy Storchaka

bpo-31993: Do not allocate large temporary buffers in pickle dump. (#4353)

The picklers do no longer allocate temporary memory when dumping large
bytes and str objects into a file object. Instead the data is
directly streamed into the underlying file object.

Previously the C implementation would buffer all content and issue a
single call to file.write() at the end of the dump. With protocol 4
this behavior has changed to issue one call to file.write() per frame.

The Python pickler with protocol 4 now dumps each frame content as a
memoryview to an IOBytes instance that is never reused and the
memoryview is no longer released after the call to write. This makes it
possible for the file object to delay access to the memoryview of
previous frames without forcing any additional memory copy as was
already possible with the C pickler.
parent 85ac726a
......@@ -201,14 +201,24 @@ class _Framer:
if self.current_frame:
f = self.current_frame
if f.tell() >= self._FRAME_SIZE_TARGET or force:
with f.getbuffer() as data:
n = len(data)
write = self.file_write
write(FRAME)
write(pack("<Q", n))
write(data)
f.seek(0)
f.truncate()
data = f.getbuffer()
write = self.file_write
# Issue a single call to the write method of the underlying
# file object for the frame opcode with the size of the
# frame. The concatenation is expected to be less expensive
# than issuing an additional call to write.
write(FRAME + pack("<Q", len(data)))
# Issue a separate call to write to append the frame
# contents without concatenation to the above to avoid a
# memory copy.
write(data)
# Start the new frame with a new io.BytesIO instance so that
# the file object can have delayed access to the previous frame
# contents via an unreleased memoryview of the previous
# io.BytesIO instance.
self.current_frame = io.BytesIO()
def write(self, data):
if self.current_frame:
......@@ -216,6 +226,21 @@ class _Framer:
else:
return self.file_write(data)
def write_large_bytes(self, header, payload):
write = self.file_write
if self.current_frame:
# Terminate the current frame and flush it to the file.
self.commit_frame(force=True)
# Perform direct write of the header and payload of the large binary
# object. Be careful not to concatenate the header and the payload
# prior to calling 'write' as we do not want to allocate a large
# temporary bytes object.
# We intentionally do not insert a protocol 4 frame opcode to make
# it possible to optimize file.read calls in the loader.
write(header)
write(payload)
class _Unframer:
......@@ -379,6 +404,7 @@ class _Pickler:
raise TypeError("file must have a 'write' attribute")
self.framer = _Framer(self._file_write)
self.write = self.framer.write
self._write_large_bytes = self.framer.write_large_bytes
self.memo = {}
self.proto = int(protocol)
self.bin = protocol >= 1
......@@ -699,7 +725,9 @@ class _Pickler:
if n <= 0xff:
self.write(SHORT_BINBYTES + pack("<B", n) + obj)
elif n > 0xffffffff and self.proto >= 4:
self.write(BINBYTES8 + pack("<Q", n) + obj)
self._write_large_bytes(BINBYTES8 + pack("<Q", n), obj)
elif n >= self.framer._FRAME_SIZE_TARGET:
self._write_large_bytes(BINBYTES + pack("<I", n), obj)
else:
self.write(BINBYTES + pack("<I", n) + obj)
self.memoize(obj)
......@@ -712,7 +740,9 @@ class _Pickler:
if n <= 0xff and self.proto >= 4:
self.write(SHORT_BINUNICODE + pack("<B", n) + encoded)
elif n > 0xffffffff and self.proto >= 4:
self.write(BINUNICODE8 + pack("<Q", n) + encoded)
self._write_large_bytes(BINUNICODE8 + pack("<Q", n), encoded)
elif n >= self.framer._FRAME_SIZE_TARGET:
self._write_large_bytes(BINUNICODE + pack("<I", n), encoded)
else:
self.write(BINUNICODE + pack("<I", n) + encoded)
else:
......
......@@ -2279,7 +2279,7 @@ def optimize(p):
if arg > proto:
proto = arg
if pos == 0:
protoheader = p[pos: end_pos]
protoheader = p[pos:end_pos]
else:
opcodes.append((pos, end_pos))
else:
......@@ -2295,6 +2295,7 @@ def optimize(p):
pickler.framer.start_framing()
idx = 0
for op, arg in opcodes:
frameless = False
if op is put:
if arg not in newids:
continue
......@@ -2305,8 +2306,12 @@ def optimize(p):
data = pickler.get(newids[arg])
else:
data = p[op:arg]
pickler.framer.commit_frame()
pickler.write(data)
frameless = len(data) > pickler.framer._FRAME_SIZE_TARGET
pickler.framer.commit_frame(force=frameless)
if frameless:
pickler.framer.file_write(data)
else:
pickler.write(data)
pickler.framer.end_framing()
return out.getvalue()
......
......@@ -2042,21 +2042,40 @@ class AbstractPickleTests(unittest.TestCase):
def check_frame_opcodes(self, pickled):
"""
Check the arguments of FRAME opcodes in a protocol 4+ pickle.
Note that binary objects that are larger than FRAME_SIZE_TARGET are not
framed by default and are therefore considered a frame by themselves in
the following consistency check.
"""
frame_opcode_size = 9
last_arg = last_pos = None
last_arg = last_pos = last_frame_opcode_size = None
frameless_opcode_sizes = {
'BINBYTES': 5,
'BINUNICODE': 5,
'BINBYTES8': 9,
'BINUNICODE8': 9,
}
for op, arg, pos in pickletools.genops(pickled):
if op.name != 'FRAME':
if op.name in frameless_opcode_sizes:
if len(arg) > self.FRAME_SIZE_TARGET:
frame_opcode_size = frameless_opcode_sizes[op.name]
arg = len(arg)
else:
continue
elif op.name == 'FRAME':
frame_opcode_size = 9
else:
continue
if last_pos is not None:
# The previous frame's size should be equal to the number
# of bytes up to the current frame.
frame_size = pos - last_pos - frame_opcode_size
frame_size = pos - last_pos - last_frame_opcode_size
self.assertEqual(frame_size, last_arg)
last_arg, last_pos = arg, pos
last_frame_opcode_size = frame_opcode_size
# The last frame's size should be equal to the number of bytes up
# to the pickle's end.
frame_size = len(pickled) - last_pos - frame_opcode_size
frame_size = len(pickled) - last_pos - last_frame_opcode_size
self.assertEqual(frame_size, last_arg)
def test_framing_many_objects(self):
......@@ -2076,15 +2095,36 @@ class AbstractPickleTests(unittest.TestCase):
def test_framing_large_objects(self):
N = 1024 * 1024
obj = [b'x' * N, b'y' * N, b'z' * N]
obj = [b'x' * N, b'y' * N, 'z' * N]
for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
with self.subTest(proto=proto):
pickled = self.dumps(obj, proto)
unpickled = self.loads(pickled)
self.assertEqual(obj, unpickled)
n_frames = count_opcode(pickle.FRAME, pickled)
self.assertGreaterEqual(n_frames, len(obj))
self.check_frame_opcodes(pickled)
for fast in [True, False]:
with self.subTest(proto=proto, fast=fast):
if hasattr(self, 'pickler'):
buf = io.BytesIO()
pickler = self.pickler(buf, protocol=proto)
pickler.fast = fast
pickler.dump(obj)
pickled = buf.getvalue()
elif fast:
continue
else:
# Fallback to self.dumps when fast=False and
# self.pickler is not available.
pickled = self.dumps(obj, proto)
unpickled = self.loads(pickled)
# More informative error message in case of failure.
self.assertEqual([len(x) for x in obj],
[len(x) for x in unpickled])
# Perform full equality check if the lengths match.
self.assertEqual(obj, unpickled)
n_frames = count_opcode(pickle.FRAME, pickled)
if not fast:
# One frame per memoize for each large object.
self.assertGreaterEqual(n_frames, len(obj))
else:
# One frame at the beginning and one at the end.
self.assertGreaterEqual(n_frames, 2)
self.check_frame_opcodes(pickled)
def test_optional_frames(self):
if pickle.HIGHEST_PROTOCOL < 4:
......@@ -2125,6 +2165,71 @@ class AbstractPickleTests(unittest.TestCase):
count_opcode(pickle.FRAME, pickled))
self.assertEqual(obj, self.loads(some_frames_pickle))
def test_framed_write_sizes_with_delayed_writer(self):
class ChunkAccumulator:
"""Accumulate pickler output in a list of raw chunks."""
def __init__(self):
self.chunks = []
def write(self, chunk):
self.chunks.append(chunk)
def concatenate_chunks(self):
# Some chunks can be memoryview instances, we need to convert
# them to bytes to be able to call join
return b"".join([c.tobytes() if hasattr(c, 'tobytes') else c
for c in self.chunks])
small_objects = [(str(i).encode('ascii'), i % 42, {'i': str(i)})
for i in range(int(1e4))]
for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
# Protocol 4 packs groups of small objects into frames and issues
# calls to write only once or twice per frame:
# The C pickler issues one call to write per-frame (header and
# contents) while Python pickler issues two calls to write: one for
# the frame header and one for the frame binary contents.
writer = ChunkAccumulator()
self.pickler(writer, proto).dump(small_objects)
# Actually read the binary content of the chunks after the end
# of the call to dump: ant memoryview passed to write should not
# be released otherwise this delayed access would not be possible.
pickled = writer.concatenate_chunks()
reconstructed = self.loads(pickled)
self.assertEqual(reconstructed, small_objects)
self.assertGreater(len(writer.chunks), 1)
n_frames, remainder = divmod(len(pickled), self.FRAME_SIZE_TARGET)
if remainder > 0:
n_frames += 1
# There should be at least one call to write per frame
self.assertGreaterEqual(len(writer.chunks), n_frames)
# but not too many either: there can be one for the proto,
# one per-frame header and one per frame for the actual contents.
self.assertGreaterEqual(2 * n_frames + 1, len(writer.chunks))
chunk_sizes = [len(c) for c in writer.chunks[:-1]]
large_sizes = [s for s in chunk_sizes
if s >= self.FRAME_SIZE_TARGET]
small_sizes = [s for s in chunk_sizes
if s < self.FRAME_SIZE_TARGET]
# Large chunks should not be too large:
for chunk_size in large_sizes:
self.assertGreater(2 * self.FRAME_SIZE_TARGET, chunk_size)
last_chunk_size = len(writer.chunks[-1])
self.assertGreater(2 * self.FRAME_SIZE_TARGET, last_chunk_size)
# Small chunks (if any) should be very small
# (only proto and frame headers)
for chunk_size in small_sizes:
self.assertGreaterEqual(9, chunk_size)
def test_nested_names(self):
global Nested
class Nested:
......
......@@ -15,6 +15,9 @@ class OptimizedPickleTests(AbstractPickleTests):
# Test relies on precise output of dumps()
test_pickle_to_2x = None
# Test relies on writing by chunks into a file object.
test_framed_write_sizes_with_delayed_writer = None
def test_optimize_long_binget(self):
data = [str(i) for i in range(257)]
data.append(data[-1])
......
The picklers do no longer allocate temporary memory when dumping large
``bytes`` and ``str`` objects into a file object. Instead the data is
directly streamed into the underlying file object.
Previously the C implementation would buffer all content and issue a
single call to ``file.write`` at the end of the dump. With protocol 4
this behavior has changed to issue one call to ``file.write`` per frame.
The Python pickler with protocol 4 now dumps each frame content as a
memoryview to an IOBytes instance that is never reused and the
memoryview is no longer released after the call to write. This makes it
possible for the file object to delay access to the memoryview of
previous frames without forcing any additional memory copy as was
already possible with the C pickler.
......@@ -971,20 +971,6 @@ _Pickler_CommitFrame(PicklerObject *self)
return 0;
}
static int
_Pickler_OpcodeBoundary(PicklerObject *self)
{
Py_ssize_t frame_len;
if (!self->framing || self->frame_start == -1)
return 0;
frame_len = self->output_len - self->frame_start - FRAME_HEADER_SIZE;
if (frame_len >= FRAME_SIZE_TARGET)
return _Pickler_CommitFrame(self);
else
return 0;
}
static PyObject *
_Pickler_GetString(PicklerObject *self)
{
......@@ -1019,6 +1005,38 @@ _Pickler_FlushToFile(PicklerObject *self)
return (result == NULL) ? -1 : 0;
}
static int
_Pickler_OpcodeBoundary(PicklerObject *self)
{
Py_ssize_t frame_len;
if (!self->framing || self->frame_start == -1) {
return 0;
}
frame_len = self->output_len - self->frame_start - FRAME_HEADER_SIZE;
if (frame_len >= FRAME_SIZE_TARGET) {
if(_Pickler_CommitFrame(self)) {
return -1;
}
/* Flush the content of the commited frame to the underlying
* file and reuse the pickler buffer for the next frame so as
* to limit memory usage when dumping large complex objects to
* a file.
*
* self->write is NULL when called via dumps.
*/
if (self->write != NULL) {
if (_Pickler_FlushToFile(self) < 0) {
return -1;
}
if (_Pickler_ClearBuffer(self) < 0) {
return -1;
}
}
}
return 0;
}
static Py_ssize_t
_Pickler_Write(PicklerObject *self, const char *s, Py_ssize_t data_len)
{
......@@ -2124,6 +2142,51 @@ done:
return 0;
}
/* No-copy code-path to write large contiguous data directly into the
underlying file object, bypassing the output_buffer of the Pickler. */
static int
_Pickler_write_large_bytes(
PicklerObject *self, const char *header, Py_ssize_t header_size,
PyObject *payload)
{
assert(self->output_buffer != NULL);
assert(self->write != NULL);
PyObject *result;
/* Commit the previous frame. */
if (_Pickler_CommitFrame(self)) {
return -1;
}
/* Disable frameing temporarily */
self->framing = 0;
if (_Pickler_Write(self, header, header_size) < 0) {
return -1;
}
/* Dump the output buffer to the file. */
if (_Pickler_FlushToFile(self) < 0) {
return -1;
}
/* Stream write the payload into the file without going through the
output buffer. */
result = PyObject_CallFunctionObjArgs(self->write, payload, NULL);
if (result == NULL) {
return -1;
}
Py_DECREF(result);
/* Reinitialize the buffer for subsequent calls to _Pickler_Write. */
if (_Pickler_ClearBuffer(self) < 0) {
return -1;
}
/* Re-enable framing for subsequent calls to _Pickler_Write. */
self->framing = 1;
return 0;
}
static int
save_bytes(PicklerObject *self, PyObject *obj)
{
......@@ -2202,11 +2265,21 @@ save_bytes(PicklerObject *self, PyObject *obj)
return -1; /* string too large */
}
if (_Pickler_Write(self, header, len) < 0)
return -1;
if (_Pickler_Write(self, PyBytes_AS_STRING(obj), size) < 0)
return -1;
if (size < FRAME_SIZE_TARGET || self->write == NULL) {
if (_Pickler_Write(self, header, len) < 0) {
return -1;
}
if (_Pickler_Write(self, PyBytes_AS_STRING(obj), size) < 0) {
return -1;
}
}
else {
/* Bypass the in-memory buffer to directly stream large data
into the underlying file object. */
if (_Pickler_write_large_bytes(self, header, len, obj) < 0) {
return -1;
}
}
if (memo_put(self, obj) < 0)
return -1;
......@@ -2291,6 +2364,7 @@ write_utf8(PicklerObject *self, const char *data, Py_ssize_t size)
{
char header[9];
Py_ssize_t len;
PyObject *mem;
assert(size >= 0);
if (size <= 0xff && self->proto >= 4) {
......@@ -2317,11 +2391,27 @@ write_utf8(PicklerObject *self, const char *data, Py_ssize_t size)
return -1;
}
if (_Pickler_Write(self, header, len) < 0)
return -1;
if (_Pickler_Write(self, data, size) < 0)
return -1;
if (size < FRAME_SIZE_TARGET || self->write == NULL) {
if (_Pickler_Write(self, header, len) < 0) {
return -1;
}
if (_Pickler_Write(self, data, size) < 0) {
return -1;
}
}
else {
/* Bypass the in-memory buffer to directly stream large data
into the underlying file object. */
mem = PyMemoryView_FromMemory((char *) data, size, PyBUF_READ);
if (mem == NULL) {
return -1;
}
if (_Pickler_write_large_bytes(self, header, len, mem) < 0) {
Py_DECREF(mem);
return -1;
}
Py_DECREF(mem);
}
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment