diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 8c7839f2d6..29995bbf55 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -26,6 +26,7 @@ import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PromiseNotificationUtil; +import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -48,6 +49,15 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; *

*/ public final class ChannelOutboundBuffer { + // Assuming a 64-bit JVM: + // - 16 bytes object header + // - 8 reference fields + // - 2 long fields + // - 2 int fields + // - 1 boolean field + // - padding + static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = + SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96); private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); @@ -128,7 +138,7 @@ public final class ChannelOutboundBuffer { // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 - incrementPendingOutboundBytes(size, false); + incrementPendingOutboundBytes(entry.pendingSize, false); } /** @@ -783,7 +793,7 @@ public final class ChannelOutboundBuffer { static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; - entry.pendingSize = size; + entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD; entry.total = total; entry.promise = promise; return entry; diff --git a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java index 72dbbbc30d..fbe48ad121 100644 --- a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java +++ b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java @@ -18,6 +18,7 @@ package io.netty.channel; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.PromiseCombiner; +import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -28,6 +29,12 @@ import io.netty.util.internal.logging.InternalLoggerFactory; */ public final class PendingWriteQueue { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class); + // Assuming a 64-bit JVM: + // - 16 bytes object header + // - 4 reference fields + // - 1 long fields + private static final int PENDING_WRITE_OVERHEAD = + SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64); private final ChannelHandlerContext ctx; private final ChannelOutboundBuffer buffer; @@ -73,6 +80,17 @@ public final class PendingWriteQueue { return bytes; } + private int size(Object msg) { + // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering, + // we should add them to the queue and let removeAndFailAll() fail them later. + int messageSize = estimatorHandle.size(msg); + if (messageSize < 0) { + // Size may be unknow so just use 0 + messageSize = 0; + } + return messageSize + PENDING_WRITE_OVERHEAD; + } + /** * Add the given {@code msg} and {@link ChannelPromise}. */ @@ -86,11 +104,8 @@ public final class PendingWriteQueue { } // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering, // we should add them to the queue and let removeAndFailAll() fail them later. - int messageSize = estimatorHandle.size(msg); - if (messageSize < 0) { - // Size may be unknow so just use 0 - messageSize = 0; - } + int messageSize = size(msg); + PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise); PendingWrite currentTail = tail; if (currentTail == null) { diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java index e65dbd4599..42048f191d 100644 --- a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -217,8 +217,8 @@ public class ChannelOutboundBufferTest { } }); - ch.config().setWriteBufferLowWaterMark(128); - ch.config().setWriteBufferHighWaterMark(256); + ch.config().setWriteBufferLowWaterMark(128 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD); + ch.config().setWriteBufferHighWaterMark(256 + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD); ch.write(buffer().writeZero(128)); // Ensure exceeding the low watermark does not make channel unwritable. @@ -234,7 +234,8 @@ public class ChannelOutboundBufferTest { // Ensure going down to the low watermark makes channel writable again by flushing the first write. assertThat(ch.unsafe().outboundBuffer().remove(), is(true)); assertThat(ch.unsafe().outboundBuffer().remove(), is(true)); - assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(), is(127L)); + assertThat(ch.unsafe().outboundBuffer().totalPendingWriteBytes(), + is(127L + ChannelOutboundBuffer.CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD)); assertThat(buf.toString(), is("false true ")); safeClose(ch);