diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 48a8c31aa7..393aad8246 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -129,7 +129,7 @@ public final class ChannelOutboundBuffer { // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 - incrementPendingOutboundBytes(size, true); + incrementPendingOutboundBytes(size); } private void addCapacity() { @@ -164,7 +164,7 @@ public final class ChannelOutboundBuffer { * Increment the pending bytes which will be written at some point. * This method is thread-safe! */ - void incrementPendingOutboundBytes(int size, boolean fireEvent) { + void incrementPendingOutboundBytes(int size) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; @@ -183,9 +183,7 @@ public final class ChannelOutboundBuffer { if (newWriteBufferSize > highWaterMark) { if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) { - if (fireEvent) { - channel.pipeline().fireChannelWritabilityChanged(); - } + channel.pipeline().fireChannelWritabilityChanged(); } } } @@ -194,7 +192,7 @@ public final class ChannelOutboundBuffer { * Decrement the pending bytes which will be written at some point. * This method is thread-safe! */ - void decrementPendingOutboundBytes(int size, boolean fireEvent) { + void decrementPendingOutboundBytes(int size) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; @@ -213,9 +211,7 @@ public final class ChannelOutboundBuffer { if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) { - if (fireEvent) { - channel.pipeline().fireChannelWritabilityChanged(); - } + channel.pipeline().fireChannelWritabilityChanged(); } } } @@ -316,7 +312,7 @@ public final class ChannelOutboundBuffer { safeRelease(msg); promise.trySuccess(); - decrementPendingOutboundBytes(size, true); + decrementPendingOutboundBytes(size); return true; } @@ -342,7 +338,7 @@ public final class ChannelOutboundBuffer { safeRelease(msg); safeFail(promise, cause); - decrementPendingOutboundBytes(size, true); + decrementPendingOutboundBytes(size); return true; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 38eab41398..352aa6426d 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -642,7 +642,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { - buffer.incrementPendingOutboundBytes(size, false); + buffer.incrementPendingOutboundBytes(size); } } executor.execute(WriteTask.newInstance(next, msg, size, flush, promise)); @@ -822,7 +822,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { - buffer.decrementPendingOutboundBytes(size, false); + buffer.decrementPendingOutboundBytes(size); } } ctx.invokeWrite(msg, promise); diff --git a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java index 67cd5fbe17..f0bf9bb308 100644 --- a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java +++ b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java @@ -54,6 +54,8 @@ public class ReentrantChannelTest extends BaseChannelTest { clientChannel.close().sync(); assertLog( + "WRITABILITY: writable=false\n" + + "WRITABILITY: writable=true\n" + "WRITE\n" + "WRITABILITY: writable=false\n" + "FLUSH\n" + @@ -91,6 +93,9 @@ public class ReentrantChannelTest extends BaseChannelTest { clientChannel.close().sync(); assertLog( + "WRITABILITY: writable=false\n" + + "FLUSH\n" + + "WRITABILITY: writable=true\n" + "WRITE\n" + "WRITABILITY: writable=false\n" + "FLUSH\n" +