Refactoring HTTP/2 Flow Control interfaces.

Motivation:

The terminology used with inbound/outbound is a little confusing since
it's not discussed in the spec. We should switch to using local/remote
instead. Also there is some asymmetry between the inbound/outbound
interfaces which could probably be cleaned up.

Modifications:

Changing the interface names and making a common Http2FlowController
interface for most of the methods.

Result:

The HTTP/2 flow control interfaces should be more clear.
This commit is contained in:
nmittler 2014-12-05 11:28:21 -08:00
parent ed09fb10bc
commit 124983afb5
26 changed files with 648 additions and 739 deletions

View File

@ -53,8 +53,8 @@ public class DefaultHttp2Connection implements Http2Connection {
private final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
private final ConnectionStream connectionStream = new ConnectionStream();
private final Set<Http2Stream> activeStreams = new LinkedHashSet<Http2Stream>();
private final DefaultEndpoint localEndpoint;
private final DefaultEndpoint remoteEndpoint;
private final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
private final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
private final Http2StreamRemovalPolicy removalPolicy;
/**
@ -78,8 +78,8 @@ public class DefaultHttp2Connection implements Http2Connection {
public DefaultHttp2Connection(boolean server, Http2StreamRemovalPolicy removalPolicy) {
this.removalPolicy = checkNotNull(removalPolicy, "removalPolicy");
localEndpoint = new DefaultEndpoint(server);
remoteEndpoint = new DefaultEndpoint(!server);
localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server);
remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server);
// Tell the removal policy how to remove a stream from this connection.
removalPolicy.setAction(new Action() {
@ -138,12 +138,12 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public Endpoint local() {
public Endpoint<Http2LocalFlowController> local() {
return localEndpoint;
}
@Override
public Endpoint remote() {
public Endpoint<Http2RemoteFlowController> remote() {
return remoteEndpoint;
}
@ -219,9 +219,6 @@ public class DefaultHttp2Connection implements Http2Connection {
private boolean resetReceived;
private boolean endOfStreamSent;
private boolean endOfStreamReceived;
private Http2FlowState inboundFlow;
private Http2FlowState outboundFlow;
private Http2FlowControlWindowManager garbageCollector;
private PropertyMap data;
DefaultStream(int id) {
@ -303,36 +300,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return data.remove(key);
}
@Override
public Http2FlowState inboundFlow() {
return inboundFlow;
}
@Override
public void inboundFlow(Http2FlowState state) {
inboundFlow = state;
}
@Override
public Http2FlowState outboundFlow() {
return outboundFlow;
}
@Override
public void outboundFlow(Http2FlowState state) {
outboundFlow = state;
}
@Override
public Http2FlowControlWindowManager garbageCollector() {
return garbageCollector;
}
@Override
public void garbageCollector(Http2FlowControlWindowManager collector) {
garbageCollector = collector;
}
@Override
public final boolean isRoot() {
return parent == null;
@ -511,7 +478,7 @@ public class DefaultHttp2Connection implements Http2Connection {
return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL;
}
final DefaultEndpoint createdBy() {
final DefaultEndpoint<? extends Http2FlowController> createdBy() {
return localEndpoint.createdStreamId(id) ? localEndpoint : remoteEndpoint;
}
@ -742,12 +709,13 @@ public class DefaultHttp2Connection implements Http2Connection {
/**
* Simple endpoint implementation.
*/
private final class DefaultEndpoint implements Endpoint {
private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
private final boolean server;
private int nextStreamId;
private int lastStreamCreated;
private int lastKnownStream = -1;
private boolean pushToAllowed = true;
private F flowController;
/**
* The maximum number of active streams allowed to be created by this endpoint.
@ -911,7 +879,17 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public Endpoint opposite() {
public F flowController() {
return flowController;
}
@Override
public void flowController(F flowController) {
this.flowController = checkNotNull(flowController, "flowController");
}
@Override
public Endpoint<? extends Http2FlowController> opposite() {
return isLocal() ? remoteEndpoint : localEndpoint;
}

View File

@ -40,7 +40,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private final Http2LifecycleManager lifecycleManager;
private final Http2ConnectionEncoder encoder;
private final Http2FrameReader frameReader;
private final Http2InboundFlowController inboundFlow;
private final Http2FrameListener listener;
private boolean prefaceReceived;
@ -52,7 +51,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private Http2LifecycleManager lifecycleManager;
private Http2ConnectionEncoder encoder;
private Http2FrameReader frameReader;
private Http2InboundFlowController inboundFlow;
private Http2FrameListener listener;
@Override
@ -72,12 +70,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
return lifecycleManager;
}
@Override
public Builder inboundFlow(Http2InboundFlowController inboundFlow) {
this.inboundFlow = inboundFlow;
return this;
}
@Override
public Builder frameReader(Http2FrameReader frameReader) {
this.frameReader = frameReader;
@ -111,8 +103,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
frameReader = checkNotNull(builder.frameReader, "frameReader");
lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
encoder = checkNotNull(builder.encoder, "encoder");
inboundFlow = checkNotNull(builder.inboundFlow, "inboundFlow");
listener = checkNotNull(builder.listener, "listener");
if (connection.local().flowController() == null) {
connection.local().flowController(
new DefaultHttp2LocalFlowController(connection, encoder.frameWriter()));
}
}
@Override
@ -120,6 +115,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
return connection;
}
@Override
public final Http2LocalFlowController flowController() {
return connection.local().flowController();
}
@Override
public Http2FrameListener listener() {
return listener;
@ -141,7 +141,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Http2FrameReader.Configuration config = frameReader.configuration();
Http2HeaderTable headerTable = config.headerTable();
Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
settings.initialWindowSize(inboundFlow.initialWindowSize());
settings.initialWindowSize(flowController().initialWindowSize());
settings.maxConcurrentStreams(connection.remote().maxStreams());
settings.headerTableSize(headerTable.maxHeaderTableSize());
settings.maxFrameSize(frameSizePolicy.maxFrameSize());
@ -189,7 +189,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) {
inboundFlow.initialWindowSize(initialWindowSize);
flowController().initialWindowSize(initialWindowSize);
}
}
@ -198,8 +198,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
frameReader.close();
}
private static int unprocessedBytes(Http2Stream stream) {
return stream.garbageCollector().unProcessedBytes();
private int unconsumedBytes(Http2Stream stream) {
return flowController().unconsumedBytes(stream);
}
/**
@ -248,13 +248,14 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
int bytesToReturn = data.readableBytes() + padding;
int unprocessedBytes = unprocessedBytes(stream);
int unconsumedBytes = unconsumedBytes(stream);
Http2LocalFlowController flowController = flowController();
try {
// If we should apply flow control, do so now.
if (shouldApplyFlowControl) {
inboundFlow.applyFlowControl(ctx, streamId, data, padding, endOfStream);
// Update the unprocessed bytes after flow control is applied.
unprocessedBytes = unprocessedBytes(stream);
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
// Update the unconsumed bytes after flow control is applied.
unconsumedBytes = unconsumedBytes(stream);
}
// If we should ignore this frame, do so now.
@ -275,20 +276,20 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// If an exception happened during delivery, the listener may have returned part
// of the bytes before the error occurred. If that's the case, subtract that from
// the total processed bytes so that we don't return too many bytes.
int delta = unprocessedBytes - unprocessedBytes(stream);
int delta = unconsumedBytes - unconsumedBytes(stream);
bytesToReturn -= delta;
throw e;
} catch (RuntimeException e) {
// If an exception happened during delivery, the listener may have returned part
// of the bytes before the error occurred. If that's the case, subtract that from
// the total processed bytes so that we don't return too many bytes.
int delta = unprocessedBytes - unprocessedBytes(stream);
int delta = unconsumedBytes - unconsumedBytes(stream);
bytesToReturn -= delta;
throw e;
} finally {
// If appropriate, returned the processed bytes to the flow controller.
if (shouldApplyFlowControl && bytesToReturn > 0) {
stream.garbageCollector().returnProcessedBytes(ctx, bytesToReturn);
flowController.consumeBytes(ctx, stream, bytesToReturn);
}
if (endOfStream) {
@ -452,7 +453,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) {
inboundFlow.initialWindowSize(initialWindowSize);
flowController().initialWindowSize(initialWindowSize);
}
}
@ -530,7 +531,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
// Update the outbound flow controller.
encoder.updateOutboundWindowSize(streamId, windowSizeIncrement);
encoder.flowController().incrementWindowSize(ctx, stream, windowSizeIncrement);
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
}

View File

@ -32,7 +32,6 @@ import java.util.ArrayDeque;
public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
private final Http2FrameWriter frameWriter;
private final Http2Connection connection;
private final Http2OutboundFlowController outboundFlow;
private final Http2LifecycleManager lifecycleManager;
// We prefer ArrayDeque to LinkedList because later will produce more GC.
// This initial capacity is plenty for SETTINGS traffic.
@ -44,7 +43,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public static class Builder implements Http2ConnectionEncoder.Builder {
protected Http2FrameWriter frameWriter;
protected Http2Connection connection;
protected Http2OutboundFlowController outboundFlow;
protected Http2LifecycleManager lifecycleManager;
@Override
@ -73,13 +71,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
return this;
}
@Override
public Builder outboundFlow(
Http2OutboundFlowController outboundFlow) {
this.outboundFlow = outboundFlow;
return this;
}
@Override
public Http2ConnectionEncoder build() {
return new DefaultHttp2ConnectionEncoder(this);
@ -91,10 +82,12 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
protected DefaultHttp2ConnectionEncoder(Builder builder) {
frameWriter = checkNotNull(builder.frameWriter, "frameWriter");
connection = checkNotNull(builder.connection, "connection");
outboundFlow = checkNotNull(builder.outboundFlow, "outboundFlow");
frameWriter = checkNotNull(builder.frameWriter, "frameWriter");
lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
if (connection.remote().flowController() == null) {
connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection, frameWriter));
}
}
@Override
@ -107,6 +100,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
return connection;
}
@Override
public final Http2RemoteFlowController flowController() {
return connection().remote().flowController();
}
@Override
public void remoteSettings(Http2Settings settings) throws Http2Exception {
Boolean pushEnabled = settings.pushEnabled();
@ -142,19 +140,20 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) {
initialOutboundWindowSize(initialWindowSize);
flowController().initialWindowSize(initialWindowSize);
}
}
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) {
Http2Stream stream;
try {
if (connection.isGoAway()) {
throw new IllegalStateException("Sending data after connection going away.");
}
Http2Stream stream = connection.requireStream(streamId);
stream = connection.requireStream(streamId);
if (stream.isResetSent()) {
throw new IllegalStateException("Sending data after sending RST_STREAM.");
}
@ -184,7 +183,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// Hand control of the frame to the flow controller.
ChannelFuture future =
outboundFlow.writeData(ctx, streamId, data, padding, endOfStream, promise);
flowController().sendFlowControlledFrame(ctx, stream, data, padding, endOfStream, promise);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -202,11 +201,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
return future;
}
@Override
public ChannelFuture lastWriteForStream(int streamId) {
return outboundFlow.lastWriteForStream(streamId);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream, ChannelPromise promise) {
@ -219,7 +213,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
final boolean exclusive, final int padding, final boolean endOfStream,
final ChannelPromise promise) {
Http2Stream stream = connection.stream(streamId);
ChannelFuture lastDataWrite = lastWriteForStream(streamId);
ChannelFuture lastDataWrite = stream != null ? flowController().lastFlowControlledFrameSent(stream) : null;
try {
if (connection.isGoAway()) {
throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away.");
@ -472,19 +466,4 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public Configuration configuration() {
return frameWriter.configuration();
}
@Override
public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception {
outboundFlow.initialOutboundWindowSize(newWindowSize);
}
@Override
public int initialOutboundWindowSize() {
return outboundFlow.initialOutboundWindowSize();
}
@Override
public void updateOutboundWindowSize(int streamId, int deltaWindowSize) throws Http2Exception {
outboundFlow.updateOutboundWindowSize(streamId, deltaWindowSize);
}
}

View File

@ -21,7 +21,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZ
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -33,9 +32,9 @@ import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
/**
* Basic implementation of {@link Http2InboundFlowController}.
* Basic implementation of {@link Http2LocalFlowController}.
*/
public class DefaultHttp2InboundFlowController implements Http2InboundFlowController {
public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
private static final int DEFAULT_COMPOSITE_EXCEPTION_SIZE = 4;
/**
* The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
@ -48,11 +47,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
private volatile float windowUpdateRatio;
private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO);
}
public DefaultHttp2InboundFlowController(Http2Connection connection,
public DefaultHttp2LocalFlowController(Http2Connection connection,
Http2FrameWriter frameWriter, float windowUpdateRatio) {
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
@ -60,17 +59,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
// Add a flow state for the connection.
final Http2Stream connectionStream = connection.connectionStream();
final FlowState connectionFlowState = new FlowState(connectionStream);
connectionStream.inboundFlow(connectionFlowState);
connectionStream.garbageCollector(connectionFlowState);
connectionStream.setProperty(FlowState.class, new FlowState(connectionStream));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void streamAdded(Http2Stream stream) {
final FlowState flowState = new FlowState(stream);
stream.inboundFlow(flowState);
stream.garbageCollector(flowState);
stream.setProperty(FlowState.class, new FlowState(stream));
}
});
}
@ -99,27 +94,35 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
}
}
@Override
public void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize)
throws Http2Exception {
checkNotNull(ctx, "ctx");
if (newWindowSize < MIN_INITIAL_WINDOW_SIZE || newWindowSize > MAX_INITIAL_WINDOW_SIZE) {
throw new IllegalArgumentException("Invalid newWindowSize: " + newWindowSize);
}
FlowState state = stateOrFail(streamId);
state.initialStreamWindowSize(newWindowSize);
state.writeWindowUpdateIfNeeded(ctx);
}
@Override
public int initialWindowSize() {
return initialWindowSize;
}
@Override
public int initialStreamWindowSize(int streamId) throws Http2Exception {
return stateOrFail(streamId).initialStreamWindowSize();
public int windowSize(Http2Stream stream) {
return state(stream).window();
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
checkNotNull(ctx, "ctx");
FlowState state = state(stream);
// Just add the delta to the stream-specific initial window size so that the next time the window
// expands it will grow to the new initial size.
state.incrementInitialStreamWindow(delta);
state.writeWindowUpdateIfNeeded(ctx);
}
@Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception {
state(stream).consumeBytes(ctx, numBytes);
}
@Override
public int unconsumedBytes(Http2Stream stream) {
return state(stream).unconsumedBytes();
}
private static void checkValidRatio(float ratio) {
@ -159,14 +162,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
* be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
* was generated by this method before the initial {@code SETTINGS} frame is sent.
* @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary.
* @param streamId the stream for which {@code ratio} applies to.
* @param stream the stream for which {@code ratio} applies to.
* @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
* @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
*/
public void windowUpdateRatio(ChannelHandlerContext ctx, int streamId, float ratio) throws Http2Exception {
checkNotNull(ctx, "ctx");
public void windowUpdateRatio(ChannelHandlerContext ctx, Http2Stream stream, float ratio) throws Http2Exception {
checkValidRatio(ratio);
FlowState state = stateOrFail(streamId);
FlowState state = state(stream);
state.windowUpdateRatio(ratio);
state.writeWindowUpdateIfNeeded(ctx);
}
@ -177,52 +179,37 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
* be sent. This window update ratio will only be applied to {@code streamId}.
* @throws Http2Exception If no stream corresponding to {@code stream} could be found.
*/
public float windowUpdateRatio(int streamId) throws Http2Exception {
return stateOrFail(streamId).windowUpdateRatio();
public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
return state(stream).windowUpdateRatio();
}
@Override
public void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data,
public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data,
int padding, boolean endOfStream) throws Http2Exception {
int dataLength = data.readableBytes() + padding;
// Apply the connection-level flow control
connectionState().applyFlowControl(dataLength);
connectionState().receiveFlowControlledFrame(dataLength);
// Apply the stream-level flow control
FlowState state = stateOrFail(streamId);
FlowState state = state(stream);
state.endOfStream(endOfStream);
state.applyFlowControl(dataLength);
state.receiveFlowControlledFrame(dataLength);
}
private FlowState connectionState() {
return state(connection.connectionStream());
}
private FlowState state(int streamId) {
Http2Stream stream = connection.stream(streamId);
return stream != null ? state(stream) : null;
}
private FlowState state(Http2Stream stream) {
return (FlowState) stream.inboundFlow();
}
/**
* Gets the window for the stream or raises a {@code PROTOCOL_ERROR} if not found.
*/
private FlowState stateOrFail(int streamId) throws Http2Exception {
FlowState state = state(streamId);
if (state == null) {
throw connectionError(PROTOCOL_ERROR, "Flow control window missing for stream: %d", streamId);
}
return state;
checkNotNull(stream, "stream");
return stream.getProperty(FlowState.class);
}
/**
* Flow control window state for an individual stream.
*/
private final class FlowState implements Http2FlowState, Http2FlowControlWindowManager {
private final class FlowState {
private final Http2Stream stream;
/**
@ -260,8 +247,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
streamWindowUpdateRatio = windowUpdateRatio;
}
@Override
public int window() {
int window() {
return window;
}
@ -277,14 +263,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
streamWindowUpdateRatio = ratio;
}
int initialStreamWindowSize() {
return initialStreamWindowSize;
}
void initialStreamWindowSize(int initialWindowSize) {
initialStreamWindowSize = initialWindowSize;
}
/**
* Increment the initial window size for this stream.
* @param delta The amount to increase the initial window size by.
@ -319,7 +297,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
* @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control.
* @throws Http2Exception If too much data is used relative to how much is available.
*/
void applyFlowControl(int dataLength) throws Http2Exception {
void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
assert dataLength > 0;
// Apply the delta. Even if we throw an exception we want to have taken this delta into account.
@ -347,12 +325,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
processedWindow -= delta;
}
@Override
public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
if (stream.id() == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
checkNotNull(ctx, "ctx");
if (numBytes <= 0) {
throw new IllegalArgumentException("numBytes must be positive");
}
@ -367,16 +343,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
writeWindowUpdateIfNeeded(ctx);
}
@Override
public int unProcessedBytes() {
public int unconsumedBytes() {
return processedWindow - window;
}
@Override
public Http2Stream stream() {
return stream;
}
/**
* Updates the flow control window for this stream if it is appropriate.
*/

View File

@ -17,11 +17,9 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
@ -38,9 +36,9 @@ import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic implementation of {@link Http2OutboundFlowController}.
* Basic implementation of {@link Http2RemoteFlowController}.
*/
public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController {
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
/**
* A {@link Comparator} that sorts streams in ascending order the amount of streamable data.
@ -58,19 +56,19 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private ChannelHandlerContext ctx;
private boolean frameSent;
public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
public DefaultHttp2RemoteFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
// Add a flow state for the connection.
connection.connectionStream().outboundFlow(new OutboundFlowState(connection.connectionStream()));
connection.connectionStream().setProperty(FlowState.class, new FlowState(connection.connectionStream()));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void streamAdded(Http2Stream stream) {
// Just add a new flow state to the stream.
stream.outboundFlow(new OutboundFlowState(stream));
stream.setProperty(FlowState.class, new FlowState(stream));
}
@Override
@ -108,7 +106,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
@Override
public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception {
public void initialWindowSize(int newWindowSize) throws Http2Exception {
if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
}
@ -117,8 +115,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
initialWindowSize = newWindowSize;
for (Http2Stream stream : connection.activeStreams()) {
// Verify that the maximum value is not exceeded by this change.
OutboundFlowState state = state(stream);
state.incrementStreamWindow(delta);
state(stream).incrementStreamWindow(delta);
}
if (delta > 0) {
@ -128,19 +125,24 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
@Override
public int initialOutboundWindowSize() {
public int initialWindowSize() {
return initialWindowSize;
}
@Override
public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception {
if (streamId == CONNECTION_STREAM_ID) {
public int windowSize(Http2Stream stream) {
return state(stream).window();
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
if (stream.id() == CONNECTION_STREAM_ID) {
// Update the connection window and write any pending frames for all streams.
connectionState().incrementStreamWindow(delta);
writePendingBytes();
} else {
// Update the stream window and write any pending frames for the stream.
OutboundFlowState state = stateOrFail(streamId);
FlowState state = state(stream);
state.incrementStreamWindow(delta);
frameSent = false;
state.writeBytes(state.writableWindow());
@ -151,8 +153,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endStream, ChannelPromise promise) {
public ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
checkNotNull(ctx, "ctx");
checkNotNull(promise, "promise");
checkNotNull(data, "data");
@ -162,20 +164,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
if (padding < 0) {
throw new IllegalArgumentException("padding must be >= 0");
}
if (streamId <= 0) {
throw new IllegalArgumentException("streamId must be >= 0");
}
// Save the context. We'll use this later when we write pending bytes.
this.ctx = ctx;
try {
OutboundFlowState state = stateOrFail(streamId);
FlowState state = state(stream);
int window = state.writableWindow();
boolean framesAlreadyQueued = state.hasFrame();
OutboundFlowState.Frame frame = state.newFrame(promise, data, padding, endStream);
FlowState.Frame frame = state.newFrame(promise, data, padding, endStream);
if (!framesAlreadyQueued && window >= frame.size()) {
// Window size is large enough to send entire data frame
frame.write();
@ -194,41 +193,33 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Create and send a partial frame up to the window size.
frame.split(window).write();
ctx.flush();
} catch (Http2Exception e) {
} catch (Throwable e) {
promise.setFailure(e);
}
return promise;
}
@Override
public ChannelFuture lastWriteForStream(int streamId) {
OutboundFlowState state = state(streamId);
public ChannelFuture lastFlowControlledFrameSent(Http2Stream stream) {
FlowState state = state(stream);
return state != null ? state.lastNewFrame() : null;
}
private static OutboundFlowState state(Http2Stream stream) {
return (OutboundFlowState) stream.outboundFlow();
}
private OutboundFlowState connectionState() {
return state(connection.connectionStream());
}
private OutboundFlowState state(int streamId) {
Http2Stream stream = connection.stream(streamId);
return stream != null ? state(stream) : null;
}
/**
* Attempts to get the {@link OutboundFlowState} for the given stream. If not available, raises a
* {@code PROTOCOL_ERROR}.
* For testing purposes only. Exposes the number of streamable bytes for the tree rooted at
* the given stream.
*/
private OutboundFlowState stateOrFail(int streamId) throws Http2Exception {
OutboundFlowState state = state(streamId);
if (state == null) {
throw connectionError(PROTOCOL_ERROR, "Missing flow control window for stream: %d", streamId);
int streamableBytesForTree(Http2Stream stream) {
return state(stream).streamableBytesForTree();
}
return state;
private static FlowState state(Http2Stream stream) {
checkNotNull(stream, "stream");
return stream.getProperty(FlowState.class);
}
private FlowState connectionState() {
return state(connection.connectionStream());
}
/**
@ -273,7 +264,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* @return An object summarizing the write and allocation results.
*/
private int writeChildren(Http2Stream parent, int connectionWindow) {
OutboundFlowState state = state(parent);
FlowState state = state(parent);
if (state.streamableBytesForTree() <= 0) {
return 0;
}
@ -353,7 +344,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* Write bytes allocated to {@code state}
*/
private static void writeChildNode(OutboundFlowState state) {
private static void writeChildNode(FlowState state) {
state.writeBytes(state.allocated());
state.resetAllocated();
}
@ -361,7 +352,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* The outbound flow control state for a single stream.
*/
final class OutboundFlowState implements Http2FlowState {
final class FlowState {
private final Queue<Frame> pendingWriteQueue;
private final Http2Stream stream;
private int window = initialWindowSize;
@ -370,12 +361,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private int allocated;
private ChannelFuture lastNewFrame;
private OutboundFlowState(Http2Stream stream) {
private FlowState(Http2Stream stream) {
this.stream = stream;
pendingWriteQueue = new ArrayDeque<Frame>(2);
}
@Override
public int window() {
return window;
}

View File

@ -14,7 +14,6 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
@ -22,7 +21,9 @@ import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -40,7 +41,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override
public void streamRemoved(Http2Stream stream) {
final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class);
final Http2Decompressor decompressor = decompressor(stream);
if (decompressor != null) {
cleanup(stream, decompressor);
}
@ -49,6 +50,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
private final Http2Connection connection;
private final boolean strict;
private boolean flowControllerInitialized;
public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
this(connection, listener, true);
@ -67,8 +69,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
final Http2Stream stream = connection.stream(streamId);
final Http2Decompressor decompressor = stream == null ? null :
(Http2Decompressor) stream.getProperty(Http2Decompressor.class);
final Http2Decompressor decompressor = decompressor(stream);
if (decompressor == null) {
// The decompressor may be null if no compatible encoding type was found in this stream's headers
return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
@ -203,7 +204,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
return;
}
Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class);
Http2Decompressor decompressor = decompressor(stream);
if (decompressor == null && !endOfStream) {
// Determine the content encoding.
AsciiString contentEncoding = headers.get(CONTENT_ENCODING);
@ -214,7 +215,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
if (channel != null) {
decompressor = new Http2Decompressor(channel);
stream.setProperty(Http2Decompressor.class, decompressor);
stream.garbageCollector(new DecompressorGarbageCollector(stream.garbageCollector()));
// Decode the content and remove or replace the existing headers
// so that the message looks like a decoded message.
AsciiString targetContentEncoding = getTargetContentEncoding(contentEncoding);
@ -231,8 +231,19 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
// this content-length will not be correct. Instead of queuing messages or delaying sending
// header frames...just remove the content-length header
headers.remove(CONTENT_LENGTH);
// The first time that we initialize a decompressor, decorate the local flow controller to
// properly convert consumed bytes.
if (!flowControllerInitialized) {
flowControllerInitialized = true;
connection.local().flowController(new ConsumedBytesConverter(connection.local().flowController()));
}
}
}
private static Http2Decompressor decompressor(Http2Stream stream) {
return (Http2Decompressor) (stream == null? null : stream.getProperty(Http2Decompressor.class));
}
/**
* Release remaining content from the {@link EmbeddedChannel} and remove the decompressor
@ -253,10 +264,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
}
}
decompressor = stream.removeProperty(Http2Decompressor.class);
if (decompressor != null) {
DecompressorGarbageCollector gc = (DecompressorGarbageCollector) stream.garbageCollector();
stream.garbageCollector(gc.original());
}
}
/**
@ -281,47 +288,72 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
}
/**
* Garbage collector which translates post-decompression amounts the application knows about
* to pre-decompression amounts that flow control knows about.
* A decorator around the local flow controller that converts consumed bytes from uncompressed to compressed.
*/
private static final class DecompressorGarbageCollector implements Http2FlowControlWindowManager {
private final Http2FlowControlWindowManager original;
private final class ConsumedBytesConverter implements Http2LocalFlowController {
private final Http2LocalFlowController flowController;
DecompressorGarbageCollector(Http2FlowControlWindowManager original) {
this.original = original;
ConsumedBytesConverter(Http2LocalFlowController flowController) {
this.flowController = checkNotNull(flowController, "flowController");
}
@Override
public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
final Http2Stream stream = stream();
final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class);
public void initialWindowSize(int newWindowSize) throws Http2Exception {
flowController.initialWindowSize(newWindowSize);
}
// Make a copy before hand in case any exceptions occur we will roll back the state
Http2Decompressor copy = new Http2Decompressor(decompressor);
@Override
public int initialWindowSize() {
return flowController.initialWindowSize();
}
@Override
public int windowSize(Http2Stream stream) {
return flowController.windowSize(stream);
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {
flowController.incrementWindowSize(ctx, stream, delta);
}
@Override
public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
}
@Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception {
Http2Decompressor decompressor = decompressor(stream);
Http2Decompressor copy = null;
try {
original.returnProcessedBytes(ctx, decompressor.consumeProcessedBytes(numBytes));
if (decompressor != null) {
// Make a copy before hand in case any exceptions occur we will roll back the state
copy = new Http2Decompressor(decompressor);
// Convert the uncompressed consumed bytes to compressed (on the wire) bytes.
numBytes = decompressor.consumeProcessedBytes(numBytes);
}
flowController.consumeBytes(ctx, stream, numBytes);
} catch (Http2Exception e) {
if (copy != null) {
stream.setProperty(Http2Decompressor.class, copy);
}
throw e;
} catch (Throwable t) {
if (copy != null) {
stream.setProperty(Http2Decompressor.class, copy);
}
throw new Http2Exception(INTERNAL_ERROR,
"Error while returning bytes to flow control window", t);
}
}
Http2FlowControlWindowManager original() {
return original;
}
@Override
public int unProcessedBytes() {
return original.unProcessedBytes();
}
@Override
public Http2Stream stream() {
return original.stream();
public int unconsumedBytes(Http2Stream stream) {
return flowController.unconsumedBytes(stream);
}
}

View File

@ -91,7 +91,7 @@ public interface Http2Connection {
/**
* A view of the connection from one endpoint (local or remote).
*/
interface Endpoint {
interface Endpoint<F extends Http2FlowController> {
/**
* Returns the next valid streamId for this endpoint. If negative, the stream IDs are
@ -186,10 +186,20 @@ public interface Http2Connection {
*/
int lastKnownStream();
/**
* Gets the flow controller for this endpoint.
*/
F flowController();
/**
* Sets the flow controller for this endpoint.
*/
void flowController(F flowController);
/**
* Gets the {@link Endpoint} opposite this one.
*/
Endpoint opposite();
Endpoint<? extends Http2FlowController> opposite();
}
/**
@ -237,7 +247,7 @@ public interface Http2Connection {
/**
* Gets a view of this connection from the local {@link Endpoint}.
*/
Endpoint local();
Endpoint<Http2LocalFlowController> local();
/**
* Creates a new stream initiated by the local endpoint. See {@link Endpoint#createStream(int, boolean)}.
@ -247,7 +257,7 @@ public interface Http2Connection {
/**
* Gets a view of this connection from the remote {@link Endpoint}.
*/
Endpoint remote();
Endpoint<Http2RemoteFlowController> remote();
/**
* Creates a new stream initiated by the remote endpoint. See {@link Endpoint#createStream(int, boolean)}.

View File

@ -48,11 +48,6 @@ public interface Http2ConnectionDecoder extends Closeable {
*/
Http2LifecycleManager lifecycleManager();
/**
* Sets the {@link Http2InboundFlowController} to be used when building the decoder.
*/
Builder inboundFlow(Http2InboundFlowController inboundFlow);
/**
* Sets the {@link Http2FrameReader} to be used when building the decoder.
*/
@ -79,6 +74,11 @@ public interface Http2ConnectionDecoder extends Closeable {
*/
Http2Connection connection();
/**
* Provides the local flow controller for managing inbound traffic.
*/
Http2LocalFlowController flowController();
/**
* Provides direct access to the underlying frame listener.
*/

View File

@ -23,7 +23,7 @@ import io.netty.channel.ChannelPromise;
/**
* Handler for outbound HTTP/2 traffic.
*/
public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundFlowController {
public interface Http2ConnectionEncoder extends Http2FrameWriter {
/**
* Builder for new instances of {@link Http2ConnectionEncoder}.
@ -50,11 +50,6 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF
*/
Builder frameWriter(Http2FrameWriter frameWriter);
/**
* Sets the {@link Http2OutboundFlowController} to be used when building the encoder.
*/
Builder outboundFlow(Http2OutboundFlowController outboundFlow);
/**
* Creates a new encoder instance.
*/
@ -66,6 +61,11 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF
*/
Http2Connection connection();
/**
* Provides the remote flow controller for managing outbound traffic.
*/
Http2RemoteFlowController flowController();
/**
* Provides direct access to the underlying frame writer object.
*/

View File

@ -60,18 +60,10 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameListener listener) {
this(connection, frameReader, frameWriter, new DefaultHttp2InboundFlowController(
connection, frameWriter), new DefaultHttp2OutboundFlowController(connection,
frameWriter), listener);
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow, Http2FrameListener listener) {
this(DefaultHttp2ConnectionDecoder.newBuilder().connection(connection)
.frameReader(frameReader).inboundFlow(inboundFlow).listener(listener),
.frameReader(frameReader).listener(listener),
DefaultHttp2ConnectionEncoder.newBuilder().connection(connection)
.frameWriter(frameWriter).outboundFlow(outboundFlow));
.frameWriter(frameWriter));
}
/**

View File

@ -1,44 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandlerContext;
/**
* Allows data to be returned to the flow control window.
*/
public interface Http2FlowControlWindowManager {
/**
* Used by applications that participate in application-level inbound flow control. Allows the
* application to return a number of bytes that has been processed and thereby enabling the
* {@link Http2InboundFlowController} to send a {@code WINDOW_UPDATE} to restore at least part
* of the flow control window.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if
* appropriate
* @param numBytes the number of bytes to be returned to the flow control window.
*/
void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception;
/**
* The number of bytes that are outstanding and have not yet been returned to the flow controller.
*/
int unProcessedBytes();
/**
* Get the stream that is being managed
*/
Http2Stream stream();
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandlerContext;
/**
* Base interface for all HTTP/2 flow controllers.
*/
public interface Http2FlowController {
/**
* Sets the initial flow control window and updates all stream windows (but not the connection
* window) by the delta.
* <p>
* This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an
* {@code SETTINGS} frame.
*
* @param newWindowSize the new initial window size.
* @throws Http2Exception thrown if any protocol-related error occurred.
*/
void initialWindowSize(int newWindowSize) throws Http2Exception;
/**
* Gets the initial flow control window size that is used as the basis for new stream flow
* control windows.
*/
int initialWindowSize();
/**
* Gets the number of bytes remaining in the flow control window size for the given stream.
*
* @param stream The subject stream. Use {@link Http2Connection#connectionStream()} for
* requesting the size of the connection window.
* @return the current size of the flow control window.
* @throws IllegalArgumentException if the given stream does not exist.
*/
int windowSize(Http2Stream stream);
/**
* Increments the size of the stream's flow control window by the given delta.
* <p>
* In the case of a {@link Http2RemoteFlowController} this is called upon receipt of a
* {@code WINDOW_UPDATE} frame from the remote endpoint to mirror the changes to the window
* size.
* <p>
* For a {@link Http2LocalFlowController} this can be called to request the expansion of the
* window size published by this endpoint. It is up to the implementation, however, as to when a
* {@code WINDOW_UPDATE} is actually sent.
*
* @param ctx The context for the calling handler
* @param stream The subject stream. Use {@link Http2Connection#connectionStream()} for
* requesting the size of the connection window.
* @param delta the change in size of the flow control window.
* @throws Http2Exception thrown if a protocol-related error occurred.
*/
void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception;
}

View File

@ -1,27 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* Base interface for flow-control state for a particular stream.
*/
public interface Http2FlowState {
/**
* Returns the current remaining flow control window (in bytes) for the stream. Depending on the
* flow control implementation, this value may be negative.
*/
int window();
}

View File

@ -1,77 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* Controls the inbound flow of data frames from the remote endpoint.
*/
public interface Http2InboundFlowController {
/**
* Applies inbound flow control to the given {@code DATA} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param data payload buffer for the frame.
* @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
* endpoint for this stream.
*/
void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
/**
* Sets the global inbound flow control window size and updates all stream window sizes by the delta.
* <p>
* This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an
* outbound {@code SETTINGS} frame.
* <p>
* The connection stream windows will not be modified as a result of this call.
* @param newWindowSize the new initial window size.
* @throws Http2Exception thrown if any protocol-related error occurred.
*/
void initialWindowSize(int newWindowSize) throws Http2Exception;
/**
* Gets the initial window size used as the basis for new stream flow control windows.
*/
int initialWindowSize();
/**
* Sets the initial inbound flow control window size for a specific stream.
* <p>
* Note it is the responsibly of the caller to ensure that the the
* initial {@code SETTINGS} frame is sent before this is called. It would
* be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
* was generated by this method before the initial {@code SETTINGS} frame is sent.
* @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary.
* @param streamId The stream to update.
* @param newWindowSize the window size to apply to {@code streamId}
* @throws Http2Exception thrown if any protocol-related error occurred.
*/
void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize) throws Http2Exception;
/**
* Obtain the initial window size for a specific stream.
* @param streamId The stream id to get the initial window size for.
* @return The initial window size for {@code streamId}.
* @throws Http2Exception If no stream corresponding to {@code stream} could be found.
*/
int initialStreamWindowSize(int streamId) throws Http2Exception;
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* A {@link Http2FlowController} for controlling the inbound flow of {@code DATA} frames from the remote
* endpoint.
*/
public interface Http2LocalFlowController extends Http2FlowController {
/**
* Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control
* policies to it for both the {@code stream} as well as the connection. If any flow control
* policies have been violated, an exception is raised immediately, otherwise the frame is
* considered to have "passed" flow control.
*
* @param ctx the context from the handler where the frame was read.
* @param stream the subject stream for the received frame. The connection stream object must
* not be used.
* @param data payload buffer for the frame.
* @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
* endpoint for this stream.
* @throws Http2Exception if any flow control errors are encountered.
*/
void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
/**
* Indicates that the application has consumed a number of bytes for the given stream and is
* therefore ready to receive more data from the remote endpoint. The application must consume
* any bytes that it receives or the flow control window will collapse. Consuming bytes enables
* the flow controller to send {@code WINDOW_UPDATE} to restore a portion of the flow control
* window for the stream.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if
* appropriate
* @param stream the stream for which window space should be freed. The connection stream object
* must not be used.
* @param numBytes the number of bytes to be returned to the flow control window.
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes()}
* for the stream.
*/
void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception;
/**
* The number of bytes for the given stream that have been received but not yet consumed by the
* application.
*
* @param stream the stream for which window space should be freed.
* @return the number of unconsumed bytes for the stream.
*/
int unconsumedBytes(Http2Stream stream);
}

View File

@ -1,71 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* Controls the outbound flow of data frames to the remote endpoint.
*/
public interface Http2OutboundFlowController extends Http2DataWriter {
/**
* Controls the flow-controlled writing of a DATA frame to the remote endpoint. There is no
* guarantee when the data will be written or whether it will be split into multiple frames
* before sending. The returned future will only be completed once all of the data has been
* successfully written to the remote endpoint.
* <p>
* Manually flushing the {@link ChannelHandlerContext} is not required, since the flow
* controller will flush as appropriate.
*/
@Override
ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endStream, ChannelPromise promise);
/**
* Returns the {@link ChannelFuture} for the most recent write for the given
* stream. If no previous write for the stream has occurred, returns {@code null}.
*/
ChannelFuture lastWriteForStream(int streamId);
/**
* Sets the initial size of the connection's outbound flow control window. The outbound flow
* control windows for all streams are updated by the delta in the initial window size. This is
* called as part of the processing of a SETTINGS frame received from the remote endpoint.
*
* @param newWindowSize the new initial window size.
*/
void initialOutboundWindowSize(int newWindowSize) throws Http2Exception;
/**
* Gets the initial size of the connection's outbound flow control window.
*/
int initialOutboundWindowSize();
/**
* Updates the size of the stream's outbound flow control window. This is called upon receiving
* a WINDOW_UPDATE frame from the remote endpoint.
*
* @param streamId the ID of the stream, or zero if the window is for the entire connection.
* @param deltaWindowSize the change in size of the outbound flow control window.
* @throws Http2Exception thrown if a protocol-related error occurred.
*/
void updateOutboundWindowSize(int streamId, int deltaWindowSize) throws Http2Exception;
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* A {@link Http2FlowController} for controlling the flow of outbound {@code DATA} frames to the remote
* endpoint.
*/
public interface Http2RemoteFlowController extends Http2FlowController {
/**
* Writes or queues a {@code DATA} frame for transmission to the remote endpoint. There is no
* guarantee when the data will be written or whether it will be split into multiple frames
* before sending. The returned future will only be completed once all of the data has been
* successfully written to the remote endpoint.
* <p>
* Manually flushing the {@link ChannelHandlerContext} is not required, since the flow
* controller will flush as appropriate.
*
* @param ctx the context from the handler.
* @param stream the subject stream. Must not be the connection stream object.
* @param data payload buffer for the frame.
* @param padding the number of padding bytes to be added at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent to the remote endpoint
* for this stream.
* @param promise the promise to be completed when the data has been successfully written or a
* failure occurs.
* @return a future that is completed when the frame is sent to the remote endpoint.
*/
ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
ByteBuf data, int padding, boolean endStream, ChannelPromise promise);
/**
* Gets the {@link ChannelFuture} for the most recent frame that was sent for the given stream
* via a call to {@link #sendFlowControlledFrame()}. This is useful for cases such as ensuring
* that {@code HEADERS} frames maintain send order with {@code DATA} frames.
*
* @param stream the subject stream. Must not be the connection stream object.
* @return the most recent sent frame, or {@code null} if no frame has been sent for the stream.
*/
ChannelFuture lastFlowControlledFrameSent(Http2Stream stream);
}

View File

@ -147,36 +147,6 @@ public interface Http2Stream {
*/
<V> V removeProperty(Object key);
/**
* Gets the in-bound flow control state for this stream.
*/
Http2FlowState inboundFlow();
/**
* Sets the in-bound flow control state for this stream.
*/
void inboundFlow(Http2FlowState state);
/**
* Gets the out-bound flow control window for this stream.
*/
Http2FlowState outboundFlow();
/**
* Sets the out-bound flow control window for this stream.
*/
void outboundFlow(Http2FlowState state);
/**
* Gets the interface which allows bytes to be returned to the flow controller
*/
Http2FlowControlWindowManager garbageCollector();
/**
* Sets the interface which allows bytes to be returned to the flow controller
*/
void garbageCollector(Http2FlowControlWindowManager collector);
/**
* Updates an priority for this stream. Calling this method may affect the straucture of the
* priority tree.

View File

@ -40,10 +40,9 @@ public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler {
super(connection, frameReader, frameWriter, listener);
}
public HttpToHttp2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow, Http2FrameListener listener) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, listener);
public HttpToHttp2ConnectionHandler(Http2ConnectionDecoder.Builder decoderBuilder,
Http2ConnectionEncoder.Builder encoderBuilder) {
super(decoderBuilder, encoderBuilder);
}
/**

View File

@ -153,7 +153,7 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
assertEquals(0, stream.garbageCollector().unProcessedBytes());
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream));
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
@ -182,7 +182,7 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
assertEquals(0, stream.garbageCollector().unProcessedBytes());
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream));
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
@ -214,7 +214,7 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
assertEquals(0, stream.garbageCollector().unProcessedBytes());
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream));
assertEquals(new StringBuilder(text1).append(text2).toString(),
serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
@ -247,7 +247,7 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
assertEquals(0, stream.garbageCollector().unProcessedBytes());
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream));
assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8),
serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
@ -294,14 +294,15 @@ public class DataCompressionHttp2Test {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
Http2FrameWriter writer = new DefaultHttp2FrameWriter();
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(
new DefaultHttp2ConnectionDecoder.Builder()
Http2ConnectionHandler connectionHandler =
new Http2ConnectionHandler(new DefaultHttp2ConnectionDecoder.Builder()
.connection(serverConnection)
.frameReader(new DefaultHttp2FrameReader())
.inboundFlow(new DefaultHttp2InboundFlowController(serverConnection, writer))
.listener(new DelegatingDecompressorFrameListener(serverConnection, serverListener)),
new CompressorHttp2ConnectionEncoder.Builder().connection(serverConnection).frameWriter(writer)
.outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer)));
.listener(
new DelegatingDecompressorFrameListener(serverConnection,
serverListener)),
new CompressorHttp2ConnectionEncoder.Builder().connection(
serverConnection).frameWriter(writer));
p.addLast(connectionHandler);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
serverChannelLatch.countDown();
@ -316,14 +317,15 @@ public class DataCompressionHttp2Test {
ChannelPipeline p = ch.pipeline();
FrameCountDown clientFrameCountDown = new FrameCountDown(clientListener, clientLatch);
Http2FrameWriter writer = new DefaultHttp2FrameWriter();
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(
new DefaultHttp2ConnectionDecoder.Builder()
Http2ConnectionHandler connectionHandler =
new Http2ConnectionHandler(new DefaultHttp2ConnectionDecoder.Builder()
.connection(clientConnection)
.frameReader(new DefaultHttp2FrameReader())
.inboundFlow(new DefaultHttp2InboundFlowController(clientConnection, writer))
.listener(new DelegatingDecompressorFrameListener(clientConnection, clientFrameCountDown)),
new CompressorHttp2ConnectionEncoder.Builder().connection(clientConnection).frameWriter(writer)
.outboundFlow(new DefaultHttp2OutboundFlowController(clientConnection, writer)));
.listener(
new DelegatingDecompressorFrameListener(clientConnection,
clientFrameCountDown)),
new CompressorHttp2ConnectionEncoder.Builder().connection(
clientConnection).frameWriter(writer));
clientEncoder = connectionHandler.encoder();
p.addLast(connectionHandler);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());

View File

@ -70,13 +70,16 @@ public class DefaultHttp2ConnectionDecoderTest {
private Http2Connection connection;
@Mock
private Http2Connection.Endpoint remote;
private Http2Connection.Endpoint<Http2RemoteFlowController> remote;
@Mock
private Http2Connection.Endpoint local;
private Http2Connection.Endpoint<Http2LocalFlowController> local;
@Mock
private Http2InboundFlowController inboundFlow;
private Http2LocalFlowController localFlow;
@Mock
private Http2RemoteFlowController remoteFlow;
@Mock
private ChannelHandlerContext ctx;
@ -104,9 +107,6 @@ public class DefaultHttp2ConnectionDecoderTest {
@Mock
private Http2ConnectionEncoder encoder;
@Mock
private Http2FlowControlWindowManager inFlowState;
@Mock
private Http2LifecycleManager lifecycleManager;
@ -119,12 +119,13 @@ public class DefaultHttp2ConnectionDecoderTest {
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
when(stream.state()).thenReturn(OPEN);
when(stream.garbageCollector()).thenReturn(inFlowState);
when(pushStream.id()).thenReturn(PUSH_STREAM_ID);
when(connection.activeStreams()).thenReturn(Collections.singletonList(stream));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local);
when(local.flowController()).thenReturn(localFlow);
when(encoder.flowController()).thenReturn(remoteFlow);
when(connection.remote()).thenReturn(remote);
doAnswer(new Answer<Http2Stream>() {
@Override
@ -151,7 +152,7 @@ public class DefaultHttp2ConnectionDecoderTest {
when(ctx.write(any())).thenReturn(future);
decoder = DefaultHttp2ConnectionDecoder.newBuilder().connection(connection)
.frameReader(reader).inboundFlow(inboundFlow).encoder(encoder)
.frameReader(reader).encoder(encoder)
.listener(listener).lifecycleManager(lifecycleManager).build();
// Simulate receiving the initial settings from the remote endpoint.
@ -173,8 +174,8 @@ public class DefaultHttp2ConnectionDecoderTest {
mockFlowControl(processedBytes);
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
verify(inFlowState).returnProcessedBytes(eq(ctx), eq(processedBytes));
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(ctx), eq(stream), eq(processedBytes));
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
@ -203,7 +204,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow, never()).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(localFlow, never()).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
@ -218,7 +219,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
@ -230,7 +231,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
} finally {
@ -248,23 +249,23 @@ public class DefaultHttp2ConnectionDecoderTest {
public Integer answer(InvocationOnMock in) throws Throwable {
return unprocessed.get();
}
}).when(inFlowState).unProcessedBytes();
}).when(localFlow).unconsumedBytes(eq(stream));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
int delta = (Integer) in.getArguments()[1];
int delta = (Integer) in.getArguments()[2];
int newValue = unprocessed.addAndGet(-delta);
if (newValue < 0) {
throw new RuntimeException("Returned too many bytes");
}
return null;
}
}).when(inFlowState).returnProcessedBytes(eq(ctx), anyInt());
}).when(localFlow).consumeBytes(eq(ctx), eq(stream), anyInt());
// When the listener callback is called, process a few bytes and then throw.
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock in) throws Throwable {
inFlowState.returnProcessedBytes(ctx, 4);
localFlow.consumeBytes(ctx, stream, 4);
throw new RuntimeException("Fake Exception");
}
}).when(listener).onDataRead(eq(ctx), eq(STREAM_ID), any(ByteBuf.class), eq(10), eq(true));
@ -272,11 +273,11 @@ public class DefaultHttp2ConnectionDecoderTest {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
fail("Expected exception");
} catch (RuntimeException cause) {
verify(inboundFlow)
.applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
verify(localFlow)
.receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
assertEquals(0, inFlowState.unProcessedBytes());
assertEquals(0, localFlow.unconsumedBytes(stream));
} finally {
data.release();
}
@ -365,7 +366,7 @@ public class DefaultHttp2ConnectionDecoderTest {
public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.goAwaySent()).thenReturn(true);
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(encoder, never()).updateOutboundWindowSize(anyInt(), anyInt());
verify(remoteFlow, never()).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt());
verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@ -378,7 +379,7 @@ public class DefaultHttp2ConnectionDecoderTest {
@Test
public void windowUpdateReadShouldSucceed() throws Exception {
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(encoder).updateOutboundWindowSize(eq(STREAM_ID), eq(10));
verify(remoteFlow).incrementWindowSize(eq(ctx), eq(stream), eq(10));
verify(listener).onWindowUpdateRead(eq(ctx), eq(STREAM_ID), eq(10));
}

View File

@ -67,13 +67,13 @@ public class DefaultHttp2ConnectionEncoderTest {
private Http2Connection connection;
@Mock
private Http2Connection.Endpoint remote;
private Http2Connection.Endpoint<Http2RemoteFlowController> remote;
@Mock
private Http2Connection.Endpoint local;
private Http2Connection.Endpoint<Http2LocalFlowController> local;
@Mock
private Http2OutboundFlowController outboundFlow;
private Http2RemoteFlowController remoteFlow;
@Mock
private ChannelHandlerContext ctx;
@ -116,6 +116,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote);
when(remote.flowController()).thenReturn(remoteFlow);
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
@ -135,9 +136,10 @@ public class DefaultHttp2ConnectionEncoderTest {
when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future);
when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise)))
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise)))
.thenReturn(future);
when(remoteFlow.sendFlowControlledFrame(eq(ctx), any(Http2Stream.class),
any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise))).thenReturn(future);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
@ -145,8 +147,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(ctx.write(any())).thenReturn(future);
encoder = DefaultHttp2ConnectionEncoder.newBuilder().connection(connection)
.frameWriter(writer).outboundFlow(outboundFlow)
.lifecycleManager(lifecycleManager).build();
.frameWriter(writer).lifecycleManager(lifecycleManager).build();
}
@Test
@ -168,7 +169,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
try {
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(false), eq(promise));
verify(remoteFlow).sendFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(0), eq(false), eq(promise));
} finally {
data.release();
}
@ -180,7 +181,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
try {
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise));
verify(remoteFlow).sendFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write completed successfully.
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
@ -240,7 +241,7 @@ public class DefaultHttp2ConnectionEncoderTest {
}).when(future).addListener(any(ChannelFutureListener.class));
// Indicate that there was a previous data write operation that the headers must wait for.
when(outboundFlow.lastWriteForStream(anyInt())).thenReturn(future);
when(remoteFlow.lastFlowControlledFrameSent(any(Http2Stream.class))).thenReturn(future);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(writer, never()).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));

View File

@ -36,12 +36,12 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link DefaultHttp2InboundFlowController}.
* Tests for {@link DefaultHttp2LocalFlowController}.
*/
public class DefaultHttp2InboundFlowControllerTest {
public class DefaultHttp2LocalFlowControllerTest {
private static final int STREAM_ID = 1;
private DefaultHttp2InboundFlowController controller;
private DefaultHttp2LocalFlowController controller;
@Mock
private ByteBuf buffer;
@ -66,28 +66,28 @@ public class DefaultHttp2InboundFlowControllerTest {
when(ctx.newPromise()).thenReturn(promise);
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2InboundFlowController(connection, frameWriter, updateRatio);
controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio);
connection.local().createStream(STREAM_ID, false);
}
@Test
public void dataFrameShouldBeAccepted() throws Http2Exception {
applyFlowControl(STREAM_ID, 10, 0, false);
receiveFlowControlledFrame(STREAM_ID, 10, 0, false);
verifyWindowUpdateNotSent();
}
@Test
public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception {
int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
applyFlowControl(STREAM_ID, dataSize, 0, false);
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
// Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent.
returnProcessedBytes(STREAM_ID, 10);
consumeBytes(STREAM_ID, 10);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
// Return the rest and verify the WINDOW_UPDATE is sent.
returnProcessedBytes(STREAM_ID, dataSize - 10);
consumeBytes(STREAM_ID, dataSize - 10);
verifyWindowUpdateSent(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
}
@ -95,7 +95,7 @@ public class DefaultHttp2InboundFlowControllerTest {
@Test(expected = Http2Exception.class)
public void connectionFlowControlExceededShouldThrow() throws Http2Exception {
// Window exceeded because of the padding.
applyFlowControl(STREAM_ID, DEFAULT_WINDOW_SIZE, 1, true);
receiveFlowControlledFrame(STREAM_ID, DEFAULT_WINDOW_SIZE, 1, true);
}
@Test
@ -103,11 +103,11 @@ public class DefaultHttp2InboundFlowControllerTest {
int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
// Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(STREAM_ID, dataSize, 0, true);
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, true);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
verifyWindowUpdateNotSent(STREAM_ID);
returnProcessedBytes(STREAM_ID, dataSize);
consumeBytes(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
verifyWindowUpdateNotSent(STREAM_ID);
}
@ -119,8 +119,8 @@ public class DefaultHttp2InboundFlowControllerTest {
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
// Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(STREAM_ID, dataSize, 0, false);
returnProcessedBytes(STREAM_ID, dataSize);
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
consumeBytes(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateSent(STREAM_ID, windowDelta);
}
@ -129,10 +129,10 @@ public class DefaultHttp2InboundFlowControllerTest {
public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception {
// Send a frame that takes up the entire window.
int initialWindowSize = DEFAULT_WINDOW_SIZE;
applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false);
assertEquals(0, window(STREAM_ID));
assertEquals(0, window(CONNECTION_STREAM_ID));
returnProcessedBytes(STREAM_ID, initialWindowSize);
consumeBytes(STREAM_ID, initialWindowSize);
assertEquals(initialWindowSize, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
@ -146,8 +146,8 @@ public class DefaultHttp2InboundFlowControllerTest {
reset(frameWriter);
// Send the next frame and verify that the expected window updates were sent.
applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
returnProcessedBytes(STREAM_ID, initialWindowSize);
receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false);
consumeBytes(STREAM_ID, initialWindowSize);
int delta = newInitialWindowSize - initialWindowSize;
verifyWindowUpdateSent(STREAM_ID, delta);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta);
@ -164,12 +164,12 @@ public class DefaultHttp2InboundFlowControllerTest {
// Test that both stream and connection window are updated (or not updated) together
int data1 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
applyFlowControl(STREAM_ID, data1, 0, false);
receiveFlowControlledFrame(STREAM_ID, data1, 0, false);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID));
returnProcessedBytes(STREAM_ID, data1);
consumeBytes(STREAM_ID, data1);
verifyWindowUpdateSent(STREAM_ID, data1);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1);
@ -180,16 +180,16 @@ public class DefaultHttp2InboundFlowControllerTest {
// a window update for the connection stream.
--data1;
int data2 = data1 >> 1;
applyFlowControl(STREAM_ID, data1, 0, false);
applyFlowControl(newStreamId, data1, 0, false);
receiveFlowControlledFrame(STREAM_ID, data1, 0, false);
receiveFlowControlledFrame(newStreamId, data1, 0, false);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(newStreamId);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId));
assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID));
returnProcessedBytes(STREAM_ID, data1);
returnProcessedBytes(newStreamId, data2);
consumeBytes(STREAM_ID, data1);
consumeBytes(newStreamId, data2);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(newStreamId);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
@ -216,26 +216,27 @@ public class DefaultHttp2InboundFlowControllerTest {
private void testRatio(float ratio, int newDefaultWindowSize, int newStreamId, boolean setStreamRatio)
throws Http2Exception {
controller.initialStreamWindowSize(ctx, 0, newDefaultWindowSize);
connection.local().createStream(newStreamId, false);
int delta = newDefaultWindowSize - DEFAULT_WINDOW_SIZE;
controller.incrementWindowSize(ctx, stream(0), delta);
Http2Stream stream = connection.local().createStream(newStreamId, false);
if (setStreamRatio) {
controller.windowUpdateRatio(ctx, newStreamId, ratio);
controller.windowUpdateRatio(ctx, stream, ratio);
}
controller.initialStreamWindowSize(ctx, newStreamId, newDefaultWindowSize);
controller.incrementWindowSize(ctx, stream, delta);
reset(frameWriter);
try {
int data1 = (int) (newDefaultWindowSize * ratio) + 1;
int data2 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) >> 1;
applyFlowControl(STREAM_ID, data2, 0, false);
applyFlowControl(newStreamId, data1, 0, false);
receiveFlowControlledFrame(STREAM_ID, data2, 0, false);
receiveFlowControlledFrame(newStreamId, data1, 0, false);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(newStreamId);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID));
assertEquals(newDefaultWindowSize - data1, window(newStreamId));
assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID));
returnProcessedBytes(STREAM_ID, data2);
returnProcessedBytes(newStreamId, data1);
consumeBytes(STREAM_ID, data2);
consumeBytes(newStreamId, data1);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateSent(newStreamId, data1);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
@ -252,10 +253,11 @@ public class DefaultHttp2InboundFlowControllerTest {
return initialSize - newWindowSize;
}
private void applyFlowControl(int streamId, int dataSize, int padding, boolean endOfStream) throws Http2Exception {
private void receiveFlowControlledFrame(int streamId, int dataSize, int padding,
boolean endOfStream) throws Http2Exception {
final ByteBuf buf = dummyData(dataSize);
try {
controller.applyFlowControl(ctx, streamId, buf, padding, endOfStream);
controller.receiveFlowControlledFrame(ctx, stream(streamId), buf, padding, endOfStream);
} finally {
buf.release();
}
@ -267,8 +269,8 @@ public class DefaultHttp2InboundFlowControllerTest {
return buffer;
}
private void returnProcessedBytes(int streamId, int processedBytes) throws Http2Exception {
connection.requireStream(streamId).garbageCollector().returnProcessedBytes(ctx, processedBytes);
private void consumeBytes(int streamId, int numBytes) throws Http2Exception {
controller.consumeBytes(ctx, stream(streamId), numBytes);
}
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception {
@ -284,7 +286,11 @@ public class DefaultHttp2InboundFlowControllerTest {
any(ChannelPromise.class));
}
private int window(int streamId) {
return connection.stream(streamId).inboundFlow().window();
private int window(int streamId) throws Http2Exception {
return controller.windowSize(stream(streamId));
}
private Http2Stream stream(int streamId) throws Http2Exception {
return connection.requireStream(streamId);
}
}

View File

@ -19,8 +19,8 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@ -33,7 +33,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController.OutboundFlowState;
import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
@ -49,16 +48,16 @@ import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link DefaultHttp2OutboundFlowController}.
* Tests for {@link DefaultHttp2RemoteFlowController}.
*/
public class DefaultHttp2OutboundFlowControllerTest {
public class DefaultHttp2RemoteFlowControllerTest {
private static final int STREAM_A = 1;
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
private static final int STREAM_D = 7;
private static final int STREAM_E = 9;
private DefaultHttp2OutboundFlowController controller;
private DefaultHttp2RemoteFlowController controller;
@Mock
private ByteBuf buffer;
@ -87,7 +86,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
when(ctx.newPromise()).thenReturn(promise);
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2OutboundFlowController(connection, frameWriter);
controller = new DefaultHttp2RemoteFlowController(connection, frameWriter);
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
@ -101,7 +100,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void initialWindowSizeShouldOnlyChangeStreams() throws Http2Exception {
controller.initialOutboundWindowSize(0);
controller.initialWindowSize(0);
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(0, window(STREAM_B));
@ -111,7 +110,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void windowUpdateShouldChangeConnectionWindow() throws Http2Exception {
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 100);
incrementWindowSize(CONNECTION_STREAM_ID, 100);
assertEquals(DEFAULT_WINDOW_SIZE + 100, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -121,7 +120,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void windowUpdateShouldChangeStreamWindow() throws Http2Exception {
controller.updateOutboundWindowSize(STREAM_A, 100);
incrementWindowSize(STREAM_A, 100);
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE + 100, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -147,13 +146,14 @@ public class DefaultHttp2OutboundFlowControllerTest {
final ByteBuf data = dummyData(5, 5);
try {
// Write one frame.
ChannelFuture future1 = controller.writeData(ctx, STREAM_A, data, 0, false, promise);
assertEquals(future1, controller.lastWriteForStream(STREAM_A));
Http2Stream stream = stream(STREAM_A);
ChannelFuture future1 = controller.sendFlowControlledFrame(ctx, stream, data, 0, false, promise);
assertEquals(future1, controller.lastFlowControlledFrameSent(stream));
// Now write another and verify that the last write is updated.
ChannelFuture future2 = controller.writeData(ctx, STREAM_A, data, 0, false, promise2);
ChannelFuture future2 = controller.sendFlowControlledFrame(ctx, stream, data, 0, false, promise2);
assertNotSame(future1, future2);
assertEquals(future2, controller.lastWriteForStream(STREAM_A));
assertEquals(future2, controller.lastFlowControlledFrameSent(stream));
} finally {
manualSafeRelease(data);
}
@ -200,7 +200,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void stalledStreamShouldQueueFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
controller.initialWindowSize(0);
final ByteBuf data = dummyData(10, 5);
try {
@ -214,7 +214,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void frameShouldSplit() throws Http2Exception {
controller.initialOutboundWindowSize(5);
controller.initialWindowSize(5);
final ByteBuf data = dummyData(5, 5);
try {
@ -236,7 +236,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void frameShouldSplitPadding() throws Http2Exception {
controller.initialOutboundWindowSize(5);
controller.initialWindowSize(5);
final ByteBuf data = dummyData(3, 7);
try {
@ -257,7 +257,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void emptyFrameShouldSplitPadding() throws Http2Exception {
controller.initialOutboundWindowSize(5);
controller.initialWindowSize(5);
final ByteBuf data = dummyData(0, 10);
try {
@ -277,7 +277,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void windowUpdateShouldSendFrame() throws Http2Exception {
controller.initialOutboundWindowSize(10);
controller.initialWindowSize(10);
final ByteBuf data = dummyData(10, 10);
try {
@ -285,7 +285,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyWrite(STREAM_A, data.slice(0, 10), 0);
// Update the window and verify that the rest of the frame is written.
controller.updateOutboundWindowSize(STREAM_A, 10);
incrementWindowSize(STREAM_A, 10);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 10);
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
@ -299,7 +299,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void initialWindowUpdateShouldSendFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
controller.initialWindowSize(0);
final ByteBuf data = dummyData(10, 0);
try {
@ -307,7 +307,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Verify that the entire frame was sent.
controller.initialOutboundWindowSize(10);
controller.initialWindowSize(10);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
final ByteBuf writtenBuf = argument.getValue();
@ -321,7 +321,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void successiveSendsShouldNotInteract() throws Http2Exception {
// Collapse the connection window to force queueing.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID));
incrementWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID));
assertEquals(0, window(CONNECTION_STREAM_ID));
ByteBuf data = dummyData(5, 5);
@ -331,7 +331,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
send(STREAM_A, dataOnly.slice(), 5);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 8);
incrementWindowSize(CONNECTION_STREAM_ID, 8);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 3, false);
ByteBuf writtenBuf = argument.getValue();
@ -345,7 +345,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
send(STREAM_B, dataOnly.slice(), 5);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 12);
incrementWindowSize(CONNECTION_STREAM_ID, 12);
assertEquals(0, window(CONNECTION_STREAM_ID));
// Verify the rest of A is written.
@ -368,7 +368,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
public void negativeWindowShouldNotThrowException() throws Http2Exception {
final int initWindow = 20;
final int secondWindowSize = 10;
controller.initialOutboundWindowSize(initWindow);
controller.initialWindowSize(initWindow);
Http2Stream streamA = connection.stream(STREAM_A);
final ByteBuf data = dummyData(initWindow, 0);
@ -379,8 +379,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyWrite(STREAM_A, data.slice(0, initWindow), 0);
// Make the window size for stream A negative
controller.initialOutboundWindowSize(initWindow - secondWindowSize);
assertEquals(-secondWindowSize, streamA.outboundFlow().window());
controller.initialWindowSize(initWindow - secondWindowSize);
assertEquals(-secondWindowSize, controller.windowSize(streamA));
// Queue up a write. It should not be written now because the window is negative
resetFrameWriter();
@ -388,18 +388,18 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Open the window size back up a bit (no send should happen)
controller.updateOutboundWindowSize(STREAM_A, 5);
assertEquals(-5, streamA.outboundFlow().window());
incrementWindowSize(STREAM_A, 5);
assertEquals(-5, controller.windowSize(streamA));
verifyNoWrite(STREAM_A);
// Open the window size back up a bit (no send should happen)
controller.updateOutboundWindowSize(STREAM_A, 5);
assertEquals(0, streamA.outboundFlow().window());
incrementWindowSize(STREAM_A, 5);
assertEquals(0, controller.windowSize(streamA));
verifyNoWrite(STREAM_A);
// Open the window size back up and allow the write to happen
controller.updateOutboundWindowSize(STREAM_A, 5);
assertEquals(0, streamA.outboundFlow().window());
incrementWindowSize(STREAM_A, 5);
assertEquals(0, controller.windowSize(streamA));
// Verify that the entire frame was sent.
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
@ -415,7 +415,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void initialWindowUpdateShouldSendEmptyFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
controller.initialWindowSize(0);
// First send a frame that will get buffered.
final ByteBuf data = dummyData(10, 0);
@ -428,7 +428,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Re-expand the window and verify that both frames were sent.
controller.initialOutboundWindowSize(10);
controller.initialWindowSize(10);
verifyWrite(STREAM_A, data.slice(), 0);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 0);
@ -439,7 +439,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
controller.initialWindowSize(0);
final ByteBuf data = dummyData(10, 0);
try {
@ -447,7 +447,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Verify that a partial frame of 5 was sent.
controller.initialOutboundWindowSize(5);
controller.initialWindowSize(5);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
@ -471,7 +471,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
incrementWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -499,7 +499,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Verify that a partial frame of 5 was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 5);
incrementWindowSize(CONNECTION_STREAM_ID, 5);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -529,7 +529,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(STREAM_A, 10);
incrementWindowSize(STREAM_A, 10);
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -557,7 +557,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
// Verify that a partial frame of 5 was sent.
controller.updateOutboundWindowSize(STREAM_A, 5);
incrementWindowSize(STREAM_A, 5);
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -599,7 +599,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_B);
// Open up the connection window.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
incrementWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B));
@ -653,7 +653,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_D);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
incrementWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2);
@ -713,7 +713,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_D);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
incrementWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
assertEquals(0, window(STREAM_B));
@ -766,7 +766,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_D);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
incrementWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(0, window(STREAM_B));
@ -840,7 +840,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
incrementWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2);
assertEquals(0, window(STREAM_B));
@ -895,7 +895,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_D);
// Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 1000);
incrementWindowSize(CONNECTION_STREAM_ID, 1000);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, captor, 0, false);
@ -973,7 +973,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(0, captor.getValue().readableBytes());
// Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999);
incrementWindowSize(CONNECTION_STREAM_ID, 999);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50);
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50);
@ -1038,18 +1038,17 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)),
streamableBytesForTree(streamD));
} finally {
manualSafeRelease(bufs);
}
@ -1108,19 +1107,18 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_D);
streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)),
streamableBytesForTree(streamD));
} finally {
manualSafeRelease(bufs);
}
@ -1185,23 +1183,20 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_D);
verifyNoWrite(STREAM_E);
OutboundFlowState state = state(stream0);
assertEquals(
calculateStreamSizeSum(streamSizes,
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)),
state.streamableBytesForTree());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
state = state(streamE);
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)),
streamableBytesForTree(streamD));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
streamableBytesForTree(streamE));
} finally {
manualSafeRelease(bufs);
}
@ -1257,26 +1252,20 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamA.close();
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
state.streamableBytesForTree());
state = state(streamA);
assertEquals(0, state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
streamableBytesForTree(stream0));
assertEquals(0, streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)),
streamableBytesForTree(streamD));
} finally {
manualSafeRelease(bufs);
}
}
private static OutboundFlowState state(Http2Stream stream) {
return (OutboundFlowState) stream.outboundFlow();
}
private static int calculateStreamSizeSum(IntObjectMap<Integer> streamSizes, List<Integer> streamIds) {
int sum = 0;
for (int i = 0; i < streamIds.size(); ++i) {
@ -1288,9 +1277,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
return sum;
}
private void send(int streamId, ByteBuf data, int padding) {
ChannelFuture future = controller.writeData(ctx, streamId, data, padding, false, promise);
assertEquals(future, controller.lastWriteForStream(streamId));
private void send(int streamId, ByteBuf data, int padding) throws Http2Exception {
Http2Stream stream = stream(streamId);
ChannelFuture future = controller.sendFlowControlledFrame(ctx, stream, data, padding, false, promise);
assertEquals(future, controller.lastFlowControlledFrameSent(stream));
}
private void verifyWrite(int streamId, ByteBuf data, int padding) {
@ -1302,8 +1292,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
eq(promise));
}
private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, int padding, boolean endStream) {
verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(padding), eq(endStream), eq(promise));
private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, int padding,
boolean endStream) {
verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(padding),
eq(endStream), eq(promise));
}
private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception {
@ -1311,7 +1303,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
private void exhaustStreamWindow(int streamId) throws Http2Exception {
controller.updateOutboundWindowSize(streamId, -window(streamId));
incrementWindowSize(streamId, -window(streamId));
}
private void resetFrameWriter() {
@ -1321,8 +1313,20 @@ public class DefaultHttp2OutboundFlowControllerTest {
when(frameWriterSizePolicy.maxFrameSize()).thenReturn(Integer.MAX_VALUE);
}
private int window(int streamId) {
return connection.stream(streamId).outboundFlow().window();
private int window(int streamId) throws Http2Exception {
return controller.windowSize(stream(streamId));
}
private void incrementWindowSize(int streamId, int delta) throws Http2Exception {
controller.incrementWindowSize(ctx, stream(streamId), delta);
}
private int streamableBytesForTree(Http2Stream stream) throws Http2Exception {
return controller.streamableBytesForTree(stream);
}
private Http2Stream stream(int streamId) throws Http2Exception {
return connection.requireStream(streamId);
}
private static ByteBuf dummyData(int size, int padding) {

View File

@ -28,8 +28,6 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.codec.http2.Http2Connection;
@ -68,8 +66,6 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
connectionHandler = new HttpToHttp2ConnectionHandler(connection,
frameReader(),
frameWriter,
new DefaultHttp2InboundFlowController(connection, frameWriter),
new DefaultHttp2OutboundFlowController(connection, frameWriter),
new DelegatingDecompressorFrameListener(connection,
new InboundHttp2ToHttpAdapter.Builder(connection)
.maxContentLength(maxContentLength)

View File

@ -23,7 +23,6 @@ import static io.netty.util.internal.logging.InternalLogLevel.INFO;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;