diff --git a/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java new file mode 100644 index 0000000000..6512103d52 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java @@ -0,0 +1,214 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.ObjectUtil; + +import java.util.ArrayDeque; + +/** + * A FIFO queue of bytes where producers add bytes by repeatedly adding {@link ByteBuf} and consumers take bytes in + * arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes + * out in a single buffer. Conversely the producer may add larger buffers and the consumer could take the bytes in + * many small buffers. + * + *

Bytes are added and removed with promises. If the last byte of a buffer added with a promise is removed then + * that promise will complete when the promise passed to {@link #remove} completes. + * + *

This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols + * such as HTTP2. + */ +public final class CoalescingBufferQueue { + + private final Channel channel; + private final ArrayDeque bufAndListenerPairs = new ArrayDeque(); + private int readableBytes; + + public CoalescingBufferQueue(Channel channel) { + this.channel = ObjectUtil.checkNotNull(channel, "channel"); + } + + /** + * Add a buffer to the end of the queue. + */ + public void add(ByteBuf buf) { + add(buf, (ChannelFutureListener) null); + } + + /** + * Add a buffer to the end of the queue and associate a promise with it that should be completed when + * all the buffers bytes have been consumed from the queue and written. + * @param buf to add to the tail of the queue + * @param promise to complete when all the bytes have been consumed and written, can be void. + */ + public void add(ByteBuf buf, ChannelPromise promise) { + // buffers are added before promises so that we naturally 'consume' the entire buffer during removal + // before we complete it's promise. + ObjectUtil.checkNotNull(promise, "promise"); + add(buf, promise.isVoid() ? null : new ChannelPromiseNotifier(promise)); + } + + /** + * Add a buffer to the end of the queue and associate a listener with it that should be completed when + * all the buffers bytes have been consumed from the queue and written. + * @param buf to add to the tail of the queue + * @param listener to notify when all the bytes have been consumed and written, can be {@code null}. + */ + public void add(ByteBuf buf, ChannelFutureListener listener) { + // buffers are added before promises so that we naturally 'consume' the entire buffer during removal + // before we complete it's promise. + ObjectUtil.checkNotNull(buf, "buf"); + if (readableBytes > Integer.MAX_VALUE - buf.readableBytes()) { + throw new IllegalStateException("buffer queue length overflow: " + readableBytes + + " + " + buf.readableBytes()); + } + bufAndListenerPairs.add(buf); + if (listener != null) { + bufAndListenerPairs.add(listener); + } + readableBytes += buf.readableBytes(); + } + + /** + * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are + * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise} + * completes. + * + * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater + * than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned. + * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers. + * @return a {@link ByteBuf} composed of the enqueued buffers. + */ + public ByteBuf remove(int bytes, ChannelPromise aggregatePromise) { + if (bytes < 0) { + throw new IllegalArgumentException("bytes (expected >= 0): " + bytes); + } + ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise"); + + // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer. + if (bufAndListenerPairs.isEmpty()) { + return Unpooled.EMPTY_BUFFER; + } + bytes = Math.min(bytes, readableBytes); + + ByteBuf toReturn = null; + int originalBytes = bytes; + for (;;) { + Object entry = bufAndListenerPairs.poll(); + if (entry == null) { + break; + } + if (entry instanceof ChannelFutureListener) { + aggregatePromise.addListener((ChannelFutureListener) entry); + continue; + } + ByteBuf entryBuffer = (ByteBuf) entry; + if (entryBuffer.readableBytes() > bytes) { + // Add the buffer back to the queue as we can't consume all of it. + bufAndListenerPairs.addFirst(entryBuffer); + if (bytes > 0) { + // Take a slice of what we can consume and retain it. + toReturn = compose(toReturn, entryBuffer.readSlice(bytes).retain()); + bytes = 0; + } + break; + } else { + toReturn = compose(toReturn, entryBuffer); + bytes -= entryBuffer.readableBytes(); + } + } + readableBytes -= originalBytes - bytes; + assert readableBytes >= 0; + return toReturn; + } + + /** + * Compose the current buffer with another. + */ + private ByteBuf compose(ByteBuf current, ByteBuf next) { + if (current == null) { + return next; + } + if (current instanceof CompositeByteBuf) { + CompositeByteBuf composite = (CompositeByteBuf) current; + composite.addComponent(next); + composite.writerIndex(composite.writerIndex() + next.readableBytes()); + return composite; + } + // Create a composite buffer to accumulate this pair and potentially all the buffers + // in the queue. Using +2 as we have already dequeued current and next. + CompositeByteBuf composite = channel.alloc().compositeBuffer(bufAndListenerPairs.size() + 2); + composite.addComponent(current); + composite.addComponent(next); + return composite.writerIndex(current.readableBytes() + next.readableBytes()); + } + + /** + * The number of readable bytes. + */ + public int readableBytes() { + return readableBytes; + } + + /** + * Are there pending buffers in the queue. + */ + public boolean isEmpty() { + return bufAndListenerPairs.isEmpty(); + } + + /** + * Release all buffers in the queue and complete all listeners and promises. + */ + public void releaseAndFailAll(Throwable cause) { + releaseAndCompleteAll(channel.newFailedFuture(cause)); + } + + private void releaseAndCompleteAll(ChannelFuture future) { + readableBytes = 0; + Throwable pending = null; + for (;;) { + Object entry = bufAndListenerPairs.poll(); + if (entry == null) { + break; + } + try { + if (entry instanceof ByteBuf) { + ReferenceCountUtil.safeRelease(entry); + } else { + ((ChannelFutureListener) entry).operationComplete(future); + } + } catch (Throwable t) { + pending = t; + } + } + if (pending != null) { + throw new IllegalStateException(pending); + } + } + + /** + * Copy all pending entries in this queue into the destination queue. + * @param dest to copy pending buffers to. + */ + public void copyTo(CoalescingBufferQueue dest) { + dest.bufAndListenerPairs.addAll(bufAndListenerPairs); + dest.readableBytes += readableBytes; + } +} diff --git a/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java b/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java new file mode 100644 index 0000000000..33cd888ec6 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java @@ -0,0 +1,218 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.ImmediateEventExecutor; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; + +import java.nio.charset.Charset; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CoalescingBufferQueue}. + */ +public class CoalescingBufferQueueTest { + + private ByteBuf cat; + private ByteBuf mouse; + private ByteBuf empty; + + private ChannelPromise catPromise, emptyPromise; + private ChannelPromise voidPromise; + private ChannelFutureListener mouseListener; + + private boolean mouseDone; + private boolean mouseSuccess; + + private Channel channel = new EmbeddedChannel(); + + private CoalescingBufferQueue writeQueue = new CoalescingBufferQueue(channel); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + catPromise = newPromise(); + mouseListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + mouseDone = true; + mouseSuccess = future.isSuccess(); + } + }; + emptyPromise = newPromise(); + voidPromise = channel.voidPromise(); + + cat = Unpooled.wrappedBuffer("cat".getBytes(Charset.defaultCharset())); + mouse = Unpooled.wrappedBuffer("mouse".getBytes(Charset.defaultCharset())); + empty = Unpooled.buffer(0, 1); + } + + @After + public void tearDown() { + ReferenceCountUtil.safeRelease(cat); + ReferenceCountUtil.safeRelease(mouse); + ReferenceCountUtil.safeRelease(empty); + } + + @Test + public void testAggregateWithFullRead() { + writeQueue.add(cat, catPromise); + assertQueueSize(3, false); + writeQueue.add(mouse, mouseListener); + assertQueueSize(8, false); + DefaultChannelPromise aggregatePromise = newPromise(); + assertEquals("catmouse", writeQueue.remove(8, aggregatePromise).toString(Charset.defaultCharset())); + assertQueueSize(0, true); + assertFalse(catPromise.isSuccess()); + assertFalse(mouseDone); + aggregatePromise.trySuccess(); + assertTrue(catPromise.isSuccess()); + assertTrue(mouseSuccess); + } + + @Test + public void testWithVoidPromise() { + writeQueue.add(cat, voidPromise); + writeQueue.add(mouse, voidPromise); + writeQueue.add(empty, voidPromise); + assertQueueSize(8, false); + assertEquals("catm", writeQueue.remove(4, newPromise()).toString(Charset.defaultCharset())); + assertQueueSize(4, false); + assertEquals("ouse", writeQueue.remove(4, newPromise()).toString(Charset.defaultCharset())); + assertQueueSize(0, true); + } + + @Test + public void testAggregateWithPartialRead() { + writeQueue.add(cat, catPromise); + writeQueue.add(mouse, mouseListener); + DefaultChannelPromise aggregatePromise = newPromise(); + assertEquals("catm", writeQueue.remove(4, aggregatePromise).toString(Charset.defaultCharset())); + assertQueueSize(4, false); + assertFalse(catPromise.isSuccess()); + assertFalse(mouseDone); + aggregatePromise.trySuccess(); + assertTrue(catPromise.isSuccess()); + assertFalse(mouseDone); + + aggregatePromise = newPromise(); + assertEquals("ouse", writeQueue.remove(Integer.MAX_VALUE, aggregatePromise).toString(Charset.defaultCharset())); + assertQueueSize(0, true); + assertFalse(mouseDone); + aggregatePromise.trySuccess(); + assertTrue(mouseSuccess); + } + + @Test + public void testReadExactAddedBufferSizeReturnsOriginal() { + writeQueue.add(cat, catPromise); + writeQueue.add(mouse, mouseListener); + DefaultChannelPromise aggregatePromise = newPromise(); + assertSame(cat, writeQueue.remove(3, aggregatePromise)); + assertFalse(catPromise.isSuccess()); + aggregatePromise.trySuccess(); + assertTrue(catPromise.isSuccess()); + + aggregatePromise = newPromise(); + assertSame(mouse, writeQueue.remove(5, aggregatePromise)); + assertFalse(mouseDone); + aggregatePromise.trySuccess(); + assertTrue(mouseSuccess); + } + + @Test + public void testReadEmptyQueueReturnsEmptyBuffer() { + assertQueueSize(0, true); + DefaultChannelPromise aggregatePromise = newPromise(); + assertEquals(0, writeQueue.remove(Integer.MAX_VALUE, aggregatePromise).readableBytes()); + assertQueueSize(0, true); + } + + @Test + public void testReleaseAndFailAll() { + writeQueue.add(cat, catPromise); + writeQueue.add(mouse, mouseListener); + RuntimeException cause = new RuntimeException("ooops"); + writeQueue.releaseAndFailAll(cause); + DefaultChannelPromise aggregatePromise = newPromise(); + assertQueueSize(0, true); + assertEquals(0, cat.refCnt()); + assertEquals(0, mouse.refCnt()); + assertSame(cause, catPromise.cause()); + assertEquals(0, writeQueue.remove(Integer.MAX_VALUE, aggregatePromise).readableBytes()); + assertQueueSize(0, true); + } + + @Test + public void testEmptyBuffersAreCoalesced() { + assertQueueSize(0, true); + writeQueue.add(cat, catPromise); + writeQueue.add(empty, emptyPromise); + assertQueueSize(3, false); + DefaultChannelPromise aggregatePromise = newPromise(); + ByteBuf removed = writeQueue.remove(3, aggregatePromise); + assertQueueSize(0, true); + assertEquals("cat", removed.toString(Charset.defaultCharset())); + assertFalse(catPromise.isSuccess()); + assertFalse(emptyPromise.isSuccess()); + aggregatePromise.trySuccess(); + assertTrue(catPromise.isSuccess()); + assertTrue(emptyPromise.isSuccess()); + removed.release(); + assertEquals(0, cat.refCnt()); + assertEquals(0, empty.refCnt()); + } + + @Test + public void testMerge() { + writeQueue.add(cat, catPromise); + CoalescingBufferQueue otherQueue = new CoalescingBufferQueue(channel); + otherQueue.add(mouse, mouseListener); + otherQueue.copyTo(writeQueue); + assertQueueSize(8, false); + DefaultChannelPromise aggregatePromise = newPromise(); + assertEquals("catmouse", writeQueue.remove(8, aggregatePromise).toString(Charset.defaultCharset())); + assertQueueSize(0, true); + assertFalse(catPromise.isSuccess()); + assertFalse(mouseDone); + aggregatePromise.trySuccess(); + assertTrue(catPromise.isSuccess()); + assertTrue(mouseSuccess); + } + + private DefaultChannelPromise newPromise() { + return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); + } + + private void assertQueueSize(int size, boolean isEmpty) { + assertEquals(size, writeQueue.readableBytes()); + if (isEmpty) { + assertTrue(writeQueue.isEmpty()); + } else { + assertFalse(writeQueue.isEmpty()); + } + } +}