diff --git a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java index a1a0e56a8c..6e757b0dfd 100644 --- a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java +++ b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java @@ -217,11 +217,11 @@ public final class PendingWriteQueue { } private void recycle(PendingWrite write) { - PendingWrite next = write.next; + final PendingWrite next = write.next; + final long writeSize = write.size; - buffer.decrementPendingOutboundBytes(write.size); - write.recycle(); size --; + if (next == null) { // Handled last PendingWrite so rest head and tail head = tail = null; @@ -230,6 +230,9 @@ public final class PendingWriteQueue { head = next; assert size > 0; } + + write.recycle(); + buffer.decrementPendingOutboundBytes(writeSize); } private static void safeFail(ChannelPromise promise, Throwable cause) { diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index c3fdc1434e..f4229612fc 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -23,6 +23,11 @@ import io.netty.util.CharsetUtil; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + public class PendingWriteQueueTest { @Test @@ -86,6 +91,55 @@ public class PendingWriteQueueTest { }, 3); } + @Test + public void shouldFireChannelWritabilityChangedAfterRemoval() { + final AtomicReference ctxRef = new AtomicReference(); + final AtomicReference queueRef = new AtomicReference(); + final ByteBuf msg = Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII); + + final EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctxRef.set(ctx); + queueRef.set(new PendingWriteQueue(ctx)); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + final PendingWriteQueue queue = queueRef.get(); + + final ByteBuf msg = (ByteBuf) queue.current(); + if (msg == null) { + return; + } + + assertThat(msg.refCnt(), is(1)); + + // This call will trigger another channelWritabilityChanged() event because the number of + // pending bytes will go below the low watermark. + // + // If PendingWriteQueue.remove() did not remove the current entry before triggering + // channelWritabilityChanged() event, we will end up with attempting to remove the same + // element twice, resulting in the double release. + queue.remove(); + + assertThat(msg.refCnt(), is(0)); + } + }); + + channel.config().setWriteBufferLowWaterMark(1); + channel.config().setWriteBufferHighWaterMark(3); + + final PendingWriteQueue queue = queueRef.get(); + + // Trigger channelWritabilityChanged() by adding a message that's larger than the high watermark. + queue.add(msg, channel.newPromise()); + + channel.finish(); + + assertThat(msg.refCnt(), is(0)); + } + private static void assertWrite(ChannelHandler handler, int count) { final ByteBuf buffer = Unpooled.copiedBuffer("Test", CharsetUtil.US_ASCII); final EmbeddedChannel channel = new EmbeddedChannel(handler);