Use a single queue in UniformStreamByteDistributor

Motivation:

The UniformStreamByteDistributor currently processes all zero-length frames, regardless of add order. This means that we would always send HEADERS for all streams, possibly taking away bandwidth for streams that actually have data.

Modifications:

Empty frames are now treated the same as any other frame except that the algorithm will pop off the any empty frames at the head of the queue.

Result:

Empty frames require no extra processing.
This commit is contained in:
nmittler 2015-11-20 10:44:38 -08:00
parent cfcee5798d
commit 79ab756fa3
2 changed files with 23 additions and 58 deletions

View File

@ -34,7 +34,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
private final Http2Connection.PropertyKey stateKey;
private final Deque<State> queue = new ArrayDeque<State>(4);
private Deque<State> emptyFrameQueue;
/**
* The minimum number of bytes that we will attempt to allocate to a stream. This is to
@ -88,34 +87,24 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
checkNotNull(writer, "writer");
// First, write out any empty frames.
if (emptyFrameQueue != null) {
while (!emptyFrameQueue.isEmpty()) {
State state = emptyFrameQueue.remove();
state.enqueued = false;
if (state.streamableBytes > 0) {
// Bytes have been added since it was queued. Add it to the regular queue.
state.addToQueue();
} else {
// Still an empty frame, just write it.
state.write(0, writer);
}
}
}
final int size = queue.size();
if (size == 0 || maxBytes <= 0) {
if (size == 0) {
return totalStreamableBytes > 0;
}
final int chunkSize = max(minAllocationChunk, maxBytes / size);
// Write until the queue is empty or we've exhausted maxBytes. We need to check queue.isEmpty()
// here since the Writer could call updateStreamableBytes, which may alter the queue.
State state = queue.pollFirst();
do {
// Remove the head of the queue.
State state = queue.remove();
state.enqueued = false;
if (state.streamableBytes > 0 && maxBytes == 0) {
// Stop at the first state that can't send. Add this state back to the head of
// the queue. Note that empty frames at the head of the queue will always be
// written.
queue.addFirst(state);
state.enqueued = true;
break;
}
// Allocate as much data as we can for this stream.
int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
@ -123,7 +112,7 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
// Write the allocated bytes and enqueue as necessary.
state.write(chunk, writer);
} while (maxBytes > 0 && !queue.isEmpty());
} while ((state = queue.pollFirst()) != null);
return totalStreamableBytes > 0;
}
@ -139,7 +128,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
final Http2Stream stream;
int streamableBytes;
boolean enqueued;
boolean previouslyOnMainQueue;
State(Http2Stream stream) {
this.stream = stream;
@ -179,30 +167,16 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
void addToQueue() {
if (!enqueued) {
enqueued = true;
if (streamableBytes == 0) {
// Add empty frames to the empty frame queue.
getOrCreateEmptyFrameQueue().addLast(this);
return;
}
if (!previouslyOnMainQueue) {
// Add to the head the first time it's on the main queue.
previouslyOnMainQueue = true;
queue.addFirst(this);
} else {
queue.addLast(this);
}
}
}
void removeFromQueue() {
if (enqueued) {
enqueued = false;
if (emptyFrameQueue != null && !emptyFrameQueue.remove(this)) {
queue.remove(this);
}
}
}
void close() {
// Remove this state from the queue.
@ -211,12 +185,5 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
// Clear the streamable bytes.
updateStreamableBytes(0, false);
}
private Deque<State> getOrCreateEmptyFrameQueue() {
if (emptyFrameQueue == null) {
emptyFrameQueue = new ArrayDeque<State>(2);
}
return emptyFrameQueue;
}
}
}

View File

@ -131,13 +131,11 @@ public class UniformStreamByteDistributorTest {
setPriority(STREAM_C, STREAM_A, (short) 100, false);
setPriority(STREAM_D, STREAM_A, (short) 100, false);
// Update in reverse order. This will yield a queue in the order A, B, C, D. This
// is due to the fact that when a stream is enqueued the first time, it is added to the
// head of the queue.
updateStream(STREAM_D, CHUNK_SIZE, true);
updateStream(STREAM_C, CHUNK_SIZE, true);
updateStream(STREAM_B, CHUNK_SIZE, true);
// Update the streams.
updateStream(STREAM_A, CHUNK_SIZE, true);
updateStream(STREAM_B, CHUNK_SIZE, true);
updateStream(STREAM_C, CHUNK_SIZE, true);
updateStream(STREAM_D, CHUNK_SIZE, true);
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
int written = 3 * CHUNK_SIZE;
@ -174,16 +172,16 @@ public class UniformStreamByteDistributorTest {
}
@Test
public void emptyFrameIsWritten() throws Http2Exception {
updateStream(STREAM_B, 0, true);
public void emptyFrameAtHeadIsWritten() throws Http2Exception {
updateStream(STREAM_A, 10, true);
updateStream(STREAM_B, 0, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 10, true);
assertFalse(write(10));
assertTrue(write(10));
verifyWrite(STREAM_A, 10);
verifyWrite(STREAM_B, 0);
verifyNoMoreInteractions(writer);
write(10);
verifyWrite(STREAM_C, 0);
verifyNoMoreInteractions(writer);
}