From 965734a1eb1f2553985094ade9f28871016f652b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 29 Mar 2018 12:49:27 +0200 Subject: [PATCH] Limit the number of bytes to use to copy the content of a direct buffer to an Outputstream (#7813) Motivation: Currently copying a direct ByteBuf copies it fully into the heap before writing it to an output stream. The can result in huge memory usage on the heap. Modification: copy the bytebuf contents via an 8k buffer into the output stream Result: Fixes #7804 --- .../java/io/netty/buffer/ByteBufUtil.java | 41 +++++++++++++++++++ .../io/netty/buffer/PooledDirectByteBuf.java | 12 +----- .../netty/buffer/UnpooledDirectByteBuf.java | 16 +------- .../io/netty/buffer/UnsafeByteBufUtil.java | 32 +++++++++++---- 4 files changed, 67 insertions(+), 34 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java index b0ecb35cac..768f558714 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java @@ -27,6 +27,8 @@ import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.CharBuffer; @@ -64,6 +66,7 @@ public final class ByteBufUtil { private static final int MAX_BYTES_PER_CHAR_UTF8 = (int) CharsetUtil.encoder(CharsetUtil.UTF_8).maxBytesPerChar(); + static final int WRITE_CHUNK_SIZE = 8192; static final ByteBufAllocator DEFAULT_ALLOCATOR; static { @@ -1392,5 +1395,43 @@ public final class ByteBufUtil { return true; } + /** + * Read bytes from the given {@link ByteBuffer} into the given {@link OutputStream} using the {@code position} and + * {@code length}. The position and limit of the given {@link ByteBuffer} may be adjusted. + */ + static void readBytes(ByteBufAllocator allocator, ByteBuffer buffer, int position, int length, OutputStream out) + throws IOException { + if (buffer.hasArray()) { + out.write(buffer.array(), position + buffer.arrayOffset(), length); + } else { + int chunkLen = Math.min(length, WRITE_CHUNK_SIZE); + buffer.clear().position(position); + + if (allocator.isDirectBufferPooled()) { + // if direct buffers are pooled chances are good that heap buffers are pooled as well. + ByteBuf tmpBuf = allocator.heapBuffer(chunkLen); + try { + byte[] tmp = tmpBuf.array(); + int offset = tmpBuf.arrayOffset(); + getBytes(buffer, tmp, offset, chunkLen, out, length); + } finally { + tmpBuf.release(); + } + } else { + getBytes(buffer, new byte[chunkLen], 0, chunkLen, out, length); + } + } + } + + private static void getBytes(ByteBuffer inBuffer, byte[] in, int inOffset, int inLen, OutputStream out, int outLen) + throws IOException { + do { + int len = Math.min(inLen, outLen); + inBuffer.get(in, inOffset, len); + out.write(in, inOffset, len); + outLen -= len; + } while (outLen > 0); + } + private ByteBufUtil() { } } diff --git a/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java index 7c6192bf07..3c43509c05 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java @@ -190,17 +190,7 @@ final class PooledDirectByteBuf extends PooledByteBuf { if (length == 0) { return; } - - byte[] tmp = new byte[length]; - ByteBuffer tmpBuf; - if (internal) { - tmpBuf = internalNioBuffer(); - } else { - tmpBuf = memory.duplicate(); - } - tmpBuf.clear().position(idx(index)); - tmpBuf.get(tmp); - out.write(tmp); + ByteBufUtil.readBytes(alloc(), internal ? internalNioBuffer() : memory.duplicate(), idx(index), length, out); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java index 3ce75d1d63..49960b3764 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java @@ -491,21 +491,7 @@ public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf { if (length == 0) { return; } - - if (buffer.hasArray()) { - out.write(buffer.array(), index + buffer.arrayOffset(), length); - } else { - byte[] tmp = new byte[length]; - ByteBuffer tmpBuf; - if (internal) { - tmpBuf = internalNioBuffer(); - } else { - tmpBuf = buffer.duplicate(); - } - tmpBuf.clear().position(index); - tmpBuf.get(tmp); - out.write(tmp); - } + ByteBufUtil.readBytes(alloc(), internal ? internalNioBuffer() : buffer.duplicate(), index, length, out); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java index 1016cc4ab9..7d866d5b43 100644 --- a/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java @@ -583,18 +583,34 @@ final class UnsafeByteBufUtil { static void getBytes(AbstractByteBuf buf, long addr, int index, OutputStream out, int length) throws IOException { buf.checkIndex(index, length); if (length != 0) { - ByteBuf tmpBuf = buf.alloc().heapBuffer(length); - try { - byte[] tmp = tmpBuf.array(); - int offset = tmpBuf.arrayOffset(); - PlatformDependent.copyMemory(addr, tmp, offset, length); - out.write(tmp, offset, length); - } finally { - tmpBuf.release(); + int len = Math.min(length, ByteBufUtil.WRITE_CHUNK_SIZE); + if (buf.alloc().isDirectBufferPooled()) { + // if direct buffers are pooled chances are good that heap buffers are pooled as well. + ByteBuf tmpBuf = buf.alloc().heapBuffer(len); + try { + byte[] tmp = tmpBuf.array(); + int offset = tmpBuf.arrayOffset(); + getBytes(addr, tmp, offset, len, out, length); + } finally { + tmpBuf.release(); + } + } else { + getBytes(addr, new byte[len], 0, len, out, length); } } } + private static void getBytes(long inAddr, byte[] in, int inOffset, int inLen, OutputStream out, int outLen) + throws IOException { + do { + int len = Math.min(inLen, outLen); + PlatformDependent.copyMemory(inAddr, in, inOffset, len); + out.write(in, inOffset, len); + outLen -= len; + inAddr += len; + } while (outLen > 0); + } + static void setZero(long addr, int length) { if (length == 0) { return;