Add generic utility for enqueuing buffers with promises and dequeueing them in arbitrary byte ranges.
Motivation: Simplifies writing code that needs to merge or slice a sequence of buffer & promise pairs into chunks of arbitrary sizes. For example in HTTP2 we merge or split buffers across fixed-size DATA frame boundaries. Modifications: Add new utility class CoalescingBufferQueue Result: Following this change HTTP2 code will switch to use it instead of CompositeByteBuffer for DATA frame coalescing.
This commit is contained in:
parent
8a16081a93
commit
319d745a13
@ -0,0 +1,214 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2015 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||||
|
* or implied. See the License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import io.netty.util.internal.ObjectUtil;
|
||||||
|
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A FIFO queue of bytes where producers add bytes by repeatedly adding {@link ByteBuf} and consumers take bytes in
|
||||||
|
* arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes
|
||||||
|
* out in a single buffer. Conversely the producer may add larger buffers and the consumer could take the bytes in
|
||||||
|
* many small buffers.
|
||||||
|
*
|
||||||
|
* <p>Bytes are added and removed with promises. If the last byte of a buffer added with a promise is removed then
|
||||||
|
* that promise will complete when the promise passed to {@link #remove} completes.
|
||||||
|
*
|
||||||
|
* <p>This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols
|
||||||
|
* such as HTTP2.
|
||||||
|
*/
|
||||||
|
public final class CoalescingBufferQueue {
|
||||||
|
|
||||||
|
private final Channel channel;
|
||||||
|
private final ArrayDeque<Object> bufAndListenerPairs = new ArrayDeque<Object>();
|
||||||
|
private int readableBytes;
|
||||||
|
|
||||||
|
public CoalescingBufferQueue(Channel channel) {
|
||||||
|
this.channel = ObjectUtil.checkNotNull(channel, "channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a buffer to the end of the queue.
|
||||||
|
*/
|
||||||
|
public void add(ByteBuf buf) {
|
||||||
|
add(buf, (ChannelFutureListener) null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a buffer to the end of the queue and associate a promise with it that should be completed when
|
||||||
|
* all the buffers bytes have been consumed from the queue and written.
|
||||||
|
* @param buf to add to the tail of the queue
|
||||||
|
* @param promise to complete when all the bytes have been consumed and written, can be void.
|
||||||
|
*/
|
||||||
|
public void add(ByteBuf buf, ChannelPromise promise) {
|
||||||
|
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||||
|
// before we complete it's promise.
|
||||||
|
ObjectUtil.checkNotNull(promise, "promise");
|
||||||
|
add(buf, promise.isVoid() ? null : new ChannelPromiseNotifier(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a buffer to the end of the queue and associate a listener with it that should be completed when
|
||||||
|
* all the buffers bytes have been consumed from the queue and written.
|
||||||
|
* @param buf to add to the tail of the queue
|
||||||
|
* @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
|
||||||
|
*/
|
||||||
|
public void add(ByteBuf buf, ChannelFutureListener listener) {
|
||||||
|
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||||
|
// before we complete it's promise.
|
||||||
|
ObjectUtil.checkNotNull(buf, "buf");
|
||||||
|
if (readableBytes > Integer.MAX_VALUE - buf.readableBytes()) {
|
||||||
|
throw new IllegalStateException("buffer queue length overflow: " + readableBytes
|
||||||
|
+ " + " + buf.readableBytes());
|
||||||
|
}
|
||||||
|
bufAndListenerPairs.add(buf);
|
||||||
|
if (listener != null) {
|
||||||
|
bufAndListenerPairs.add(listener);
|
||||||
|
}
|
||||||
|
readableBytes += buf.readableBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
|
||||||
|
* fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
|
||||||
|
* completes.
|
||||||
|
*
|
||||||
|
* @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
|
||||||
|
* than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
|
||||||
|
* @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
|
||||||
|
* @return a {@link ByteBuf} composed of the enqueued buffers.
|
||||||
|
*/
|
||||||
|
public ByteBuf remove(int bytes, ChannelPromise aggregatePromise) {
|
||||||
|
if (bytes < 0) {
|
||||||
|
throw new IllegalArgumentException("bytes (expected >= 0): " + bytes);
|
||||||
|
}
|
||||||
|
ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise");
|
||||||
|
|
||||||
|
// Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
|
||||||
|
if (bufAndListenerPairs.isEmpty()) {
|
||||||
|
return Unpooled.EMPTY_BUFFER;
|
||||||
|
}
|
||||||
|
bytes = Math.min(bytes, readableBytes);
|
||||||
|
|
||||||
|
ByteBuf toReturn = null;
|
||||||
|
int originalBytes = bytes;
|
||||||
|
for (;;) {
|
||||||
|
Object entry = bufAndListenerPairs.poll();
|
||||||
|
if (entry == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (entry instanceof ChannelFutureListener) {
|
||||||
|
aggregatePromise.addListener((ChannelFutureListener) entry);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ByteBuf entryBuffer = (ByteBuf) entry;
|
||||||
|
if (entryBuffer.readableBytes() > bytes) {
|
||||||
|
// Add the buffer back to the queue as we can't consume all of it.
|
||||||
|
bufAndListenerPairs.addFirst(entryBuffer);
|
||||||
|
if (bytes > 0) {
|
||||||
|
// Take a slice of what we can consume and retain it.
|
||||||
|
toReturn = compose(toReturn, entryBuffer.readSlice(bytes).retain());
|
||||||
|
bytes = 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
toReturn = compose(toReturn, entryBuffer);
|
||||||
|
bytes -= entryBuffer.readableBytes();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
readableBytes -= originalBytes - bytes;
|
||||||
|
assert readableBytes >= 0;
|
||||||
|
return toReturn;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compose the current buffer with another.
|
||||||
|
*/
|
||||||
|
private ByteBuf compose(ByteBuf current, ByteBuf next) {
|
||||||
|
if (current == null) {
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
if (current instanceof CompositeByteBuf) {
|
||||||
|
CompositeByteBuf composite = (CompositeByteBuf) current;
|
||||||
|
composite.addComponent(next);
|
||||||
|
composite.writerIndex(composite.writerIndex() + next.readableBytes());
|
||||||
|
return composite;
|
||||||
|
}
|
||||||
|
// Create a composite buffer to accumulate this pair and potentially all the buffers
|
||||||
|
// in the queue. Using +2 as we have already dequeued current and next.
|
||||||
|
CompositeByteBuf composite = channel.alloc().compositeBuffer(bufAndListenerPairs.size() + 2);
|
||||||
|
composite.addComponent(current);
|
||||||
|
composite.addComponent(next);
|
||||||
|
return composite.writerIndex(current.readableBytes() + next.readableBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of readable bytes.
|
||||||
|
*/
|
||||||
|
public int readableBytes() {
|
||||||
|
return readableBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Are there pending buffers in the queue.
|
||||||
|
*/
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return bufAndListenerPairs.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release all buffers in the queue and complete all listeners and promises.
|
||||||
|
*/
|
||||||
|
public void releaseAndFailAll(Throwable cause) {
|
||||||
|
releaseAndCompleteAll(channel.newFailedFuture(cause));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void releaseAndCompleteAll(ChannelFuture future) {
|
||||||
|
readableBytes = 0;
|
||||||
|
Throwable pending = null;
|
||||||
|
for (;;) {
|
||||||
|
Object entry = bufAndListenerPairs.poll();
|
||||||
|
if (entry == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (entry instanceof ByteBuf) {
|
||||||
|
ReferenceCountUtil.safeRelease(entry);
|
||||||
|
} else {
|
||||||
|
((ChannelFutureListener) entry).operationComplete(future);
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
pending = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pending != null) {
|
||||||
|
throw new IllegalStateException(pending);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy all pending entries in this queue into the destination queue.
|
||||||
|
* @param dest to copy pending buffers to.
|
||||||
|
*/
|
||||||
|
public void copyTo(CoalescingBufferQueue dest) {
|
||||||
|
dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
|
||||||
|
dest.readableBytes += readableBytes;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,218 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2015 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||||
|
* or implied. See the License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link CoalescingBufferQueue}.
|
||||||
|
*/
|
||||||
|
public class CoalescingBufferQueueTest {
|
||||||
|
|
||||||
|
private ByteBuf cat;
|
||||||
|
private ByteBuf mouse;
|
||||||
|
private ByteBuf empty;
|
||||||
|
|
||||||
|
private ChannelPromise catPromise, emptyPromise;
|
||||||
|
private ChannelPromise voidPromise;
|
||||||
|
private ChannelFutureListener mouseListener;
|
||||||
|
|
||||||
|
private boolean mouseDone;
|
||||||
|
private boolean mouseSuccess;
|
||||||
|
|
||||||
|
private Channel channel = new EmbeddedChannel();
|
||||||
|
|
||||||
|
private CoalescingBufferQueue writeQueue = new CoalescingBufferQueue(channel);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
catPromise = newPromise();
|
||||||
|
mouseListener = new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
mouseDone = true;
|
||||||
|
mouseSuccess = future.isSuccess();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
emptyPromise = newPromise();
|
||||||
|
voidPromise = channel.voidPromise();
|
||||||
|
|
||||||
|
cat = Unpooled.wrappedBuffer("cat".getBytes(Charset.defaultCharset()));
|
||||||
|
mouse = Unpooled.wrappedBuffer("mouse".getBytes(Charset.defaultCharset()));
|
||||||
|
empty = Unpooled.buffer(0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
ReferenceCountUtil.safeRelease(cat);
|
||||||
|
ReferenceCountUtil.safeRelease(mouse);
|
||||||
|
ReferenceCountUtil.safeRelease(empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAggregateWithFullRead() {
|
||||||
|
writeQueue.add(cat, catPromise);
|
||||||
|
assertQueueSize(3, false);
|
||||||
|
writeQueue.add(mouse, mouseListener);
|
||||||
|
assertQueueSize(8, false);
|
||||||
|
DefaultChannelPromise aggregatePromise = newPromise();
|
||||||
|
assertEquals("catmouse", writeQueue.remove(8, aggregatePromise).toString(Charset.defaultCharset()));
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
assertFalse(catPromise.isSuccess());
|
||||||
|
assertFalse(mouseDone);
|
||||||
|
aggregatePromise.trySuccess();
|
||||||
|
assertTrue(catPromise.isSuccess());
|
||||||
|
assertTrue(mouseSuccess);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithVoidPromise() {
|
||||||
|
writeQueue.add(cat, voidPromise);
|
||||||
|
writeQueue.add(mouse, voidPromise);
|
||||||
|
writeQueue.add(empty, voidPromise);
|
||||||
|
assertQueueSize(8, false);
|
||||||
|
assertEquals("catm", writeQueue.remove(4, newPromise()).toString(Charset.defaultCharset()));
|
||||||
|
assertQueueSize(4, false);
|
||||||
|
assertEquals("ouse", writeQueue.remove(4, newPromise()).toString(Charset.defaultCharset()));
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAggregateWithPartialRead() {
|
||||||
|
writeQueue.add(cat, catPromise);
|
||||||
|
writeQueue.add(mouse, mouseListener);
|
||||||
|
DefaultChannelPromise aggregatePromise = newPromise();
|
||||||
|
assertEquals("catm", writeQueue.remove(4, aggregatePromise).toString(Charset.defaultCharset()));
|
||||||
|
assertQueueSize(4, false);
|
||||||
|
assertFalse(catPromise.isSuccess());
|
||||||
|
assertFalse(mouseDone);
|
||||||
|
aggregatePromise.trySuccess();
|
||||||
|
assertTrue(catPromise.isSuccess());
|
||||||
|
assertFalse(mouseDone);
|
||||||
|
|
||||||
|
aggregatePromise = newPromise();
|
||||||
|
assertEquals("ouse", writeQueue.remove(Integer.MAX_VALUE, aggregatePromise).toString(Charset.defaultCharset()));
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
assertFalse(mouseDone);
|
||||||
|
aggregatePromise.trySuccess();
|
||||||
|
assertTrue(mouseSuccess);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadExactAddedBufferSizeReturnsOriginal() {
|
||||||
|
writeQueue.add(cat, catPromise);
|
||||||
|
writeQueue.add(mouse, mouseListener);
|
||||||
|
DefaultChannelPromise aggregatePromise = newPromise();
|
||||||
|
assertSame(cat, writeQueue.remove(3, aggregatePromise));
|
||||||
|
assertFalse(catPromise.isSuccess());
|
||||||
|
aggregatePromise.trySuccess();
|
||||||
|
assertTrue(catPromise.isSuccess());
|
||||||
|
|
||||||
|
aggregatePromise = newPromise();
|
||||||
|
assertSame(mouse, writeQueue.remove(5, aggregatePromise));
|
||||||
|
assertFalse(mouseDone);
|
||||||
|
aggregatePromise.trySuccess();
|
||||||
|
assertTrue(mouseSuccess);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadEmptyQueueReturnsEmptyBuffer() {
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
DefaultChannelPromise aggregatePromise = newPromise();
|
||||||
|
assertEquals(0, writeQueue.remove(Integer.MAX_VALUE, aggregatePromise).readableBytes());
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReleaseAndFailAll() {
|
||||||
|
writeQueue.add(cat, catPromise);
|
||||||
|
writeQueue.add(mouse, mouseListener);
|
||||||
|
RuntimeException cause = new RuntimeException("ooops");
|
||||||
|
writeQueue.releaseAndFailAll(cause);
|
||||||
|
DefaultChannelPromise aggregatePromise = newPromise();
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
assertEquals(0, cat.refCnt());
|
||||||
|
assertEquals(0, mouse.refCnt());
|
||||||
|
assertSame(cause, catPromise.cause());
|
||||||
|
assertEquals(0, writeQueue.remove(Integer.MAX_VALUE, aggregatePromise).readableBytes());
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyBuffersAreCoalesced() {
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
writeQueue.add(cat, catPromise);
|
||||||
|
writeQueue.add(empty, emptyPromise);
|
||||||
|
assertQueueSize(3, false);
|
||||||
|
DefaultChannelPromise aggregatePromise = newPromise();
|
||||||
|
ByteBuf removed = writeQueue.remove(3, aggregatePromise);
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
assertEquals("cat", removed.toString(Charset.defaultCharset()));
|
||||||
|
assertFalse(catPromise.isSuccess());
|
||||||
|
assertFalse(emptyPromise.isSuccess());
|
||||||
|
aggregatePromise.trySuccess();
|
||||||
|
assertTrue(catPromise.isSuccess());
|
||||||
|
assertTrue(emptyPromise.isSuccess());
|
||||||
|
removed.release();
|
||||||
|
assertEquals(0, cat.refCnt());
|
||||||
|
assertEquals(0, empty.refCnt());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMerge() {
|
||||||
|
writeQueue.add(cat, catPromise);
|
||||||
|
CoalescingBufferQueue otherQueue = new CoalescingBufferQueue(channel);
|
||||||
|
otherQueue.add(mouse, mouseListener);
|
||||||
|
otherQueue.copyTo(writeQueue);
|
||||||
|
assertQueueSize(8, false);
|
||||||
|
DefaultChannelPromise aggregatePromise = newPromise();
|
||||||
|
assertEquals("catmouse", writeQueue.remove(8, aggregatePromise).toString(Charset.defaultCharset()));
|
||||||
|
assertQueueSize(0, true);
|
||||||
|
assertFalse(catPromise.isSuccess());
|
||||||
|
assertFalse(mouseDone);
|
||||||
|
aggregatePromise.trySuccess();
|
||||||
|
assertTrue(catPromise.isSuccess());
|
||||||
|
assertTrue(mouseSuccess);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DefaultChannelPromise newPromise() {
|
||||||
|
return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertQueueSize(int size, boolean isEmpty) {
|
||||||
|
assertEquals(size, writeQueue.readableBytes());
|
||||||
|
if (isEmpty) {
|
||||||
|
assertTrue(writeQueue.isEmpty());
|
||||||
|
} else {
|
||||||
|
assertFalse(writeQueue.isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user