From 013ac44d3a64d53ab9e131cb43124fcbc0873caf Mon Sep 17 00:00:00 2001 From: Bill Gallagher Date: Mon, 30 Sep 2013 11:33:12 -0400 Subject: [PATCH] [#1832] - Channel writability change notifications sometimes fail to fire --- .../netty/channel/ChannelOutboundBuffer.java | 18 +++++++----------- .../channel/DefaultChannelHandlerContext.java | 4 ++-- .../io/netty/channel/ReentrantChannelTest.java | 5 +++++ 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index eb4ff0ff03..809b769f5d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -119,7 +119,7 @@ public final class ChannelOutboundBuffer { // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 - incrementPendingOutboundBytes(size, true); + incrementPendingOutboundBytes(size); } private void addCapacity() { @@ -154,7 +154,7 @@ public final class ChannelOutboundBuffer { * Increment the pending bytes which will be written at some point. * This method is thread-safe! */ - void incrementPendingOutboundBytes(int size, boolean fireEvent) { + void incrementPendingOutboundBytes(int size) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; @@ -173,9 +173,7 @@ public final class ChannelOutboundBuffer { if (newWriteBufferSize > highWaterMark) { if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) { - if (fireEvent) { - channel.pipeline().fireChannelWritabilityChanged(); - } + channel.pipeline().fireChannelWritabilityChanged(); } } } @@ -184,7 +182,7 @@ public final class ChannelOutboundBuffer { * Decrement the pending bytes which will be written at some point. * This method is thread-safe! */ - void decrementPendingOutboundBytes(int size, boolean fireEvent) { + void decrementPendingOutboundBytes(int size) { // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // recycled while process this method. Channel channel = this.channel; @@ -203,9 +201,7 @@ public final class ChannelOutboundBuffer { if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) { - if (fireEvent) { - channel.pipeline().fireChannelWritabilityChanged(); - } + channel.pipeline().fireChannelWritabilityChanged(); } } } @@ -272,7 +268,7 @@ public final class ChannelOutboundBuffer { safeRelease(msg); promise.trySuccess(); - decrementPendingOutboundBytes(size, true); + decrementPendingOutboundBytes(size); return true; } @@ -298,7 +294,7 @@ public final class ChannelOutboundBuffer { safeRelease(msg); safeFail(promise, cause); - decrementPendingOutboundBytes(size, true); + decrementPendingOutboundBytes(size); return true; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 6c4460833d..db8c2dd59d 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -705,7 +705,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { - buffer.incrementPendingOutboundBytes(size, false); + buffer.incrementPendingOutboundBytes(size); } } executor.execute(WriteTask.newInstance(next, msg, size, flush, promise)); @@ -885,7 +885,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { - buffer.decrementPendingOutboundBytes(size, false); + buffer.decrementPendingOutboundBytes(size); } } ctx.invokeWrite(msg, promise); diff --git a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java index eb35ef8ee4..3f75017e3d 100644 --- a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java +++ b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java @@ -48,6 +48,8 @@ public class ReentrantChannelTest extends BaseChannelTest { clientChannel.close().sync(); assertLog( + "WRITABILITY: writable=false\n" + + "WRITABILITY: writable=true\n" + "WRITE\n" + "WRITABILITY: writable=false\n" + "FLUSH\n" + @@ -85,6 +87,9 @@ public class ReentrantChannelTest extends BaseChannelTest { clientChannel.close().sync(); assertLog( + "WRITABILITY: writable=false\n" + + "FLUSH\n" + + "WRITABILITY: writable=true\n" + "WRITE\n" + "WRITABILITY: writable=false\n" + "FLUSH\n" +