Use a single queue in UniformStreamByteDistributor
Motivation: The UniformStreamByteDistributor currently processes all zero-length frames, regardless of add order. This means that we would always send HEADERS for all streams, possibly taking away bandwidth for streams that actually have data. Modifications: Empty frames are now treated the same as any other frame except that the algorithm will pop off the any empty frames at the head of the queue. Result: Empty frames require no extra processing.
This commit is contained in:
parent
9118f94648
commit
9df16a6c54
@ -34,7 +34,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
|||||||
|
|
||||||
private final Http2Connection.PropertyKey stateKey;
|
private final Http2Connection.PropertyKey stateKey;
|
||||||
private final Deque<State> queue = new ArrayDeque<State>(4);
|
private final Deque<State> queue = new ArrayDeque<State>(4);
|
||||||
private Deque<State> emptyFrameQueue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The minimum number of bytes that we will attempt to allocate to a stream. This is to
|
* The minimum number of bytes that we will attempt to allocate to a stream. This is to
|
||||||
@ -88,34 +87,24 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
|||||||
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
|
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
|
||||||
checkNotNull(writer, "writer");
|
checkNotNull(writer, "writer");
|
||||||
|
|
||||||
// First, write out any empty frames.
|
|
||||||
if (emptyFrameQueue != null) {
|
|
||||||
while (!emptyFrameQueue.isEmpty()) {
|
|
||||||
State state = emptyFrameQueue.remove();
|
|
||||||
state.enqueued = false;
|
|
||||||
if (state.streamableBytes > 0) {
|
|
||||||
// Bytes have been added since it was queued. Add it to the regular queue.
|
|
||||||
state.addToQueue();
|
|
||||||
} else {
|
|
||||||
// Still an empty frame, just write it.
|
|
||||||
state.write(0, writer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final int size = queue.size();
|
final int size = queue.size();
|
||||||
if (size == 0 || maxBytes <= 0) {
|
if (size == 0) {
|
||||||
return totalStreamableBytes > 0;
|
return totalStreamableBytes > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
final int chunkSize = max(minAllocationChunk, maxBytes / size);
|
final int chunkSize = max(minAllocationChunk, maxBytes / size);
|
||||||
|
|
||||||
// Write until the queue is empty or we've exhausted maxBytes. We need to check queue.isEmpty()
|
State state = queue.pollFirst();
|
||||||
// here since the Writer could call updateStreamableBytes, which may alter the queue.
|
|
||||||
do {
|
do {
|
||||||
// Remove the head of the queue.
|
|
||||||
State state = queue.remove();
|
|
||||||
state.enqueued = false;
|
state.enqueued = false;
|
||||||
|
if (state.streamableBytes > 0 && maxBytes == 0) {
|
||||||
|
// Stop at the first state that can't send. Add this state back to the head of
|
||||||
|
// the queue. Note that empty frames at the head of the queue will always be
|
||||||
|
// written.
|
||||||
|
queue.addFirst(state);
|
||||||
|
state.enqueued = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Allocate as much data as we can for this stream.
|
// Allocate as much data as we can for this stream.
|
||||||
int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
|
int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
|
||||||
@ -123,7 +112,7 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
|||||||
|
|
||||||
// Write the allocated bytes and enqueue as necessary.
|
// Write the allocated bytes and enqueue as necessary.
|
||||||
state.write(chunk, writer);
|
state.write(chunk, writer);
|
||||||
} while (maxBytes > 0 && !queue.isEmpty());
|
} while ((state = queue.pollFirst()) != null);
|
||||||
|
|
||||||
return totalStreamableBytes > 0;
|
return totalStreamableBytes > 0;
|
||||||
}
|
}
|
||||||
@ -139,7 +128,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
|||||||
final Http2Stream stream;
|
final Http2Stream stream;
|
||||||
int streamableBytes;
|
int streamableBytes;
|
||||||
boolean enqueued;
|
boolean enqueued;
|
||||||
boolean previouslyOnMainQueue;
|
|
||||||
|
|
||||||
State(Http2Stream stream) {
|
State(Http2Stream stream) {
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
@ -179,28 +167,14 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
|||||||
void addToQueue() {
|
void addToQueue() {
|
||||||
if (!enqueued) {
|
if (!enqueued) {
|
||||||
enqueued = true;
|
enqueued = true;
|
||||||
if (streamableBytes == 0) {
|
queue.addLast(this);
|
||||||
// Add empty frames to the empty frame queue.
|
|
||||||
getOrCreateEmptyFrameQueue().addLast(this);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!previouslyOnMainQueue) {
|
|
||||||
// Add to the head the first time it's on the main queue.
|
|
||||||
previouslyOnMainQueue = true;
|
|
||||||
queue.addFirst(this);
|
|
||||||
} else {
|
|
||||||
queue.addLast(this);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void removeFromQueue() {
|
void removeFromQueue() {
|
||||||
if (enqueued) {
|
if (enqueued) {
|
||||||
enqueued = false;
|
enqueued = false;
|
||||||
if (emptyFrameQueue != null && !emptyFrameQueue.remove(this)) {
|
queue.remove(this);
|
||||||
queue.remove(this);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,12 +185,5 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
|||||||
// Clear the streamable bytes.
|
// Clear the streamable bytes.
|
||||||
updateStreamableBytes(0, false);
|
updateStreamableBytes(0, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Deque<State> getOrCreateEmptyFrameQueue() {
|
|
||||||
if (emptyFrameQueue == null) {
|
|
||||||
emptyFrameQueue = new ArrayDeque<State>(2);
|
|
||||||
}
|
|
||||||
return emptyFrameQueue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,13 +131,11 @@ public class UniformStreamByteDistributorTest {
|
|||||||
setPriority(STREAM_C, STREAM_A, (short) 100, false);
|
setPriority(STREAM_C, STREAM_A, (short) 100, false);
|
||||||
setPriority(STREAM_D, STREAM_A, (short) 100, false);
|
setPriority(STREAM_D, STREAM_A, (short) 100, false);
|
||||||
|
|
||||||
// Update in reverse order. This will yield a queue in the order A, B, C, D. This
|
// Update the streams.
|
||||||
// is due to the fact that when a stream is enqueued the first time, it is added to the
|
|
||||||
// head of the queue.
|
|
||||||
updateStream(STREAM_D, CHUNK_SIZE, true);
|
|
||||||
updateStream(STREAM_C, CHUNK_SIZE, true);
|
|
||||||
updateStream(STREAM_B, CHUNK_SIZE, true);
|
|
||||||
updateStream(STREAM_A, CHUNK_SIZE, true);
|
updateStream(STREAM_A, CHUNK_SIZE, true);
|
||||||
|
updateStream(STREAM_B, CHUNK_SIZE, true);
|
||||||
|
updateStream(STREAM_C, CHUNK_SIZE, true);
|
||||||
|
updateStream(STREAM_D, CHUNK_SIZE, true);
|
||||||
|
|
||||||
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
|
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
|
||||||
int written = 3 * CHUNK_SIZE;
|
int written = 3 * CHUNK_SIZE;
|
||||||
@ -174,16 +172,16 @@ public class UniformStreamByteDistributorTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void emptyFrameIsWritten() throws Http2Exception {
|
public void emptyFrameAtHeadIsWritten() throws Http2Exception {
|
||||||
updateStream(STREAM_B, 0, true);
|
|
||||||
updateStream(STREAM_A, 10, true);
|
updateStream(STREAM_A, 10, true);
|
||||||
|
updateStream(STREAM_B, 0, true);
|
||||||
|
updateStream(STREAM_C, 0, true);
|
||||||
|
updateStream(STREAM_D, 10, true);
|
||||||
|
|
||||||
assertFalse(write(10));
|
assertTrue(write(10));
|
||||||
verifyWrite(STREAM_A, 10);
|
verifyWrite(STREAM_A, 10);
|
||||||
verifyWrite(STREAM_B, 0);
|
verifyWrite(STREAM_B, 0);
|
||||||
verifyNoMoreInteractions(writer);
|
verifyWrite(STREAM_C, 0);
|
||||||
|
|
||||||
write(10);
|
|
||||||
verifyNoMoreInteractions(writer);
|
verifyNoMoreInteractions(writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user