HTTP/2 Weighted Fair Queue Byte Distributor

Motivation:
PriorityStreamByteDistributor uses a homegrown algorithm which distributes bytes to nodes in the priority tree. PriorityStreamByteDistributor has no concept of goodput which may result in poor utilization of network resources. PriorityStreamByteDistributor also has performance issues related to the tree traversal approach and number of nodes that must be visited. There also exists some more proven algorithms from the resource scheduling domain which PriorityStreamByteDistributor does not employ.

Modifications:
- Introduce a new ByteDistributor which uses elements from weighted fair queue schedulers

Result:
StreamByteDistributor which is sensitive to priority and uses a more familiar distribution concept.
Fixes https://github.com/netty/netty/issues/4462
This commit is contained in:
Scott Mitchell 2015-12-02 12:43:26 -08:00
parent 8d4db050f3
commit 904e70a4d4
16 changed files with 1928 additions and 331 deletions

View File

@ -604,6 +604,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
this.markedWritable = isWritable;
}
@Override
public final boolean isWriteAllowed() {
return windowSize() >= 0;
}
abstract int windowSize();
abstract int initialWindowSize();

View File

@ -44,6 +44,13 @@ public interface StreamByteDistributor {
* Indicates whether or not there are frames pending for this stream.
*/
boolean hasFrame();
/**
* Determine if a write operation is allowed for this stream. This will typically take into account the
* stream's flow controller being non-negative.
* @return {@code true} if a write is allowed on this stream. {@code false} otherwise.
*/
boolean isWriteAllowed();
}
/**

View File

@ -43,8 +43,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
private long totalStreamableBytes;
public UniformStreamByteDistributor(Http2Connection connection) {
checkNotNull(connection, "connection");
// Add a state for the connection.
stateKey = connection.newKey();
Http2Stream connectionStream = connection.connectionStream();

View File

@ -0,0 +1,339 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.PriorityQueue;
import io.netty.util.internal.PriorityQueueNode;
import java.util.Queue;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;
/**
* A {@link StreamByteDistributor} that is sensitive to stream priority and uses
* <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
* bytes.
* <p>
* Inspiration for this distributor was taken from Linux's
* <a href="https://git.kernel.org/cgit/linux/kernel/git/stable/linux-stable.git/tree/Documentation/scheduler
* /sched-design-CFS.txt">Completely Fair Scheduler</a>
* to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
* an "ideal multi-tasking NIC".
* <p>
* Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
* relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
*/
public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
private final Http2Connection.PropertyKey stateKey;
private final State connectionState;
/**
* The minimum number of bytes that we will attempt to allocate to a stream. This is to
* help improve goodput on a per-stream basis.
*/
private int allocationQuantum = 1024;
public WeightedFairQueueByteDistributor(Http2Connection connection) {
stateKey = connection.newKey();
Http2Stream connectionStream = connection.connectionStream();
connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamAdded(Http2Stream stream) {
stream.setProperty(stateKey, new State(stream));
}
@Override
public void onWeightChanged(Http2Stream stream, short oldWeight) {
Http2Stream parent;
if (state(stream).activeCountForTree != 0 && (parent = stream.parent()) != null) {
state(parent).totalQueuedWeights += stream.weight() - oldWeight;
}
}
@Override
public void onStreamClosed(Http2Stream stream) {
state(stream).close();
}
@Override
public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
State state = state(stream);
if (state.activeCountForTree != 0) {
State pState = state(parent);
pState.offerAndInitializePseudoTime(state);
pState.isActiveCountChangeForTree(state.activeCountForTree);
}
}
}
@Override
public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
State state = state(stream);
if (state.activeCountForTree != 0) {
State pState = state(parent);
pState.remove(state);
pState.isActiveCountChangeForTree(-state.activeCountForTree);
}
}
}
});
}
@Override
public void updateStreamableBytes(StreamState state) {
state(state.stream()).updateStreamableBytes(state.streamableBytes(),
state.hasFrame() && state.isWriteAllowed());
}
@Override
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
checkNotNull(writer, "writer");
// As long as there is some active frame we should write at least 1 time.
if (connectionState.activeCountForTree == 0) {
return false;
}
// The goal is to write until we write all the allocated bytes or are no longer making progress.
// We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
// to be sent. Making progress means the active streams rooted at the connection stream has changed.
int oldIsActiveCountForTree;
do {
oldIsActiveCountForTree = connectionState.activeCountForTree;
// connectionState will never be active, so go right to its children.
maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
} while (connectionState.activeCountForTree != 0 &&
(maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
return connectionState.activeCountForTree != 0;
}
/**
* Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
* @param minAllocationChunk the amount of bytes that will be allocated to each stream. Must be > 0.
*/
public void allocationQuantum(int allocationQuantum) {
if (allocationQuantum <= 0) {
throw new IllegalArgumentException("allocationQuantum must be > 0");
}
this.allocationQuantum = allocationQuantum;
}
private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
if (state.active) {
int nsent = min(maxBytes, state.streamableBytes);
state.write(nsent, writer);
if (nsent == 0 && maxBytes != 0) {
// If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
// considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
// be allocated bytes when the parent stream can't utilize them. This may be as a result of the
// stream's flow control window being 0.
state.updateStreamableBytes(state.streamableBytes, false);
}
return nsent;
}
return distributeToChildren(maxBytes, writer, state);
}
/**
* It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
* the allocation algorithm is structured and can be explained in the following cases:
* <h3>For the recursive case</h3>
* If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
* allocation tree. If a node is active then it will not delegate to children and recursion ends.
* <h3>For the initial case</h3>
* We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
* has no active children we don't get into this method.
*/
private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
long oldTotalQueuedWeights = state.totalQueuedWeights;
State childState = state.poll();
State nextChildState = state.peek();
try {
assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
"nextChildState.pseudoTime(" + nextChildState.pseudoTimeToWrite + ") < " + " childState.pseudoTime(" +
childState.pseudoTimeToWrite + ")";
int nsent = distribute(nextChildState == null ? maxBytes :
min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
childState.stream.weight() / oldTotalQueuedWeights + allocationQuantum,
Integer.MAX_VALUE)
),
writer,
childState);
state.pseudoTime += nsent;
childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
return nsent;
} finally {
// Do in finally to ensure the internal state is not corrupted if an exception is thrown.
// The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
// the priority queue due to a write operation.
if (childState.activeCountForTree != 0) {
state.offer(childState);
}
}
}
private State state(Http2Stream stream) {
return stream.getProperty(stateKey);
}
/**
* For testing only!
*/
int streamableBytes(Http2Stream stream) {
return state(stream).streamableBytes;
}
/**
* The remote flow control state for a single stream.
*/
private final class State implements PriorityQueueNode<State> {
final Http2Stream stream;
private final Queue<State> queue;
int streamableBytes;
/**
* Count of nodes rooted at this sub tree with {@link #active} equal to {@code true}.
*/
int activeCountForTree;
private int priorityQueueIndex = INDEX_NOT_IN_QUEUE;
/**
* An estimate of when this node should be given the opportunity to write data.
*/
long pseudoTimeToWrite;
/**
* A pseudo time maintained for immediate children to base their {@link pseudoTimeToSend} off of.
*/
long pseudoTime;
long totalQueuedWeights;
boolean active;
State(Http2Stream stream) {
this(stream, 0);
}
State(Http2Stream stream, int initialSize) {
this.stream = stream;
queue = new PriorityQueue<State>(initialSize);
}
void write(int numBytes, Writer writer) throws Http2Exception {
try {
writer.write(stream, numBytes);
} catch (Throwable t) {
throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
}
}
void isActiveCountChangeForTree(int increment) {
assert activeCountForTree + increment >= 0;
activeCountForTree += increment;
if (!stream.isRoot()) {
State pState = state(stream.parent());
if (activeCountForTree == 0) {
pState.remove(this);
} else if (activeCountForTree - increment == 0) { // if frame count was 0 but is now not, then queue.
pState.offerAndInitializePseudoTime(this);
}
pState.isActiveCountChangeForTree(increment);
}
}
void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
if (this.active != isActive) {
isActiveCountChangeForTree(isActive ? 1 : -1);
this.active = isActive;
}
streamableBytes = newStreamableBytes;
}
/**
* Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
*/
void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
assert stream.id() != CONNECTION_STREAM_ID && nsent >= 0;
// If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
// and should use parentState.pseudoTime.
pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) +
nsent * totalQueuedWeights / stream.weight();
}
/**
* The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
* to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
* {@code state} to {@link #pseudoTime} of this node and then calls {@link #offer(State)}.
*/
void offerAndInitializePseudoTime(State state) {
state.pseudoTimeToWrite = pseudoTime;
offer(state);
}
void offer(State state) {
queue.offer(state);
totalQueuedWeights += state.stream.weight();
}
/**
* Must only be called if the queue is non-empty!
*/
State poll() {
State state = queue.poll();
// This method is only ever called if the queue is non-empty.
totalQueuedWeights -= state.stream.weight();
return state;
}
void remove(State state) {
if (queue.remove(state)) {
totalQueuedWeights -= state.stream.weight();
}
}
State peek() {
return queue.peek();
}
void close() {
updateStreamableBytes(0, false);
}
@Override
public int compareTo(State o) {
return MathUtil.compare(pseudoTimeToWrite, o.pseudoTimeToWrite);
}
@Override
public int priorityQueueIndex() {
return priorityQueueIndex;
}
@Override
public void priorityQueueIndex(int i) {
priorityQueueIndex = i;
}
}
}

View File

@ -656,6 +656,11 @@ public class PriorityStreamByteDistributorTest {
public boolean hasFrame() {
return hasFrame;
}
@Override
public boolean isWriteAllowed() {
return hasFrame;
}
});
}

View File

@ -190,6 +190,11 @@ public class UniformStreamByteDistributorTest {
}
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
updateStream(streamId, streamableBytes, hasFrame, hasFrame);
}
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame,
final boolean isWriteAllowed) {
final Http2Stream stream = stream(streamId);
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
@ -206,6 +211,11 @@ public class UniformStreamByteDistributorTest {
public boolean hasFrame() {
return hasFrame;
}
@Override
public boolean isWriteAllowed() {
return isWriteAllowed;
}
});
}

View File

@ -0,0 +1,941 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class WeightedFairQueueByteDistributorTest {
private static final int STREAM_A = 1;
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
private static final int STREAM_D = 7;
private static final int STREAM_E = 9;
private static final int ALLOCATION_QUANTUM = 100;
private Http2Connection connection;
private WeightedFairQueueByteDistributor distributor;
@Mock
private StreamByteDistributor.Writer writer;
@Before
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
connection = new DefaultHttp2Connection(false);
distributor = new WeightedFairQueueByteDistributor(connection);
distributor.allocationQuantum(ALLOCATION_QUANTUM);
// Assume we always write all the allocated bytes.
doAnswer(writeAnswer()).when(writer).write(any(Http2Stream.class), anyInt());
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
Http2Stream streamC = connection.local().createStream(STREAM_C, false);
Http2Stream streamD = connection.local().createStream(STREAM_D, false);
streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
}
private Answer<Void> writeAnswer() {
return new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
Http2Stream stream = in.getArgumentAt(0, Http2Stream.class);
int numBytes = in.getArgumentAt(1, Integer.class);
int streamableBytes = distributor.streamableBytes(stream) - numBytes;
updateStream(stream.id(), streamableBytes, streamableBytes > 0);
return null;
}
};
}
@Test
public void bytesUnassignedAfterProcessing() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 4);
assertFalse(write(10));
verifyAnyWrite(STREAM_A, 1);
verifyAnyWrite(STREAM_B, 1);
verifyAnyWrite(STREAM_C, 1);
verifyAnyWrite(STREAM_D, 1);
}
@Test
public void connectionErrorForWriterException() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
Exception fakeException = new RuntimeException("Fake exception");
doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3));
try {
write(10);
fail("Expected an exception");
} catch (Http2Exception e) {
assertFalse(Http2Exception.isStreamError(e));
assertEquals(Http2Error.INTERNAL_ERROR, e.error());
assertSame(fakeException, e.getCause());
}
verifyWrite(atMost(1), STREAM_A, 1);
verifyWrite(atMost(1), STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(atMost(1), STREAM_D, 4);
doAnswer(writeAnswer()).when(writer).write(same(stream(STREAM_C)), eq(3));
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
verifyWrite(times(2), STREAM_C, 3);
verifyWrite(STREAM_D, 4);
}
/**
* In this test, we verify that each stream is allocated a minimum chunk size. When bytes
* run out, the remaining streams will be next in line for the next iteration.
*/
@Test
public void minChunkShouldBeAllocatedPerStream() throws Http2Exception {
// Re-assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, STREAM_A, (short) 100, false);
setPriority(STREAM_D, STREAM_A, (short) 100, false);
// Update the streams.
updateStream(STREAM_A, ALLOCATION_QUANTUM, true);
updateStream(STREAM_B, ALLOCATION_QUANTUM, true);
updateStream(STREAM_C, ALLOCATION_QUANTUM, true);
updateStream(STREAM_D, ALLOCATION_QUANTUM, true);
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
int written = 3 * ALLOCATION_QUANTUM;
assertTrue(write(written));
assertEquals(ALLOCATION_QUANTUM, captureWrites(STREAM_A));
assertEquals(ALLOCATION_QUANTUM, captureWrites(STREAM_B));
assertEquals(ALLOCATION_QUANTUM, captureWrites(STREAM_C));
verifyWrite(atMost(1), STREAM_D, 0);
// Now write again and verify that the last stream is written to.
assertFalse(write(ALLOCATION_QUANTUM));
assertEquals(ALLOCATION_QUANTUM, captureWrites(STREAM_A));
assertEquals(ALLOCATION_QUANTUM, captureWrites(STREAM_B));
assertEquals(ALLOCATION_QUANTUM, captureWrites(STREAM_C));
assertEquals(ALLOCATION_QUANTUM, captureWrites(STREAM_D));
}
/**
* In this test, we verify that the highest priority frame which has 0 bytes to send, but an empty frame is able
* to send that empty frame.
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* |
* A
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void emptyFrameAtHeadIsWritten() throws Http2Exception {
updateStream(STREAM_A, 0, true);
updateStream(STREAM_B, 0, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 10, true);
stream(STREAM_B).setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 0);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 10);
}
/**
* In this test, we block A which allows bytes to be written by C and D. Here's a view of the tree (stream A is
* blocked).
*
* <pre>
* 0
* / \
* [A] B
* / \
* C D
* </pre>
*/
@Test
public void blockedStreamNoDataShouldSpreadDataToChildren() throws Http2Exception {
blockedStreamShouldSpreadDataToChildren(false);
}
/**
* In this test, we block A and also give it an empty data frame to send.
* All bytes should be delegated to by C and D. Here's a view of the tree (stream A is blocked).
*
* <pre>
* 0
* / \
* [A](0) B
* / \
* C D
* </pre>
*/
@Test
public void blockedStreamWithDataAndNotAllowedToSendShouldSpreadDataToChildren() throws Http2Exception {
// A cannot stream.
updateStream(STREAM_A, 0, true, false);
blockedStreamShouldSpreadDataToChildren(false);
}
/**
* In this test, we allow A to send, but expect the flow controller will only write to the stream 1 time.
* This is because we give the stream a chance to write its empty frame 1 time, and the stream will not
* be written to again until a update stream is called.
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void streamWithZeroFlowControlWindowAndDataShouldWriteOnlyOnce() throws Http2Exception {
updateStream(STREAM_A, 0, true, true);
blockedStreamShouldSpreadDataToChildren(true);
// Make sure if we call update stream again, A should write 1 more time.
updateStream(STREAM_A, 0, true, true);
assertFalse(write(1));
verifyWrite(times(2), STREAM_A, 0);
// Try to write again, but since no updateStream A should not write again
assertFalse(write(1));
verifyWrite(times(2), STREAM_A, 0);
}
private void blockedStreamShouldSpreadDataToChildren(boolean streamAShouldWriteZero) throws Http2Exception {
updateStream(STREAM_B, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
if (streamAShouldWriteZero) {
verifyWrite(STREAM_A, 0);
} else {
verifyNeverWrite(STREAM_A);
}
verifyWrite(atMost(1), STREAM_C, 0);
verifyWrite(atMost(1), STREAM_D, 0);
// B is entirely written
verifyWrite(STREAM_B, 10);
// Now test that writes get delegated from A (which is blocked) to its children
assertTrue(write(5));
if (streamAShouldWriteZero) {
verifyWrite(times(1), STREAM_A, 0);
} else {
verifyNeverWrite(STREAM_A);
}
verifyWrite(STREAM_D, 5);
verifyWrite(atMost(1), STREAM_C, 0);
assertTrue(write(5));
if (streamAShouldWriteZero) {
verifyWrite(times(1), STREAM_A, 0);
} else {
verifyNeverWrite(STREAM_A);
}
assertEquals(10, captureWrites(STREAM_C) + captureWrites(STREAM_D));
assertTrue(write(5));
assertFalse(write(5));
if (streamAShouldWriteZero) {
verifyWrite(times(1), STREAM_A, 0);
} else {
verifyNeverWrite(STREAM_A);
}
verifyWrite(times(2), STREAM_C, 5);
verifyWrite(times(2), STREAM_D, 5);
}
/**
* In this test, we block B which allows all bytes to be written by A. A should not share the data with its children
* since it's not blocked.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
// A is assigned all of the bytes.
verifyWrite(STREAM_A, 10);
verifyNeverWrite(STREAM_B);
verifyWrite(atMost(1), STREAM_C, 0);
verifyWrite(atMost(1), STREAM_D, 0);
}
/**
* In this test, we block B which allows all bytes to be written by A. Once A is complete, it will spill over the
* remaining of its portion to its children.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 5, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
verifyWrite(STREAM_A, 5);
verifyNeverWrite(STREAM_B);
verifyWrite(STREAM_C, 5);
verifyNeverWrite(STREAM_D);
assertFalse(write(15));
verifyAnyWrite(STREAM_A, 1);
verifyNeverWrite(STREAM_B);
verifyWrite(times(2), STREAM_C, 5);
verifyWrite(STREAM_D, 10);
}
/**
* In this test, we verify re-prioritizing a stream. We start out with B blocked:
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*
* We then re-prioritize D so that it's directly off of the connection and verify that A and D split the written
* bytes between them.
*
* <pre>
* 0
* /|\
* / | \
* A [B] D
* /
* C
* </pre>
*/
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Re-prioritize D as a direct child of the connection.
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
assertTrue(write(10));
verifyWrite(STREAM_A, 10);
verifyNeverWrite(STREAM_B);
verifyNeverWrite(STREAM_C);
verifyWrite(atMost(1), STREAM_D, 0);
assertFalse(write(20));
verifyAnyWrite(STREAM_A, 1);
verifyNeverWrite(STREAM_B);
verifyWrite(STREAM_C, 10);
verifyWrite(STREAM_D, 10);
}
/**
* Test that the maximum allowed amount the flow controller allows to be sent is always fully allocated if
* the streams have at least this much data to send. See https://github.com/netty/netty/issues/4266.
* <pre>
* 0
* / | \
* / | \
* A(0) B(0) C(0)
* /
* D(> allowed to send in 1 allocation attempt)
* </pre>
*/
@Test
public void unstreamableParentsShouldFeedHungryChildren() throws Http2Exception {
// Setup the priority tree.
setPriority(STREAM_A, 0, (short) 32, false);
setPriority(STREAM_B, 0, (short) 16, false);
setPriority(STREAM_C, 0, (short) 16, false);
setPriority(STREAM_D, STREAM_A, (short) 16, false);
final int writableBytes = 100;
// Send enough so it can not be completely written out
final int expectedUnsentAmount = 1;
updateStream(STREAM_D, writableBytes + expectedUnsentAmount, true);
assertTrue(write(writableBytes));
verifyWrite(STREAM_D, writableBytes);
assertFalse(write(expectedUnsentAmount));
verifyWrite(STREAM_D, expectedUnsentAmount);
}
/**
* In this test, we root all streams at the connection, and then verify that data is split appropriately based on
* weight (all available data is the same).
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Root the streams at the connection and assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
updateStream(STREAM_A, 1000, true);
updateStream(STREAM_B, 1000, true);
updateStream(STREAM_C, 1000, true);
updateStream(STREAM_D, 1000, true);
// Set allocation quantum to 1 so it is easier to see the ratio of total bytes written between each stream.
distributor.allocationQuantum(1);
assertTrue(write(1000));
assertEquals(100, captureWrites(STREAM_A));
assertEquals(450, captureWrites(STREAM_B));
assertEquals(225, captureWrites(STREAM_C));
assertEquals(225, captureWrites(STREAM_D));
}
/**
* In this test, we root all streams at the connection, block streams C and D, and then verify that data is
* prioritized toward stream B which has a higher weight than stream A.
* <p>
* We also verify that the amount that is written is not uniform, and not always the allocation quantum.
*
* <pre>
* 0
* / / \ \
* A B [C] [D]
* </pre>
*/
@Test
public void writeShouldFavorPriority() throws Http2Exception {
// Root the streams at the connection and assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
updateStream(STREAM_A, 1000, true);
updateStream(STREAM_B, 1000, true);
updateStream(STREAM_C, 1000, false);
updateStream(STREAM_D, 1000, false);
// Set allocation quantum to 1 so it is easier to see the ratio of total bytes written between each stream.
distributor.allocationQuantum(1);
assertTrue(write(100));
assertEquals(20, captureWrites(STREAM_A));
verifyWrite(times(20), STREAM_A, 1);
assertEquals(80, captureWrites(STREAM_B));
verifyWrite(times(0), STREAM_B, 1);
verifyNeverWrite(STREAM_C);
verifyNeverWrite(STREAM_D);
assertTrue(write(100));
assertEquals(40, captureWrites(STREAM_A));
verifyWrite(times(40), STREAM_A, 1);
assertEquals(160, captureWrites(STREAM_B));
verifyWrite(atMost(1), STREAM_B, 1);
verifyNeverWrite(STREAM_C);
verifyNeverWrite(STREAM_D);
assertTrue(write(1050));
assertEquals(250, captureWrites(STREAM_A));
verifyWrite(times(250), STREAM_A, 1);
assertEquals(1000, captureWrites(STREAM_B));
verifyWrite(atMost(2), STREAM_B, 1);
verifyNeverWrite(STREAM_C);
verifyNeverWrite(STREAM_D);
assertFalse(write(750));
assertEquals(1000, captureWrites(STREAM_A));
verifyWrite(times(1), STREAM_A, 750);
assertEquals(1000, captureWrites(STREAM_B));
verifyWrite(times(0), STREAM_B, 0);
verifyNeverWrite(STREAM_C);
verifyNeverWrite(STREAM_D);
}
/**
* In this test, we root all streams at the connection, and then verify that data is split equally among the stream,
* since they all have the same weight.
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void samePriorityShouldDistributeBasedOnData() throws Http2Exception {
// Root the streams at the connection with the same weights.
setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 700, true);
// Set allocation quantum to 1 so it is easier to see the ratio of total bytes written between each stream.
distributor.allocationQuantum(1);
assertTrue(write(999));
assertEquals(333, captureWrites(STREAM_A));
assertEquals(333, captureWrites(STREAM_B));
verifyWrite(times(1), STREAM_C, 0);
assertEquals(333, captureWrites(STREAM_D));
}
/**
* In this test, we call distribute with 0 bytes and verify that all streams with 0 bytes are written.
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* |
* [A]
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void zeroDistributeShouldWriteAllZeroFrames() throws Http2Exception {
updateStream(STREAM_A, 400, false);
updateStream(STREAM_B, 0, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 0, true);
stream(STREAM_B).setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
assertFalse(write(0));
verifyNeverWrite(STREAM_A);
verifyWrite(STREAM_B, 0);
verifyAnyWrite(STREAM_B, 1);
verifyWrite(STREAM_C, 0);
verifyAnyWrite(STREAM_C, 1);
verifyWrite(STREAM_D, 0);
verifyAnyWrite(STREAM_D, 1);
}
/**
* In this test, we call distribute with 100 bytes which is the total amount eligible to be written, and also have
* streams with 0 bytes to write. All of these streams should be written with a single call to distribute.
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* |
* [A]
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void nonZeroDistributeShouldWriteAllZeroFramesIfAllEligibleDataIsWritten() throws Http2Exception {
updateStream(STREAM_A, 400, false);
updateStream(STREAM_B, 100, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 0, true);
stream(STREAM_B).setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
assertFalse(write(100));
verifyNeverWrite(STREAM_A);
verifyWrite(STREAM_B, 100);
verifyAnyWrite(STREAM_B, 1);
verifyWrite(STREAM_C, 0);
verifyAnyWrite(STREAM_C, 1);
verifyWrite(STREAM_D, 0);
verifyAnyWrite(STREAM_D, 1);
}
/**
* In this test, we shift the priority tree and verify priority bytes for each subtree are correct
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* |
* A
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void bytesDistributedWithRestructureShouldBeCorrect() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
stream(STREAM_B).setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
assertTrue(write(500));
assertEquals(400, captureWrites(STREAM_A));
verifyWrite(STREAM_B, 100);
verifyNeverWrite(STREAM_C);
verifyNeverWrite(STREAM_D);
assertTrue(write(400));
assertEquals(400, captureWrites(STREAM_A));
assertEquals(500, captureWrites(STREAM_B));
verifyWrite(atMost(1), STREAM_C, 0);
verifyWrite(atMost(1), STREAM_D, 0);
assertFalse(write(1300));
assertEquals(400, captureWrites(STREAM_A));
assertEquals(500, captureWrites(STREAM_B));
assertEquals(600, captureWrites(STREAM_C));
assertEquals(700, captureWrites(STREAM_D));
}
/**
* In this test, we add a node to the priority tree and verify
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* / \
* A B
* |
* E
* / \
* C D
* </pre>
*/
@Test
public void bytesDistributedWithAdditionShouldBeCorrect() throws Http2Exception {
Http2Stream streamE = connection.local().createStream(STREAM_E, false);
streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
streamSizes.put(STREAM_E, (Integer) 900);
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
updateStream(STREAM_E, 900, true);
assertTrue(write(900));
assertEquals(400, captureWrites(STREAM_A));
assertEquals(500, captureWrites(STREAM_B));
verifyNeverWrite(STREAM_C);
verifyNeverWrite(STREAM_D);
verifyWrite(atMost(1), STREAM_E, 0);
assertTrue(write(900));
assertEquals(400, captureWrites(STREAM_A));
assertEquals(500, captureWrites(STREAM_B));
verifyWrite(atMost(1), STREAM_C, 0);
verifyWrite(atMost(1), STREAM_D, 0);
assertEquals(900, captureWrites(STREAM_E));
assertFalse(write(1301));
assertEquals(400, captureWrites(STREAM_A));
assertEquals(500, captureWrites(STREAM_B));
assertEquals(600, captureWrites(STREAM_C));
assertEquals(700, captureWrites(STREAM_D));
assertEquals(900, captureWrites(STREAM_E));
}
/**
* In this test, we close an internal stream in the priority tree but tree should not change.
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void bytesDistributedShouldBeCorrectWithInternalStreamClose() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
stream(STREAM_A).close();
assertTrue(write(500));
verifyNeverWrite(STREAM_A);
assertEquals(200, captureWrites(STREAM_B));
assertEquals(300, captureWrites(STREAM_C) + captureWrites(STREAM_D));
assertFalse(write(1300));
verifyNeverWrite(STREAM_A);
assertEquals(500, captureWrites(STREAM_B));
assertEquals(600, captureWrites(STREAM_C));
assertEquals(700, captureWrites(STREAM_D));
}
/**
* In this test, we close a leaf stream in the priority tree and verify distribution.
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the close:
* <pre>
* 0
* / \
* A B
* |
* D
* </pre>
*/
@Test
public void bytesDistributedShouldBeCorrectWithLeafStreamClose() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
stream(STREAM_C).close();
assertTrue(write(900));
assertEquals(400, captureWrites(STREAM_A));
assertEquals(500, captureWrites(STREAM_B));
verifyNeverWrite(STREAM_C);
verifyWrite(atMost(1), STREAM_D, 0);
assertFalse(write(700));
assertEquals(400, captureWrites(STREAM_A));
assertEquals(500, captureWrites(STREAM_B));
verifyNeverWrite(STREAM_C);
assertEquals(700, captureWrites(STREAM_D));
}
private boolean write(int numBytes) throws Http2Exception {
return distributor.distribute(numBytes, writer);
}
private void verifyWrite(int streamId, int numBytes) {
verify(writer).write(same(stream(streamId)), eq(numBytes));
}
private void verifyWrite(VerificationMode mode, int streamId, int numBytes) {
verify(writer, mode).write(same(stream(streamId)), eq(numBytes));
}
private void verifyAnyWrite(int streamId, int times) {
verify(writer, times(times)).write(same(stream(streamId)), anyInt());
}
private void verifyNeverWrite(int streamId) {
verify(writer, never()).write(same(stream(streamId)), anyInt());
}
private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception {
stream(streamId).setPriority(parent, (short) weight, exclusive);
}
private Http2Stream stream(int streamId) {
return connection.stream(streamId);
}
private int captureWrites(int streamId) {
ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
verify(writer, atLeastOnce()).write(same(stream(streamId)), captor.capture());
int total = 0;
for (Integer x : captor.getAllValues()) {
total += x;
}
return total;
}
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
updateStream(streamId, streamableBytes, hasFrame, hasFrame);
}
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame,
final boolean isWriteAllowed) {
final Http2Stream stream = stream(streamId);
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
public Http2Stream stream() {
return stream;
}
@Override
public int streamableBytes() {
return streamableBytes;
}
@Override
public boolean hasFrame() {
return hasFrame;
}
@Override
public boolean isWriteAllowed() {
return isWriteAllowed;
}
});
}
}

View File

@ -47,4 +47,22 @@ public final class MathUtil {
public static boolean isOutOfBounds(int index, int length, int capacity) {
return (index | length | (index + length) | (capacity - (index + length))) < 0;
}
/**
* Compare to {@code long} values.
* @param x the first {@code long} to compare.
* @param y the second {@code long} to compare.
* @return
* <ul>
* <li>0 if {@code x == y}</li>
* <li>{@code > 0} if {@code x > y}</li>
* <li>{@code < 0} if {@code x < y}</li>
* </ul>
*/
public static int compare(long x, long y) {
if (PlatformDependent.javaVersion() < 7) {
return (x < y) ? -1 : (x > y) ? 1 : 0;
}
return Long.compare(x, y);
}
}

View File

@ -0,0 +1,255 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static io.netty.util.internal.PriorityQueueNode.INDEX_NOT_IN_QUEUE;
/**
* A priority queue which uses natural ordering of elements. Elements are also required to be of type
* {@link PriorityQueueNode} for the purpose of maintaining the index in the priority queue.
* @param <T> The object that is maintained in the queue.
*/
public final class PriorityQueue<T extends PriorityQueueNode<T>> extends AbstractQueue<T> implements Queue<T> {
@SuppressWarnings("rawtypes")
private static final PriorityQueueNode[] EMPTY_QUEUE = new PriorityQueueNode[0];
private T[] queue;
private int size;
public PriorityQueue() {
this(8);
}
@SuppressWarnings("unchecked")
public PriorityQueue(int initialSize) {
queue = (T[]) (initialSize != 0 ? new PriorityQueueNode[initialSize] : EMPTY_QUEUE);
}
@Override
public int size() {
return size;
}
@Override
public boolean isEmpty() {
return size == 0;
}
@Override
public boolean contains(Object o) {
if (!(o instanceof PriorityQueueNode)) {
return false;
}
PriorityQueueNode<?> node = (PriorityQueueNode<?>) o;
int i = node.priorityQueueIndex();
return i >= 0 && i < size && node.equals(queue[i]);
}
@Override
public void clear() {
for (int i = 0; i < size; ++i) {
T node = queue[i];
if (node != null) {
node.priorityQueueIndex(INDEX_NOT_IN_QUEUE);
queue[i] = null;
}
}
size = 0;
}
@Override
public boolean offer(T e) {
checkNotNull(e, "e");
if (e.priorityQueueIndex() != INDEX_NOT_IN_QUEUE) {
throw new IllegalArgumentException("e.priorityQueueIndex(): " + e.priorityQueueIndex() +
" (expected: " + INDEX_NOT_IN_QUEUE + ")");
}
// Check that the array capacity is enough to hold values by doubling capacity.
if (size >= queue.length) {
// Use a policy which allows for a 0 initial capacity. Same policy as JDK's priority queue, double when
// "small", then grow by 50% when "large".
queue = Arrays.copyOf(queue, queue.length + ((queue.length < 64) ?
(queue.length + 2) :
(queue.length >>> 1)));
}
bubbleUp(size++, e);
return true;
}
@Override
public T poll() {
if (size == 0) {
return null;
}
T result = queue[0];
result.priorityQueueIndex(INDEX_NOT_IN_QUEUE);
T last = queue[--size];
queue[size] = null;
if (size != 0) { // Make sure we don't add the last element back.
bubbleDown(0, last);
}
return result;
}
@Override
public T peek() {
return (size == 0) ? null : queue[0];
}
@Override
public boolean remove(Object o) {
if (!contains(o)) {
return false;
}
@SuppressWarnings("unchecked")
T node = (T) o;
int i = node.priorityQueueIndex();
node.priorityQueueIndex(INDEX_NOT_IN_QUEUE);
if (--size == 0 || size == i) {
// If there are no node left, or this is the last node in the array just remove and return.
queue[i] = null;
return true;
}
// Move the last element where node currently lives in the array.
T moved = queue[i] = queue[size];
queue[size] = null;
// priorityQueueIndex will be updated below in bubbleUp or bubbleDown
// Make sure the moved node still preserves the min-heap properties.
if (node.compareTo(moved) < 0) {
bubbleDown(i, moved);
} else {
bubbleUp(i, moved);
}
return true;
}
@Override
public Object[] toArray() {
return Arrays.copyOf(queue, size);
}
@SuppressWarnings("unchecked")
@Override
public <X> X[] toArray(X[] a) {
if (a.length < size) {
return (X[]) Arrays.copyOf(queue, size, a.getClass());
}
System.arraycopy(queue, 0, a, 0, size);
if (a.length > size) {
a[size] = null;
}
return a;
}
/**
* This iterator does not return elements in any particular order.
*/
@Override
public Iterator<T> iterator() {
return new PriorityQueueIterator();
}
private final class PriorityQueueIterator implements Iterator<T> {
private int index;
@Override
public boolean hasNext() {
return index < size;
}
@Override
public T next() {
if (index >= size) {
throw new NoSuchElementException();
}
return queue[index++];
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove");
}
}
private void bubbleDown(int k, T node) {
final int half = size >>> 1;
while (k < half) {
// Compare node to the children of index k.
int iChild = (k << 1) + 1;
T child = queue[iChild];
// Make sure we get the smallest child to compare against.
int rightChild = iChild + 1;
if (rightChild < size && child.compareTo(queue[rightChild]) > 0) {
child = queue[iChild = rightChild];
}
// If the bubbleDown node is less than or equal to the smallest child then we will preserve the min-heap
// property by inserting the bubbleDown node here.
if (node.compareTo(child) <= 0) {
break;
}
// Bubble the child up.
queue[k] = child;
child.priorityQueueIndex(k);
// Move down k down the tree for the next iteration.
k = iChild;
}
// We have found where node should live and still satisfy the min-heap property, so put it in the queue.
queue[k] = node;
node.priorityQueueIndex(k);
}
private void bubbleUp(int k, T node) {
while (k > 0) {
int iParent = (k - 1) >>> 1;
T parent = queue[iParent];
// If the bubbleUp node is less than the parent, then we have found a spot to insert and still maintain
// min-heap properties.
if (node.compareTo(parent) >= 0) {
break;
}
// Bubble the parent down.
queue[k] = parent;
parent.priorityQueueIndex(k);
// Move k up the tree for the next iteration.
k = iParent;
}
// We have found where node should live and still satisfy the min-heap property, so put it in the queue.
queue[k] = node;
node.priorityQueueIndex(k);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
/**
* Provides methods for {@link PriorityQueue} to maintain internal state. These methods should generally not be used
* outside the scope of {@link PriorityQueue}.
* @param <T> The type which will be queued in {@link PriorityQueue}.
*/
public interface PriorityQueueNode<T> extends Comparable<T> {
/**
* This should be used to initialize the storage returned by {@link #priorityQueueIndex()}.
*/
int INDEX_NOT_IN_QUEUE = -1;
/**
* Get the last value set by {@link #priorityQueueIndex(int)}.
* <p>
* Throwing exceptions from this method will result in undefined behavior.
*/
int priorityQueueIndex();
/**
* Used by {@link PriorityQueue} to maintain state for an element in the queue.
* <p>
* Throwing exceptions from this method will result in undefined behavior.
* @param i The index as used by {@link PriorityQueue}.
*/
void priorityQueueIndex(int i);
}

View File

@ -0,0 +1,218 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class PriorityQueueTest {
@Test
public void testPoll() {
PriorityQueue<TestElement> queue = new PriorityQueue<TestElement>(0);
assertEmptyQueue(queue);
TestElement a = new TestElement(5);
TestElement b = new TestElement(10);
TestElement c = new TestElement(2);
TestElement d = new TestElement(7);
TestElement e = new TestElement(6);
assertOffer(queue, a);
assertOffer(queue, b);
assertOffer(queue, c);
assertOffer(queue, d);
// Remove the first element
assertSame(c, queue.peek());
assertSame(c, queue.poll());
assertEquals(3, queue.size());
// Test that offering another element preserves the priority queue semantics.
assertOffer(queue, e);
assertEquals(4, queue.size());
assertSame(a, queue.peek());
assertSame(a, queue.poll());
assertEquals(3, queue.size());
// Keep removing the remaining elements
assertSame(e, queue.peek());
assertSame(e, queue.poll());
assertEquals(2, queue.size());
assertSame(d, queue.peek());
assertSame(d, queue.poll());
assertEquals(1, queue.size());
assertSame(b, queue.peek());
assertSame(b, queue.poll());
assertEmptyQueue(queue);
}
@Test
public void testClear() {
PriorityQueue<TestElement> queue = new PriorityQueue<TestElement>(0);
assertEmptyQueue(queue);
TestElement a = new TestElement(5);
TestElement b = new TestElement(10);
TestElement c = new TestElement(2);
TestElement d = new TestElement(6);
assertOffer(queue, a);
assertOffer(queue, b);
assertOffer(queue, c);
assertOffer(queue, d);
queue.clear();
assertEmptyQueue(queue);
// Test that elements can be re-inserted after the clear operation
assertOffer(queue, a);
assertSame(a, queue.peek());
assertOffer(queue, b);
assertSame(a, queue.peek());
assertOffer(queue, c);
assertSame(c, queue.peek());
assertOffer(queue, d);
assertSame(c, queue.peek());
}
@Test
public void testRemoval() {
PriorityQueue<TestElement> queue = new PriorityQueue<TestElement>(4);
assertEmptyQueue(queue);
TestElement a = new TestElement(5);
TestElement b = new TestElement(10);
TestElement c = new TestElement(2);
TestElement d = new TestElement(6);
TestElement notInQueue = new TestElement(-1);
assertOffer(queue, a);
assertOffer(queue, b);
assertOffer(queue, c);
assertOffer(queue, d);
// Remove an element that isn't in the queue.
assertFalse(queue.remove(notInQueue));
assertSame(c, queue.peek());
assertEquals(4, queue.size());
// Remove the last element in the array, when the array is non-empty.
assertTrue(queue.remove(b));
assertSame(c, queue.peek());
assertEquals(3, queue.size());
// Re-insert the element after removal
assertOffer(queue, b);
assertSame(c, queue.peek());
assertEquals(4, queue.size());
// Repeat remove the last element in the array, when the array is non-empty.
assertTrue(queue.remove(b));
assertSame(c, queue.peek());
assertEquals(3, queue.size());
// Remove the head of the queue.
assertTrue(queue.remove(c));
assertSame(a, queue.peek());
assertEquals(2, queue.size());
assertTrue(queue.remove(a));
assertSame(d, queue.peek());
assertEquals(1, queue.size());
assertTrue(queue.remove(d));
assertEmptyQueue(queue);
}
@Test
public void testZeroInitialSize() {
PriorityQueue<TestElement> queue = new PriorityQueue<TestElement>(0);
assertEmptyQueue(queue);
TestElement e = new TestElement(1);
assertOffer(queue, e);
assertSame(e, queue.peek());
assertEquals(1, queue.size());
assertFalse(queue.isEmpty());
assertSame(e, queue.poll());
assertEmptyQueue(queue);
}
private static void assertOffer(PriorityQueue<TestElement> queue, TestElement a) {
assertTrue(queue.offer(a));
assertTrue(queue.contains(a));
try { // An element can not be inserted more than 1 time.
queue.offer(a);
fail();
} catch (IllegalArgumentException ignored) {
// ignored
}
}
private static void assertEmptyQueue(PriorityQueue<TestElement> queue) {
assertNull(queue.peek());
assertNull(queue.poll());
assertEquals(0, queue.size());
assertTrue(queue.isEmpty());
}
private static final class TestElement implements Comparable<TestElement>, PriorityQueueNode<TestElement> {
int value;
private int priorityQueueIndex = INDEX_NOT_IN_QUEUE;
public TestElement(int value) {
this.value = value;
}
@Override
public int compareTo(TestElement o) {
return value - o.value;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof TestElement)) {
return false;
}
return ((TestElement) o).value == value;
}
@Override
public int hashCode() {
return value;
}
@Override
public int priorityQueueIndex() {
return priorityQueueIndex;
}
@Override
public void priorityQueueIndex(int i) {
priorityQueueIndex = i;
}
}
}

View File

@ -14,54 +14,44 @@
*/
package io.netty.microbench.http2;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2RemoteFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamVisitor;
import io.netty.handler.codec.http2.PriorityStreamByteDistributor;
import io.netty.handler.codec.http2.StreamByteDistributor;
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import java.net.SocketAddress;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
/**
* Benchmark to compare stream byte distribution algorithms when priorities are identical for
* all streams.
*/
@Fork(1)
@Threads(1)
@State(Scope.Benchmark)
public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark {
public enum Algorithm {
PRIORITY,
WFQ,
UNIFORM
}
@ -75,14 +65,16 @@ public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark
private Algorithm algorithm;
private Http2Connection connection;
private Http2Connection.PropertyKey dataRefresherKey;
private Http2RemoteFlowController controller;
private StreamByteDistributor distributor;
private AdditionalCounters counters;
private Http2ConnectionHandler handler;
private ChannelHandlerContext ctx;
public NoPriorityByteDistributionBenchmark() {
super(true);
}
/**
* Additional counters for a single iteration.
@ -90,11 +82,11 @@ public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
private int minWriteSize;
private int maxWriteSize;
private long totalBytes;
private long numWrites;
private int invocations;
int minWriteSize;
int maxWriteSize;
long totalBytes;
long numWrites;
int invocations;
public int minWriteSize() {
return minWriteSize;
@ -121,8 +113,13 @@ public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark
}
};
@TearDown(Level.Trial)
public void tearDownTrial() throws Exception {
ctx.close();
}
@Setup(Level.Trial)
public void setupTrial() throws Http2Exception {
public void setupTrial() throws Exception {
connection = new DefaultHttp2Connection(false);
dataRefresherKey = connection.newKey();
@ -131,12 +128,28 @@ public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark
case PRIORITY:
distributor = new PriorityStreamByteDistributor(connection);
break;
case WFQ:
distributor = new WeightedFairQueueByteDistributor(connection);
break;
case UNIFORM:
distributor = new UniformStreamByteDistributor(connection);
break;
}
controller = new DefaultHttp2RemoteFlowController(connection, new ByteCounter(distributor));
controller.channelHandlerContext(new TestContext());
connection.remote().flowController(controller);
handler = new Http2ConnectionHandler.Builder()
.encoderEnforceMaxConcurrentStreams(false).validateHeaders(false)
.frameListener(new Http2FrameAdapter())
.build(connection);
ctx = new EmbeddedChannelWriteReleaseHandlerContext(
PooledByteBufAllocator.DEFAULT, handler) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
};
handler.handlerAdded(ctx);
handler.channelActive(ctx);
// Create the streams, each initialized with MAX_INT bytes.
for (int i = 0; i < numStreams; ++i) {
@ -282,297 +295,4 @@ public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark
}
}
}
private static class TestContext implements ChannelHandlerContext {
private Channel channel = new TestChannel();
@Override
public Channel channel() {
return channel;
}
@Override
public EventExecutor executor() {
return channel.eventLoop();
}
@Override
public String name() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandler handler() {
throw new UnsupportedOperationException();
}
@Override
public boolean isRemoved() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelActive() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelInactive() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture disconnect() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture close() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture deregister() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture close(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext read() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture write(Object msg) {
return channel.newSucceededFuture();
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return promise;
}
@Override
public ChannelHandlerContext flush() {
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return promise;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return null;
}
@Override
public ChannelPipeline pipeline() {
return channel.pipeline();
}
@Override
public ByteBufAllocator alloc() {
return channel.alloc();
}
@Override
public ChannelPromise newPromise() {
return channel.newPromise();
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return channel.newProgressivePromise();
}
@Override
public ChannelFuture newSucceededFuture() {
return channel.newSucceededFuture();
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return channel.newFailedFuture(cause);
}
@Override
public ChannelPromise voidPromise() {
return channel.voidPromise();
}
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
return channel.attr(key);
}
@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
return channel.hasAttr(key);
}
}
private static class TestChannel extends AbstractChannel {
private static final ChannelMetadata TEST_METADATA = new ChannelMetadata(false);
private DefaultChannelConfig config = new DefaultChannelConfig(this);
private class TestUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
}
}
public TestChannel() {
super(null);
config.setWriteBufferHighWaterMark(Integer.MAX_VALUE);
config.setWriteBufferLowWaterMark(Integer.MAX_VALUE);
}
@Override
public long bytesBeforeUnwritable() {
return Long.MAX_VALUE;
}
@Override
public boolean isWritable() {
return true;
}
@Override
public ChannelConfig config() {
return new DefaultChannelConfig(this);
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isActive() {
return true;
}
@Override
public ChannelMetadata metadata() {
return TEST_METADATA;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new TestUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return null;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception { }
@Override
protected void doDisconnect() throws Exception { }
@Override
protected void doClose() throws Exception { }
@Override
protected void doBeginRead() throws Exception { }
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { }
}
}

View File

@ -52,11 +52,30 @@ public class AbstractMicrobenchmark extends AbstractMicrobenchmarkBase {
}
}
@Override
protected String[] jvmArgs() {
return JVM_ARGS;
private final boolean disableAssertions;
private String[] jvmArgsWithNoAssertions;
public AbstractMicrobenchmark() {
this(false);
}
public AbstractMicrobenchmark(boolean disableAssertions) {
this.disableAssertions = disableAssertions;
}
@Override
protected String[] jvmArgs() {
if (!disableAssertions) {
return JVM_ARGS;
}
if (jvmArgsWithNoAssertions == null) {
jvmArgsWithNoAssertions = removeAssertions(JVM_ARGS);
}
return jvmArgsWithNoAssertions;
}
@Override
protected ChainedOptionsBuilder newOptionsBuilder() throws Exception {
ChainedOptionsBuilder runnerOptions = super.newOptionsBuilder();
if (getForks() > 0) {

View File

@ -20,6 +20,8 @@ import io.netty.util.ResourceLeakDetector;
import io.netty.util.internal.SystemPropertyUtil;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.openjdk.jmh.annotations.Measurement;
@ -83,6 +85,20 @@ public abstract class AbstractMicrobenchmarkBase {
protected abstract String[] jvmArgs();
protected static String[] removeAssertions(String[] jvmArgs) {
List<String> customArgs = new ArrayList<String>(jvmArgs.length);
for (String arg : jvmArgs) {
if (!arg.startsWith("-ea")) {
customArgs.add(arg);
}
}
if (jvmArgs.length != customArgs.size()) {
jvmArgs = new String[customArgs.size()];
customArgs.toArray(jvmArgs);
}
return jvmArgs;
}
@Test
public void run() throws Exception {
new Runner(newOptionsBuilder().build()).run();

View File

@ -38,8 +38,8 @@ public class AbstractSharedExecutorMicrobenchmark extends AbstractMicrobenchmark
static {
final String[] customArgs = {
"-Xms2g", "-Xmx2g", "-XX:MaxDirectMemorySize=2g", "-Dharness.executor=CUSTOM",
"-Dharness.executor.class=AbstractSharedExecutorMicrobenchmark$DelegateHarnessExecutor" };
"-Xms2g", "-Xmx2g", "-XX:MaxDirectMemorySize=2g", "-Djmh.executor=CUSTOM",
"-Djmh.executor.class=io.netty.microbench.util.AbstractSharedExecutorMicrobenchmark$DelegateHarnessExecutor" };
JVM_ARGS = new String[BASE_JVM_ARGS.length + customArgs.length];
System.arraycopy(BASE_JVM_ARGS, 0, JVM_ARGS, 0, BASE_JVM_ARGS.length);

View File

@ -1092,6 +1092,9 @@
<ignore>java.security.AlgorithmConstraints</ignore>
<ignore>java.util.concurrent.ConcurrentLinkedDeque</ignore>
<!-- Used in internal utilities (protected by conditional) -->
<ignore>java.lang.Long</ignore>
</ignores>
</configuration>
<executions>