[#1832] - Channel writability change notifications sometimes fail to fire

This commit is contained in:
Bill Gallagher 2013-09-30 11:33:12 -04:00 committed by Norman Maurer
parent 512908f993
commit 20d5361403
3 changed files with 14 additions and 13 deletions

View File

@ -129,7 +129,7 @@ public final class ChannelOutboundBuffer {
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, true);
incrementPendingOutboundBytes(size);
}
private void addCapacity() {
@ -164,7 +164,7 @@ public final class ChannelOutboundBuffer {
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(int size, boolean fireEvent) {
void incrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
@ -183,9 +183,7 @@ public final class ChannelOutboundBuffer {
if (newWriteBufferSize > highWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
if (fireEvent) {
channel.pipeline().fireChannelWritabilityChanged();
}
channel.pipeline().fireChannelWritabilityChanged();
}
}
}
@ -194,7 +192,7 @@ public final class ChannelOutboundBuffer {
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(int size, boolean fireEvent) {
void decrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
@ -213,9 +211,7 @@ public final class ChannelOutboundBuffer {
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
if (fireEvent) {
channel.pipeline().fireChannelWritabilityChanged();
}
channel.pipeline().fireChannelWritabilityChanged();
}
}
}
@ -316,7 +312,7 @@ public final class ChannelOutboundBuffer {
safeRelease(msg);
promise.trySuccess();
decrementPendingOutboundBytes(size, true);
decrementPendingOutboundBytes(size);
return true;
}
@ -342,7 +338,7 @@ public final class ChannelOutboundBuffer {
safeRelease(msg);
safeFail(promise, cause);
decrementPendingOutboundBytes(size, true);
decrementPendingOutboundBytes(size);
return true;
}

View File

@ -642,7 +642,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size, false);
buffer.incrementPendingOutboundBytes(size);
}
}
executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));
@ -822,7 +822,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size, false);
buffer.decrementPendingOutboundBytes(size);
}
}
ctx.invokeWrite(msg, promise);

View File

@ -54,6 +54,8 @@ public class ReentrantChannelTest extends BaseChannelTest {
clientChannel.close().sync();
assertLog(
"WRITABILITY: writable=false\n" +
"WRITABILITY: writable=true\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
@ -91,6 +93,9 @@ public class ReentrantChannelTest extends BaseChannelTest {
clientChannel.close().sync();
assertLog(
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +