diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 594d711b9b..41a49b74e7 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -53,8 +53,8 @@ public class DefaultHttp2Connection implements Http2Connection { private final IntObjectMap streamMap = new IntObjectHashMap(); private final ConnectionStream connectionStream = new ConnectionStream(); private final Set activeStreams = new LinkedHashSet(); - private final DefaultEndpoint localEndpoint; - private final DefaultEndpoint remoteEndpoint; + private final DefaultEndpoint localEndpoint; + private final DefaultEndpoint remoteEndpoint; private final Http2StreamRemovalPolicy removalPolicy; /** @@ -78,8 +78,8 @@ public class DefaultHttp2Connection implements Http2Connection { public DefaultHttp2Connection(boolean server, Http2StreamRemovalPolicy removalPolicy) { this.removalPolicy = checkNotNull(removalPolicy, "removalPolicy"); - localEndpoint = new DefaultEndpoint(server); - remoteEndpoint = new DefaultEndpoint(!server); + localEndpoint = new DefaultEndpoint(server); + remoteEndpoint = new DefaultEndpoint(!server); // Tell the removal policy how to remove a stream from this connection. removalPolicy.setAction(new Action() { @@ -138,12 +138,12 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public Endpoint local() { + public Endpoint local() { return localEndpoint; } @Override - public Endpoint remote() { + public Endpoint remote() { return remoteEndpoint; } @@ -219,9 +219,6 @@ public class DefaultHttp2Connection implements Http2Connection { private boolean resetReceived; private boolean endOfStreamSent; private boolean endOfStreamReceived; - private Http2FlowState inboundFlow; - private Http2FlowState outboundFlow; - private Http2FlowControlWindowManager garbageCollector; private PropertyMap data; DefaultStream(int id) { @@ -303,36 +300,6 @@ public class DefaultHttp2Connection implements Http2Connection { return data.remove(key); } - @Override - public Http2FlowState inboundFlow() { - return inboundFlow; - } - - @Override - public void inboundFlow(Http2FlowState state) { - inboundFlow = state; - } - - @Override - public Http2FlowState outboundFlow() { - return outboundFlow; - } - - @Override - public void outboundFlow(Http2FlowState state) { - outboundFlow = state; - } - - @Override - public Http2FlowControlWindowManager garbageCollector() { - return garbageCollector; - } - - @Override - public void garbageCollector(Http2FlowControlWindowManager collector) { - garbageCollector = collector; - } - @Override public final boolean isRoot() { return parent == null; @@ -511,7 +478,7 @@ public class DefaultHttp2Connection implements Http2Connection { return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL; } - final DefaultEndpoint createdBy() { + final DefaultEndpoint createdBy() { return localEndpoint.createdStreamId(id) ? localEndpoint : remoteEndpoint; } @@ -742,12 +709,13 @@ public class DefaultHttp2Connection implements Http2Connection { /** * Simple endpoint implementation. */ - private final class DefaultEndpoint implements Endpoint { + private final class DefaultEndpoint implements Endpoint { private final boolean server; private int nextStreamId; private int lastStreamCreated; private int lastKnownStream = -1; private boolean pushToAllowed = true; + private F flowController; /** * The maximum number of active streams allowed to be created by this endpoint. @@ -911,7 +879,17 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public Endpoint opposite() { + public F flowController() { + return flowController; + } + + @Override + public void flowController(F flowController) { + this.flowController = checkNotNull(flowController, "flowController"); + } + + @Override + public Endpoint opposite() { return isLocal() ? remoteEndpoint : localEndpoint; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index fd46926d59..08cd7524d6 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -40,7 +40,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { private final Http2LifecycleManager lifecycleManager; private final Http2ConnectionEncoder encoder; private final Http2FrameReader frameReader; - private final Http2InboundFlowController inboundFlow; private final Http2FrameListener listener; private boolean prefaceReceived; @@ -52,7 +51,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { private Http2LifecycleManager lifecycleManager; private Http2ConnectionEncoder encoder; private Http2FrameReader frameReader; - private Http2InboundFlowController inboundFlow; private Http2FrameListener listener; @Override @@ -72,12 +70,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { return lifecycleManager; } - @Override - public Builder inboundFlow(Http2InboundFlowController inboundFlow) { - this.inboundFlow = inboundFlow; - return this; - } - @Override public Builder frameReader(Http2FrameReader frameReader) { this.frameReader = frameReader; @@ -111,8 +103,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { frameReader = checkNotNull(builder.frameReader, "frameReader"); lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager"); encoder = checkNotNull(builder.encoder, "encoder"); - inboundFlow = checkNotNull(builder.inboundFlow, "inboundFlow"); listener = checkNotNull(builder.listener, "listener"); + if (connection.local().flowController() == null) { + connection.local().flowController( + new DefaultHttp2LocalFlowController(connection, encoder.frameWriter())); + } } @Override @@ -120,6 +115,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { return connection; } + @Override + public final Http2LocalFlowController flowController() { + return connection.local().flowController(); + } + @Override public Http2FrameListener listener() { return listener; @@ -141,7 +141,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Http2FrameReader.Configuration config = frameReader.configuration(); Http2HeaderTable headerTable = config.headerTable(); Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); - settings.initialWindowSize(inboundFlow.initialWindowSize()); + settings.initialWindowSize(flowController().initialWindowSize()); settings.maxConcurrentStreams(connection.remote().maxStreams()); settings.headerTableSize(headerTable.maxHeaderTableSize()); settings.maxFrameSize(frameSizePolicy.maxFrameSize()); @@ -189,7 +189,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { - inboundFlow.initialWindowSize(initialWindowSize); + flowController().initialWindowSize(initialWindowSize); } } @@ -198,8 +198,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { frameReader.close(); } - private static int unprocessedBytes(Http2Stream stream) { - return stream.garbageCollector().unProcessedBytes(); + private int unconsumedBytes(Http2Stream stream) { + return flowController().unconsumedBytes(stream); } /** @@ -248,13 +248,14 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { } int bytesToReturn = data.readableBytes() + padding; - int unprocessedBytes = unprocessedBytes(stream); + int unconsumedBytes = unconsumedBytes(stream); + Http2LocalFlowController flowController = flowController(); try { // If we should apply flow control, do so now. if (shouldApplyFlowControl) { - inboundFlow.applyFlowControl(ctx, streamId, data, padding, endOfStream); - // Update the unprocessed bytes after flow control is applied. - unprocessedBytes = unprocessedBytes(stream); + flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream); + // Update the unconsumed bytes after flow control is applied. + unconsumedBytes = unconsumedBytes(stream); } // If we should ignore this frame, do so now. @@ -275,20 +276,20 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // If an exception happened during delivery, the listener may have returned part // of the bytes before the error occurred. If that's the case, subtract that from // the total processed bytes so that we don't return too many bytes. - int delta = unprocessedBytes - unprocessedBytes(stream); + int delta = unconsumedBytes - unconsumedBytes(stream); bytesToReturn -= delta; throw e; } catch (RuntimeException e) { // If an exception happened during delivery, the listener may have returned part // of the bytes before the error occurred. If that's the case, subtract that from // the total processed bytes so that we don't return too many bytes. - int delta = unprocessedBytes - unprocessedBytes(stream); + int delta = unconsumedBytes - unconsumedBytes(stream); bytesToReturn -= delta; throw e; } finally { // If appropriate, returned the processed bytes to the flow controller. if (shouldApplyFlowControl && bytesToReturn > 0) { - stream.garbageCollector().returnProcessedBytes(ctx, bytesToReturn); + flowController.consumeBytes(ctx, stream, bytesToReturn); } if (endOfStream) { @@ -452,7 +453,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { - inboundFlow.initialWindowSize(initialWindowSize); + flowController().initialWindowSize(initialWindowSize); } } @@ -530,7 +531,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { } // Update the outbound flow controller. - encoder.updateOutboundWindowSize(streamId, windowSizeIncrement); + encoder.flowController().incrementWindowSize(ctx, stream, windowSizeIncrement); listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 5874c23bd0..dc410115d2 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -32,7 +32,6 @@ import java.util.ArrayDeque; public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { private final Http2FrameWriter frameWriter; private final Http2Connection connection; - private final Http2OutboundFlowController outboundFlow; private final Http2LifecycleManager lifecycleManager; // We prefer ArrayDeque to LinkedList because later will produce more GC. // This initial capacity is plenty for SETTINGS traffic. @@ -44,7 +43,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { public static class Builder implements Http2ConnectionEncoder.Builder { protected Http2FrameWriter frameWriter; protected Http2Connection connection; - protected Http2OutboundFlowController outboundFlow; protected Http2LifecycleManager lifecycleManager; @Override @@ -73,13 +71,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return this; } - @Override - public Builder outboundFlow( - Http2OutboundFlowController outboundFlow) { - this.outboundFlow = outboundFlow; - return this; - } - @Override public Http2ConnectionEncoder build() { return new DefaultHttp2ConnectionEncoder(this); @@ -91,10 +82,12 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } protected DefaultHttp2ConnectionEncoder(Builder builder) { - frameWriter = checkNotNull(builder.frameWriter, "frameWriter"); connection = checkNotNull(builder.connection, "connection"); - outboundFlow = checkNotNull(builder.outboundFlow, "outboundFlow"); + frameWriter = checkNotNull(builder.frameWriter, "frameWriter"); lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager"); + if (connection.remote().flowController() == null) { + connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection, frameWriter)); + } } @Override @@ -107,6 +100,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return connection; } + @Override + public final Http2RemoteFlowController flowController() { + return connection().remote().flowController(); + } + @Override public void remoteSettings(Http2Settings settings) throws Http2Exception { Boolean pushEnabled = settings.pushEnabled(); @@ -142,19 +140,20 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { - initialOutboundWindowSize(initialWindowSize); + flowController().initialWindowSize(initialWindowSize); } } @Override public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding, final boolean endOfStream, ChannelPromise promise) { + Http2Stream stream; try { if (connection.isGoAway()) { throw new IllegalStateException("Sending data after connection going away."); } - Http2Stream stream = connection.requireStream(streamId); + stream = connection.requireStream(streamId); if (stream.isResetSent()) { throw new IllegalStateException("Sending data after sending RST_STREAM."); } @@ -184,7 +183,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { // Hand control of the frame to the flow controller. ChannelFuture future = - outboundFlow.writeData(ctx, streamId, data, padding, endOfStream, promise); + flowController().sendFlowControlledFrame(ctx, stream, data, padding, endOfStream, promise); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -202,11 +201,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return future; } - @Override - public ChannelFuture lastWriteForStream(int streamId) { - return outboundFlow.lastWriteForStream(streamId); - } - @Override public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { @@ -219,7 +213,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { final boolean exclusive, final int padding, final boolean endOfStream, final ChannelPromise promise) { Http2Stream stream = connection.stream(streamId); - ChannelFuture lastDataWrite = lastWriteForStream(streamId); + ChannelFuture lastDataWrite = stream != null ? flowController().lastFlowControlledFrameSent(stream) : null; try { if (connection.isGoAway()) { throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away."); @@ -472,19 +466,4 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { public Configuration configuration() { return frameWriter.configuration(); } - - @Override - public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception { - outboundFlow.initialOutboundWindowSize(newWindowSize); - } - - @Override - public int initialOutboundWindowSize() { - return outboundFlow.initialOutboundWindowSize(); - } - - @Override - public void updateOutboundWindowSize(int streamId, int deltaWindowSize) throws Http2Exception { - outboundFlow.updateOutboundWindowSize(streamId, deltaWindowSize); - } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java similarity index 81% rename from codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java rename to codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index bdeaa3d6b3..98e2b9efb5 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -21,7 +21,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZ import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -33,9 +32,9 @@ import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException; /** - * Basic implementation of {@link Http2InboundFlowController}. + * Basic implementation of {@link Http2LocalFlowController}. */ -public class DefaultHttp2InboundFlowController implements Http2InboundFlowController { +public class DefaultHttp2LocalFlowController implements Http2LocalFlowController { private static final int DEFAULT_COMPOSITE_EXCEPTION_SIZE = 4; /** * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE} @@ -48,11 +47,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro private volatile float windowUpdateRatio; private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE; - public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { + public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO); } - public DefaultHttp2InboundFlowController(Http2Connection connection, + public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter, float windowUpdateRatio) { this.connection = checkNotNull(connection, "connection"); this.frameWriter = checkNotNull(frameWriter, "frameWriter"); @@ -60,17 +59,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // Add a flow state for the connection. final Http2Stream connectionStream = connection.connectionStream(); - final FlowState connectionFlowState = new FlowState(connectionStream); - connectionStream.inboundFlow(connectionFlowState); - connectionStream.garbageCollector(connectionFlowState); + connectionStream.setProperty(FlowState.class, new FlowState(connectionStream)); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @Override public void streamAdded(Http2Stream stream) { - final FlowState flowState = new FlowState(stream); - stream.inboundFlow(flowState); - stream.garbageCollector(flowState); + stream.setProperty(FlowState.class, new FlowState(stream)); } }); } @@ -99,27 +94,35 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro } } - @Override - public void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize) - throws Http2Exception { - checkNotNull(ctx, "ctx"); - if (newWindowSize < MIN_INITIAL_WINDOW_SIZE || newWindowSize > MAX_INITIAL_WINDOW_SIZE) { - throw new IllegalArgumentException("Invalid newWindowSize: " + newWindowSize); - } - - FlowState state = stateOrFail(streamId); - state.initialStreamWindowSize(newWindowSize); - state.writeWindowUpdateIfNeeded(ctx); - } - @Override public int initialWindowSize() { return initialWindowSize; } @Override - public int initialStreamWindowSize(int streamId) throws Http2Exception { - return stateOrFail(streamId).initialStreamWindowSize(); + public int windowSize(Http2Stream stream) { + return state(stream).window(); + } + + @Override + public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception { + checkNotNull(ctx, "ctx"); + FlowState state = state(stream); + // Just add the delta to the stream-specific initial window size so that the next time the window + // expands it will grow to the new initial size. + state.incrementInitialStreamWindow(delta); + state.writeWindowUpdateIfNeeded(ctx); + } + + @Override + public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) + throws Http2Exception { + state(stream).consumeBytes(ctx, numBytes); + } + + @Override + public int unconsumedBytes(Http2Stream stream) { + return state(stream).unconsumedBytes(); } private static void checkValidRatio(float ratio) { @@ -159,14 +162,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE} * was generated by this method before the initial {@code SETTINGS} frame is sent. * @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary. - * @param streamId the stream for which {@code ratio} applies to. + * @param stream the stream for which {@code ratio} applies to. * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary. * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames */ - public void windowUpdateRatio(ChannelHandlerContext ctx, int streamId, float ratio) throws Http2Exception { - checkNotNull(ctx, "ctx"); + public void windowUpdateRatio(ChannelHandlerContext ctx, Http2Stream stream, float ratio) throws Http2Exception { checkValidRatio(ratio); - FlowState state = stateOrFail(streamId); + FlowState state = state(stream); state.windowUpdateRatio(ratio); state.writeWindowUpdateIfNeeded(ctx); } @@ -177,52 +179,37 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro * be sent. This window update ratio will only be applied to {@code streamId}. * @throws Http2Exception If no stream corresponding to {@code stream} could be found. */ - public float windowUpdateRatio(int streamId) throws Http2Exception { - return stateOrFail(streamId).windowUpdateRatio(); + public float windowUpdateRatio(Http2Stream stream) throws Http2Exception { + return state(stream).windowUpdateRatio(); } @Override - public void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, + public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { int dataLength = data.readableBytes() + padding; // Apply the connection-level flow control - connectionState().applyFlowControl(dataLength); + connectionState().receiveFlowControlledFrame(dataLength); // Apply the stream-level flow control - FlowState state = stateOrFail(streamId); + FlowState state = state(stream); state.endOfStream(endOfStream); - state.applyFlowControl(dataLength); + state.receiveFlowControlledFrame(dataLength); } private FlowState connectionState() { return state(connection.connectionStream()); } - private FlowState state(int streamId) { - Http2Stream stream = connection.stream(streamId); - return stream != null ? state(stream) : null; - } - private FlowState state(Http2Stream stream) { - return (FlowState) stream.inboundFlow(); - } - - /** - * Gets the window for the stream or raises a {@code PROTOCOL_ERROR} if not found. - */ - private FlowState stateOrFail(int streamId) throws Http2Exception { - FlowState state = state(streamId); - if (state == null) { - throw connectionError(PROTOCOL_ERROR, "Flow control window missing for stream: %d", streamId); - } - return state; + checkNotNull(stream, "stream"); + return stream.getProperty(FlowState.class); } /** * Flow control window state for an individual stream. */ - private final class FlowState implements Http2FlowState, Http2FlowControlWindowManager { + private final class FlowState { private final Http2Stream stream; /** @@ -260,8 +247,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro streamWindowUpdateRatio = windowUpdateRatio; } - @Override - public int window() { + int window() { return window; } @@ -277,14 +263,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro streamWindowUpdateRatio = ratio; } - int initialStreamWindowSize() { - return initialStreamWindowSize; - } - - void initialStreamWindowSize(int initialWindowSize) { - initialStreamWindowSize = initialWindowSize; - } - /** * Increment the initial window size for this stream. * @param delta The amount to increase the initial window size by. @@ -319,7 +297,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control. * @throws Http2Exception If too much data is used relative to how much is available. */ - void applyFlowControl(int dataLength) throws Http2Exception { + void receiveFlowControlledFrame(int dataLength) throws Http2Exception { assert dataLength > 0; // Apply the delta. Even if we throw an exception we want to have taken this delta into account. @@ -347,12 +325,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro processedWindow -= delta; } - @Override - public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { if (stream.id() == CONNECTION_STREAM_ID) { throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); } - checkNotNull(ctx, "ctx"); if (numBytes <= 0) { throw new IllegalArgumentException("numBytes must be positive"); } @@ -367,16 +343,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro writeWindowUpdateIfNeeded(ctx); } - @Override - public int unProcessedBytes() { + public int unconsumedBytes() { return processedWindow - window; } - @Override - public Http2Stream stream() { - return stream; - } - /** * Updates the flow control window for this stream if it is appropriate. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java similarity index 90% rename from codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java rename to codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 15eda65d82..dfc5483c3e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -17,11 +17,9 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.streamError; -import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; import static java.lang.Math.min; @@ -38,9 +36,9 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; /** - * Basic implementation of {@link Http2OutboundFlowController}. + * Basic implementation of {@link Http2RemoteFlowController}. */ -public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController { +public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { /** * A {@link Comparator} that sorts streams in ascending order the amount of streamable data. @@ -58,19 +56,19 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont private ChannelHandlerContext ctx; private boolean frameSent; - public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { + public DefaultHttp2RemoteFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { this.connection = checkNotNull(connection, "connection"); this.frameWriter = checkNotNull(frameWriter, "frameWriter"); // Add a flow state for the connection. - connection.connectionStream().outboundFlow(new OutboundFlowState(connection.connectionStream())); + connection.connectionStream().setProperty(FlowState.class, new FlowState(connection.connectionStream())); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @Override public void streamAdded(Http2Stream stream) { // Just add a new flow state to the stream. - stream.outboundFlow(new OutboundFlowState(stream)); + stream.setProperty(FlowState.class, new FlowState(stream)); } @Override @@ -108,7 +106,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont } @Override - public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception { + public void initialWindowSize(int newWindowSize) throws Http2Exception { if (newWindowSize < 0) { throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); } @@ -117,8 +115,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont initialWindowSize = newWindowSize; for (Http2Stream stream : connection.activeStreams()) { // Verify that the maximum value is not exceeded by this change. - OutboundFlowState state = state(stream); - state.incrementStreamWindow(delta); + state(stream).incrementStreamWindow(delta); } if (delta > 0) { @@ -128,19 +125,24 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont } @Override - public int initialOutboundWindowSize() { + public int initialWindowSize() { return initialWindowSize; } @Override - public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception { - if (streamId == CONNECTION_STREAM_ID) { + public int windowSize(Http2Stream stream) { + return state(stream).window(); + } + + @Override + public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception { + if (stream.id() == CONNECTION_STREAM_ID) { // Update the connection window and write any pending frames for all streams. connectionState().incrementStreamWindow(delta); writePendingBytes(); } else { // Update the stream window and write any pending frames for the stream. - OutboundFlowState state = stateOrFail(streamId); + FlowState state = state(stream); state.incrementStreamWindow(delta); frameSent = false; state.writeBytes(state.writableWindow()); @@ -151,8 +153,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont } @Override - public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endStream, ChannelPromise promise) { + public ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, + ByteBuf data, int padding, boolean endStream, ChannelPromise promise) { checkNotNull(ctx, "ctx"); checkNotNull(promise, "promise"); checkNotNull(data, "data"); @@ -162,20 +164,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont if (padding < 0) { throw new IllegalArgumentException("padding must be >= 0"); } - if (streamId <= 0) { - throw new IllegalArgumentException("streamId must be >= 0"); - } // Save the context. We'll use this later when we write pending bytes. this.ctx = ctx; try { - OutboundFlowState state = stateOrFail(streamId); + FlowState state = state(stream); int window = state.writableWindow(); boolean framesAlreadyQueued = state.hasFrame(); - OutboundFlowState.Frame frame = state.newFrame(promise, data, padding, endStream); + FlowState.Frame frame = state.newFrame(promise, data, padding, endStream); if (!framesAlreadyQueued && window >= frame.size()) { // Window size is large enough to send entire data frame frame.write(); @@ -194,41 +193,33 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont // Create and send a partial frame up to the window size. frame.split(window).write(); ctx.flush(); - } catch (Http2Exception e) { + } catch (Throwable e) { promise.setFailure(e); } return promise; } @Override - public ChannelFuture lastWriteForStream(int streamId) { - OutboundFlowState state = state(streamId); + public ChannelFuture lastFlowControlledFrameSent(Http2Stream stream) { + FlowState state = state(stream); return state != null ? state.lastNewFrame() : null; } - private static OutboundFlowState state(Http2Stream stream) { - return (OutboundFlowState) stream.outboundFlow(); - } - - private OutboundFlowState connectionState() { - return state(connection.connectionStream()); - } - - private OutboundFlowState state(int streamId) { - Http2Stream stream = connection.stream(streamId); - return stream != null ? state(stream) : null; - } - /** - * Attempts to get the {@link OutboundFlowState} for the given stream. If not available, raises a - * {@code PROTOCOL_ERROR}. + * For testing purposes only. Exposes the number of streamable bytes for the tree rooted at + * the given stream. */ - private OutboundFlowState stateOrFail(int streamId) throws Http2Exception { - OutboundFlowState state = state(streamId); - if (state == null) { - throw connectionError(PROTOCOL_ERROR, "Missing flow control window for stream: %d", streamId); - } - return state; + int streamableBytesForTree(Http2Stream stream) { + return state(stream).streamableBytesForTree(); + } + + private static FlowState state(Http2Stream stream) { + checkNotNull(stream, "stream"); + return stream.getProperty(FlowState.class); + } + + private FlowState connectionState() { + return state(connection.connectionStream()); } /** @@ -273,7 +264,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont * @return An object summarizing the write and allocation results. */ private int writeChildren(Http2Stream parent, int connectionWindow) { - OutboundFlowState state = state(parent); + FlowState state = state(parent); if (state.streamableBytesForTree() <= 0) { return 0; } @@ -353,7 +344,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont /** * Write bytes allocated to {@code state} */ - private static void writeChildNode(OutboundFlowState state) { + private static void writeChildNode(FlowState state) { state.writeBytes(state.allocated()); state.resetAllocated(); } @@ -361,7 +352,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont /** * The outbound flow control state for a single stream. */ - final class OutboundFlowState implements Http2FlowState { + final class FlowState { private final Queue pendingWriteQueue; private final Http2Stream stream; private int window = initialWindowSize; @@ -370,12 +361,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont private int allocated; private ChannelFuture lastNewFrame; - private OutboundFlowState(Http2Stream stream) { + private FlowState(Http2Stream stream) { this.stream = stream; pendingWriteQueue = new ArrayDeque(2); } - @Override public int window() { return window; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index 3f4fa2e3da..2477fd9e20 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -14,7 +14,6 @@ */ package io.netty.handler.codec.http2; -import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE; @@ -22,7 +21,9 @@ import static io.netty.handler.codec.http.HttpHeaderValues.GZIP; import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY; import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE; import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP; +import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.streamError; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -40,7 +41,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { @Override public void streamRemoved(Http2Stream stream) { - final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class); + final Http2Decompressor decompressor = decompressor(stream); if (decompressor != null) { cleanup(stream, decompressor); } @@ -49,6 +50,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor private final Http2Connection connection; private final boolean strict; + private boolean flowControllerInitialized; public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) { this(connection, listener, true); @@ -67,8 +69,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { final Http2Stream stream = connection.stream(streamId); - final Http2Decompressor decompressor = stream == null ? null : - (Http2Decompressor) stream.getProperty(Http2Decompressor.class); + final Http2Decompressor decompressor = decompressor(stream); if (decompressor == null) { // The decompressor may be null if no compatible encoding type was found in this stream's headers return listener.onDataRead(ctx, streamId, data, padding, endOfStream); @@ -203,7 +204,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor return; } - Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class); + Http2Decompressor decompressor = decompressor(stream); if (decompressor == null && !endOfStream) { // Determine the content encoding. AsciiString contentEncoding = headers.get(CONTENT_ENCODING); @@ -214,7 +215,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor if (channel != null) { decompressor = new Http2Decompressor(channel); stream.setProperty(Http2Decompressor.class, decompressor); - stream.garbageCollector(new DecompressorGarbageCollector(stream.garbageCollector())); // Decode the content and remove or replace the existing headers // so that the message looks like a decoded message. AsciiString targetContentEncoding = getTargetContentEncoding(contentEncoding); @@ -231,9 +231,20 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor // this content-length will not be correct. Instead of queuing messages or delaying sending // header frames...just remove the content-length header headers.remove(CONTENT_LENGTH); + + // The first time that we initialize a decompressor, decorate the local flow controller to + // properly convert consumed bytes. + if (!flowControllerInitialized) { + flowControllerInitialized = true; + connection.local().flowController(new ConsumedBytesConverter(connection.local().flowController())); + } } } + private static Http2Decompressor decompressor(Http2Stream stream) { + return (Http2Decompressor) (stream == null? null : stream.getProperty(Http2Decompressor.class)); + } + /** * Release remaining content from the {@link EmbeddedChannel} and remove the decompressor * from the {@link Http2Stream}. @@ -253,10 +264,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor } } decompressor = stream.removeProperty(Http2Decompressor.class); - if (decompressor != null) { - DecompressorGarbageCollector gc = (DecompressorGarbageCollector) stream.garbageCollector(); - stream.garbageCollector(gc.original()); - } } /** @@ -281,47 +288,72 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor } /** - * Garbage collector which translates post-decompression amounts the application knows about - * to pre-decompression amounts that flow control knows about. + * A decorator around the local flow controller that converts consumed bytes from uncompressed to compressed. */ - private static final class DecompressorGarbageCollector implements Http2FlowControlWindowManager { - private final Http2FlowControlWindowManager original; + private final class ConsumedBytesConverter implements Http2LocalFlowController { + private final Http2LocalFlowController flowController; - DecompressorGarbageCollector(Http2FlowControlWindowManager original) { - this.original = original; + ConsumedBytesConverter(Http2LocalFlowController flowController) { + this.flowController = checkNotNull(flowController, "flowController"); } @Override - public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { - final Http2Stream stream = stream(); - final Http2Decompressor decompressor = stream.getProperty(Http2Decompressor.class); + public void initialWindowSize(int newWindowSize) throws Http2Exception { + flowController.initialWindowSize(newWindowSize); + } - // Make a copy before hand in case any exceptions occur we will roll back the state - Http2Decompressor copy = new Http2Decompressor(decompressor); + @Override + public int initialWindowSize() { + return flowController.initialWindowSize(); + } + + @Override + public int windowSize(Http2Stream stream) { + return flowController.windowSize(stream); + } + + @Override + public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) + throws Http2Exception { + flowController.incrementWindowSize(ctx, stream, delta); + } + + @Override + public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, + ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream); + } + + @Override + public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) + throws Http2Exception { + Http2Decompressor decompressor = decompressor(stream); + Http2Decompressor copy = null; try { - original.returnProcessedBytes(ctx, decompressor.consumeProcessedBytes(numBytes)); + if (decompressor != null) { + // Make a copy before hand in case any exceptions occur we will roll back the state + copy = new Http2Decompressor(decompressor); + // Convert the uncompressed consumed bytes to compressed (on the wire) bytes. + numBytes = decompressor.consumeProcessedBytes(numBytes); + } + flowController.consumeBytes(ctx, stream, numBytes); } catch (Http2Exception e) { - stream.setProperty(Http2Decompressor.class, copy); + if (copy != null) { + stream.setProperty(Http2Decompressor.class, copy); + } throw e; } catch (Throwable t) { - stream.setProperty(Http2Decompressor.class, copy); + if (copy != null) { + stream.setProperty(Http2Decompressor.class, copy); + } throw new Http2Exception(INTERNAL_ERROR, "Error while returning bytes to flow control window", t); } } - Http2FlowControlWindowManager original() { - return original; - } - @Override - public int unProcessedBytes() { - return original.unProcessedBytes(); - } - - @Override - public Http2Stream stream() { - return original.stream(); + public int unconsumedBytes(Http2Stream stream) { + return flowController.unconsumedBytes(stream); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 45985304db..9824550c9d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -91,7 +91,7 @@ public interface Http2Connection { /** * A view of the connection from one endpoint (local or remote). */ - interface Endpoint { + interface Endpoint { /** * Returns the next valid streamId for this endpoint. If negative, the stream IDs are @@ -186,10 +186,20 @@ public interface Http2Connection { */ int lastKnownStream(); + /** + * Gets the flow controller for this endpoint. + */ + F flowController(); + + /** + * Sets the flow controller for this endpoint. + */ + void flowController(F flowController); + /** * Gets the {@link Endpoint} opposite this one. */ - Endpoint opposite(); + Endpoint opposite(); } /** @@ -237,7 +247,7 @@ public interface Http2Connection { /** * Gets a view of this connection from the local {@link Endpoint}. */ - Endpoint local(); + Endpoint local(); /** * Creates a new stream initiated by the local endpoint. See {@link Endpoint#createStream(int, boolean)}. @@ -247,7 +257,7 @@ public interface Http2Connection { /** * Gets a view of this connection from the remote {@link Endpoint}. */ - Endpoint remote(); + Endpoint remote(); /** * Creates a new stream initiated by the remote endpoint. See {@link Endpoint#createStream(int, boolean)}. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java index 606591b97e..9741c50aa4 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java @@ -48,11 +48,6 @@ public interface Http2ConnectionDecoder extends Closeable { */ Http2LifecycleManager lifecycleManager(); - /** - * Sets the {@link Http2InboundFlowController} to be used when building the decoder. - */ - Builder inboundFlow(Http2InboundFlowController inboundFlow); - /** * Sets the {@link Http2FrameReader} to be used when building the decoder. */ @@ -79,6 +74,11 @@ public interface Http2ConnectionDecoder extends Closeable { */ Http2Connection connection(); + /** + * Provides the local flow controller for managing inbound traffic. + */ + Http2LocalFlowController flowController(); + /** * Provides direct access to the underlying frame listener. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java index 56f8f2015a..6403fd1fa6 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelPromise; /** * Handler for outbound HTTP/2 traffic. */ -public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundFlowController { +public interface Http2ConnectionEncoder extends Http2FrameWriter { /** * Builder for new instances of {@link Http2ConnectionEncoder}. @@ -50,11 +50,6 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF */ Builder frameWriter(Http2FrameWriter frameWriter); - /** - * Sets the {@link Http2OutboundFlowController} to be used when building the encoder. - */ - Builder outboundFlow(Http2OutboundFlowController outboundFlow); - /** * Creates a new encoder instance. */ @@ -66,6 +61,11 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF */ Http2Connection connection(); + /** + * Provides the remote flow controller for managing outbound traffic. + */ + Http2RemoteFlowController flowController(); + /** * Provides direct access to the underlying frame writer object. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 282c2c9029..7ac109aeb3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -60,18 +60,10 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, Http2FrameListener listener) { - this(connection, frameReader, frameWriter, new DefaultHttp2InboundFlowController( - connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, - frameWriter), listener); - } - - public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, - Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow, - Http2OutboundFlowController outboundFlow, Http2FrameListener listener) { this(DefaultHttp2ConnectionDecoder.newBuilder().connection(connection) - .frameReader(frameReader).inboundFlow(inboundFlow).listener(listener), + .frameReader(frameReader).listener(listener), DefaultHttp2ConnectionEncoder.newBuilder().connection(connection) - .frameWriter(frameWriter).outboundFlow(outboundFlow)); + .frameWriter(frameWriter)); } /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowControlWindowManager.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowControlWindowManager.java deleted file mode 100644 index 17e82fe74f..0000000000 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowControlWindowManager.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package io.netty.handler.codec.http2; - -import io.netty.channel.ChannelHandlerContext; - -/** - * Allows data to be returned to the flow control window. - */ -public interface Http2FlowControlWindowManager { - /** - * Used by applications that participate in application-level inbound flow control. Allows the - * application to return a number of bytes that has been processed and thereby enabling the - * {@link Http2InboundFlowController} to send a {@code WINDOW_UPDATE} to restore at least part - * of the flow control window. - * - * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if - * appropriate - * @param numBytes the number of bytes to be returned to the flow control window. - */ - void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; - - /** - * The number of bytes that are outstanding and have not yet been returned to the flow controller. - */ - int unProcessedBytes(); - - /** - * Get the stream that is being managed - */ - Http2Stream stream(); -} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowController.java new file mode 100644 index 0000000000..54f1f0f46c --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowController.java @@ -0,0 +1,70 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.channel.ChannelHandlerContext; + +/** + * Base interface for all HTTP/2 flow controllers. + */ +public interface Http2FlowController { + + /** + * Sets the initial flow control window and updates all stream windows (but not the connection + * window) by the delta. + *

+ * This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an + * {@code SETTINGS} frame. + * + * @param newWindowSize the new initial window size. + * @throws Http2Exception thrown if any protocol-related error occurred. + */ + void initialWindowSize(int newWindowSize) throws Http2Exception; + + /** + * Gets the initial flow control window size that is used as the basis for new stream flow + * control windows. + */ + int initialWindowSize(); + + /** + * Gets the number of bytes remaining in the flow control window size for the given stream. + * + * @param stream The subject stream. Use {@link Http2Connection#connectionStream()} for + * requesting the size of the connection window. + * @return the current size of the flow control window. + * @throws IllegalArgumentException if the given stream does not exist. + */ + int windowSize(Http2Stream stream); + + /** + * Increments the size of the stream's flow control window by the given delta. + *

+ * In the case of a {@link Http2RemoteFlowController} this is called upon receipt of a + * {@code WINDOW_UPDATE} frame from the remote endpoint to mirror the changes to the window + * size. + *

+ * For a {@link Http2LocalFlowController} this can be called to request the expansion of the + * window size published by this endpoint. It is up to the implementation, however, as to when a + * {@code WINDOW_UPDATE} is actually sent. + * + * @param ctx The context for the calling handler + * @param stream The subject stream. Use {@link Http2Connection#connectionStream()} for + * requesting the size of the connection window. + * @param delta the change in size of the flow control window. + * @throws Http2Exception thrown if a protocol-related error occurred. + */ + void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception; +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowState.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowState.java deleted file mode 100644 index a6972a4fdf..0000000000 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowState.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package io.netty.handler.codec.http2; - -/** - * Base interface for flow-control state for a particular stream. - */ -public interface Http2FlowState { - - /** - * Returns the current remaining flow control window (in bytes) for the stream. Depending on the - * flow control implementation, this value may be negative. - */ - int window(); -} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java deleted file mode 100644 index 1f34b2797e..0000000000 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package io.netty.handler.codec.http2; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; - -/** - * Controls the inbound flow of data frames from the remote endpoint. - */ -public interface Http2InboundFlowController { - - /** - * Applies inbound flow control to the given {@code DATA} frame. - * - * @param ctx the context from the handler where the frame was read. - * @param streamId the subject stream for the frame. - * @param data payload buffer for the frame. - * @param padding the number of padding bytes found at the end of the frame. - * @param endOfStream Indicates whether this is the last frame to be sent from the remote - * endpoint for this stream. - */ - void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception; - - /** - * Sets the global inbound flow control window size and updates all stream window sizes by the delta. - *

- * This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an - * outbound {@code SETTINGS} frame. - *

- * The connection stream windows will not be modified as a result of this call. - * @param newWindowSize the new initial window size. - * @throws Http2Exception thrown if any protocol-related error occurred. - */ - void initialWindowSize(int newWindowSize) throws Http2Exception; - - /** - * Gets the initial window size used as the basis for new stream flow control windows. - */ - int initialWindowSize(); - - /** - * Sets the initial inbound flow control window size for a specific stream. - *

- * Note it is the responsibly of the caller to ensure that the the - * initial {@code SETTINGS} frame is sent before this is called. It would - * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE} - * was generated by this method before the initial {@code SETTINGS} frame is sent. - * @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary. - * @param streamId The stream to update. - * @param newWindowSize the window size to apply to {@code streamId} - * @throws Http2Exception thrown if any protocol-related error occurred. - */ - void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize) throws Http2Exception; - - /** - * Obtain the initial window size for a specific stream. - * @param streamId The stream id to get the initial window size for. - * @return The initial window size for {@code streamId}. - * @throws Http2Exception If no stream corresponding to {@code stream} could be found. - */ - int initialStreamWindowSize(int streamId) throws Http2Exception; -} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java new file mode 100644 index 0000000000..188353a8e8 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java @@ -0,0 +1,69 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * A {@link Http2FlowController} for controlling the inbound flow of {@code DATA} frames from the remote + * endpoint. + */ +public interface Http2LocalFlowController extends Http2FlowController { + + /** + * Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control + * policies to it for both the {@code stream} as well as the connection. If any flow control + * policies have been violated, an exception is raised immediately, otherwise the frame is + * considered to have "passed" flow control. + * + * @param ctx the context from the handler where the frame was read. + * @param stream the subject stream for the received frame. The connection stream object must + * not be used. + * @param data payload buffer for the frame. + * @param padding the number of padding bytes found at the end of the frame. + * @param endOfStream Indicates whether this is the last frame to be sent from the remote + * endpoint for this stream. + * @throws Http2Exception if any flow control errors are encountered. + */ + void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception; + + /** + * Indicates that the application has consumed a number of bytes for the given stream and is + * therefore ready to receive more data from the remote endpoint. The application must consume + * any bytes that it receives or the flow control window will collapse. Consuming bytes enables + * the flow controller to send {@code WINDOW_UPDATE} to restore a portion of the flow control + * window for the stream. + * + * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if + * appropriate + * @param stream the stream for which window space should be freed. The connection stream object + * must not be used. + * @param numBytes the number of bytes to be returned to the flow control window. + * @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes()} + * for the stream. + */ + void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception; + + /** + * The number of bytes for the given stream that have been received but not yet consumed by the + * application. + * + * @param stream the stream for which window space should be freed. + * @return the number of unconsumed bytes for the stream. + */ + int unconsumedBytes(Http2Stream stream); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFlowController.java deleted file mode 100644 index a7e4d21c37..0000000000 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFlowController.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package io.netty.handler.codec.http2; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; - - -/** - * Controls the outbound flow of data frames to the remote endpoint. - */ -public interface Http2OutboundFlowController extends Http2DataWriter { - - /** - * Controls the flow-controlled writing of a DATA frame to the remote endpoint. There is no - * guarantee when the data will be written or whether it will be split into multiple frames - * before sending. The returned future will only be completed once all of the data has been - * successfully written to the remote endpoint. - *

- * Manually flushing the {@link ChannelHandlerContext} is not required, since the flow - * controller will flush as appropriate. - */ - @Override - ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endStream, ChannelPromise promise); - - /** - * Returns the {@link ChannelFuture} for the most recent write for the given - * stream. If no previous write for the stream has occurred, returns {@code null}. - */ - ChannelFuture lastWriteForStream(int streamId); - - /** - * Sets the initial size of the connection's outbound flow control window. The outbound flow - * control windows for all streams are updated by the delta in the initial window size. This is - * called as part of the processing of a SETTINGS frame received from the remote endpoint. - * - * @param newWindowSize the new initial window size. - */ - void initialOutboundWindowSize(int newWindowSize) throws Http2Exception; - - /** - * Gets the initial size of the connection's outbound flow control window. - */ - int initialOutboundWindowSize(); - - /** - * Updates the size of the stream's outbound flow control window. This is called upon receiving - * a WINDOW_UPDATE frame from the remote endpoint. - * - * @param streamId the ID of the stream, or zero if the window is for the entire connection. - * @param deltaWindowSize the change in size of the outbound flow control window. - * @throws Http2Exception thrown if a protocol-related error occurred. - */ - void updateOutboundWindowSize(int streamId, int deltaWindowSize) throws Http2Exception; -} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java new file mode 100644 index 0000000000..b529c8216c --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java @@ -0,0 +1,59 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +/** + * A {@link Http2FlowController} for controlling the flow of outbound {@code DATA} frames to the remote + * endpoint. + */ +public interface Http2RemoteFlowController extends Http2FlowController { + + /** + * Writes or queues a {@code DATA} frame for transmission to the remote endpoint. There is no + * guarantee when the data will be written or whether it will be split into multiple frames + * before sending. The returned future will only be completed once all of the data has been + * successfully written to the remote endpoint. + *

+ * Manually flushing the {@link ChannelHandlerContext} is not required, since the flow + * controller will flush as appropriate. + * + * @param ctx the context from the handler. + * @param stream the subject stream. Must not be the connection stream object. + * @param data payload buffer for the frame. + * @param padding the number of padding bytes to be added at the end of the frame. + * @param endOfStream Indicates whether this is the last frame to be sent to the remote endpoint + * for this stream. + * @param promise the promise to be completed when the data has been successfully written or a + * failure occurs. + * @return a future that is completed when the frame is sent to the remote endpoint. + */ + ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, + ByteBuf data, int padding, boolean endStream, ChannelPromise promise); + + /** + * Gets the {@link ChannelFuture} for the most recent frame that was sent for the given stream + * via a call to {@link #sendFlowControlledFrame()}. This is useful for cases such as ensuring + * that {@code HEADERS} frames maintain send order with {@code DATA} frames. + * + * @param stream the subject stream. Must not be the connection stream object. + * @return the most recent sent frame, or {@code null} if no frame has been sent for the stream. + */ + ChannelFuture lastFlowControlledFrameSent(Http2Stream stream); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index 8f894227b1..670e7ce67e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -147,36 +147,6 @@ public interface Http2Stream { */ V removeProperty(Object key); - /** - * Gets the in-bound flow control state for this stream. - */ - Http2FlowState inboundFlow(); - - /** - * Sets the in-bound flow control state for this stream. - */ - void inboundFlow(Http2FlowState state); - - /** - * Gets the out-bound flow control window for this stream. - */ - Http2FlowState outboundFlow(); - - /** - * Sets the out-bound flow control window for this stream. - */ - void outboundFlow(Http2FlowState state); - - /** - * Gets the interface which allows bytes to be returned to the flow controller - */ - Http2FlowControlWindowManager garbageCollector(); - - /** - * Sets the interface which allows bytes to be returned to the flow controller - */ - void garbageCollector(Http2FlowControlWindowManager collector); - /** * Updates an priority for this stream. Calling this method may affect the straucture of the * priority tree. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java index b006fb1c6a..035f4250bf 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java @@ -40,10 +40,9 @@ public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler { super(connection, frameReader, frameWriter, listener); } - public HttpToHttp2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, - Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow, - Http2OutboundFlowController outboundFlow, Http2FrameListener listener) { - super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, listener); + public HttpToHttp2ConnectionHandler(Http2ConnectionDecoder.Builder decoderBuilder, + Http2ConnectionEncoder.Builder encoderBuilder) { + super(decoderBuilder, encoderBuilder); } /** diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java index 47b4defbaf..f111d627c1 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java @@ -153,7 +153,7 @@ public class DataCompressionHttp2Test { } }); awaitServer(); - assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream)); assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name())); } finally { data.release(); @@ -182,7 +182,7 @@ public class DataCompressionHttp2Test { } }); awaitServer(); - assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream)); assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name())); } finally { data.release(); @@ -214,7 +214,7 @@ public class DataCompressionHttp2Test { } }); awaitServer(); - assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream)); assertEquals(new StringBuilder(text1).append(text2).toString(), serverOut.toString(CharsetUtil.UTF_8.name())); } finally { @@ -247,7 +247,7 @@ public class DataCompressionHttp2Test { } }); awaitServer(); - assertEquals(0, stream.garbageCollector().unProcessedBytes()); + assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream)); assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8), serverOut.toString(CharsetUtil.UTF_8.name())); } finally { @@ -294,14 +294,15 @@ public class DataCompressionHttp2Test { protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); Http2FrameWriter writer = new DefaultHttp2FrameWriter(); - Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler( - new DefaultHttp2ConnectionDecoder.Builder() - .connection(serverConnection) - .frameReader(new DefaultHttp2FrameReader()) - .inboundFlow(new DefaultHttp2InboundFlowController(serverConnection, writer)) - .listener(new DelegatingDecompressorFrameListener(serverConnection, serverListener)), - new CompressorHttp2ConnectionEncoder.Builder().connection(serverConnection).frameWriter(writer) - .outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer))); + Http2ConnectionHandler connectionHandler = + new Http2ConnectionHandler(new DefaultHttp2ConnectionDecoder.Builder() + .connection(serverConnection) + .frameReader(new DefaultHttp2FrameReader()) + .listener( + new DelegatingDecompressorFrameListener(serverConnection, + serverListener)), + new CompressorHttp2ConnectionEncoder.Builder().connection( + serverConnection).frameWriter(writer)); p.addLast(connectionHandler); p.addLast(Http2CodecUtil.ignoreSettingsHandler()); serverChannelLatch.countDown(); @@ -316,14 +317,15 @@ public class DataCompressionHttp2Test { ChannelPipeline p = ch.pipeline(); FrameCountDown clientFrameCountDown = new FrameCountDown(clientListener, clientLatch); Http2FrameWriter writer = new DefaultHttp2FrameWriter(); - Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler( - new DefaultHttp2ConnectionDecoder.Builder() - .connection(clientConnection) - .frameReader(new DefaultHttp2FrameReader()) - .inboundFlow(new DefaultHttp2InboundFlowController(clientConnection, writer)) - .listener(new DelegatingDecompressorFrameListener(clientConnection, clientFrameCountDown)), - new CompressorHttp2ConnectionEncoder.Builder().connection(clientConnection).frameWriter(writer) - .outboundFlow(new DefaultHttp2OutboundFlowController(clientConnection, writer))); + Http2ConnectionHandler connectionHandler = + new Http2ConnectionHandler(new DefaultHttp2ConnectionDecoder.Builder() + .connection(clientConnection) + .frameReader(new DefaultHttp2FrameReader()) + .listener( + new DelegatingDecompressorFrameListener(clientConnection, + clientFrameCountDown)), + new CompressorHttp2ConnectionEncoder.Builder().connection( + clientConnection).frameWriter(writer)); clientEncoder = connectionHandler.encoder(); p.addLast(connectionHandler); p.addLast(Http2CodecUtil.ignoreSettingsHandler()); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index d96c828220..542a0f65fa 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -70,13 +70,16 @@ public class DefaultHttp2ConnectionDecoderTest { private Http2Connection connection; @Mock - private Http2Connection.Endpoint remote; + private Http2Connection.Endpoint remote; @Mock - private Http2Connection.Endpoint local; + private Http2Connection.Endpoint local; @Mock - private Http2InboundFlowController inboundFlow; + private Http2LocalFlowController localFlow; + + @Mock + private Http2RemoteFlowController remoteFlow; @Mock private ChannelHandlerContext ctx; @@ -104,9 +107,6 @@ public class DefaultHttp2ConnectionDecoderTest { @Mock private Http2ConnectionEncoder encoder; - @Mock - private Http2FlowControlWindowManager inFlowState; - @Mock private Http2LifecycleManager lifecycleManager; @@ -119,12 +119,13 @@ public class DefaultHttp2ConnectionDecoderTest { when(channel.isActive()).thenReturn(true); when(stream.id()).thenReturn(STREAM_ID); when(stream.state()).thenReturn(OPEN); - when(stream.garbageCollector()).thenReturn(inFlowState); when(pushStream.id()).thenReturn(PUSH_STREAM_ID); when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.requireStream(STREAM_ID)).thenReturn(stream); when(connection.local()).thenReturn(local); + when(local.flowController()).thenReturn(localFlow); + when(encoder.flowController()).thenReturn(remoteFlow); when(connection.remote()).thenReturn(remote); doAnswer(new Answer() { @Override @@ -151,7 +152,7 @@ public class DefaultHttp2ConnectionDecoderTest { when(ctx.write(any())).thenReturn(future); decoder = DefaultHttp2ConnectionDecoder.newBuilder().connection(connection) - .frameReader(reader).inboundFlow(inboundFlow).encoder(encoder) + .frameReader(reader).encoder(encoder) .listener(listener).lifecycleManager(lifecycleManager).build(); // Simulate receiving the initial settings from the remote endpoint. @@ -173,8 +174,8 @@ public class DefaultHttp2ConnectionDecoderTest { mockFlowControl(processedBytes); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); - verify(inFlowState).returnProcessedBytes(eq(ctx), eq(processedBytes)); + verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true)); + verify(localFlow).consumeBytes(eq(ctx), eq(stream), eq(processedBytes)); // Verify that the event was absorbed and not propagated to the oberver. verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); @@ -203,7 +204,7 @@ public class DefaultHttp2ConnectionDecoderTest { final ByteBuf data = dummyData(); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow, never()).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(localFlow, never()).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true)); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); } finally { data.release(); @@ -218,7 +219,7 @@ public class DefaultHttp2ConnectionDecoderTest { final ByteBuf data = dummyData(); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true)); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); } finally { data.release(); @@ -230,7 +231,7 @@ public class DefaultHttp2ConnectionDecoderTest { final ByteBuf data = dummyData(); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true)); verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); } finally { @@ -248,23 +249,23 @@ public class DefaultHttp2ConnectionDecoderTest { public Integer answer(InvocationOnMock in) throws Throwable { return unprocessed.get(); } - }).when(inFlowState).unProcessedBytes(); + }).when(localFlow).unconsumedBytes(eq(stream)); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock in) throws Throwable { - int delta = (Integer) in.getArguments()[1]; + int delta = (Integer) in.getArguments()[2]; int newValue = unprocessed.addAndGet(-delta); if (newValue < 0) { throw new RuntimeException("Returned too many bytes"); } return null; } - }).when(inFlowState).returnProcessedBytes(eq(ctx), anyInt()); + }).when(localFlow).consumeBytes(eq(ctx), eq(stream), anyInt()); // When the listener callback is called, process a few bytes and then throw. doAnswer(new Answer() { @Override public Integer answer(InvocationOnMock in) throws Throwable { - inFlowState.returnProcessedBytes(ctx, 4); + localFlow.consumeBytes(ctx, stream, 4); throw new RuntimeException("Fake Exception"); } }).when(listener).onDataRead(eq(ctx), eq(STREAM_ID), any(ByteBuf.class), eq(10), eq(true)); @@ -272,11 +273,11 @@ public class DefaultHttp2ConnectionDecoderTest { decode().onDataRead(ctx, STREAM_ID, data, padding, true); fail("Expected exception"); } catch (RuntimeException cause) { - verify(inboundFlow) - .applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); + verify(localFlow) + .receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true)); verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); - assertEquals(0, inFlowState.unProcessedBytes()); + assertEquals(0, localFlow.unconsumedBytes(stream)); } finally { data.release(); } @@ -365,7 +366,7 @@ public class DefaultHttp2ConnectionDecoderTest { public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception { when(connection.goAwaySent()).thenReturn(true); decode().onWindowUpdateRead(ctx, STREAM_ID, 10); - verify(encoder, never()).updateOutboundWindowSize(anyInt(), anyInt()); + verify(remoteFlow, never()).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt()); verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt()); } @@ -378,7 +379,7 @@ public class DefaultHttp2ConnectionDecoderTest { @Test public void windowUpdateReadShouldSucceed() throws Exception { decode().onWindowUpdateRead(ctx, STREAM_ID, 10); - verify(encoder).updateOutboundWindowSize(eq(STREAM_ID), eq(10)); + verify(remoteFlow).incrementWindowSize(eq(ctx), eq(stream), eq(10)); verify(listener).onWindowUpdateRead(eq(ctx), eq(STREAM_ID), eq(10)); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index 91ab79e818..aef89d03cb 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -67,13 +67,13 @@ public class DefaultHttp2ConnectionEncoderTest { private Http2Connection connection; @Mock - private Http2Connection.Endpoint remote; + private Http2Connection.Endpoint remote; @Mock - private Http2Connection.Endpoint local; + private Http2Connection.Endpoint local; @Mock - private Http2OutboundFlowController outboundFlow; + private Http2RemoteFlowController remoteFlow; @Mock private ChannelHandlerContext ctx; @@ -116,6 +116,7 @@ public class DefaultHttp2ConnectionEncoderTest { when(connection.requireStream(STREAM_ID)).thenReturn(stream); when(connection.local()).thenReturn(local); when(connection.remote()).thenReturn(remote); + when(remote.flowController()).thenReturn(remoteFlow); doAnswer(new Answer() { @Override public Http2Stream answer(InvocationOnMock invocation) throws Throwable { @@ -135,9 +136,10 @@ public class DefaultHttp2ConnectionEncoderTest { when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future); - when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future); - when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise))) + when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))) .thenReturn(future); + when(remoteFlow.sendFlowControlledFrame(eq(ctx), any(Http2Stream.class), + any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise))).thenReturn(future); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(ctx.channel()).thenReturn(channel); when(ctx.newSucceededFuture()).thenReturn(future); @@ -145,8 +147,7 @@ public class DefaultHttp2ConnectionEncoderTest { when(ctx.write(any())).thenReturn(future); encoder = DefaultHttp2ConnectionEncoder.newBuilder().connection(connection) - .frameWriter(writer).outboundFlow(outboundFlow) - .lifecycleManager(lifecycleManager).build(); + .frameWriter(writer).lifecycleManager(lifecycleManager).build(); } @Test @@ -168,7 +169,7 @@ public class DefaultHttp2ConnectionEncoderTest { final ByteBuf data = dummyData(); try { encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); - verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(false), eq(promise)); + verify(remoteFlow).sendFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(0), eq(false), eq(promise)); } finally { data.release(); } @@ -180,7 +181,7 @@ public class DefaultHttp2ConnectionEncoderTest { final ByteBuf data = dummyData(); try { encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); - verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise)); + verify(remoteFlow).sendFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(0), eq(true), eq(promise)); // Invoke the listener callback indicating that the write completed successfully. ArgumentCaptor captor = ArgumentCaptor.forClass(ChannelFutureListener.class); @@ -240,7 +241,7 @@ public class DefaultHttp2ConnectionEncoderTest { }).when(future).addListener(any(ChannelFutureListener.class)); // Indicate that there was a previous data write operation that the headers must wait for. - when(outboundFlow.lastWriteForStream(anyInt())).thenReturn(future); + when(remoteFlow.lastFlowControlledFrameSent(any(Http2Stream.class))).thenReturn(future); encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise); verify(writer, never()).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java similarity index 79% rename from codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java rename to codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index b556e628e4..adb37ce5f7 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -36,12 +36,12 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** - * Tests for {@link DefaultHttp2InboundFlowController}. + * Tests for {@link DefaultHttp2LocalFlowController}. */ -public class DefaultHttp2InboundFlowControllerTest { +public class DefaultHttp2LocalFlowControllerTest { private static final int STREAM_ID = 1; - private DefaultHttp2InboundFlowController controller; + private DefaultHttp2LocalFlowController controller; @Mock private ByteBuf buffer; @@ -66,28 +66,28 @@ public class DefaultHttp2InboundFlowControllerTest { when(ctx.newPromise()).thenReturn(promise); connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2InboundFlowController(connection, frameWriter, updateRatio); + controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio); connection.local().createStream(STREAM_ID, false); } @Test public void dataFrameShouldBeAccepted() throws Http2Exception { - applyFlowControl(STREAM_ID, 10, 0, false); + receiveFlowControlledFrame(STREAM_ID, 10, 0, false); verifyWindowUpdateNotSent(); } @Test public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception { int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; - applyFlowControl(STREAM_ID, dataSize, 0, false); + receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent. - returnProcessedBytes(STREAM_ID, 10); + consumeBytes(STREAM_ID, 10); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); // Return the rest and verify the WINDOW_UPDATE is sent. - returnProcessedBytes(STREAM_ID, dataSize - 10); + consumeBytes(STREAM_ID, dataSize - 10); verifyWindowUpdateSent(STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); } @@ -95,7 +95,7 @@ public class DefaultHttp2InboundFlowControllerTest { @Test(expected = Http2Exception.class) public void connectionFlowControlExceededShouldThrow() throws Http2Exception { // Window exceeded because of the padding. - applyFlowControl(STREAM_ID, DEFAULT_WINDOW_SIZE, 1, true); + receiveFlowControlledFrame(STREAM_ID, DEFAULT_WINDOW_SIZE, 1, true); } @Test @@ -103,11 +103,11 @@ public class DefaultHttp2InboundFlowControllerTest { int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; // Set end-of-stream on the frame, so no window update will be sent for the stream. - applyFlowControl(STREAM_ID, dataSize, 0, true); + receiveFlowControlledFrame(STREAM_ID, dataSize, 0, true); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID); - returnProcessedBytes(STREAM_ID, dataSize); + consumeBytes(STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); verifyWindowUpdateNotSent(STREAM_ID); } @@ -119,8 +119,8 @@ public class DefaultHttp2InboundFlowControllerTest { int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); // Don't set end-of-stream so we'll get a window update for the stream as well. - applyFlowControl(STREAM_ID, dataSize, 0, false); - returnProcessedBytes(STREAM_ID, dataSize); + receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); + consumeBytes(STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateSent(STREAM_ID, windowDelta); } @@ -129,10 +129,10 @@ public class DefaultHttp2InboundFlowControllerTest { public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception { // Send a frame that takes up the entire window. int initialWindowSize = DEFAULT_WINDOW_SIZE; - applyFlowControl(STREAM_ID, initialWindowSize, 0, false); + receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false); assertEquals(0, window(STREAM_ID)); assertEquals(0, window(CONNECTION_STREAM_ID)); - returnProcessedBytes(STREAM_ID, initialWindowSize); + consumeBytes(STREAM_ID, initialWindowSize); assertEquals(initialWindowSize, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); @@ -146,8 +146,8 @@ public class DefaultHttp2InboundFlowControllerTest { reset(frameWriter); // Send the next frame and verify that the expected window updates were sent. - applyFlowControl(STREAM_ID, initialWindowSize, 0, false); - returnProcessedBytes(STREAM_ID, initialWindowSize); + receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false); + consumeBytes(STREAM_ID, initialWindowSize); int delta = newInitialWindowSize - initialWindowSize; verifyWindowUpdateSent(STREAM_ID, delta); verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); @@ -164,12 +164,12 @@ public class DefaultHttp2InboundFlowControllerTest { // Test that both stream and connection window are updated (or not updated) together int data1 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; - applyFlowControl(STREAM_ID, data1, 0, false); + receiveFlowControlledFrame(STREAM_ID, data1, 0, false); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID)); - returnProcessedBytes(STREAM_ID, data1); + consumeBytes(STREAM_ID, data1); verifyWindowUpdateSent(STREAM_ID, data1); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1); @@ -180,16 +180,16 @@ public class DefaultHttp2InboundFlowControllerTest { // a window update for the connection stream. --data1; int data2 = data1 >> 1; - applyFlowControl(STREAM_ID, data1, 0, false); - applyFlowControl(newStreamId, data1, 0, false); + receiveFlowControlledFrame(STREAM_ID, data1, 0, false); + receiveFlowControlledFrame(newStreamId, data1, 0, false); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(newStreamId); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId)); assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID)); - returnProcessedBytes(STREAM_ID, data1); - returnProcessedBytes(newStreamId, data2); + consumeBytes(STREAM_ID, data1); + consumeBytes(newStreamId, data2); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(newStreamId); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); @@ -216,26 +216,27 @@ public class DefaultHttp2InboundFlowControllerTest { private void testRatio(float ratio, int newDefaultWindowSize, int newStreamId, boolean setStreamRatio) throws Http2Exception { - controller.initialStreamWindowSize(ctx, 0, newDefaultWindowSize); - connection.local().createStream(newStreamId, false); + int delta = newDefaultWindowSize - DEFAULT_WINDOW_SIZE; + controller.incrementWindowSize(ctx, stream(0), delta); + Http2Stream stream = connection.local().createStream(newStreamId, false); if (setStreamRatio) { - controller.windowUpdateRatio(ctx, newStreamId, ratio); + controller.windowUpdateRatio(ctx, stream, ratio); } - controller.initialStreamWindowSize(ctx, newStreamId, newDefaultWindowSize); + controller.incrementWindowSize(ctx, stream, delta); reset(frameWriter); try { int data1 = (int) (newDefaultWindowSize * ratio) + 1; int data2 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) >> 1; - applyFlowControl(STREAM_ID, data2, 0, false); - applyFlowControl(newStreamId, data1, 0, false); + receiveFlowControlledFrame(STREAM_ID, data2, 0, false); + receiveFlowControlledFrame(newStreamId, data1, 0, false); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(newStreamId); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID)); assertEquals(newDefaultWindowSize - data1, window(newStreamId)); assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID)); - returnProcessedBytes(STREAM_ID, data2); - returnProcessedBytes(newStreamId, data1); + consumeBytes(STREAM_ID, data2); + consumeBytes(newStreamId, data1); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateSent(newStreamId, data1); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); @@ -252,10 +253,11 @@ public class DefaultHttp2InboundFlowControllerTest { return initialSize - newWindowSize; } - private void applyFlowControl(int streamId, int dataSize, int padding, boolean endOfStream) throws Http2Exception { + private void receiveFlowControlledFrame(int streamId, int dataSize, int padding, + boolean endOfStream) throws Http2Exception { final ByteBuf buf = dummyData(dataSize); try { - controller.applyFlowControl(ctx, streamId, buf, padding, endOfStream); + controller.receiveFlowControlledFrame(ctx, stream(streamId), buf, padding, endOfStream); } finally { buf.release(); } @@ -267,8 +269,8 @@ public class DefaultHttp2InboundFlowControllerTest { return buffer; } - private void returnProcessedBytes(int streamId, int processedBytes) throws Http2Exception { - connection.requireStream(streamId).garbageCollector().returnProcessedBytes(ctx, processedBytes); + private void consumeBytes(int streamId, int numBytes) throws Http2Exception { + controller.consumeBytes(ctx, stream(streamId), numBytes); } private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception { @@ -284,7 +286,11 @@ public class DefaultHttp2InboundFlowControllerTest { any(ChannelPromise.class)); } - private int window(int streamId) { - return connection.stream(streamId).inboundFlow().window(); + private int window(int streamId) throws Http2Exception { + return controller.windowSize(stream(streamId)); + } + + private Http2Stream stream(int streamId) throws Http2Exception { + return connection.requireStream(streamId); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java similarity index 88% rename from codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java rename to codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index 5ce7c0bf0a..c60070e71b 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -19,8 +19,8 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -33,7 +33,6 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController.OutboundFlowState; import io.netty.handler.codec.http2.Http2FrameWriter.Configuration; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; @@ -49,16 +48,16 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; /** - * Tests for {@link DefaultHttp2OutboundFlowController}. + * Tests for {@link DefaultHttp2RemoteFlowController}. */ -public class DefaultHttp2OutboundFlowControllerTest { +public class DefaultHttp2RemoteFlowControllerTest { private static final int STREAM_A = 1; private static final int STREAM_B = 3; private static final int STREAM_C = 5; private static final int STREAM_D = 7; private static final int STREAM_E = 9; - private DefaultHttp2OutboundFlowController controller; + private DefaultHttp2RemoteFlowController controller; @Mock private ByteBuf buffer; @@ -87,7 +86,7 @@ public class DefaultHttp2OutboundFlowControllerTest { when(ctx.newPromise()).thenReturn(promise); connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2OutboundFlowController(connection, frameWriter); + controller = new DefaultHttp2RemoteFlowController(connection, frameWriter); connection.local().createStream(STREAM_A, false); connection.local().createStream(STREAM_B, false); @@ -101,7 +100,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void initialWindowSizeShouldOnlyChangeStreams() throws Http2Exception { - controller.initialOutboundWindowSize(0); + controller.initialWindowSize(0); assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); assertEquals(0, window(STREAM_B)); @@ -111,7 +110,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void windowUpdateShouldChangeConnectionWindow() throws Http2Exception { - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 100); + incrementWindowSize(CONNECTION_STREAM_ID, 100); assertEquals(DEFAULT_WINDOW_SIZE + 100, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -121,7 +120,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void windowUpdateShouldChangeStreamWindow() throws Http2Exception { - controller.updateOutboundWindowSize(STREAM_A, 100); + incrementWindowSize(STREAM_A, 100); assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE + 100, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -147,13 +146,14 @@ public class DefaultHttp2OutboundFlowControllerTest { final ByteBuf data = dummyData(5, 5); try { // Write one frame. - ChannelFuture future1 = controller.writeData(ctx, STREAM_A, data, 0, false, promise); - assertEquals(future1, controller.lastWriteForStream(STREAM_A)); + Http2Stream stream = stream(STREAM_A); + ChannelFuture future1 = controller.sendFlowControlledFrame(ctx, stream, data, 0, false, promise); + assertEquals(future1, controller.lastFlowControlledFrameSent(stream)); // Now write another and verify that the last write is updated. - ChannelFuture future2 = controller.writeData(ctx, STREAM_A, data, 0, false, promise2); + ChannelFuture future2 = controller.sendFlowControlledFrame(ctx, stream, data, 0, false, promise2); assertNotSame(future1, future2); - assertEquals(future2, controller.lastWriteForStream(STREAM_A)); + assertEquals(future2, controller.lastFlowControlledFrameSent(stream)); } finally { manualSafeRelease(data); } @@ -200,7 +200,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void stalledStreamShouldQueueFrame() throws Http2Exception { - controller.initialOutboundWindowSize(0); + controller.initialWindowSize(0); final ByteBuf data = dummyData(10, 5); try { @@ -214,7 +214,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void frameShouldSplit() throws Http2Exception { - controller.initialOutboundWindowSize(5); + controller.initialWindowSize(5); final ByteBuf data = dummyData(5, 5); try { @@ -236,7 +236,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void frameShouldSplitPadding() throws Http2Exception { - controller.initialOutboundWindowSize(5); + controller.initialWindowSize(5); final ByteBuf data = dummyData(3, 7); try { @@ -257,7 +257,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void emptyFrameShouldSplitPadding() throws Http2Exception { - controller.initialOutboundWindowSize(5); + controller.initialWindowSize(5); final ByteBuf data = dummyData(0, 10); try { @@ -277,7 +277,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void windowUpdateShouldSendFrame() throws Http2Exception { - controller.initialOutboundWindowSize(10); + controller.initialWindowSize(10); final ByteBuf data = dummyData(10, 10); try { @@ -285,7 +285,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyWrite(STREAM_A, data.slice(0, 10), 0); // Update the window and verify that the rest of the frame is written. - controller.updateOutboundWindowSize(STREAM_A, 10); + incrementWindowSize(STREAM_A, 10); verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 10); assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); @@ -299,7 +299,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void initialWindowUpdateShouldSendFrame() throws Http2Exception { - controller.initialOutboundWindowSize(0); + controller.initialWindowSize(0); final ByteBuf data = dummyData(10, 0); try { @@ -307,7 +307,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Verify that the entire frame was sent. - controller.initialOutboundWindowSize(10); + controller.initialWindowSize(10); ArgumentCaptor argument = ArgumentCaptor.forClass(ByteBuf.class); captureWrite(STREAM_A, argument, 0, false); final ByteBuf writtenBuf = argument.getValue(); @@ -321,7 +321,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void successiveSendsShouldNotInteract() throws Http2Exception { // Collapse the connection window to force queueing. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID)); + incrementWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID)); assertEquals(0, window(CONNECTION_STREAM_ID)); ByteBuf data = dummyData(5, 5); @@ -331,7 +331,7 @@ public class DefaultHttp2OutboundFlowControllerTest { send(STREAM_A, dataOnly.slice(), 5); verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_B); - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 8); + incrementWindowSize(CONNECTION_STREAM_ID, 8); ArgumentCaptor argument = ArgumentCaptor.forClass(ByteBuf.class); captureWrite(STREAM_A, argument, 3, false); ByteBuf writtenBuf = argument.getValue(); @@ -345,7 +345,7 @@ public class DefaultHttp2OutboundFlowControllerTest { send(STREAM_B, dataOnly.slice(), 5); verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_B); - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 12); + incrementWindowSize(CONNECTION_STREAM_ID, 12); assertEquals(0, window(CONNECTION_STREAM_ID)); // Verify the rest of A is written. @@ -368,7 +368,7 @@ public class DefaultHttp2OutboundFlowControllerTest { public void negativeWindowShouldNotThrowException() throws Http2Exception { final int initWindow = 20; final int secondWindowSize = 10; - controller.initialOutboundWindowSize(initWindow); + controller.initialWindowSize(initWindow); Http2Stream streamA = connection.stream(STREAM_A); final ByteBuf data = dummyData(initWindow, 0); @@ -379,8 +379,8 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyWrite(STREAM_A, data.slice(0, initWindow), 0); // Make the window size for stream A negative - controller.initialOutboundWindowSize(initWindow - secondWindowSize); - assertEquals(-secondWindowSize, streamA.outboundFlow().window()); + controller.initialWindowSize(initWindow - secondWindowSize); + assertEquals(-secondWindowSize, controller.windowSize(streamA)); // Queue up a write. It should not be written now because the window is negative resetFrameWriter(); @@ -388,18 +388,18 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Open the window size back up a bit (no send should happen) - controller.updateOutboundWindowSize(STREAM_A, 5); - assertEquals(-5, streamA.outboundFlow().window()); + incrementWindowSize(STREAM_A, 5); + assertEquals(-5, controller.windowSize(streamA)); verifyNoWrite(STREAM_A); // Open the window size back up a bit (no send should happen) - controller.updateOutboundWindowSize(STREAM_A, 5); - assertEquals(0, streamA.outboundFlow().window()); + incrementWindowSize(STREAM_A, 5); + assertEquals(0, controller.windowSize(streamA)); verifyNoWrite(STREAM_A); // Open the window size back up and allow the write to happen - controller.updateOutboundWindowSize(STREAM_A, 5); - assertEquals(0, streamA.outboundFlow().window()); + incrementWindowSize(STREAM_A, 5); + assertEquals(0, controller.windowSize(streamA)); // Verify that the entire frame was sent. ArgumentCaptor argument = ArgumentCaptor.forClass(ByteBuf.class); @@ -415,7 +415,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void initialWindowUpdateShouldSendEmptyFrame() throws Http2Exception { - controller.initialOutboundWindowSize(0); + controller.initialWindowSize(0); // First send a frame that will get buffered. final ByteBuf data = dummyData(10, 0); @@ -428,7 +428,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Re-expand the window and verify that both frames were sent. - controller.initialOutboundWindowSize(10); + controller.initialWindowSize(10); verifyWrite(STREAM_A, data.slice(), 0); verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 0); @@ -439,7 +439,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception { - controller.initialOutboundWindowSize(0); + controller.initialWindowSize(0); final ByteBuf data = dummyData(10, 0); try { @@ -447,7 +447,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Verify that a partial frame of 5 was sent. - controller.initialOutboundWindowSize(5); + controller.initialWindowSize(5); ArgumentCaptor argument = ArgumentCaptor.forClass(ByteBuf.class); captureWrite(STREAM_A, argument, 0, false); ByteBuf writtenBuf = argument.getValue(); @@ -471,7 +471,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Verify that the entire frame was sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); + incrementWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -499,7 +499,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Verify that a partial frame of 5 was sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 5); + incrementWindowSize(CONNECTION_STREAM_ID, 5); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -529,7 +529,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Verify that the entire frame was sent. - controller.updateOutboundWindowSize(STREAM_A, 10); + incrementWindowSize(STREAM_A, 10); assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -557,7 +557,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); // Verify that a partial frame of 5 was sent. - controller.updateOutboundWindowSize(STREAM_A, 5); + incrementWindowSize(STREAM_A, 5); assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -599,7 +599,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_B); // Open up the connection window. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); + incrementWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B)); @@ -653,7 +653,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_D); // Verify that the entire frame was sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); + incrementWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2); @@ -713,7 +713,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_D); // Verify that the entire frame was sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); + incrementWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A)); assertEquals(0, window(STREAM_B)); @@ -766,7 +766,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_D); // Verify that the entire frame was sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); + incrementWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A)); assertEquals(0, window(STREAM_B)); @@ -840,7 +840,7 @@ public class DefaultHttp2OutboundFlowControllerTest { setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false); // Verify that the entire frame was sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); + incrementWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2); assertEquals(0, window(STREAM_B)); @@ -895,7 +895,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_D); // Allow 1000 bytes to be sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 1000); + incrementWindowSize(CONNECTION_STREAM_ID, 1000); final ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); captureWrite(STREAM_A, captor, 0, false); @@ -973,7 +973,7 @@ public class DefaultHttp2OutboundFlowControllerTest { assertEquals(0, captor.getValue().readableBytes()); // Allow 1000 bytes to be sent. - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999); + incrementWindowSize(CONNECTION_STREAM_ID, 999); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50); assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50); @@ -1038,18 +1038,17 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_D); - OutboundFlowState state = state(stream0); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); - state = state(streamA); + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(stream0)); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); - state = state(streamB); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree()); - state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); - state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); + streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), + streamableBytesForTree(streamD)); } finally { manualSafeRelease(bufs); } @@ -1108,19 +1107,18 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_D); streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); - OutboundFlowState state = state(stream0); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); - state = state(streamA); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); - state = state(streamB); + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(stream0)); + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(streamA)); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); - state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); - state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), + streamableBytesForTree(streamD)); } finally { manualSafeRelease(bufs); } @@ -1185,23 +1183,20 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_D); verifyNoWrite(STREAM_E); - OutboundFlowState state = state(stream0); - assertEquals( - calculateStreamSizeSum(streamSizes, + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)), - state.streamableBytesForTree()); - state = state(streamA); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); - state = state(streamB); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree()); - state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); - state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); - state = state(streamE); + streamableBytesForTree(stream0)); + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)), + streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), + streamableBytesForTree(streamD)); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); + streamableBytesForTree(streamE)); } finally { manualSafeRelease(bufs); } @@ -1257,26 +1252,20 @@ public class DefaultHttp2OutboundFlowControllerTest { streamA.close(); - OutboundFlowState state = state(stream0); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), - state.streamableBytesForTree()); - state = state(streamA); - assertEquals(0, state.streamableBytesForTree()); - state = state(streamB); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree()); - state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); - state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); + streamableBytesForTree(stream0)); + assertEquals(0, streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), + streamableBytesForTree(streamD)); } finally { manualSafeRelease(bufs); } } - private static OutboundFlowState state(Http2Stream stream) { - return (OutboundFlowState) stream.outboundFlow(); - } - private static int calculateStreamSizeSum(IntObjectMap streamSizes, List streamIds) { int sum = 0; for (int i = 0; i < streamIds.size(); ++i) { @@ -1288,9 +1277,10 @@ public class DefaultHttp2OutboundFlowControllerTest { return sum; } - private void send(int streamId, ByteBuf data, int padding) { - ChannelFuture future = controller.writeData(ctx, streamId, data, padding, false, promise); - assertEquals(future, controller.lastWriteForStream(streamId)); + private void send(int streamId, ByteBuf data, int padding) throws Http2Exception { + Http2Stream stream = stream(streamId); + ChannelFuture future = controller.sendFlowControlledFrame(ctx, stream, data, padding, false, promise); + assertEquals(future, controller.lastFlowControlledFrameSent(stream)); } private void verifyWrite(int streamId, ByteBuf data, int padding) { @@ -1302,8 +1292,10 @@ public class DefaultHttp2OutboundFlowControllerTest { eq(promise)); } - private void captureWrite(int streamId, ArgumentCaptor captor, int padding, boolean endStream) { - verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(padding), eq(endStream), eq(promise)); + private void captureWrite(int streamId, ArgumentCaptor captor, int padding, + boolean endStream) { + verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(padding), + eq(endStream), eq(promise)); } private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception { @@ -1311,7 +1303,7 @@ public class DefaultHttp2OutboundFlowControllerTest { } private void exhaustStreamWindow(int streamId) throws Http2Exception { - controller.updateOutboundWindowSize(streamId, -window(streamId)); + incrementWindowSize(streamId, -window(streamId)); } private void resetFrameWriter() { @@ -1321,8 +1313,20 @@ public class DefaultHttp2OutboundFlowControllerTest { when(frameWriterSizePolicy.maxFrameSize()).thenReturn(Integer.MAX_VALUE); } - private int window(int streamId) { - return connection.stream(streamId).outboundFlow().window(); + private int window(int streamId) throws Http2Exception { + return controller.windowSize(stream(streamId)); + } + + private void incrementWindowSize(int streamId, int delta) throws Http2Exception { + controller.incrementWindowSize(ctx, stream(streamId), delta); + } + + private int streamableBytesForTree(Http2Stream stream) throws Http2Exception { + return controller.streamableBytesForTree(stream); + } + + private Http2Stream stream(int streamId) throws Http2Exception { + return connection.requireStream(streamId); } private static ByteBuf dummyData(int size, int padding) { diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java index 96ac5b7588..c7c35d1cb2 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java @@ -28,8 +28,6 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; import io.netty.handler.codec.http2.Http2Connection; @@ -68,8 +66,6 @@ public class Http2ClientInitializer extends ChannelInitializer { connectionHandler = new HttpToHttp2ConnectionHandler(connection, frameReader(), frameWriter, - new DefaultHttp2InboundFlowController(connection, frameWriter), - new DefaultHttp2OutboundFlowController(connection, frameWriter), new DelegatingDecompressorFrameListener(connection, new InboundHttp2ToHttpAdapter.Builder(connection) .maxContentLength(maxContentLength) diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index 11fc26f712..caf8142f1c 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -23,7 +23,6 @@ import static io.netty.util.internal.logging.InternalLogLevel.INFO; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.AsciiString; -import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerUpgradeHandler; import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2FrameReader;