AbstractCoalescingBufferQueue addFirst void promise handling

Motivation:
AbstractCoalescingBufferQueue#add accounts for void promises, but AbstractCoalescingBufferQueue#addFirst does not. These methods should be consistent.

Modifications:
- AbstractCoalescingBufferQueue#addFirst should account for void promises and share code with AbstractCoalescingBufferQueue#add

Result:
More correct void promise handling in AbstractCoalescingBufferQueue.
This commit is contained in:
Scott Mitchell 2017-11-07 09:54:17 -08:00
parent 8c5eeb581e
commit 7511c15187
2 changed files with 59 additions and 8 deletions

View File

@ -48,15 +48,20 @@ public abstract class AbstractCoalescingBufferQueue {
}
/**
* Add a buffer to the front of the queue.
* Add a buffer to the front of the queue and associate a promise with it that should be completed when
* all the buffer's bytes have been consumed from the queue and written.
* @param buf to add to the head of the queue
* @param promise to complete when all the bytes have been consumed and written, can be void.
*/
public final void addFirst(ByteBuf buf, ChannelPromise promise) {
// Listener would be added here, but since it is null there is no need. The assumption is there is already a
// listener at the front of the queue, or there is a buffer at the front of the queue, which was spliced from
// buf via remove().
bufAndListenerPairs.addFirst(new DelegatingChannelPromiseNotifier(promise));
bufAndListenerPairs.addFirst(buf);
addFirst(buf, toChannelFutureListener(promise));
}
private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
if (listener != null) {
bufAndListenerPairs.addFirst(listener);
}
bufAndListenerPairs.addFirst(buf);
incrementReadableBytes(buf.readableBytes());
}
@ -69,14 +74,14 @@ public abstract class AbstractCoalescingBufferQueue {
/**
* Add a buffer to the end of the queue and associate a promise with it that should be completed when
* all the buffers bytes have been consumed from the queue and written.
* all the buffer's bytes have been consumed from the queue and written.
* @param buf to add to the tail of the queue
* @param promise to complete when all the bytes have been consumed and written, can be void.
*/
public final void add(ByteBuf buf, ChannelPromise promise) {
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
// before we complete it's promise.
add(buf, promise.isVoid() ? null : (ChannelFutureListener) new DelegatingChannelPromiseNotifier(promise));
add(buf, toChannelFutureListener(promise));
}
/**
@ -343,4 +348,8 @@ public abstract class AbstractCoalescingBufferQueue {
tracker.decrementPendingOutboundBytes(decrement);
}
}
private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
}
}

View File

@ -72,6 +72,48 @@ public class CoalescingBufferQueueTest {
assertFalse(channel.finish());
}
@Test
public void testAddFirstPromiseRetained() {
writeQueue.add(cat, catPromise);
assertQueueSize(3, false);
writeQueue.add(mouse, mouseListener);
assertQueueSize(8, false);
ChannelPromise aggregatePromise = newPromise();
assertEquals("catmous", dequeue(7, aggregatePromise));
ByteBuf remainder = Unpooled.wrappedBuffer("mous".getBytes(CharsetUtil.US_ASCII));
writeQueue.addFirst(remainder, aggregatePromise);
ChannelPromise aggregatePromise2 = newPromise();
assertEquals("mouse", dequeue(5, aggregatePromise2));
aggregatePromise2.setSuccess();
assertTrue(catPromise.isSuccess());
assertTrue(mouseSuccess);
assertEquals(0, cat.refCnt());
assertEquals(0, mouse.refCnt());
}
@Test
public void testAddFirstVoidPromise() {
writeQueue.add(cat, catPromise);
assertQueueSize(3, false);
writeQueue.add(mouse, mouseListener);
assertQueueSize(8, false);
ChannelPromise aggregatePromise = newPromise();
assertEquals("catmous", dequeue(7, aggregatePromise));
ByteBuf remainder = Unpooled.wrappedBuffer("mous".getBytes(CharsetUtil.US_ASCII));
writeQueue.addFirst(remainder, voidPromise);
ChannelPromise aggregatePromise2 = newPromise();
assertEquals("mouse", dequeue(5, aggregatePromise2));
aggregatePromise2.setSuccess();
// Because we used a void promise above, we shouldn't complete catPromise until aggregatePromise is completed.
assertFalse(catPromise.isSuccess());
assertTrue(mouseSuccess);
aggregatePromise.setSuccess();
assertTrue(catPromise.isSuccess());
assertTrue(mouseSuccess);
assertEquals(0, cat.refCnt());
assertEquals(0, mouse.refCnt());
}
@Test
public void testAggregateWithFullRead() {
writeQueue.add(cat, catPromise);