Take memory overhead of ChannelOutboundBuffer / PendingWriteQueue into account

Motivation:

To guard against the case that a user will enqueue a lot of empty or small buffers and so raise an OOME we need to also take the overhead of the ChannelOutboundBuffer / PendingWriteQueue into account when detect if a Channel is writable or not. This is related to #5856.

Modifications:

When calculate the memory for an message that is enqueued also add some extra bytes depending on the implementation.

Result:

Better guard against OOME.
This commit is contained in:
Norman Maurer 2016-10-13 14:50:09 +02:00 committed by Norman Maurer
parent 4ee361e302
commit e3cb9935c0
3 changed files with 36 additions and 10 deletions

View File

@ -26,6 +26,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -48,6 +49,15 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
* </p>
*/
public final class ChannelOutboundBuffer {
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 8 reference fields
// - 2 long fields
// - 2 int fields
// - 1 boolean field
// - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
@ -128,7 +138,7 @@ public final class ChannelOutboundBuffer {
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, false);
incrementPendingOutboundBytes(entry.pendingSize, false);
}
/**
@ -783,7 +793,7 @@ public final class ChannelOutboundBuffer {
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size;
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -28,6 +29,12 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
*/
public final class PendingWriteQueue {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 4 reference fields
// - 1 long fields
private static final int PENDING_WRITE_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
private final ChannelHandlerContext ctx;
private final ChannelOutboundBuffer buffer;
@ -73,6 +80,17 @@ public final class PendingWriteQueue {
return bytes;
}
private int size(Object msg) {
// It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
// we should add them to the queue and let removeAndFailAll() fail them later.
int messageSize = estimatorHandle.size(msg);
if (messageSize < 0) {
// Size may be unknow so just use 0
messageSize = 0;
}
return messageSize + PENDING_WRITE_OVERHEAD;
}
/**
* Add the given {@code msg} and {@link ChannelPromise}.
*/
@ -86,11 +104,8 @@ public final class PendingWriteQueue {
}
// It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
// we should add them to the queue and let removeAndFailAll() fail them later.
int messageSize = estimatorHandle.size(msg);
if (messageSize < 0) {
// Size may be unknow so just use 0
messageSize = 0;
}
int messageSize = size(msg);
PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
PendingWrite currentTail = tail;
if (currentTail == null) {

View File

@ -218,8 +218,8 @@ public class ChannelOutboundBufferTest {
}
});
ch.config().setWriteBufferLowWaterMark(128);
ch.config().setWriteBufferHighWaterMark(256);
ch.config().setWriteBufferLowWaterMark(128 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD);
ch.config().setWriteBufferHighWaterMark(256 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD);
ch.write(buffer().writeZero(128));
// Ensure exceeding the low watermark does not make channel unwritable.
@ -235,7 +235,8 @@ public class ChannelOutboundBufferTest {
// Ensure going down to the low watermark makes channel writable again by flushing the first write.
assertThat(ch.unsafe().outboundBuffer().remove(), is(true));
assertThat(ch.unsafe().outboundBuffer().remove(), is(true));
assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(), is(127L));
assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(),
is(127L + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD));
assertThat(buf.toString(), is("false true "));
safeClose(ch);