[#1832] - Channel writability change notifications sometimes fail to fire

This commit is contained in:
Bill Gallagher 2013-09-30 11:33:12 -04:00 committed by Norman Maurer
parent 6d09e57be7
commit 013ac44d3a
3 changed files with 14 additions and 13 deletions

View File

@ -119,7 +119,7 @@ public final class ChannelOutboundBuffer {
// increment pending bytes after adding message to the unflushed arrays. // increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619 // See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, true); incrementPendingOutboundBytes(size);
} }
private void addCapacity() { private void addCapacity() {
@ -154,7 +154,7 @@ public final class ChannelOutboundBuffer {
* Increment the pending bytes which will be written at some point. * Increment the pending bytes which will be written at some point.
* This method is thread-safe! * This method is thread-safe!
*/ */
void incrementPendingOutboundBytes(int size, boolean fireEvent) { void incrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method. // recycled while process this method.
Channel channel = this.channel; Channel channel = this.channel;
@ -173,18 +173,16 @@ public final class ChannelOutboundBuffer {
if (newWriteBufferSize > highWaterMark) { if (newWriteBufferSize > highWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) { if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
if (fireEvent) {
channel.pipeline().fireChannelWritabilityChanged(); channel.pipeline().fireChannelWritabilityChanged();
} }
} }
} }
}
/** /**
* Decrement the pending bytes which will be written at some point. * Decrement the pending bytes which will be written at some point.
* This method is thread-safe! * This method is thread-safe!
*/ */
void decrementPendingOutboundBytes(int size, boolean fireEvent) { void decrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method. // recycled while process this method.
Channel channel = this.channel; Channel channel = this.channel;
@ -203,12 +201,10 @@ public final class ChannelOutboundBuffer {
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) { if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
if (fireEvent) {
channel.pipeline().fireChannelWritabilityChanged(); channel.pipeline().fireChannelWritabilityChanged();
} }
} }
} }
}
private static long total(Object msg) { private static long total(Object msg) {
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
@ -272,7 +268,7 @@ public final class ChannelOutboundBuffer {
safeRelease(msg); safeRelease(msg);
promise.trySuccess(); promise.trySuccess();
decrementPendingOutboundBytes(size, true); decrementPendingOutboundBytes(size);
return true; return true;
} }
@ -298,7 +294,7 @@ public final class ChannelOutboundBuffer {
safeRelease(msg); safeRelease(msg);
safeFail(promise, cause); safeFail(promise, cause);
decrementPendingOutboundBytes(size, true); decrementPendingOutboundBytes(size);
return true; return true;
} }

View File

@ -705,7 +705,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already // Check for null as it may be set to null if the channel is closed already
if (buffer != null) { if (buffer != null) {
buffer.incrementPendingOutboundBytes(size, false); buffer.incrementPendingOutboundBytes(size);
} }
} }
executor.execute(WriteTask.newInstance(next, msg, size, flush, promise)); executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));
@ -885,7 +885,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer(); ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already // Check for null as it may be set to null if the channel is closed already
if (buffer != null) { if (buffer != null) {
buffer.decrementPendingOutboundBytes(size, false); buffer.decrementPendingOutboundBytes(size);
} }
} }
ctx.invokeWrite(msg, promise); ctx.invokeWrite(msg, promise);

View File

@ -48,6 +48,8 @@ public class ReentrantChannelTest extends BaseChannelTest {
clientChannel.close().sync(); clientChannel.close().sync();
assertLog( assertLog(
"WRITABILITY: writable=false\n" +
"WRITABILITY: writable=true\n" +
"WRITE\n" + "WRITE\n" +
"WRITABILITY: writable=false\n" + "WRITABILITY: writable=false\n" +
"FLUSH\n" + "FLUSH\n" +
@ -85,6 +87,9 @@ public class ReentrantChannelTest extends BaseChannelTest {
clientChannel.close().sync(); clientChannel.close().sync();
assertLog( assertLog(
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n" +
"WRITE\n" + "WRITE\n" +
"WRITABILITY: writable=false\n" + "WRITABILITY: writable=false\n" +
"FLUSH\n" + "FLUSH\n" +