Limit the number of bytes to use to copy the content of a direct buffer to an Outputstream (#7813)
Motivation: Currently copying a direct ByteBuf copies it fully into the heap before writing it to an output stream. The can result in huge memory usage on the heap. Modification: copy the bytebuf contents via an 8k buffer into the output stream Result: Fixes #7804
This commit is contained in:
parent
ed0668384b
commit
965734a1eb
@ -27,6 +27,8 @@ import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.CharBuffer;
|
||||
@ -64,6 +66,7 @@ public final class ByteBufUtil {
|
||||
private static final int MAX_BYTES_PER_CHAR_UTF8 =
|
||||
(int) CharsetUtil.encoder(CharsetUtil.UTF_8).maxBytesPerChar();
|
||||
|
||||
static final int WRITE_CHUNK_SIZE = 8192;
|
||||
static final ByteBufAllocator DEFAULT_ALLOCATOR;
|
||||
|
||||
static {
|
||||
@ -1392,5 +1395,43 @@ public final class ByteBufUtil {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read bytes from the given {@link ByteBuffer} into the given {@link OutputStream} using the {@code position} and
|
||||
* {@code length}. The position and limit of the given {@link ByteBuffer} may be adjusted.
|
||||
*/
|
||||
static void readBytes(ByteBufAllocator allocator, ByteBuffer buffer, int position, int length, OutputStream out)
|
||||
throws IOException {
|
||||
if (buffer.hasArray()) {
|
||||
out.write(buffer.array(), position + buffer.arrayOffset(), length);
|
||||
} else {
|
||||
int chunkLen = Math.min(length, WRITE_CHUNK_SIZE);
|
||||
buffer.clear().position(position);
|
||||
|
||||
if (allocator.isDirectBufferPooled()) {
|
||||
// if direct buffers are pooled chances are good that heap buffers are pooled as well.
|
||||
ByteBuf tmpBuf = allocator.heapBuffer(chunkLen);
|
||||
try {
|
||||
byte[] tmp = tmpBuf.array();
|
||||
int offset = tmpBuf.arrayOffset();
|
||||
getBytes(buffer, tmp, offset, chunkLen, out, length);
|
||||
} finally {
|
||||
tmpBuf.release();
|
||||
}
|
||||
} else {
|
||||
getBytes(buffer, new byte[chunkLen], 0, chunkLen, out, length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void getBytes(ByteBuffer inBuffer, byte[] in, int inOffset, int inLen, OutputStream out, int outLen)
|
||||
throws IOException {
|
||||
do {
|
||||
int len = Math.min(inLen, outLen);
|
||||
inBuffer.get(in, inOffset, len);
|
||||
out.write(in, inOffset, len);
|
||||
outLen -= len;
|
||||
} while (outLen > 0);
|
||||
}
|
||||
|
||||
private ByteBufUtil() { }
|
||||
}
|
||||
|
@ -190,17 +190,7 @@ final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
|
||||
if (length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] tmp = new byte[length];
|
||||
ByteBuffer tmpBuf;
|
||||
if (internal) {
|
||||
tmpBuf = internalNioBuffer();
|
||||
} else {
|
||||
tmpBuf = memory.duplicate();
|
||||
}
|
||||
tmpBuf.clear().position(idx(index));
|
||||
tmpBuf.get(tmp);
|
||||
out.write(tmp);
|
||||
ByteBufUtil.readBytes(alloc(), internal ? internalNioBuffer() : memory.duplicate(), idx(index), length, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -491,21 +491,7 @@ public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
|
||||
if (length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (buffer.hasArray()) {
|
||||
out.write(buffer.array(), index + buffer.arrayOffset(), length);
|
||||
} else {
|
||||
byte[] tmp = new byte[length];
|
||||
ByteBuffer tmpBuf;
|
||||
if (internal) {
|
||||
tmpBuf = internalNioBuffer();
|
||||
} else {
|
||||
tmpBuf = buffer.duplicate();
|
||||
}
|
||||
tmpBuf.clear().position(index);
|
||||
tmpBuf.get(tmp);
|
||||
out.write(tmp);
|
||||
}
|
||||
ByteBufUtil.readBytes(alloc(), internal ? internalNioBuffer() : buffer.duplicate(), index, length, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -583,18 +583,34 @@ final class UnsafeByteBufUtil {
|
||||
static void getBytes(AbstractByteBuf buf, long addr, int index, OutputStream out, int length) throws IOException {
|
||||
buf.checkIndex(index, length);
|
||||
if (length != 0) {
|
||||
ByteBuf tmpBuf = buf.alloc().heapBuffer(length);
|
||||
try {
|
||||
byte[] tmp = tmpBuf.array();
|
||||
int offset = tmpBuf.arrayOffset();
|
||||
PlatformDependent.copyMemory(addr, tmp, offset, length);
|
||||
out.write(tmp, offset, length);
|
||||
} finally {
|
||||
tmpBuf.release();
|
||||
int len = Math.min(length, ByteBufUtil.WRITE_CHUNK_SIZE);
|
||||
if (buf.alloc().isDirectBufferPooled()) {
|
||||
// if direct buffers are pooled chances are good that heap buffers are pooled as well.
|
||||
ByteBuf tmpBuf = buf.alloc().heapBuffer(len);
|
||||
try {
|
||||
byte[] tmp = tmpBuf.array();
|
||||
int offset = tmpBuf.arrayOffset();
|
||||
getBytes(addr, tmp, offset, len, out, length);
|
||||
} finally {
|
||||
tmpBuf.release();
|
||||
}
|
||||
} else {
|
||||
getBytes(addr, new byte[len], 0, len, out, length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void getBytes(long inAddr, byte[] in, int inOffset, int inLen, OutputStream out, int outLen)
|
||||
throws IOException {
|
||||
do {
|
||||
int len = Math.min(inLen, outLen);
|
||||
PlatformDependent.copyMemory(inAddr, in, inOffset, len);
|
||||
out.write(in, inOffset, len);
|
||||
outLen -= len;
|
||||
inAddr += len;
|
||||
} while (outLen > 0);
|
||||
}
|
||||
|
||||
static void setZero(long addr, int length) {
|
||||
if (length == 0) {
|
||||
return;
|
||||
|
Loading…
Reference in New Issue
Block a user