Take memory overhead of ChannelOutboundBuffer / PendingWriteQueue into account

Motivation:

To guard against the case that a user will enqueue a lot of empty or small buffers and so raise an OOME we need to also take the overhead of the ChannelOutboundBuffer / PendingWriteQueue into account when detect if a Channel is writable or not. This is related to #5856.

Modifications:

When calculate the memory for an message that is enqueued also add some extra bytes depending on the implementation.

Result:

Better guard against OOME.
This commit is contained in:
Norman Maurer 2016-10-13 14:50:09 +02:00 committed by Norman Maurer
parent a7662db470
commit e9607cc1c6
3 changed files with 36 additions and 10 deletions

View File

@ -26,6 +26,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -48,6 +49,15 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
* </p> * </p>
*/ */
public final class ChannelOutboundBuffer { public final class ChannelOutboundBuffer {
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 8 reference fields
// - 2 long fields
// - 2 int fields
// - 1 boolean field
// - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
@ -128,7 +138,7 @@ public final class ChannelOutboundBuffer {
// increment pending bytes after adding message to the unflushed arrays. // increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619 // See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, false); incrementPendingOutboundBytes(entry.pendingSize, false);
} }
/** /**
@ -783,7 +793,7 @@ public final class ChannelOutboundBuffer {
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get(); Entry entry = RECYCLER.get();
entry.msg = msg; entry.msg = msg;
entry.pendingSize = size; entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total; entry.total = total;
entry.promise = promise; entry.promise = promise;
return entry; return entry;

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner; import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -28,6 +29,12 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
*/ */
public final class PendingWriteQueue { public final class PendingWriteQueue {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 4 reference fields
// - 1 long fields
private static final int PENDING_WRITE_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
private final ChannelOutboundBuffer buffer; private final ChannelOutboundBuffer buffer;
@ -73,6 +80,17 @@ public final class PendingWriteQueue {
return bytes; return bytes;
} }
private int size(Object msg) {
// It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
// we should add them to the queue and let removeAndFailAll() fail them later.
int messageSize = estimatorHandle.size(msg);
if (messageSize < 0) {
// Size may be unknow so just use 0
messageSize = 0;
}
return messageSize + PENDING_WRITE_OVERHEAD;
}
/** /**
* Add the given {@code msg} and {@link ChannelPromise}. * Add the given {@code msg} and {@link ChannelPromise}.
*/ */
@ -86,11 +104,8 @@ public final class PendingWriteQueue {
} }
// It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering, // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
// we should add them to the queue and let removeAndFailAll() fail them later. // we should add them to the queue and let removeAndFailAll() fail them later.
int messageSize = estimatorHandle.size(msg); int messageSize = size(msg);
if (messageSize < 0) {
// Size may be unknow so just use 0
messageSize = 0;
}
PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise); PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
PendingWrite currentTail = tail; PendingWrite currentTail = tail;
if (currentTail == null) { if (currentTail == null) {

View File

@ -217,8 +217,8 @@ public class ChannelOutboundBufferTest {
} }
}); });
ch.config().setWriteBufferLowWaterMark(128); ch.config().setWriteBufferLowWaterMark(128 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD);
ch.config().setWriteBufferHighWaterMark(256); ch.config().setWriteBufferHighWaterMark(256 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD);
ch.write(buffer().writeZero(128)); ch.write(buffer().writeZero(128));
// Ensure exceeding the low watermark does not make channel unwritable. // Ensure exceeding the low watermark does not make channel unwritable.
@ -234,7 +234,8 @@ public class ChannelOutboundBufferTest {
// Ensure going down to the low watermark makes channel writable again by flushing the first write. // Ensure going down to the low watermark makes channel writable again by flushing the first write.
assertThat(ch.unsafe().outboundBuffer().remove(), is(true)); assertThat(ch.unsafe().outboundBuffer().remove(), is(true));
assertThat(ch.unsafe().outboundBuffer().remove(), is(true)); assertThat(ch.unsafe().outboundBuffer().remove(), is(true));
assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(), is(127L)); assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(),
is(127L + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD));
assertThat(buf.toString(), is("false true ")); assertThat(buf.toString(), is("false true "));
safeClose(ch); safeClose(ch);