Add HTTP/2 local flow control option for auto refill
Motivation: For many HTTP/2 applications (such as gRPC) it is necessary to autorefill the connection window in order to prevent application-level deadlocking. Consider an application with 2 streams, A and B. A receives a stream of messages and the application pops off one message at a time and makes a request on stream B. However, if receiving of data on A has caused the connection window to collapse, B will not be able to receive any data and the application will deadlock. The only way (currently) to get around this is 1) use multiple connections, or 2) manually refill the connection window. Both are undesirable and could needlessly complicate the application code. Modifications: Add a configuration option to DefaultHttp2LocalFlowController, allowing it to autorefill the connection window. Result: Applications can configure HTTP/2 to avoid inter-stream deadlocking.
This commit is contained in:
parent
a4f3e72e71
commit
6504d52b94
@ -62,9 +62,9 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
this.encoder = checkNotNull(encoder, "encoder");
|
||||
this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier");
|
||||
if (connection.local().flowController() == null) {
|
||||
connection.local().flowController(
|
||||
new DefaultHttp2LocalFlowController(connection, encoder.frameWriter()));
|
||||
connection.local().flowController(new DefaultHttp2LocalFlowController(connection));
|
||||
}
|
||||
connection.local().flowController().frameWriter(encoder.frameWriter());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -198,8 +198,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
return promise.setFailure(t);
|
||||
}
|
||||
|
||||
ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
|
||||
return future;
|
||||
return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -222,20 +221,17 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
|
||||
ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise);
|
||||
return future;
|
||||
return frameWriter.writeSettings(ctx, settings, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||
ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise);
|
||||
return future;
|
||||
return frameWriter.writeSettingsAck(ctx, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
|
||||
ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise);
|
||||
return future;
|
||||
return frameWriter.writePing(ctx, ack, data, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -253,8 +249,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
|
||||
ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
|
||||
return future;
|
||||
return frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -46,26 +46,41 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
|
||||
|
||||
private final Http2Connection connection;
|
||||
private final Http2FrameWriter frameWriter;
|
||||
private final Http2Connection.PropertyKey stateKey;
|
||||
private Http2FrameWriter frameWriter;
|
||||
private ChannelHandlerContext ctx;
|
||||
private float windowUpdateRatio;
|
||||
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
|
||||
|
||||
public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
|
||||
this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO);
|
||||
public DefaultHttp2LocalFlowController(Http2Connection connection) {
|
||||
this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a controller with the given settings.
|
||||
*
|
||||
* @param connection the connection state.
|
||||
* @param windowUpdateRatio the window percentage below which to send a {@code WINDOW_UPDATE}.
|
||||
* @param autoRefillConnectionWindow if {@code true}, effectively disables the connection window
|
||||
* in the flow control algorithm as they will always refill automatically without requiring the
|
||||
* application to consume the bytes. When enabled, the maximum bytes you must be prepared to
|
||||
* queue is proportional to {@code maximum number of concurrent streams * the initial window
|
||||
* size per stream}
|
||||
* (<a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_CONCURRENT_STREAMS</a>
|
||||
* <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_INITIAL_WINDOW_SIZE</a>).
|
||||
*/
|
||||
public DefaultHttp2LocalFlowController(Http2Connection connection,
|
||||
Http2FrameWriter frameWriter, float windowUpdateRatio) {
|
||||
float windowUpdateRatio,
|
||||
boolean autoRefillConnectionWindow) {
|
||||
this.connection = checkNotNull(connection, "connection");
|
||||
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
|
||||
windowUpdateRatio(windowUpdateRatio);
|
||||
|
||||
// Add a flow state for the connection.
|
||||
stateKey = connection.newKey();
|
||||
connection.connectionStream()
|
||||
.setProperty(stateKey, new DefaultState(connection.connectionStream(), initialWindowSize));
|
||||
FlowState connectionState = autoRefillConnectionWindow ?
|
||||
new AutoRefillState(connection.connectionStream(), initialWindowSize) :
|
||||
new DefaultState(connection.connectionStream(), initialWindowSize);
|
||||
connection.connectionStream().setProperty(stateKey, connectionState);
|
||||
|
||||
// Register for notification of new streams.
|
||||
connection.addListener(new Http2ConnectionAdapter() {
|
||||
@ -106,9 +121,15 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public DefaultHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
|
||||
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelHandlerContext(ChannelHandlerContext ctx) {
|
||||
this.ctx = ctx;
|
||||
this.ctx = checkNotNull(ctx, "ctx");
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -269,10 +290,33 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
return stream.state() == Http2Stream.State.CLOSED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow control state that does autorefill of the flow control window when the data is
|
||||
* received.
|
||||
*/
|
||||
private final class AutoRefillState extends DefaultState {
|
||||
public AutoRefillState(Http2Stream stream, int initialWindowSize) {
|
||||
super(stream, initialWindowSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
|
||||
super.receiveFlowControlledFrame(dataLength);
|
||||
// Need to call the super to consume the bytes, since this.consumeBytes does nothing.
|
||||
super.consumeBytes(dataLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean consumeBytes(int numBytes) throws Http2Exception {
|
||||
// Do nothing, since the bytes are already consumed upon receiving the data.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flow control window state for an individual stream.
|
||||
*/
|
||||
private final class DefaultState implements FlowState {
|
||||
private class DefaultState implements FlowState {
|
||||
private final Http2Stream stream;
|
||||
|
||||
/**
|
||||
|
@ -14,14 +14,6 @@
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.StreamByteDistributor.Writer;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
|
||||
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
@ -32,6 +24,14 @@ import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.StreamByteDistributor.Writer;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
|
||||
/**
|
||||
* Basic implementation of {@link Http2RemoteFlowController}.
|
||||
* <p>
|
||||
@ -149,7 +149,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
*/
|
||||
@Override
|
||||
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
|
||||
this.ctx = ctx;
|
||||
this.ctx = checkNotNull(ctx, "ctx");
|
||||
|
||||
// Writing the pending bytes will not check writability change and instead a writability change notification
|
||||
// to be provided by an explicit call.
|
||||
@ -652,7 +652,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract class which provides common functionality for {@link WritabilityMonitorfoo} implementations.
|
||||
* Abstract class which provides common functionality for writability monitor implementations.
|
||||
*/
|
||||
private abstract class WritabilityMonitor {
|
||||
private long totalPendingBytes;
|
||||
|
@ -294,6 +294,11 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
|
||||
this.flowController = checkNotNull(flowController, "flowController");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
|
||||
return flowController.frameWriter(frameWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
|
||||
flowController.channelHandlerContext(ctx);
|
||||
|
@ -772,7 +772,8 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
||||
* @param cause the exception that was caught
|
||||
* @param http2Ex the {@link StreamException} that is embedded in the causality chain.
|
||||
*/
|
||||
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, StreamException http2Ex) {
|
||||
protected void onStreamError(ChannelHandlerContext ctx, @SuppressWarnings("unused") Throwable cause,
|
||||
StreamException http2Ex) {
|
||||
resetStream(ctx, http2Ex.streamId(), http2Ex.error().code(), ctx.newPromise());
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,13 @@ import io.netty.buffer.ByteBuf;
|
||||
* A {@link Http2FlowController} for controlling the inbound flow of {@code DATA} frames from the remote endpoint.
|
||||
*/
|
||||
public interface Http2LocalFlowController extends Http2FlowController {
|
||||
/**
|
||||
* Sets the writer to be use for sending {@code WINDOW_UPDATE} frames. This must be called before any flow
|
||||
* controlled data is received.
|
||||
*
|
||||
* @param frameWriter the HTTP/2 frame writer.
|
||||
*/
|
||||
Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter);
|
||||
|
||||
/**
|
||||
* Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control policies to it for both
|
||||
@ -29,7 +36,6 @@ public interface Http2LocalFlowController extends Http2FlowController {
|
||||
* If {@code stream} is {@code null} or closed, flow control should only be applied to the connection window and the
|
||||
* bytes are immediately consumed.
|
||||
*
|
||||
* @param ctx the context from the handler where the frame was read.
|
||||
* @param stream the subject stream for the received frame. The connection stream object must not be used. If {@code
|
||||
* stream} is {@code null} or closed, flow control should only be applied to the connection window and the bytes are
|
||||
* immediately consumed.
|
||||
|
@ -90,8 +90,8 @@ public interface Http2RemoteFlowController extends Http2FlowController {
|
||||
/**
|
||||
* Called to indicate that an error occurred before this object could be completely written.
|
||||
* <p>
|
||||
* The {@link Http2RemoteFlowController} will make exactly one call to either {@link #error(Throwable)} or
|
||||
* {@link #writeComplete()}.
|
||||
* The {@link Http2RemoteFlowController} will make exactly one call to either
|
||||
* this method or {@link #writeComplete()}.
|
||||
* </p>
|
||||
*
|
||||
* @param ctx The context to use if any communication needs to occur as a result of the error.
|
||||
@ -103,8 +103,8 @@ public interface Http2RemoteFlowController extends Http2FlowController {
|
||||
/**
|
||||
* Called after this object has been successfully written.
|
||||
* <p>
|
||||
* The {@link Http2RemoteFlowController} will make exactly one call to either {@link #error(Throwable)} or
|
||||
* {@link #writeComplete()}.
|
||||
* The {@link Http2RemoteFlowController} will make exactly one call to either
|
||||
* this method or {@link #error(ChannelHandlerContext, Throwable)}.
|
||||
* </p>
|
||||
*/
|
||||
void writeComplete();
|
||||
@ -116,7 +116,7 @@ public interface Http2RemoteFlowController extends Http2FlowController {
|
||||
* the payload is fully written, i.e it's size after the write is 0.
|
||||
* <p>
|
||||
* When an exception is thrown the {@link Http2RemoteFlowController} will make a call to
|
||||
* {@link #error(Throwable)}.
|
||||
* {@link #error(ChannelHandlerContext, Throwable)}.
|
||||
* </p>
|
||||
*
|
||||
* @param ctx The context to use for writing.
|
||||
|
@ -296,8 +296,13 @@ public class DataCompressionHttp2Test {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
|
||||
serverConnection.remote().flowController(
|
||||
new DefaultHttp2RemoteFlowController(serverConnection));
|
||||
serverConnection.local().flowController(
|
||||
new DefaultHttp2LocalFlowController(serverConnection).frameWriter(frameWriter));
|
||||
Http2ConnectionEncoder encoder = new CompressorHttp2ConnectionEncoder(
|
||||
new DefaultHttp2ConnectionEncoder(serverConnection, new DefaultHttp2FrameWriter()));
|
||||
new DefaultHttp2ConnectionEncoder(serverConnection, frameWriter));
|
||||
Http2ConnectionDecoder decoder =
|
||||
new DefaultHttp2ConnectionDecoder(serverConnection, encoder, new DefaultHttp2FrameReader());
|
||||
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler.Builder()
|
||||
@ -314,8 +319,14 @@ public class DataCompressionHttp2Test {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
|
||||
clientConnection.remote().flowController(
|
||||
new DefaultHttp2RemoteFlowController(clientConnection));
|
||||
clientConnection.local().flowController(
|
||||
new DefaultHttp2LocalFlowController(clientConnection).frameWriter(frameWriter));
|
||||
clientEncoder = new CompressorHttp2ConnectionEncoder(
|
||||
new DefaultHttp2ConnectionEncoder(clientConnection, new DefaultHttp2FrameWriter()));
|
||||
new DefaultHttp2ConnectionEncoder(clientConnection, frameWriter));
|
||||
|
||||
Http2ConnectionDecoder decoder =
|
||||
new DefaultHttp2ConnectionDecoder(clientConnection, clientEncoder,
|
||||
new DefaultHttp2FrameReader());
|
||||
|
@ -221,6 +221,7 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
verify(localFlow)
|
||||
.receiveFlowControlledFrame(eq((Http2Stream) null), eq(data), eq(padding), eq(true));
|
||||
verify(localFlow).consumeBytes(eq((Http2Stream) null), eq(processedBytes));
|
||||
verify(localFlow).frameWriter(any(Http2FrameWriter.class));
|
||||
verifyNoMoreInteractions(localFlow);
|
||||
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
|
||||
} finally {
|
||||
@ -240,6 +241,7 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
verify(localFlow)
|
||||
.receiveFlowControlledFrame(eq((Http2Stream) null), eq(data), eq(padding), eq(true));
|
||||
verify(localFlow).consumeBytes(eq((Http2Stream) null), eq(processedBytes));
|
||||
verify(localFlow).frameWriter(any(Http2FrameWriter.class));
|
||||
verifyNoMoreInteractions(localFlow);
|
||||
|
||||
// Verify that the event was absorbed and not propagated to the observer.
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -26,14 +27,15 @@ import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import junit.framework.AssertionFailedError;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
@ -64,23 +66,16 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
|
||||
private DefaultHttp2Connection connection;
|
||||
|
||||
private static float updateRatio = 0.5f;
|
||||
|
||||
@Before
|
||||
public void setup() throws Http2Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
when(ctx.newPromise()).thenReturn(promise);
|
||||
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
|
||||
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio);
|
||||
connection.local().flowController(controller);
|
||||
|
||||
connection.local().createStream(STREAM_ID, false);
|
||||
controller.channelHandlerContext(ctx);
|
||||
when(ctx.executor()).thenReturn(executor);
|
||||
when(executor.inEventLoop()).thenReturn(true);
|
||||
|
||||
initController(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -91,17 +86,39 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
|
||||
@Test
|
||||
public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception {
|
||||
int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
|
||||
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;
|
||||
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
|
||||
|
||||
// Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent.
|
||||
assertFalse(consumeBytes(STREAM_ID, 10));
|
||||
verifyWindowUpdateNotSent(STREAM_ID);
|
||||
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
|
||||
|
||||
// Return the rest and verify the WINDOW_UPDATE is sent.
|
||||
assertTrue(consumeBytes(STREAM_ID, dataSize - 10));
|
||||
verifyWindowUpdateSent(STREAM_ID, dataSize);
|
||||
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
|
||||
verifyNoMoreInteractions(frameWriter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void connectionWindowShouldAutoRefillWhenDataReceived() throws Http2Exception {
|
||||
// Reconfigure controller to auto-refill the connection window.
|
||||
initController(true);
|
||||
|
||||
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;
|
||||
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
|
||||
// Verify that we immediately refill the connection window.
|
||||
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
|
||||
|
||||
// Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent for the stream.
|
||||
assertFalse(consumeBytes(STREAM_ID, 10));
|
||||
verifyWindowUpdateNotSent(STREAM_ID);
|
||||
|
||||
// Return the rest and verify the WINDOW_UPDATE is sent for the stream.
|
||||
assertTrue(consumeBytes(STREAM_ID, dataSize - 10));
|
||||
verifyWindowUpdateSent(STREAM_ID, dataSize);
|
||||
verifyNoMoreInteractions(frameWriter);
|
||||
}
|
||||
|
||||
@Test(expected = Http2Exception.class)
|
||||
@ -112,7 +129,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
|
||||
@Test
|
||||
public void windowUpdateShouldNotBeSentAfterEndOfStream() throws Http2Exception {
|
||||
int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
|
||||
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;
|
||||
|
||||
// Set end-of-stream on the frame, so no window update will be sent for the stream.
|
||||
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, true);
|
||||
@ -126,7 +143,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
|
||||
@Test
|
||||
public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception {
|
||||
int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
|
||||
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;
|
||||
int initialWindowSize = DEFAULT_WINDOW_SIZE;
|
||||
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
|
||||
|
||||
@ -175,7 +192,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
|
||||
|
||||
// Test that both stream and connection window are updated (or not updated) together
|
||||
int data1 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
|
||||
int data1 = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;
|
||||
receiveFlowControlledFrame(STREAM_ID, data1, 0, false);
|
||||
verifyWindowUpdateNotSent(STREAM_ID);
|
||||
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
|
||||
@ -221,6 +238,17 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void closeShouldNotConsumeConnectionWindowWhenAutoRefilled() throws Http2Exception {
|
||||
// Reconfigure controller to auto-refill the connection window.
|
||||
initController(true);
|
||||
|
||||
receiveFlowControlledFrame(STREAM_ID, 10, 0, false);
|
||||
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
|
||||
stream(STREAM_ID).close();
|
||||
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataReceivedForClosedStreamShouldImmediatelyConsumeBytes() throws Http2Exception {
|
||||
Http2Stream stream = stream(STREAM_ID);
|
||||
@ -276,7 +304,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
reset(frameWriter);
|
||||
try {
|
||||
int data1 = (int) (newDefaultWindowSize * ratio) + 1;
|
||||
int data2 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) >> 1;
|
||||
int data2 = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) >> 1;
|
||||
receiveFlowControlledFrame(STREAM_ID, data2, 0, false);
|
||||
receiveFlowControlledFrame(newStreamId, data1, 0, false);
|
||||
verifyWindowUpdateNotSent(STREAM_ID);
|
||||
@ -348,4 +376,13 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
private Http2Stream stream(int streamId) {
|
||||
return connection.stream(streamId);
|
||||
}
|
||||
|
||||
private void initController(boolean autoRefillConnectionWindow) throws Http2Exception {
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
controller = new DefaultHttp2LocalFlowController(connection,
|
||||
DEFAULT_WINDOW_UPDATE_RATIO, autoRefillConnectionWindow).frameWriter(frameWriter);
|
||||
connection.local().flowController(controller);
|
||||
connection.local().createStream(STREAM_ID, false);
|
||||
controller.channelHandlerContext(ctx);
|
||||
}
|
||||
}
|
||||
|
@ -105,6 +105,8 @@ public class StreamBufferingEncoderTest {
|
||||
.thenAnswer(successAnswer());
|
||||
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
|
||||
connection.local().flowController(new DefaultHttp2LocalFlowController(connection).frameWriter(writer));
|
||||
|
||||
DefaultHttp2ConnectionEncoder defaultEncoder =
|
||||
new DefaultHttp2ConnectionEncoder(connection, writer);
|
||||
|
@ -19,6 +19,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZ
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.Http2Exception;
|
||||
import io.netty.handler.codec.http2.Http2FrameWriter;
|
||||
import io.netty.handler.codec.http2.Http2LocalFlowController;
|
||||
import io.netty.handler.codec.http2.Http2Stream;
|
||||
|
||||
@ -68,4 +69,9 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
|
||||
@Override
|
||||
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user