HTTP/2 support pending data larger than Integer.MAX_VALUE

Motivation:
Currently the remote flow controller limits the maximum amount of pending data to Integer.MAX_VALUE. The overflow handling is also not very graceful in that it may lead to infinite loops, or otherwise no progress being made.

Modifications:
- StreamByteDistributor and RemoteFlowController should support pending bytes of type long.

Result:
Fixes https://github.com/netty/netty/issues/4283
This commit is contained in:
Scott Mitchell 2017-12-20 08:55:15 -08:00 committed by GitHub
parent 0c5014b105
commit 144716f668
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 195 additions and 187 deletions

View File

@ -16,7 +16,6 @@ package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.StreamByteDistributor.Writer;
import io.netty.util.BooleanSupplier;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -274,7 +273,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private final Http2Stream stream;
private final Deque<FlowControlled> pendingWriteQueue;
private int window;
private int pendingBytes;
private long pendingBytes;
private boolean markedWritable;
/**
@ -285,12 +284,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* Set to true if cancel() was called.
*/
private boolean cancelled;
private BooleanSupplier isWritableSupplier = new BooleanSupplier() {
@Override
public boolean get() throws Exception {
return windowSize() > pendingBytes();
}
};
FlowState(Http2Stream stream) {
this.stream = stream;
@ -302,11 +295,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* @return {@code true} if the stream associated with this object is writable.
*/
boolean isWritable() {
try {
return isWritableSupplier.get();
} catch (Throwable cause) {
throw new Error("isWritableSupplier should never throw!", cause);
}
return windowSize() > pendingBytes() && !cancelled;
}
/**
@ -432,7 +421,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public int pendingBytes() {
public long pendingBytes() {
return pendingBytes;
}
@ -502,7 +491,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
streamByteDistributor.updateStreamableBytes(this);
isWritableSupplier = BooleanSupplier.FALSE_SUPPLIER;
monitor.stateCancelled(this);
}

View File

@ -223,7 +223,7 @@ public final class Http2CodecUtil {
* Calculate the amount of bytes that can be sent by {@code state}. The lower bound is {@code 0}.
*/
public static int streamableBytes(StreamByteDistributor.StreamState state) {
return max(0, min(state.pendingBytes(), state.windowSize()));
return max(0, (int) min(state.pendingBytes(), state.windowSize()));
}
/**

View File

@ -40,7 +40,7 @@ public interface StreamByteDistributor {
* @return The amount of bytes this stream has pending to send.
* @see Http2CodecUtil#streamableBytes(StreamState)
*/
int pendingBytes();
long pendingBytes();
/**
* Indicates whether or not there are frames pending for this stream.

View File

@ -129,13 +129,6 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
return checkNotNull(stream, "stream").getProperty(stateKey);
}
/**
* For testing only!
*/
int streamableBytes0(Http2Stream stream) {
return state(stream).streamableBytes;
}
/**
* The remote flow control state for a single stream.
*/

View File

@ -352,13 +352,6 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib
return stream != null ? state(stream) : stateOnlyMap.get(streamId);
}
/**
* For testing only!
*/
int streamableBytes0(Http2Stream stream) {
return state(stream).streamableBytes;
}
/**
* For testing only!
*/

View File

@ -14,6 +14,9 @@
*/
package io.netty.handler.codec.http2;
import io.netty.handler.codec.http2.Http2TestUtil.TestStreamByteDistributorStreamState;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -21,6 +24,8 @@ import org.mockito.stubbing.Answer;
abstract class AbstractWeightedFairQueueByteDistributorDependencyTest {
Http2Connection connection;
WeightedFairQueueByteDistributor distributor;
private IntObjectMap<TestStreamByteDistributorStreamState> stateMap =
new IntObjectHashMap<TestStreamByteDistributorStreamState>();
@Mock
StreamByteDistributor.Writer writer;
@ -35,45 +40,30 @@ abstract class AbstractWeightedFairQueueByteDistributorDependencyTest {
public Void answer(InvocationOnMock in) throws Throwable {
Http2Stream stream = in.getArgument(0);
int numBytes = in.getArgument(1);
int streamableBytes = distributor.streamableBytes0(stream) - numBytes;
boolean hasFrame = streamableBytes > 0;
updateStream(stream.id(), streamableBytes, hasFrame, hasFrame, closeIfNoFrame);
TestStreamByteDistributorStreamState state = stateMap.get(stream.id());
state.pendingBytes -= numBytes;
state.hasFrame = state.pendingBytes > 0;
state.isWriteAllowed = state.hasFrame;
if (closeIfNoFrame && !state.hasFrame) {
stream.close();
}
distributor.updateStreamableBytes(state);
return null;
}
};
}
void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
updateStream(streamId, streamableBytes, hasFrame, hasFrame, false);
void initState(final int streamId, final long streamableBytes, final boolean hasFrame) {
initState(streamId, streamableBytes, hasFrame, hasFrame);
}
void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame,
final boolean isWriteAllowed, boolean closeIfNoFrame) {
void initState(final int streamId, final long pendingBytes, final boolean hasFrame,
final boolean isWriteAllowed) {
final Http2Stream stream = stream(streamId);
if (closeIfNoFrame && !hasFrame) {
stream(streamId).close();
}
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
public Http2Stream stream() {
return stream;
}
@Override
public int pendingBytes() {
return pendingBytes;
}
@Override
public boolean hasFrame() {
return hasFrame;
}
@Override
public int windowSize() {
return isWriteAllowed ? pendingBytes : -1;
}
});
TestStreamByteDistributorStreamState state = new TestStreamByteDistributorStreamState(stream, pendingBytes,
hasFrame, isWriteAllowed);
stateMap.put(streamId, state);
distributor.updateStreamableBytes(state);
}
void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception {

View File

@ -35,6 +35,7 @@ import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_HEADER_LIST_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_HEADER_TABLE_SIZE;
import static java.lang.Math.min;
/**
* Utilities for the integration tests.
@ -470,4 +471,39 @@ public final class Http2TestUtil {
}
};
}
static final class TestStreamByteDistributorStreamState implements StreamByteDistributor.StreamState {
private final Http2Stream stream;
boolean isWriteAllowed;
long pendingBytes;
boolean hasFrame;
TestStreamByteDistributorStreamState(Http2Stream stream, long pendingBytes, boolean hasFrame,
boolean isWriteAllowed) {
this.stream = stream;
this.isWriteAllowed = isWriteAllowed;
this.pendingBytes = pendingBytes;
this.hasFrame = hasFrame;
}
@Override
public Http2Stream stream() {
return stream;
}
@Override
public long pendingBytes() {
return pendingBytes;
}
@Override
public boolean hasFrame() {
return hasFrame;
}
@Override
public int windowSize() {
return isWriteAllowed ? (int) min(pendingBytes, Integer.MAX_VALUE) : -1;
}
}
}

View File

@ -14,6 +14,9 @@
*/
package io.netty.handler.codec.http2;
import io.netty.handler.codec.http2.Http2TestUtil.TestStreamByteDistributorStreamState;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -32,13 +35,13 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -55,6 +58,7 @@ public class UniformStreamByteDistributorTest {
private Http2Connection connection;
private UniformStreamByteDistributor distributor;
private IntObjectMap<TestStreamByteDistributorStreamState> stateMap;
@Mock
private StreamByteDistributor.Writer writer;
@ -63,6 +67,7 @@ public class UniformStreamByteDistributorTest {
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
stateMap = new IntObjectHashMap<TestStreamByteDistributorStreamState>();
connection = new DefaultHttp2Connection(false);
distributor = new UniformStreamByteDistributor(connection);
@ -83,8 +88,10 @@ public class UniformStreamByteDistributorTest {
public Void answer(InvocationOnMock in) throws Throwable {
Http2Stream stream = in.getArgument(0);
int numBytes = in.getArgument(1);
int streamableBytes = distributor.streamableBytes0(stream) - numBytes;
updateStream(stream.id(), streamableBytes, streamableBytes > 0);
TestStreamByteDistributorStreamState state = stateMap.get(stream.id());
state.pendingBytes -= numBytes;
state.hasFrame = state.pendingBytes > 0;
distributor.updateStreamableBytes(state);
return null;
}
};
@ -97,10 +104,10 @@ public class UniformStreamByteDistributorTest {
@Test
public void bytesUnassignedAfterProcessing() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
initState(STREAM_A, 1, true);
initState(STREAM_B, 2, true);
initState(STREAM_C, 3, true);
initState(STREAM_D, 4, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
@ -115,10 +122,10 @@ public class UniformStreamByteDistributorTest {
@Test
public void connectionErrorForWriterException() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
initState(STREAM_A, 1, true);
initState(STREAM_B, 2, true);
initState(STREAM_C, 3, true);
initState(STREAM_D, 4, true);
Exception fakeException = new RuntimeException("Fake exception");
doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3));
@ -158,10 +165,10 @@ public class UniformStreamByteDistributorTest {
setPriority(STREAM_D, STREAM_A, (short) 100, false);
// Update the streams.
updateStream(STREAM_A, CHUNK_SIZE, true);
updateStream(STREAM_B, CHUNK_SIZE, true);
updateStream(STREAM_C, CHUNK_SIZE, true);
updateStream(STREAM_D, CHUNK_SIZE, true);
initState(STREAM_A, CHUNK_SIZE, true);
initState(STREAM_B, CHUNK_SIZE, true);
initState(STREAM_C, CHUNK_SIZE, true);
initState(STREAM_D, CHUNK_SIZE, true);
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
int written = 3 * CHUNK_SIZE;
@ -182,7 +189,7 @@ public class UniformStreamByteDistributorTest {
@Test
public void streamWithMoreDataShouldBeEnqueuedAfterWrite() throws Http2Exception {
// Give the stream a bunch of data.
updateStream(STREAM_A, 2 * CHUNK_SIZE, true);
initState(STREAM_A, 2 * CHUNK_SIZE, true);
// Write only part of the data.
assertTrue(write(CHUNK_SIZE));
@ -199,10 +206,10 @@ public class UniformStreamByteDistributorTest {
@Test
public void emptyFrameAtHeadIsWritten() throws Http2Exception {
updateStream(STREAM_A, 10, true);
updateStream(STREAM_B, 0, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 10, true);
initState(STREAM_A, 10, true);
initState(STREAM_B, 0, true);
initState(STREAM_C, 0, true);
initState(STREAM_D, 10, true);
assertTrue(write(10));
verifyWrite(STREAM_A, 10);
@ -213,10 +220,10 @@ public class UniformStreamByteDistributorTest {
@Test
public void streamWindowExhaustedDoesNotWrite() throws Http2Exception {
updateStream(STREAM_A, 0, true, false);
updateStream(STREAM_B, 0, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 0, true, false);
initState(STREAM_A, 0, true, false);
initState(STREAM_B, 0, true);
initState(STREAM_C, 0, true);
initState(STREAM_D, 0, true, false);
assertFalse(write(10));
verifyWrite(STREAM_B, 0);
@ -224,38 +231,30 @@ public class UniformStreamByteDistributorTest {
verifyNoMoreInteractions(writer);
}
@Test
public void streamWindowLargerThanIntDoesNotInfiniteLoop() throws Http2Exception {
initState(STREAM_A, Integer.MAX_VALUE + 1L, true, true);
assertTrue(write(Integer.MAX_VALUE));
verifyWrite(STREAM_A, Integer.MAX_VALUE);
assertFalse(write(1));
verifyWrite(STREAM_A, 1);
}
private Http2Stream stream(int streamId) {
return connection.stream(streamId);
}
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
updateStream(streamId, streamableBytes, hasFrame, hasFrame);
private void initState(final int streamId, final long streamableBytes, final boolean hasFrame) {
initState(streamId, streamableBytes, hasFrame, hasFrame);
}
private void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame,
private void initState(final int streamId, final long pendingBytes, final boolean hasFrame,
final boolean isWriteAllowed) {
final Http2Stream stream = stream(streamId);
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
public Http2Stream stream() {
return stream;
}
@Override
public int pendingBytes() {
return pendingBytes;
}
@Override
public boolean hasFrame() {
return hasFrame;
}
@Override
public int windowSize() {
return isWriteAllowed ? pendingBytes : -1;
}
});
TestStreamByteDistributorStreamState state = new TestStreamByteDistributorStreamState(stream, pendingBytes,
hasFrame, isWriteAllowed);
stateMap.put(streamId, state);
distributor.updateStreamableBytes(state);
}
private void setPriority(int streamId, int parent, int weight, boolean exclusive) {

View File

@ -95,16 +95,16 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void writeWithNonActiveStreamShouldNotDobuleAddToPriorityQueue() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
initState(STREAM_A, 400, true);
initState(STREAM_B, 500, true);
initState(STREAM_C, 600, true);
initState(STREAM_D, 700, true);
setPriority(STREAM_B, STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
setPriority(STREAM_D, STREAM_C, DEFAULT_PRIORITY_WEIGHT, true);
// Block B, but it should still remain in the queue/tree structure.
updateStream(STREAM_B, 0, false);
initState(STREAM_B, 0, false);
// Get the streams before the write, because they may be be closed.
Http2Stream streamA = stream(STREAM_A);
@ -124,10 +124,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
@Test
public void bytesUnassignedAfterProcessing() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
initState(STREAM_A, 1, true);
initState(STREAM_B, 2, true);
initState(STREAM_C, 3, true);
initState(STREAM_D, 4, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
@ -144,10 +144,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
@Test
public void connectionErrorForWriterException() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
initState(STREAM_A, 1, true);
initState(STREAM_B, 2, true);
initState(STREAM_C, 3, true);
initState(STREAM_D, 4, true);
Exception fakeException = new RuntimeException("Fake exception");
doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3));
@ -187,10 +187,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
setPriority(STREAM_D, STREAM_A, (short) 100, false);
// Update the streams.
updateStream(STREAM_A, ALLOCATION_QUANTUM, true);
updateStream(STREAM_B, ALLOCATION_QUANTUM, true);
updateStream(STREAM_C, ALLOCATION_QUANTUM, true);
updateStream(STREAM_D, ALLOCATION_QUANTUM, true);
initState(STREAM_A, ALLOCATION_QUANTUM, true);
initState(STREAM_B, ALLOCATION_QUANTUM, true);
initState(STREAM_C, ALLOCATION_QUANTUM, true);
initState(STREAM_D, ALLOCATION_QUANTUM, true);
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
int written = 3 * ALLOCATION_QUANTUM;
@ -234,10 +234,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void emptyFrameAtHeadIsWritten() throws Http2Exception {
updateStream(STREAM_A, 0, true);
updateStream(STREAM_B, 0, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 10, true);
initState(STREAM_A, 0, true);
initState(STREAM_B, 0, true);
initState(STREAM_C, 0, true);
initState(STREAM_D, 10, true);
setPriority(STREAM_B, STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
@ -280,7 +280,7 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
@Test
public void blockedStreamWithDataAndNotAllowedToSendShouldSpreadDataToChildren() throws Http2Exception {
// A cannot stream.
updateStream(STREAM_A, 0, true, false, false);
initState(STREAM_A, 0, true, false);
blockedStreamShouldSpreadDataToChildren(false);
}
@ -299,23 +299,23 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void streamWithZeroFlowControlWindowAndDataShouldWriteOnlyOnce() throws Http2Exception {
updateStream(STREAM_A, 0, true, true, false);
initState(STREAM_A, 0, true, true);
blockedStreamShouldSpreadDataToChildren(true);
// Make sure if we call update stream again, A should write 1 more time.
updateStream(STREAM_A, 0, true, true, false);
initState(STREAM_A, 0, true, true);
assertFalse(write(1));
verifyWrite(times(2), STREAM_A, 0);
// Try to write again, but since no updateStream A should not write again
// Try to write again, but since no initState A should not write again
assertFalse(write(1));
verifyWrite(times(2), STREAM_A, 0);
}
private void blockedStreamShouldSpreadDataToChildren(boolean streamAShouldWriteZero) throws Http2Exception {
updateStream(STREAM_B, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
initState(STREAM_B, 10, true);
initState(STREAM_C, 10, true);
initState(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
@ -375,9 +375,9 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
initState(STREAM_A, 10, true);
initState(STREAM_C, 10, true);
initState(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
@ -404,9 +404,9 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 5, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
initState(STREAM_A, 5, true);
initState(STREAM_C, 10, true);
initState(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
@ -449,9 +449,9 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
initState(STREAM_A, 10, true);
initState(STREAM_C, 10, true);
initState(STREAM_D, 10, true);
// Re-prioritize D as a direct child of the connection.
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
@ -494,7 +494,7 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
// Send enough so it can not be completely written out
final int expectedUnsentAmount = 1;
updateStream(STREAM_D, writableBytes + expectedUnsentAmount, true);
initState(STREAM_D, writableBytes + expectedUnsentAmount, true);
assertTrue(write(writableBytes));
verifyWrite(STREAM_D, writableBytes);
@ -521,10 +521,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
updateStream(STREAM_A, 1000, true);
updateStream(STREAM_B, 1000, true);
updateStream(STREAM_C, 1000, true);
updateStream(STREAM_D, 1000, true);
initState(STREAM_A, 1000, true);
initState(STREAM_B, 1000, true);
initState(STREAM_C, 1000, true);
initState(STREAM_D, 1000, true);
// Set allocation quantum to 1 so it is easier to see the ratio of total bytes written between each stream.
distributor.allocationQuantum(1);
@ -556,10 +556,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
updateStream(STREAM_A, 1000, true);
updateStream(STREAM_B, 1000, true);
updateStream(STREAM_C, 1000, false);
updateStream(STREAM_D, 1000, false);
initState(STREAM_A, 1000, true);
initState(STREAM_B, 1000, true);
initState(STREAM_C, 1000, false);
initState(STREAM_D, 1000, false);
// Set allocation quantum to 1 so it is easier to see the ratio of total bytes written between each stream.
distributor.allocationQuantum(1);
@ -615,10 +615,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 700, true);
initState(STREAM_A, 400, true);
initState(STREAM_B, 500, true);
initState(STREAM_C, 0, true);
initState(STREAM_D, 700, true);
// Set allocation quantum to 1 so it is easier to see the ratio of total bytes written between each stream.
distributor.allocationQuantum(1);
@ -655,10 +655,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void zeroDistributeShouldWriteAllZeroFrames() throws Http2Exception {
updateStream(STREAM_A, 400, false);
updateStream(STREAM_B, 0, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 0, true);
initState(STREAM_A, 400, false);
initState(STREAM_B, 0, true);
initState(STREAM_C, 0, true);
initState(STREAM_D, 0, true);
setPriority(STREAM_B, STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
@ -698,10 +698,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void nonZeroDistributeShouldWriteAllZeroFramesIfAllEligibleDataIsWritten() throws Http2Exception {
updateStream(STREAM_A, 400, false);
updateStream(STREAM_B, 100, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 0, true);
initState(STREAM_A, 400, false);
initState(STREAM_B, 100, true);
initState(STREAM_C, 0, true);
initState(STREAM_D, 0, true);
setPriority(STREAM_B, STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
@ -740,10 +740,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void bytesDistributedWithRestructureShouldBeCorrect() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
initState(STREAM_A, 400, true);
initState(STREAM_B, 500, true);
initState(STREAM_C, 600, true);
initState(STREAM_D, 700, true);
setPriority(STREAM_B, STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
@ -795,11 +795,11 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
setPriority(streamE.id(), STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
// Send a bunch of data on each stream.
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
updateStream(STREAM_E, 900, true);
initState(STREAM_A, 400, true);
initState(STREAM_B, 500, true);
initState(STREAM_C, 600, true);
initState(STREAM_D, 700, true);
initState(STREAM_E, 900, true);
assertTrue(write(900));
assertEquals(400, captureWrites(STREAM_A));
@ -843,10 +843,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void bytesDistributedShouldBeCorrectWithInternalStreamClose() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
initState(STREAM_A, 400, true);
initState(STREAM_B, 500, true);
initState(STREAM_C, 600, true);
initState(STREAM_D, 700, true);
stream(STREAM_A).close();
@ -883,10 +883,10 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
*/
@Test
public void bytesDistributedShouldBeCorrectWithLeafStreamClose() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);
initState(STREAM_A, 400, true);
initState(STREAM_B, 500, true);
initState(STREAM_C, 600, true);
initState(STREAM_D, 700, true);
stream(STREAM_C).close();
@ -906,13 +906,22 @@ public class WeightedFairQueueByteDistributorTest extends AbstractWeightedFairQu
@Test
public void activeStreamDependentOnNewNonActiveStreamGetsQuantum() throws Http2Exception {
setup(0);
updateStream(STREAM_D, 700, true);
initState(STREAM_D, 700, true);
setPriority(STREAM_D, STREAM_E, DEFAULT_PRIORITY_WEIGHT, true);
assertFalse(write(700));
assertEquals(700, captureWrites(STREAM_D));
}
@Test
public void streamWindowLargerThanIntDoesNotInfiniteLoop() throws Http2Exception {
initState(STREAM_A, Integer.MAX_VALUE + 1L, true, true);
assertTrue(write(Integer.MAX_VALUE));
verifyWrite(STREAM_A, Integer.MAX_VALUE);
assertFalse(write(1));
verifyWrite(STREAM_A, 1);
}
private boolean write(int numBytes) throws Http2Exception {
return distributor.distribute(numBytes, writer);
}