HTTP/2 DefaultHttp2RemoteFlowController Stream writability notification broken
Motivation: DefaultHttp2RemoteFlowController.ListenerWritabilityMonitor no longer reliably detects when a stream's writability change occurs. Modifications: - Ensure writiability is reliabily reported by DefaultHttp2RemoteFlowController.ListenerWritabilityMonitor - Fix infinite loop issue (https://github.com/netty/netty/issues/4588) detected when consolidating unit tests Result: Reliable stream writability change notification, and 1 less infinite loop in UniformStreamByteDistributor. Fixes https://github.com/netty/netty/issues/4587
This commit is contained in:
parent
c22f1aa4ac
commit
9ac430f16f
@ -299,7 +299,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
int windowSize() {
|
||||
public int windowSize() {
|
||||
return window;
|
||||
}
|
||||
|
||||
@ -389,11 +389,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
return window;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
return max(0, min(pendingBytes, window));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the maximum writable window (minimum of the stream and connection windows).
|
||||
*/
|
||||
@ -402,7 +397,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
int pendingBytes() {
|
||||
public int pendingBytes() {
|
||||
return pendingBytes;
|
||||
}
|
||||
|
||||
@ -514,7 +509,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
int windowSize() {
|
||||
public int windowSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -524,12 +519,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
int pendingBytes() {
|
||||
public int pendingBytes() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -604,13 +594,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
this.markedWritable = isWritable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isWriteAllowed() {
|
||||
return windowSize() >= 0;
|
||||
}
|
||||
|
||||
abstract int windowSize();
|
||||
|
||||
abstract int initialWindowSize();
|
||||
|
||||
/**
|
||||
@ -620,11 +603,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
*/
|
||||
abstract int writeAllocatedBytes(int allocated);
|
||||
|
||||
/**
|
||||
* Get the number of bytes pending to be written.
|
||||
*/
|
||||
abstract int pendingBytes();
|
||||
|
||||
/**
|
||||
* Any operations that may be pending are cleared and the status of these operations is failed.
|
||||
*/
|
||||
@ -651,20 +629,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
*/
|
||||
private abstract class WritabilityMonitor {
|
||||
private long totalPendingBytes;
|
||||
private final Writer writer;
|
||||
|
||||
/**
|
||||
* Increment all windows by {@code newWindowSize} amount, and write data if streams change from not writable
|
||||
* to writable.
|
||||
* @param newWindowSize The new window size.
|
||||
* @throws Http2Exception If an overflow occurs or an exception on write occurs.
|
||||
*/
|
||||
public abstract void initialWindowSize(int newWindowSize) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Attempt to allocate bytes to streams which have frames queued.
|
||||
* @throws Http2Exception If a write occurs and an exception happens in the write operation.
|
||||
*/
|
||||
public abstract void writePendingBytes() throws Http2Exception;
|
||||
protected WritabilityMonitor(Writer writer) {
|
||||
this.writer = writer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the writability of the underlying channel changes.
|
||||
@ -719,7 +688,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
return isWritableConnection() && state.windowSize() - state.pendingBytes() > 0;
|
||||
}
|
||||
|
||||
protected final void writePendingBytes(Writer writer) throws Http2Exception {
|
||||
protected final void writePendingBytes() throws Http2Exception {
|
||||
int bytesToWrite = writableBytes();
|
||||
|
||||
// Make sure we always write at least once, regardless if we have bytesToWrite or not.
|
||||
@ -733,7 +702,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
}
|
||||
|
||||
protected final boolean initialWindowSize(int newWindowSize, Writer writer) throws Http2Exception {
|
||||
protected void initialWindowSize(int newWindowSize) throws Http2Exception {
|
||||
if (newWindowSize < 0) {
|
||||
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
|
||||
}
|
||||
@ -750,10 +719,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
|
||||
if (delta > 0) {
|
||||
// The window size increased, send any pending frames for all streams.
|
||||
writePendingBytes(writer);
|
||||
return false;
|
||||
writePendingBytes();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected final boolean isWritableConnection() {
|
||||
@ -765,21 +732,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
* Provides no notification or tracking of writablity changes.
|
||||
*/
|
||||
private final class DefaultWritabilityMonitor extends WritabilityMonitor {
|
||||
private final Writer writer = new StreamByteDistributor.Writer() {
|
||||
@Override
|
||||
public void write(Http2Stream stream, int numBytes) {
|
||||
state(stream).writeAllocatedBytes(numBytes);
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public void writePendingBytes() throws Http2Exception {
|
||||
writePendingBytes(writer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialWindowSize(int newWindowSize) throws Http2Exception {
|
||||
initialWindowSize(newWindowSize, writer);
|
||||
DefaultWritabilityMonitor() {
|
||||
super(new StreamByteDistributor.Writer() {
|
||||
@Override
|
||||
public void write(Http2Stream stream, int numBytes) {
|
||||
state(stream).writeAllocatedBytes(numBytes);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -803,32 +762,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
return true;
|
||||
}
|
||||
};
|
||||
private final Writer initialWindowSizeWriter = new StreamByteDistributor.Writer() {
|
||||
@Override
|
||||
public void write(Http2Stream stream, int numBytes) {
|
||||
AbstractState state = state(stream);
|
||||
writeAllocatedBytes(state, numBytes);
|
||||
if (isWritable(state) != state.markWritability()) {
|
||||
notifyWritabilityChanged(state);
|
||||
|
||||
ListenerWritabilityMonitor(final Listener listener) {
|
||||
super(new StreamByteDistributor.Writer() {
|
||||
@Override
|
||||
public void write(Http2Stream stream, int numBytes) {
|
||||
AbstractState state = state(stream);
|
||||
int written = state.writeAllocatedBytes(numBytes);
|
||||
if (written != -1) {
|
||||
listener.streamWritten(state.stream(), written);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
private final Writer writeAllocatedBytesWriter = new StreamByteDistributor.Writer() {
|
||||
@Override
|
||||
public void write(Http2Stream stream, int numBytes) {
|
||||
writeAllocatedBytes(state(stream), numBytes);
|
||||
}
|
||||
};
|
||||
|
||||
ListenerWritabilityMonitor(Listener listener) {
|
||||
});
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writePendingBytes() throws Http2Exception {
|
||||
writePendingBytes(writeAllocatedBytesWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception {
|
||||
super.incrementWindowSize(state, delta);
|
||||
@ -842,13 +790,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialWindowSize(int newWindowSize) throws Http2Exception {
|
||||
if (initialWindowSize(newWindowSize, initialWindowSizeWriter)) {
|
||||
if (isWritableConnection()) {
|
||||
// If the write operation does not occur we still need to check all streams because they
|
||||
// may have transitioned from writable to not writable.
|
||||
checkAllWritabilityChanged();
|
||||
}
|
||||
protected void initialWindowSize(int newWindowSize) throws Http2Exception {
|
||||
super.initialWindowSize(newWindowSize);
|
||||
if (isWritableConnection()) {
|
||||
// If the write operation does not occur we still need to check all streams because they
|
||||
// may have transitioned from writable to not writable.
|
||||
checkAllWritabilityChanged();
|
||||
}
|
||||
}
|
||||
|
||||
@ -897,12 +844,5 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
connectionState.markWritability(isWritableConnection());
|
||||
connection.forEachActiveStream(checkStreamWritabilityVisitor);
|
||||
}
|
||||
|
||||
private void writeAllocatedBytes(AbstractState state, int numBytes) {
|
||||
int written = state.writeAllocatedBytes(numBytes);
|
||||
if (written != -1) {
|
||||
listener.streamWritten(state.stream(), written);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,8 @@ import static io.netty.buffer.Unpooled.directBuffer;
|
||||
import static io.netty.buffer.Unpooled.unmodifiableBuffer;
|
||||
import static io.netty.buffer.Unpooled.unreleasableBuffer;
|
||||
import static io.netty.util.CharsetUtil.UTF_8;
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
/**
|
||||
* Constants and utility method used for encoding/decoding HTTP2 frames.
|
||||
@ -189,6 +191,13 @@ public final class Http2CodecUtil {
|
||||
writeFrameHeaderInternal(out, payloadLength, type, flags, streamId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the amount of bytes that can be sent by {@code state}. The lower bound is {@code 0}.
|
||||
*/
|
||||
public static int streamableBytes(StreamByteDistributor.StreamState state) {
|
||||
return max(0, min(state.pendingBytes(), state.windowSize()));
|
||||
}
|
||||
|
||||
static void writeFrameHeaderInternal(ByteBuf out, int payloadLength, byte type,
|
||||
Http2Flags flags, int streamId) {
|
||||
out.writeMedium(payloadLength);
|
||||
|
@ -17,6 +17,7 @@ package io.netty.handler.codec.http2;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
@ -78,7 +79,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
|
||||
|
||||
@Override
|
||||
public void updateStreamableBytes(StreamState streamState) {
|
||||
state(streamState.stream()).updateStreamableBytes(streamState.streamableBytes(),
|
||||
state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
|
||||
streamState.hasFrame());
|
||||
}
|
||||
|
||||
|
@ -32,13 +32,12 @@ public interface StreamByteDistributor {
|
||||
Http2Stream stream();
|
||||
|
||||
/**
|
||||
* Returns the number of pending bytes for this node that will fit within the stream flow
|
||||
* control window. This is used for the priority algorithm to determine the aggregate number
|
||||
* of bytes that can be written at each node. Each node only takes into account its stream
|
||||
* window so that when a change occurs to the connection window, these values need not
|
||||
* change (i.e. no tree traversal is required).
|
||||
* Get the amount of bytes this stream has pending to send. The actual amount written must not exceed
|
||||
* {@link #windowSize()}!
|
||||
* @return The amount of bytes this stream has pending to send.
|
||||
* @see {@link #io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes(StreamState)}
|
||||
*/
|
||||
int streamableBytes();
|
||||
int pendingBytes();
|
||||
|
||||
/**
|
||||
* Indicates whether or not there are frames pending for this stream.
|
||||
@ -46,11 +45,15 @@ public interface StreamByteDistributor {
|
||||
boolean hasFrame();
|
||||
|
||||
/**
|
||||
* Determine if a write operation is allowed for this stream. This will typically take into account the
|
||||
* stream's flow controller being non-negative.
|
||||
* @return {@code true} if a write is allowed on this stream. {@code false} otherwise.
|
||||
* The size (in bytes) of the stream's flow control window. The amount written must not exceed this amount!
|
||||
* <p>A {@link StreamByteDistributor} needs to know the stream's window size in order to avoid allocating bytes
|
||||
* if the window size is negative. The window size being {@code 0} may also be significant to determine when if
|
||||
* an stream has been given a chance to write an empty frame, and also enables optimizations like not writing
|
||||
* empty frames in some situations (don't write headers until data can also be written).
|
||||
* @return the size of the stream's flow control window.
|
||||
* @see {@link #io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes(StreamState)}
|
||||
*/
|
||||
boolean isWriteAllowed();
|
||||
int windowSize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -14,15 +14,16 @@
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
|
||||
/**
|
||||
* A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
|
||||
* streams. This class uses a minimum chunk size that will be allocated to each stream. While
|
||||
@ -77,8 +78,9 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
||||
|
||||
@Override
|
||||
public void updateStreamableBytes(StreamState streamState) {
|
||||
State state = state(streamState.stream());
|
||||
state.updateStreamableBytes(streamState.streamableBytes(), streamState.hasFrame());
|
||||
state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
|
||||
streamState.hasFrame(),
|
||||
streamState.windowSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -119,6 +121,13 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
||||
return checkNotNull(stream, "stream").getProperty(stateKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing only!
|
||||
*/
|
||||
int streamableBytes0(Http2Stream stream) {
|
||||
return state(stream).streamableBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* The remote flow control state for a single stream.
|
||||
*/
|
||||
@ -126,12 +135,13 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
||||
final Http2Stream stream;
|
||||
int streamableBytes;
|
||||
boolean enqueued;
|
||||
boolean writing;
|
||||
|
||||
State(Http2Stream stream) {
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) {
|
||||
void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
|
||||
assert hasFrame || newStreamableBytes == 0;
|
||||
|
||||
int delta = newStreamableBytes - streamableBytes;
|
||||
@ -139,7 +149,11 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
||||
streamableBytes = newStreamableBytes;
|
||||
totalStreamableBytes += delta;
|
||||
}
|
||||
if (hasFrame) {
|
||||
// We should queue this state if there is a frame. We don't want to queue this frame if the window
|
||||
// size is <= 0 and we are writing this state. The rational being we already gave this state the chance to
|
||||
// write, and if there were empty frames the expectation is they would have been sent. At this point there
|
||||
// must be a call to updateStreamableBytes for this state to be able to write again.
|
||||
if (hasFrame && (!writing || windowSize > 0)) {
|
||||
// It's not in the queue but has data to send, add it.
|
||||
addToQueue();
|
||||
}
|
||||
@ -150,15 +164,14 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
||||
* assuming all of the bytes will be written.
|
||||
*/
|
||||
void write(int numBytes, Writer writer) throws Http2Exception {
|
||||
// Update the streamable bytes, assuming that all the bytes will be written.
|
||||
int newStreamableBytes = streamableBytes - numBytes;
|
||||
updateStreamableBytes(newStreamableBytes, newStreamableBytes > 0);
|
||||
|
||||
writing = true;
|
||||
try {
|
||||
// Write the allocated bytes.
|
||||
writer.write(stream, numBytes);
|
||||
} catch (Throwable t) {
|
||||
throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
|
||||
} finally {
|
||||
writing = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,7 +194,7 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
|
||||
removeFromQueue();
|
||||
|
||||
// Clear the streamable bytes.
|
||||
updateStreamableBytes(0, false);
|
||||
updateStreamableBytes(0, false, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import io.netty.util.internal.PriorityQueueNode;
|
||||
import java.util.Queue;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
@ -104,8 +105,8 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib
|
||||
|
||||
@Override
|
||||
public void updateStreamableBytes(StreamState state) {
|
||||
state(state.stream()).updateStreamableBytes(state.streamableBytes(),
|
||||
state.hasFrame() && state.isWriteAllowed());
|
||||
state(state.stream()).updateStreamableBytes(streamableBytes(state),
|
||||
state.hasFrame() && state.windowSize() >= 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -204,7 +205,7 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib
|
||||
/**
|
||||
* For testing only!
|
||||
*/
|
||||
int streamableBytes(Http2Stream stream) {
|
||||
int streamableBytes0(Http2Stream stream) {
|
||||
return state(stream).streamableBytes;
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
/**
|
||||
* Tests for {@link DefaultHttp2RemoteFlowController}.
|
||||
*/
|
||||
public class DefaultHttp2RemoteFlowControllerTest {
|
||||
public abstract class DefaultHttp2RemoteFlowControllerTest {
|
||||
private static final int STREAM_A = 1;
|
||||
private static final int STREAM_B = 3;
|
||||
private static final int STREAM_C = 5;
|
||||
@ -114,9 +114,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
reset(listener);
|
||||
}
|
||||
|
||||
protected abstract StreamByteDistributor newDistributor(Http2Connection connection);
|
||||
|
||||
private void initConnectionAndController() throws Http2Exception {
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
controller = new DefaultHttp2RemoteFlowController(connection, listener);
|
||||
controller = new DefaultHttp2RemoteFlowController(connection, newDistributor(connection), listener);
|
||||
connection.remote().flowController(controller);
|
||||
|
||||
connection.local().createStream(STREAM_A, false);
|
||||
@ -926,7 +928,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
mock(Http2RemoteFlowController.FlowControlled.class);
|
||||
when(flowControlled.size()).thenReturn(100);
|
||||
doAnswer(new Answer<Void>() {
|
||||
private int invocationCount;
|
||||
@Override
|
||||
public Void answer(InvocationOnMock in) throws Throwable {
|
||||
// Write most of the bytes and then fail
|
||||
|
@ -639,7 +639,7 @@ public class PriorityStreamByteDistributorTest {
|
||||
return connection.stream(streamId);
|
||||
}
|
||||
|
||||
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
|
||||
private void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame) {
|
||||
final Http2Stream stream = stream(streamId);
|
||||
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
|
||||
@Override
|
||||
@ -648,8 +648,8 @@ public class PriorityStreamByteDistributorTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
return streamableBytes;
|
||||
public int pendingBytes() {
|
||||
return pendingBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -658,8 +658,8 @@ public class PriorityStreamByteDistributorTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWriteAllowed() {
|
||||
return hasFrame;
|
||||
public int windowSize() {
|
||||
return pendingBytes;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
public class UniformStreamByteDistributorFlowControllerTest extends DefaultHttp2RemoteFlowControllerTest {
|
||||
@Override
|
||||
protected StreamByteDistributor newDistributor(Http2Connection connection) {
|
||||
return new UniformStreamByteDistributor(connection);
|
||||
}
|
||||
}
|
@ -21,9 +21,12 @@ import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.reset;
|
||||
@ -35,6 +38,8 @@ import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.mockito.verification.VerificationMode;
|
||||
|
||||
/**
|
||||
@ -61,6 +66,9 @@ public class UniformStreamByteDistributorTest {
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
distributor = new UniformStreamByteDistributor(connection);
|
||||
|
||||
// Assume we always write all the allocated bytes.
|
||||
resetWriter();
|
||||
|
||||
connection.local().createStream(STREAM_A, false);
|
||||
connection.local().createStream(STREAM_B, false);
|
||||
Http2Stream streamC = connection.local().createStream(STREAM_C, false);
|
||||
@ -69,6 +77,24 @@ public class UniformStreamByteDistributorTest {
|
||||
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
|
||||
}
|
||||
|
||||
private Answer<Void> writeAnswer() {
|
||||
return new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock in) throws Throwable {
|
||||
Http2Stream stream = in.getArgumentAt(0, Http2Stream.class);
|
||||
int numBytes = in.getArgumentAt(1, Integer.class);
|
||||
int streamableBytes = distributor.streamableBytes0(stream) - numBytes;
|
||||
updateStream(stream.id(), streamableBytes, streamableBytes > 0);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void resetWriter() {
|
||||
reset(writer);
|
||||
doAnswer(writeAnswer()).when(writer).write(any(Http2Stream.class), anyInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bytesUnassignedAfterProcessing() throws Http2Exception {
|
||||
updateStream(STREAM_A, 1, true);
|
||||
@ -145,7 +171,7 @@ public class UniformStreamByteDistributorTest {
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_C));
|
||||
verifyNoMoreInteractions(writer);
|
||||
|
||||
reset(writer);
|
||||
resetWriter();
|
||||
|
||||
// Now write again and verify that the last stream is written to.
|
||||
assertFalse(write(CHUNK_SIZE));
|
||||
@ -163,7 +189,7 @@ public class UniformStreamByteDistributorTest {
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_A));
|
||||
verifyNoMoreInteractions(writer);
|
||||
|
||||
reset(writer);
|
||||
resetWriter();
|
||||
|
||||
// Now write the rest of the data.
|
||||
assertFalse(write(CHUNK_SIZE));
|
||||
@ -193,7 +219,7 @@ public class UniformStreamByteDistributorTest {
|
||||
updateStream(streamId, streamableBytes, hasFrame, hasFrame);
|
||||
}
|
||||
|
||||
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame,
|
||||
private void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame,
|
||||
final boolean isWriteAllowed) {
|
||||
final Http2Stream stream = stream(streamId);
|
||||
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
|
||||
@ -203,8 +229,8 @@ public class UniformStreamByteDistributorTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
return streamableBytes;
|
||||
public int pendingBytes() {
|
||||
return pendingBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -213,8 +239,8 @@ public class UniformStreamByteDistributorTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWriteAllowed() {
|
||||
return isWriteAllowed;
|
||||
public int windowSize() {
|
||||
return isWriteAllowed ? pendingBytes : -1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ public class WeightedFairQueueByteDistributorTest {
|
||||
public Void answer(InvocationOnMock in) throws Throwable {
|
||||
Http2Stream stream = in.getArgumentAt(0, Http2Stream.class);
|
||||
int numBytes = in.getArgumentAt(1, Integer.class);
|
||||
int streamableBytes = distributor.streamableBytes(stream) - numBytes;
|
||||
int streamableBytes = distributor.streamableBytes0(stream) - numBytes;
|
||||
updateStream(stream.id(), streamableBytes, streamableBytes > 0);
|
||||
return null;
|
||||
}
|
||||
@ -913,7 +913,7 @@ public class WeightedFairQueueByteDistributorTest {
|
||||
updateStream(streamId, streamableBytes, hasFrame, hasFrame);
|
||||
}
|
||||
|
||||
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame,
|
||||
private void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame,
|
||||
final boolean isWriteAllowed) {
|
||||
final Http2Stream stream = stream(streamId);
|
||||
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
|
||||
@ -923,8 +923,8 @@ public class WeightedFairQueueByteDistributorTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
return streamableBytes;
|
||||
public int pendingBytes() {
|
||||
return pendingBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -933,8 +933,8 @@ public class WeightedFairQueueByteDistributorTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWriteAllowed() {
|
||||
return isWriteAllowed;
|
||||
public int windowSize() {
|
||||
return isWriteAllowed ? pendingBytes : -1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
public class WeightedFairQueueRemoteFlowControllerTest extends DefaultHttp2RemoteFlowControllerTest {
|
||||
@Override
|
||||
protected StreamByteDistributor newDistributor(Http2Connection connection) {
|
||||
return new WeightedFairQueueByteDistributor(connection);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user