diff --git a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java index de9839fa61..c15e9d5e39 100644 --- a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java @@ -48,15 +48,20 @@ public abstract class AbstractCoalescingBufferQueue { } /** - * Add a buffer to the front of the queue. + * Add a buffer to the front of the queue and associate a promise with it that should be completed when + * all the buffer's bytes have been consumed from the queue and written. + * @param buf to add to the head of the queue + * @param promise to complete when all the bytes have been consumed and written, can be void. */ public final void addFirst(ByteBuf buf, ChannelPromise promise) { - // Listener would be added here, but since it is null there is no need. The assumption is there is already a - // listener at the front of the queue, or there is a buffer at the front of the queue, which was spliced from - // buf via remove(). - bufAndListenerPairs.addFirst(new DelegatingChannelPromiseNotifier(promise)); - bufAndListenerPairs.addFirst(buf); + addFirst(buf, toChannelFutureListener(promise)); + } + private void addFirst(ByteBuf buf, ChannelFutureListener listener) { + if (listener != null) { + bufAndListenerPairs.addFirst(listener); + } + bufAndListenerPairs.addFirst(buf); incrementReadableBytes(buf.readableBytes()); } @@ -69,14 +74,14 @@ public abstract class AbstractCoalescingBufferQueue { /** * Add a buffer to the end of the queue and associate a promise with it that should be completed when - * all the buffers bytes have been consumed from the queue and written. + * all the buffer's bytes have been consumed from the queue and written. * @param buf to add to the tail of the queue * @param promise to complete when all the bytes have been consumed and written, can be void. */ public final void add(ByteBuf buf, ChannelPromise promise) { // buffers are added before promises so that we naturally 'consume' the entire buffer during removal // before we complete it's promise. - add(buf, promise.isVoid() ? null : (ChannelFutureListener) new DelegatingChannelPromiseNotifier(promise)); + add(buf, toChannelFutureListener(promise)); } /** @@ -343,4 +348,8 @@ public abstract class AbstractCoalescingBufferQueue { tracker.decrementPendingOutboundBytes(decrement); } } + + private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) { + return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise); + } } diff --git a/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java b/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java index cb02e888a1..2a5749b3ab 100644 --- a/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java +++ b/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java @@ -72,6 +72,48 @@ public class CoalescingBufferQueueTest { assertFalse(channel.finish()); } + @Test + public void testAddFirstPromiseRetained() { + writeQueue.add(cat, catPromise); + assertQueueSize(3, false); + writeQueue.add(mouse, mouseListener); + assertQueueSize(8, false); + ChannelPromise aggregatePromise = newPromise(); + assertEquals("catmous", dequeue(7, aggregatePromise)); + ByteBuf remainder = Unpooled.wrappedBuffer("mous".getBytes(CharsetUtil.US_ASCII)); + writeQueue.addFirst(remainder, aggregatePromise); + ChannelPromise aggregatePromise2 = newPromise(); + assertEquals("mouse", dequeue(5, aggregatePromise2)); + aggregatePromise2.setSuccess(); + assertTrue(catPromise.isSuccess()); + assertTrue(mouseSuccess); + assertEquals(0, cat.refCnt()); + assertEquals(0, mouse.refCnt()); + } + + @Test + public void testAddFirstVoidPromise() { + writeQueue.add(cat, catPromise); + assertQueueSize(3, false); + writeQueue.add(mouse, mouseListener); + assertQueueSize(8, false); + ChannelPromise aggregatePromise = newPromise(); + assertEquals("catmous", dequeue(7, aggregatePromise)); + ByteBuf remainder = Unpooled.wrappedBuffer("mous".getBytes(CharsetUtil.US_ASCII)); + writeQueue.addFirst(remainder, voidPromise); + ChannelPromise aggregatePromise2 = newPromise(); + assertEquals("mouse", dequeue(5, aggregatePromise2)); + aggregatePromise2.setSuccess(); + // Because we used a void promise above, we shouldn't complete catPromise until aggregatePromise is completed. + assertFalse(catPromise.isSuccess()); + assertTrue(mouseSuccess); + aggregatePromise.setSuccess(); + assertTrue(catPromise.isSuccess()); + assertTrue(mouseSuccess); + assertEquals(0, cat.refCnt()); + assertEquals(0, mouse.refCnt()); + } + @Test public void testAggregateWithFullRead() { writeQueue.add(cat, catPromise);