From 79ab756fa337dd034e97df715065e8440b600656 Mon Sep 17 00:00:00 2001 From: nmittler Date: Fri, 20 Nov 2015 10:44:38 -0800 Subject: [PATCH] Use a single queue in UniformStreamByteDistributor Motivation: The UniformStreamByteDistributor currently processes all zero-length frames, regardless of add order. This means that we would always send HEADERS for all streams, possibly taking away bandwidth for streams that actually have data. Modifications: Empty frames are now treated the same as any other frame except that the algorithm will pop off the any empty frames at the head of the queue. Result: Empty frames require no extra processing. --- .../http2/UniformStreamByteDistributor.java | 59 ++++--------------- .../UniformStreamByteDistributorTest.java | 22 ++++--- 2 files changed, 23 insertions(+), 58 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java index c1bb73a195..3e9212f639 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java @@ -34,7 +34,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor private final Http2Connection.PropertyKey stateKey; private final Deque queue = new ArrayDeque(4); - private Deque emptyFrameQueue; /** * The minimum number of bytes that we will attempt to allocate to a stream. This is to @@ -88,34 +87,24 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor public boolean distribute(int maxBytes, Writer writer) throws Http2Exception { checkNotNull(writer, "writer"); - // First, write out any empty frames. - if (emptyFrameQueue != null) { - while (!emptyFrameQueue.isEmpty()) { - State state = emptyFrameQueue.remove(); - state.enqueued = false; - if (state.streamableBytes > 0) { - // Bytes have been added since it was queued. Add it to the regular queue. - state.addToQueue(); - } else { - // Still an empty frame, just write it. - state.write(0, writer); - } - } - } - final int size = queue.size(); - if (size == 0 || maxBytes <= 0) { + if (size == 0) { return totalStreamableBytes > 0; } final int chunkSize = max(minAllocationChunk, maxBytes / size); - // Write until the queue is empty or we've exhausted maxBytes. We need to check queue.isEmpty() - // here since the Writer could call updateStreamableBytes, which may alter the queue. + State state = queue.pollFirst(); do { - // Remove the head of the queue. - State state = queue.remove(); state.enqueued = false; + if (state.streamableBytes > 0 && maxBytes == 0) { + // Stop at the first state that can't send. Add this state back to the head of + // the queue. Note that empty frames at the head of the queue will always be + // written. + queue.addFirst(state); + state.enqueued = true; + break; + } // Allocate as much data as we can for this stream. int chunk = min(chunkSize, min(maxBytes, state.streamableBytes)); @@ -123,7 +112,7 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor // Write the allocated bytes and enqueue as necessary. state.write(chunk, writer); - } while (maxBytes > 0 && !queue.isEmpty()); + } while ((state = queue.pollFirst()) != null); return totalStreamableBytes > 0; } @@ -139,7 +128,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor final Http2Stream stream; int streamableBytes; boolean enqueued; - boolean previouslyOnMainQueue; State(Http2Stream stream) { this.stream = stream; @@ -179,28 +167,14 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor void addToQueue() { if (!enqueued) { enqueued = true; - if (streamableBytes == 0) { - // Add empty frames to the empty frame queue. - getOrCreateEmptyFrameQueue().addLast(this); - return; - } - - if (!previouslyOnMainQueue) { - // Add to the head the first time it's on the main queue. - previouslyOnMainQueue = true; - queue.addFirst(this); - } else { - queue.addLast(this); - } + queue.addLast(this); } } void removeFromQueue() { if (enqueued) { enqueued = false; - if (emptyFrameQueue != null && !emptyFrameQueue.remove(this)) { - queue.remove(this); - } + queue.remove(this); } } @@ -211,12 +185,5 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor // Clear the streamable bytes. updateStreamableBytes(0, false); } - - private Deque getOrCreateEmptyFrameQueue() { - if (emptyFrameQueue == null) { - emptyFrameQueue = new ArrayDeque(2); - } - return emptyFrameQueue; - } } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/UniformStreamByteDistributorTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/UniformStreamByteDistributorTest.java index b2115bb398..44010e48ca 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/UniformStreamByteDistributorTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/UniformStreamByteDistributorTest.java @@ -131,13 +131,11 @@ public class UniformStreamByteDistributorTest { setPriority(STREAM_C, STREAM_A, (short) 100, false); setPriority(STREAM_D, STREAM_A, (short) 100, false); - // Update in reverse order. This will yield a queue in the order A, B, C, D. This - // is due to the fact that when a stream is enqueued the first time, it is added to the - // head of the queue. - updateStream(STREAM_D, CHUNK_SIZE, true); - updateStream(STREAM_C, CHUNK_SIZE, true); - updateStream(STREAM_B, CHUNK_SIZE, true); + // Update the streams. updateStream(STREAM_A, CHUNK_SIZE, true); + updateStream(STREAM_B, CHUNK_SIZE, true); + updateStream(STREAM_C, CHUNK_SIZE, true); + updateStream(STREAM_D, CHUNK_SIZE, true); // Only write 3 * chunkSize, so that we'll only write to the first 3 streams. int written = 3 * CHUNK_SIZE; @@ -174,16 +172,16 @@ public class UniformStreamByteDistributorTest { } @Test - public void emptyFrameIsWritten() throws Http2Exception { - updateStream(STREAM_B, 0, true); + public void emptyFrameAtHeadIsWritten() throws Http2Exception { updateStream(STREAM_A, 10, true); + updateStream(STREAM_B, 0, true); + updateStream(STREAM_C, 0, true); + updateStream(STREAM_D, 10, true); - assertFalse(write(10)); + assertTrue(write(10)); verifyWrite(STREAM_A, 10); verifyWrite(STREAM_B, 0); - verifyNoMoreInteractions(writer); - - write(10); + verifyWrite(STREAM_C, 0); verifyNoMoreInteractions(writer); }