From ee9233d8fa99e53408cba61d97d9707881fda1e0 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 22 Apr 2015 14:35:31 -0700 Subject: [PATCH] HTTP/2 Flow Controller required memory reduction Motivation: Currently we allocate the full amount of state for each stream as soon as the stream is created, and keep that state until the stream is GC. The full set of state is only needed when the stream can support flow controlled frames. There is an opportunity to reduce the required amount of memory, and make memory eligible for GC sooner by only allocating what is necessary for flow control stream state. Modifications: Introduce objects which require 'less' state for local/remote flow control stream state. Use these new objects when streams have been created but will not transition out of idle AND when streams are no longer eligible for flow controlled frame transfer but still must persist in the priority tree. Result: Memory allocations are reduced to what is actually needed, and memory is made eligible for GC potentially sooner. --- .../codec/http2/DefaultHttp2Connection.java | 110 +++--- .../http2/DefaultHttp2ConnectionDecoder.java | 5 +- .../http2/DefaultHttp2ConnectionEncoder.java | 31 +- .../DefaultHttp2LocalFlowController.java | 219 ++++++++--- .../DefaultHttp2RemoteFlowController.java | 347 +++++++++++++----- .../handler/codec/http2/Http2Connection.java | 24 +- .../codec/http2/Http2ConnectionHandler.java | 4 +- .../DefaultHttp2ConnectionDecoderTest.java | 24 +- .../DefaultHttp2ConnectionEncoderTest.java | 23 +- .../http2/DefaultHttp2ConnectionTest.java | 176 ++++----- .../DefaultHttp2LocalFlowControllerTest.java | 6 +- .../DefaultHttp2RemoteFlowControllerTest.java | 10 +- .../http2/Http2ConnectionRoundtripTest.java | 13 +- .../handler/codec/http2/Http2TestUtil.java | 13 +- 14 files changed, 667 insertions(+), 338 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 1322ace09a..257471307b 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -33,6 +33,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.PrimitiveCollections; @@ -229,12 +230,47 @@ public class DefaultHttp2Connection implements Http2Connection { } } + static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed) + throws Http2Exception { + switch (initialState) { + case IDLE: + return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN; + case RESERVED_LOCAL: + return HALF_CLOSED_REMOTE; + case RESERVED_REMOTE: + return HALF_CLOSED_LOCAL; + default: + throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: " + + initialState); + } + } + + void notifyHalfClosed(Http2Stream stream) { + for (int i = 0; i < listeners.size(); i++) { + try { + listeners.get(i).onStreamHalfClosed(stream); + } catch (RuntimeException e) { + logger.error("Caught RuntimeException from listener onStreamHalfClosed.", e); + } + } + } + + void notifyClosed(Http2Stream stream) { + for (int i = 0; i < listeners.size(); i++) { + try { + listeners.get(i).onStreamClosed(stream); + } catch (RuntimeException e) { + logger.error("Caught RuntimeException from listener onStreamClosed.", e); + } + } + } + /** * Simple stream implementation. Streams can be compared to each other by priority. */ private class DefaultStream implements Http2Stream { private final int id; - private State state = IDLE; + private State state; private short weight = DEFAULT_PRIORITY_WEIGHT; private DefaultStream parent; private IntObjectMap children = PrimitiveCollections.emptyIntObjectMap(); @@ -245,8 +281,9 @@ public class DefaultHttp2Connection implements Http2Connection { private FlowControlState localFlowState; private FlowControlState remoteFlowState; - DefaultStream(int id) { + DefaultStream(int id, State state) { this.id = id; + this.state = state; data = new LazyPropertyMap(this); } @@ -375,7 +412,7 @@ public class DefaultHttp2Connection implements Http2Connection { if (newParent == null) { // Streams can depend on other streams in the IDLE state. We must ensure // the stream has been "created" in order to use it in the priority tree. - newParent = createdBy().createStream(parentStreamId); + newParent = createdBy().createIdleStream(parentStreamId); } else if (this == newParent) { throw new IllegalArgumentException("A stream cannot depend on itself"); } @@ -400,24 +437,18 @@ public class DefaultHttp2Connection implements Http2Connection { @Override public Http2Stream open(boolean halfClosed) throws Http2Exception { - switch (state) { - case IDLE: - state = halfClosed ? isLocal() ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN; - break; - case RESERVED_LOCAL: - state = HALF_CLOSED_REMOTE; - break; - case RESERVED_REMOTE: - state = HALF_CLOSED_LOCAL; - break; - default: - throw streamError(id, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: " + state); + state = activeState(id, state, isLocal(), halfClosed); + activate(); + if (halfClosed) { + notifyHalfClosed(this); } - - activeStreams.activate(this); return this; } + void activate() { + activeStreams.activate(this); + } + @Override public Http2Stream close() { if (state == CLOSED) { @@ -516,16 +547,6 @@ public class DefaultHttp2Connection implements Http2Connection { return state != CLOSED; } - private void notifyHalfClosed(Http2Stream stream) { - for (int i = 0; i < listeners.size(); i++) { - try { - listeners.get(i).onStreamHalfClosed(stream); - } catch (RuntimeException e) { - logger.error("Caught RuntimeException from listener onStreamHalfClosed.", e); - } - } - } - private void initChildrenIfEmpty() { if (children == PrimitiveCollections.emptyIntObjectMap()) { initChildren(); @@ -802,7 +823,7 @@ public class DefaultHttp2Connection implements Http2Connection { */ private final class ConnectionStream extends DefaultStream { ConnectionStream() { - super(CONNECTION_STREAM_ID); + super(CONNECTION_STREAM_ID, IDLE); } @Override @@ -891,12 +912,11 @@ public class DefaultHttp2Connection implements Http2Connection { return nextStreamId() > 0 && numActiveStreams + 1 <= maxActiveStreams; } - @Override - public DefaultStream createStream(int streamId) throws Http2Exception { + private DefaultStream createStream(int streamId, State state) throws Http2Exception { checkNewStreamAllowed(streamId); // Create and initialize the stream. - DefaultStream stream = new DefaultStream(streamId); + DefaultStream stream = new DefaultStream(streamId, state); // Update the next and last stream IDs. nextStreamId = streamId + 2; @@ -906,6 +926,21 @@ public class DefaultHttp2Connection implements Http2Connection { return stream; } + @Override + public DefaultStream createIdleStream(int streamId) throws Http2Exception { + return createStream(streamId, IDLE); + } + + @Override + public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception { + DefaultStream stream = createStream(streamId, activeState(streamId, IDLE, isLocal(), halfClosed)); + stream.activate(); + if (halfClosed) { + notifyHalfClosed(stream); + } + return stream; + } + @Override public boolean isServer() { return server; @@ -925,8 +960,7 @@ public class DefaultHttp2Connection implements Http2Connection { checkNewStreamAllowed(streamId); // Create and initialize the stream. - DefaultStream stream = new DefaultStream(streamId); - stream.state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE; + DefaultStream stream = new DefaultStream(streamId, isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE); // Update the next and last stream IDs. nextStreamId = streamId + 2; @@ -1152,16 +1186,6 @@ public class DefaultHttp2Connection implements Http2Connection { removeStream(stream); } - private void notifyClosed(DefaultStream stream) { - for (int i = 0; i < listeners.size(); i++) { - try { - listeners.get(i).onStreamClosed(stream); - } catch (RuntimeException e) { - logger.error("Caught RuntimeException from listener onStreamClosed.", e); - } - } - } - private boolean allowModifications() { return pendingIterations == 0; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 27608b569f..3819e159a9 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -271,7 +271,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Http2Stream stream = connection.stream(streamId); boolean allowHalfClosedRemote = false; if (stream == null && !connection.streamMayHaveExisted(streamId)) { - stream = connection.remote().createStream(streamId).open(endOfStream); + stream = connection.remote().createStream(streamId, endOfStream); // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state. allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE; } @@ -283,7 +283,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { switch (stream.state()) { case RESERVED_REMOTE: - case IDLE: stream.open(endOfStream); break; case OPEN: @@ -336,7 +335,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the // first frame to be received for a stream that we must create the stream. - stream = connection.remote().createStream(streamId); + stream = connection.remote().createIdleStream(streamId); } else if (streamCreatedAfterGoAwaySent(streamId)) { // Ignore this frame. return; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 1d71e291de..d64045583f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -149,21 +149,20 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { try { Http2Stream stream = connection.stream(streamId); if (stream == null) { - stream = connection.local().createStream(streamId); - } - - switch (stream.state()) { - case RESERVED_LOCAL: - case IDLE: - stream.open(endOfStream); - break; - case OPEN: - case HALF_CLOSED_REMOTE: - // Allowed sending headers in these states. - break; - default: - throw new IllegalStateException(String.format( - "Stream %d in unexpected state: %s", stream.id(), stream.state())); + stream = connection.local().createStream(streamId, endOfStream); + } else { + switch (stream.state()) { + case RESERVED_LOCAL: + stream.open(endOfStream); + break; + case OPEN: + case HALF_CLOSED_REMOTE: + // Allowed sending headers in these states. + break; + default: + throw new IllegalStateException(String.format( + "Stream %d in unexpected state: %s", stream.id(), stream.state())); + } } // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames. @@ -186,7 +185,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { // Update the priority on this stream. Http2Stream stream = connection.stream(streamId); if (stream == null) { - stream = connection.local().createStream(streamId); + stream = connection.local().createIdleStream(streamId); } // The set priority operation must be done before sending the frame. The parent may not yet exist diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index c1a7d495b3..b2fbb37037 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -61,20 +61,22 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController // Add a flow state for the connection. connection.connectionStream().localFlowState( - new DefaultFlowState(connection.connectionStream(), initialWindowSize)); + new DefaultState(connection.connectionStream(), initialWindowSize)); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @Override public void onStreamAdded(Http2Stream stream) { - stream.localFlowState(new DefaultFlowState(stream, 0)); + // Unconditionally used the reduced flow control state because it requires no object allocation + // and the DefaultFlowState will be allocated in onStreamActive. + stream.localFlowState(REDUCED_FLOW_STATE); } @Override public void onStreamActive(Http2Stream stream) { // Need to be sure the stream's initial window is adjusted for SETTINGS // frames which may have been exchanged while it was in IDLE - state(stream).window(initialWindowSize); + stream.localFlowState(new DefaultState(stream, initialWindowSize)); } @Override @@ -82,7 +84,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController try { // When a stream is closed, consume any remaining bytes so that they // are restored to the connection window. - DefaultFlowState state = state(stream); + FlowState state = state(stream); int unconsumedBytes = state.unconsumedBytes(); if (ctx != null && unconsumedBytes > 0) { connectionState().consumeBytes(ctx, unconsumedBytes); @@ -90,6 +92,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } } catch (Http2Exception e) { PlatformDependent.throwException(e); + } finally { + // Unconditionally reduce the amount of memory required for flow control because there is no + // object allocation costs associated with doing so and the stream will not have any more + // local flow control state to keep track of anymore. + stream.localFlowState(REDUCED_FLOW_STATE); } } }); @@ -113,7 +120,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception { checkNotNull(ctx, "ctx"); - DefaultFlowState state = state(stream); + FlowState state = state(stream); // Just add the delta to the stream-specific initial window size so that the next time the window // expands it will grow to the new initial size. state.incrementInitialStreamWindow(delta); @@ -186,7 +193,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController */ public void windowUpdateRatio(ChannelHandlerContext ctx, Http2Stream stream, float ratio) throws Http2Exception { checkValidRatio(ratio); - DefaultFlowState state = state(stream); + FlowState state = state(stream); state.windowUpdateRatio(ratio); state.writeWindowUpdateIfNeeded(ctx); } @@ -208,12 +215,12 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController int dataLength = data.readableBytes() + padding; // Apply the connection-level flow control - DefaultFlowState connectionState = connectionState(); + FlowState connectionState = connectionState(); connectionState.receiveFlowControlledFrame(dataLength); if (stream != null && !isClosed(stream)) { // Apply the stream-level flow control - DefaultFlowState state = state(stream); + FlowState state = state(stream); state.endOfStream(endOfStream); state.receiveFlowControlledFrame(dataLength); } else if (dataLength > 0) { @@ -222,12 +229,12 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } } - private DefaultFlowState connectionState() { - return (DefaultFlowState) connection.connectionStream().localFlowState(); + private FlowState connectionState() { + return (FlowState) connection.connectionStream().localFlowState(); } - private static DefaultFlowState state(Http2Stream stream) { - return (DefaultFlowState) checkNotNull(stream, "stream").localFlowState(); + private static FlowState state(Http2Stream stream) { + return (FlowState) checkNotNull(stream, "stream").localFlowState(); } private static boolean isClosed(Http2Stream stream) { @@ -237,7 +244,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController /** * Flow control window state for an individual stream. */ - private final class DefaultFlowState implements FlowControlState { + private final class DefaultState implements FlowState { private final Http2Stream stream; /** @@ -269,12 +276,17 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController private int lowerBound; private boolean endOfStream; - DefaultFlowState(Http2Stream stream, int initialWindowSize) { + public DefaultState(Http2Stream stream, int initialWindowSize) { this.stream = stream; window(initialWindowSize); streamWindowUpdateRatio = windowUpdateRatio; } + @Override + public void window(int initialWindowSize) { + window = processedWindow = initialStreamWindowSize = initialWindowSize; + } + @Override public int windowSize() { return window; @@ -285,27 +297,23 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController return initialStreamWindowSize; } - void window(int initialWindowSize) { - window = processedWindow = initialStreamWindowSize = initialWindowSize; - } - - void endOfStream(boolean endOfStream) { + @Override + public void endOfStream(boolean endOfStream) { this.endOfStream = endOfStream; } - float windowUpdateRatio() { + @Override + public float windowUpdateRatio() { return streamWindowUpdateRatio; } - void windowUpdateRatio(float ratio) { + @Override + public void windowUpdateRatio(float ratio) { streamWindowUpdateRatio = ratio; } - /** - * Increment the initial window size for this stream. - * @param delta The amount to increase the initial window size by. - */ - void incrementInitialStreamWindow(int delta) { + @Override + public void incrementInitialStreamWindow(int delta) { // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range. int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE, max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta)); @@ -314,12 +322,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController initialStreamWindowSize += delta; } - /** - * Increment the windows which are used to determine many bytes have been processed. - * @param delta The amount to increment the window by. - * @throws Http2Exception if integer overflow occurs on the window. - */ - void incrementFlowControlWindows(int delta) throws Http2Exception { + @Override + public void incrementFlowControlWindows(int delta) throws Http2Exception { if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) { throw streamError(stream.id(), FLOW_CONTROL_ERROR, "Flow control window overflowed for stream: %d", stream.id()); @@ -330,12 +334,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController lowerBound = delta < 0 ? delta : 0; } - /** - * A flow control event has occurred and we should decrement the amount of available bytes for this stream. - * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control. - * @throws Http2Exception If too much data is used relative to how much is available. - */ - void receiveFlowControlledFrame(int dataLength) throws Http2Exception { + @Override + public void receiveFlowControlledFrame(int dataLength) throws Http2Exception { assert dataLength >= 0; // Apply the delta. Even if we throw an exception we want to have taken this delta into account. @@ -352,10 +352,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } } - /** - * Returns the processed bytes for this stream. - */ - void returnProcessedBytes(int delta) throws Http2Exception { + @Override + public void returnProcessedBytes(int delta) throws Http2Exception { if (processedWindow - delta < window) { throw streamError(stream.id(), INTERNAL_ERROR, "Attempting to return too many bytes for stream %d", stream.id()); @@ -363,21 +361,21 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController processedWindow -= delta; } - void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + @Override + public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { // Return the bytes processed and update the window. returnProcessedBytes(numBytes); writeWindowUpdateIfNeeded(ctx); } - int unconsumedBytes() { + @Override + public int unconsumedBytes() { return processedWindow - window; } - /** - * Updates the flow control window for this stream if it is appropriate. - */ - void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { - if (endOfStream || initialStreamWindowSize <= 0 || isClosed(stream)) { + @Override + public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { + if (endOfStream || initialStreamWindowSize <= 0) { return; } @@ -391,7 +389,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController * Called to perform a window update for this stream (or connection). Updates the window size back * to the size of the initial window and sends a window update frame to the remote endpoint. */ - void writeWindowUpdate(ChannelHandlerContext ctx) throws Http2Exception { + private void writeWindowUpdate(ChannelHandlerContext ctx) throws Http2Exception { // Expand the window for this stream back to the size of the initial window. int deltaWindowSize = initialStreamWindowSize - processedWindow; try { @@ -407,6 +405,125 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } } + /** + * The local flow control state for a single stream that is not in a state where flow controlled frames cannot + * be exchanged. + */ + private static final FlowState REDUCED_FLOW_STATE = new FlowState() { + @Override + public int windowSize() { + return 0; + } + + @Override + public int initialWindowSize() { + return 0; + } + + @Override + public void window(int initialWindowSize) { + throw new UnsupportedOperationException(); + } + + @Override + public void incrementInitialStreamWindow(int delta) { + // This operation needs to be supported during the initial settings exchange when + // the peer has not yet acknowledged this peer being activated. + } + + @Override + public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + } + + @Override + public int unconsumedBytes() { + return 0; + } + + @Override + public float windowUpdateRatio() { + throw new UnsupportedOperationException(); + } + + @Override + public void windowUpdateRatio(float ratio) { + throw new UnsupportedOperationException(); + } + + @Override + public void receiveFlowControlledFrame(int dataLength) throws Http2Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void incrementFlowControlWindows(int delta) throws Http2Exception { + // This operation needs to be supported during the initial settings exchange when + // the peer has not yet acknowledged this peer being activated. + } + + @Override + public void returnProcessedBytes(int delta) throws Http2Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void endOfStream(boolean endOfStream) { + throw new UnsupportedOperationException(); + } + }; + + /** + * An abstraction around {@link FlowControlState} which provides specific extensions used by local flow control. + */ + private interface FlowState extends FlowControlState { + void window(int initialWindowSize); + + /** + * Increment the initial window size for this stream. + * @param delta The amount to increase the initial window size by. + */ + void incrementInitialStreamWindow(int delta); + + /** + * Updates the flow control window for this stream if it is appropriate. + */ + void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception; + + void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; + + int unconsumedBytes(); + + float windowUpdateRatio(); + + void windowUpdateRatio(float ratio); + + /** + * A flow control event has occurred and we should decrement the amount of available bytes for this stream. + * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control. + * @throws Http2Exception If too much data is used relative to how much is available. + */ + void receiveFlowControlledFrame(int dataLength) throws Http2Exception; + + /** + * Increment the windows which are used to determine many bytes have been processed. + * @param delta The amount to increment the window by. + * @throws Http2Exception if integer overflow occurs on the window. + */ + void incrementFlowControlWindows(int delta) throws Http2Exception; + + /** + * Returns the processed bytes for this stream. + */ + void returnProcessedBytes(int delta) throws Http2Exception; + + void endOfStream(boolean endOfStream); + } + /** * Provides a means to iterate over all active streams and increment the flow control windows. */ @@ -422,7 +539,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController public boolean visit(Http2Stream stream) throws Http2Exception { try { // Increment flow control window first so state will be consistent if overflow is detected. - DefaultFlowState state = state(stream); + FlowState state = state(stream); state.incrementFlowControlWindows(delta); state.incrementInitialStreamWindow(delta); } catch (StreamException e) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 6347654f22..b9eebe859d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -18,6 +18,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; +import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; @@ -51,28 +52,43 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // Add a flow state for the connection. connection.connectionStream().remoteFlowState( - new DefaultFlowState(connection.connectionStream(), initialWindowSize)); + new DefaultState(connection.connectionStream(), initialWindowSize)); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @Override public void onStreamAdded(Http2Stream stream) { - // Just add a new flow state to the stream. - stream.remoteFlowState(new DefaultFlowState(stream, 0)); + // If the stream state is not open then the stream is not yet eligible for flow controlled frames and + // only requires the ReducedFlowState. Otherwise the full amount of memory is required. + stream.remoteFlowState(stream.state() == IDLE ? + new ReducedState(stream) : + new DefaultState(stream, 0)); } @Override public void onStreamActive(Http2Stream stream) { - // Need to be sure the stream's initial window is adjusted for SETTINGS - // frames which may have been exchanged while it was in IDLE - state(stream).window(initialWindowSize); + // If the object was previously created, but later activated then we have to ensure + // the full state is allocated and the proper initialWindowSize is used. + if (stream.remoteFlowState().getClass() == DefaultState.class) { + state(stream).window(initialWindowSize); + } else { + stream.remoteFlowState(new DefaultState(state(stream), initialWindowSize)); + } } @Override public void onStreamClosed(Http2Stream stream) { // Any pending frames can never be written, cancel and // write errors for any pending frames. - state(stream).cancel(); + AbstractState state = state(stream); + state.cancel(); + + // If the stream is now eligible for removal, but will persist in the priority tree then we can + // decrease the amount of memory required for this stream because no flow controlled frames can + // be exchanged on this stream + if (stream.prioritizableForTree() != 0) { + stream.remoteFlowState(new ReducedState(state)); + } } @Override @@ -89,7 +105,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * * This is to cancel any such illegal writes. */ - state(stream).cancel(); + state(stream).cancel(); } } @@ -153,7 +169,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll writePendingBytes(); } else { // Update the stream window and write any pending frames for the stream. - DefaultFlowState state = state(stream); + AbstractState state = state(stream); state.incrementStreamWindow(delta); state.writeBytes(state.writableWindow()); flush(); @@ -169,7 +185,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } // Save the context. We'll use this later when we write pending bytes. this.ctx = ctx; - final DefaultFlowState state; + final AbstractState state; try { state = state(stream); state.enqueueFrame(frame); @@ -193,12 +209,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return state(stream).streamableBytesForTree(); } - private static DefaultFlowState state(Http2Stream stream) { - return (DefaultFlowState) checkNotNull(stream, "stream").remoteFlowState(); + private static AbstractState state(Http2Stream stream) { + return (AbstractState) checkNotNull(stream, "stream").remoteFlowState(); } - private DefaultFlowState connectionState() { - return (DefaultFlowState) connection.connectionStream().remoteFlowState(); + private AbstractState connectionState() { + return (AbstractState) connection.connectionStream().remoteFlowState(); } /** @@ -246,7 +262,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * @return An object summarizing the write and allocation results. */ static int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception { - DefaultFlowState state = state(parent); + AbstractState state = state(parent); if (state.streamableBytesForTree() <= 0) { return 0; } @@ -297,7 +313,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll int connectionWindowChunk = max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight))); int bytesForTree = min(nextConnectionWindow, connectionWindowChunk); - DefaultFlowState state = state(child); + AbstractState state = state(child); int bytesForChild = min(state.streamableBytes(), bytesForTree); // Allocate the bytes to this child. @@ -393,7 +409,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override public boolean visit(Http2Stream child) throws Http2Exception { - DefaultFlowState childState = state(child); + AbstractState childState = state(child); int bytesForChild = childState.streamableBytes(); if (bytesForChild > 0 || childState.hasFrame()) { @@ -411,20 +427,24 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * The remote flow control state for a single stream. */ - private final class DefaultFlowState implements FlowControlState { + private final class DefaultState extends AbstractState { private final Deque pendingWriteQueue; - private final Http2Stream stream; private int window; private int pendingBytes; - private int streamableBytesForTree; private int allocated; // Set to true while a frame is being written, false otherwise. private boolean writing; // Set to true if cancel() was called. private boolean cancelled; - DefaultFlowState(Http2Stream stream, int initialWindowSize) { - this.stream = stream; + public DefaultState(Http2Stream stream, int initialWindowSize) { + super(stream); + window(initialWindowSize); + pendingWriteQueue = new ArrayDeque(2); + } + + public DefaultState(AbstractState existingState, int initialWindowSize) { + super(existingState); window(initialWindowSize); pendingWriteQueue = new ArrayDeque(2); } @@ -439,14 +459,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return initialWindowSize; } - void window(int initialWindowSize) { + @Override + public void window(int initialWindowSize) { window = initialWindowSize; } - /** - * Increment the number of bytes allocated to this stream by the priority algorithm - */ - void allocate(int bytes) { + @Override + public void allocate(int bytes) { allocated += bytes; // Also artificially reduce the streamable bytes for this tree to give the appearance // that the data has been written. This will be restored before the allocated bytes are @@ -454,10 +473,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll incrementStreamableBytesForTree(-bytes); } - /** - * Write bytes allocated bytes for this stream. - */ - void writeAllocatedBytes() { + @Override + public void writeAllocatedBytes() { int numBytes = allocated; // Restore the number of streamable bytes to this branch. @@ -471,14 +488,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * Reset the number of bytes that have been allocated to this stream by the priority algorithm. */ - void resetAllocated() { + private void resetAllocated() { allocated = 0; } - /** - * Increments the flow control window for this stream by the given delta and returns the new value. - */ - int incrementStreamWindow(int delta) throws Http2Exception { + @Override + public int incrementStreamWindow(int delta) throws Http2Exception { if (delta > 0 && Integer.MAX_VALUE - delta < window) { throw streamError(stream.id(), FLOW_CONTROL_ERROR, "Window size overflow for stream: %d", stream.id()); @@ -494,67 +509,51 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return window; } - /** - * Returns the maximum writable window (minimum of the stream and connection windows). - */ - int writableWindow() { + @Override + public int writableWindow() { return min(window, connectionWindowSize()); } - /** - * Returns the number of pending bytes for this node that will fit within the - * {@link #window}. This is used for the priority algorithm to determine the aggregate - * number of bytes that can be written at each node. Each node only takes into account its - * stream window so that when a change occurs to the connection window, these values need - * not change (i.e. no tree traversal is required). - */ - int streamableBytes() { + @Override + public int streamableBytes() { return max(0, min(pendingBytes - allocated, window)); } - int streamableBytesForTree() { + @Override + public int streamableBytesForTree() { return streamableBytesForTree; } - /** - * Adds the {@code frame} to the pending queue and increments the pending - * byte count. - */ - void enqueueFrame(FlowControlled frame) { + @Override + public void enqueueFrame(FlowControlled frame) { incrementPendingBytes(frame.size()); pendingWriteQueue.offer(frame); } - /** - * Indicates whether or not there are frames in the pending queue. - */ - boolean hasFrame() { + @Override + public boolean hasFrame() { return !pendingWriteQueue.isEmpty(); } /** * Returns the the head of the pending queue, or {@code null} if empty. */ - FlowControlled peek() { + private FlowControlled peek() { return pendingWriteQueue.peek(); } - /** - * Clears the pending queue and writes errors for each remaining frame. - */ - void cancel() { + @Override + public void cancel() { cancel(null); } /** * Clears the pending queue and writes errors for each remaining frame. - * * @param cause the {@link Throwable} that caused this method to be invoked. */ - void cancel(Throwable cause) { + private void cancel(Throwable cause) { cancelled = true; - // Ensure that the queue can't be modified while - // we are writing. + // Ensure that the queue can't be modified while we are writing. if (writing) { return; } @@ -568,12 +567,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } } - /** - * Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by - * the number of pending writes available, or because a frame does not support splitting on arbitrary - * boundaries. - */ - int writeBytes(int bytes) { + @Override + public int writeBytes(int bytes) { int bytesAttempted = 0; while (hasFrame()) { int maxBytes = min(bytes - bytesAttempted, writableWindow()); @@ -600,11 +595,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * Note: this does not flush the {@link ChannelHandlerContext}. *

*/ - int write(FlowControlled frame, int allowedBytes) { + private int write(FlowControlled frame, int allowedBytes) { int before = frame.size(); int writtenBytes = 0; - // In case an exception is thrown we want to - // remember it and pass it to cancel(Throwable). + // In case an exception is thrown we want to remember it and pass it to cancel(Throwable). Throwable cause = null; try { assert !writing; @@ -613,10 +607,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll writing = true; needFlush |= frame.write(max(0, allowedBytes)); if (!cancelled && frame.size() == 0) { - // This frame has been fully written, remove this frame - // and notify it. Since we remove this frame - // first, we're guaranteed that its error method will not - // be called when we call cancel. + // This frame has been fully written, remove this frame and notify it. Since we remove this frame + // first, we're guaranteed that its error method will not be called when we call cancel. pendingWriteQueue.remove(); frame.writeComplete(); } @@ -641,23 +633,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return writtenBytes; } - /** - * Recursively increments the streamable bytes for this branch in the priority tree starting at the current - * node. - */ - void incrementStreamableBytesForTree(int numBytes) { - streamableBytesForTree += numBytes; - if (!stream.isRoot()) { - state(stream.parent()).incrementStreamableBytesForTree(numBytes); - } - } - /** * Increments the number of pending bytes for this node. If there was any change to the number of bytes that * fit into the stream window, then {@link #incrementStreamableBytesForTree} is called to recursively update * this branch of the priority tree. */ - void incrementPendingBytes(int numBytes) { + private void incrementPendingBytes(int numBytes) { int previouslyStreamable = streamableBytes(); pendingBytes += numBytes; @@ -670,14 +651,14 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * If this frame is in the pending queue, decrements the number of pending bytes for the stream. */ - void decrementPendingBytes(int bytes) { + private void decrementPendingBytes(int bytes) { incrementPendingBytes(-bytes); } /** * Decrement the per stream and connection flow control window by {@code bytes}. */ - void decrementFlowControlWindow(int bytes) { + private void decrementFlowControlWindow(int bytes) { try { int negativeBytes = -bytes; connectionState().incrementStreamWindow(negativeBytes); @@ -692,9 +673,185 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue, * the unwritten bytes are removed from this branch of the priority tree. */ - void writeError(FlowControlled frame, Http2Exception cause) { + private void writeError(FlowControlled frame, Http2Exception cause) { decrementPendingBytes(frame.size()); frame.error(cause); } } + + /** + * The remote flow control state for a single stream that is not in a state where flow controlled frames cannot + * be exchanged. + */ + private static final class ReducedState extends AbstractState { + public ReducedState(Http2Stream stream) { + super(stream); + } + + public ReducedState(AbstractState existingState) { + super(existingState); + } + + @Override + public int windowSize() { + return 0; + } + + @Override + public int initialWindowSize() { + return 0; + } + + @Override + public int writableWindow() { + return 0; + } + + @Override + public int streamableBytes() { + return 0; + } + + @Override + public int streamableBytesForTree() { + return streamableBytesForTree; + } + + @Override + public void writeAllocatedBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel() { + } + + @Override + public void window(int initialWindowSize) { + throw new UnsupportedOperationException(); + } + + @Override + public int incrementStreamWindow(int delta) throws Http2Exception { + // This operation needs to be supported during the initial settings exchange when + // the peer has not yet acknowledged this peer being activated. + return 0; + } + + @Override + public int writeBytes(int bytes) { + throw new UnsupportedOperationException(); + } + + @Override + public void enqueueFrame(FlowControlled frame) { + throw new UnsupportedOperationException(); + } + + @Override + public void allocate(int bytes) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFrame() { + return false; + } + } + + /** + * An abstraction around {@link FlowControlState} which provides specific extensions used by remote flow control. + */ + private abstract static class AbstractState implements FlowControlState { + protected final Http2Stream stream; + protected int streamableBytesForTree; + + public AbstractState(Http2Stream stream) { + this.stream = stream; + } + + public AbstractState(AbstractState existingState) { + this.stream = existingState.stream(); + this.streamableBytesForTree = existingState.streamableBytesForTree(); + } + + /** + * The stream this state is associated with. + */ + public final Http2Stream stream() { + return stream; + } + + /** + * Recursively increments the {@link #streamableBytesForTree()} for this branch in the priority tree starting + * at the current node. + */ + public final void incrementStreamableBytesForTree(int numBytes) { + streamableBytesForTree += numBytes; + if (!stream.isRoot()) { + state(stream.parent()).incrementStreamableBytesForTree(numBytes); + } + } + + /** + * Write bytes allocated bytes for this stream. + */ + public abstract void writeAllocatedBytes(); + + /** + * Returns the number of pending bytes for this node that will fit within the + * {@link #availableWindow()}. This is used for the priority algorithm to determine the aggregate + * number of bytes that can be written at each node. Each node only takes into account its + * stream window so that when a change occurs to the connection window, these values need + * not change (i.e. no tree traversal is required). + */ + public abstract int streamableBytes(); + + /** + * Get the {@link #streamableBytes()} for the entire tree rooted at this node. + */ + public abstract int streamableBytesForTree(); + + /** + * Any operations that may be pending are cleared and the status of these operations is failed. + */ + public abstract void cancel(); + + /** + * Reset the {@link #availableWindow()} size. + */ + public abstract void window(int initialWindowSize); + + /** + * Increments the flow control window for this stream by the given delta and returns the new value. + */ + public abstract int incrementStreamWindow(int delta) throws Http2Exception; + + /** + * Returns the maximum writable window (minimum of the stream and connection windows). + */ + public abstract int writableWindow(); + + /** + * Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by + * the number of pending writes available, or because a frame does not support splitting on arbitrary + * boundaries. + */ + public abstract int writeBytes(int bytes); + + /** + * Adds the {@code frame} to the pending queue and increments the pending byte count. + */ + public abstract void enqueueFrame(FlowControlled frame); + + /** + * Increment the number of bytes allocated to this stream by the priority algorithm + */ + public abstract void allocate(int bytes); + + /** + * Indicates whether or not there are frames in the pending queue. + */ + public abstract boolean hasFrame(); + } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 676abc4804..a71f36df64 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -171,11 +171,31 @@ public interface Http2Connection { *
  • The connection is marked as going away.
  • * *

    - * The caller is expected to {@link Http2Stream#open(boolean)} the stream. + * If the stream is intended to initialized to {@link Http2Stream.State#OPEN} then use + * {@link #createStream(int, boolean)} otherwise optimizations in {@link Listener}s may not work + * and memory may be thrashed. The caller is expected to {@link Http2Stream#open(boolean)} the stream. * @param streamId The ID of the stream * @see Http2Stream#open(boolean) */ - Http2Stream createStream(int streamId) throws Http2Exception; + Http2Stream createIdleStream(int streamId) throws Http2Exception; + + /** + * Creates a stream initiated by this endpoint. This could fail for the following reasons: + *

      + *
    • The requested stream ID is not the next sequential ID for this endpoint.
    • + *
    • The stream already exists.
    • + *
    • {@link #canCreateStream()} is {@code false}.
    • + *
    • The connection is marked as going away.
    • + *
    + *

    + * This method differs from {@link #createdStreamId(int)} because the initial state of the stream will be + * Immediately set before notifying {@link Listener}s. The state transition is sensitive to {@code halfClosed} + * and is defined by {@link Http2Stream#open(boolean)}. + * @param streamId The ID of the stream + * @param halfClosed see {@link Http2Stream#open(boolean)}. + * @see Http2Stream#open(boolean) + */ + Http2Stream createStream(int streamId, boolean halfClosed) throws Http2Exception; /** * Creates a push stream in the reserved state for this endpoint and notifies all listeners. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 51e63e6042..3d96e93c78 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -115,7 +115,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http } // Create a local stream used for the HTTP cleartext upgrade. - connection().local().createStream(HTTP_UPGRADE_STREAM_ID).open(true); + connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true); } /** @@ -134,7 +134,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http encoder.remoteSettings(settings); // Create a stream in the half-closed state. - connection().remote().createStream(HTTP_UPGRADE_STREAM_ID).open(true); + connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true); } private abstract class BaseDecoder { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index d503914f7e..6fb4544c2b 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -140,9 +140,9 @@ public class DefaultHttp2ConnectionDecoderTest { when(local.flowController()).thenReturn(localFlow); when(encoder.flowController()).thenReturn(remoteFlow); when(connection.remote()).thenReturn(remote); - when(local.createStream(eq(STREAM_ID))).thenReturn(stream); + when(local.createIdleStream(eq(STREAM_ID))).thenReturn(stream); when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); - when(remote.createStream(eq(STREAM_ID))).thenReturn(stream); + when(remote.createIdleStream(eq(STREAM_ID))).thenReturn(stream); when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(ctx.channel()).thenReturn(channel); @@ -347,12 +347,12 @@ public class DefaultHttp2ConnectionDecoderTest { public void headersReadAfterGoAwayShouldBeIgnored() throws Exception { when(connection.goAwaySent()).thenReturn(true); decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); - verify(remote, never()).createStream(eq(STREAM_ID)); + verify(remote, never()).createIdleStream(eq(STREAM_ID)); verify(stream, never()).open(anyBoolean()); // Verify that the event was absorbed and not propagated to the oberver. verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean()); - verify(remote, never()).createStream(anyInt()); + verify(remote, never()).createIdleStream(anyInt()); verify(stream, never()).open(anyBoolean()); } @@ -360,22 +360,21 @@ public class DefaultHttp2ConnectionDecoderTest { public void headersReadForUnknownStreamShouldBeIgnored() throws Exception { when(connection.stream(STREAM_ID)).thenReturn(null); decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); - verify(remote, never()).createStream(eq(STREAM_ID)); + verify(remote, never()).createStream(anyInt(), anyBoolean()); verify(stream, never()).open(anyBoolean()); // Verify that the event was absorbed and not propagated to the oberver. verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean()); - verify(remote, never()).createStream(anyInt()); + verify(remote, never()).createStream(anyInt(), anyBoolean()); verify(stream, never()).open(anyBoolean()); } @Test public void headersReadForUnknownStreamShouldCreateStream() throws Exception { final int streamId = 5; - when(remote.createStream(eq(streamId))).thenReturn(stream); + when(remote.createStream(eq(streamId), anyBoolean())).thenReturn(stream); decode().onHeadersRead(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false); - verify(remote).createStream(eq(streamId)); - verify(stream).open(eq(false)); + verify(remote).createStream(eq(streamId), eq(false)); verify(listener).onHeadersRead(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false)); } @@ -383,10 +382,9 @@ public class DefaultHttp2ConnectionDecoderTest { @Test public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception { final int streamId = 5; - when(remote.createStream(eq(streamId))).thenReturn(stream); + when(remote.createStream(eq(streamId), anyBoolean())).thenReturn(stream); decode().onHeadersRead(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, true); - verify(remote).createStream(eq(streamId)); - verify(stream).open(eq(true)); + verify(remote).createStream(eq(streamId), eq(true)); verify(listener).onHeadersRead(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true)); } @@ -500,7 +498,7 @@ public class DefaultHttp2ConnectionDecoderTest { decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true); verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); - verify(remote).createStream(STREAM_ID); + verify(remote).createIdleStream(STREAM_ID); verify(stream, never()).open(anyBoolean()); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index 098bb7b7d5..deb29e79b6 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -154,9 +154,9 @@ public class DefaultHttp2ConnectionEncoderTest { when(writer.configuration()).thenReturn(writerConfig); when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy); when(frameSizePolicy.maxFrameSize()).thenReturn(64); - when(local.createStream(eq(STREAM_ID))).thenReturn(stream); + when(local.createIdleStream(eq(STREAM_ID))).thenReturn(stream); when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); - when(remote.createStream(eq(STREAM_ID))).thenReturn(stream); + when(remote.createIdleStream(eq(STREAM_ID))).thenReturn(stream); when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future); when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))) @@ -307,10 +307,9 @@ public class DefaultHttp2ConnectionEncoderTest { when(stream.id()).thenReturn(streamId); when(stream.state()).thenReturn(IDLE); mockFutureAddListener(true); - when(local.createStream(eq(streamId))).thenReturn(stream); + when(local.createStream(eq(streamId), anyBoolean())).thenReturn(stream); encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false, promise); - verify(local).createStream(eq(streamId)); - verify(stream).open(eq(false)); + verify(local).createStream(eq(streamId), eq(false)); assertNotNull(payloadCaptor.getValue()); payloadCaptor.getValue().write(0); verify(writer).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0), @@ -359,7 +358,7 @@ public class DefaultHttp2ConnectionEncoderTest { encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); - verify(local).createStream(STREAM_ID); + verify(local).createIdleStream(STREAM_ID); verify(stream, never()).open(anyBoolean()); } @@ -370,14 +369,14 @@ public class DefaultHttp2ConnectionEncoderTest { public Http2Stream answer(InvocationOnMock in) throws Throwable { throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR); } - }).when(local).createStream(eq(STREAM_ID)); + }).when(local).createIdleStream(eq(STREAM_ID)); when(connection.stream(STREAM_ID)).thenReturn(null); // Just return the stream object as the connection stream to ensure the dependent stream "exists" when(connection.stream(0)).thenReturn(stream); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); - verify(local).createStream(STREAM_ID); + verify(local).createIdleStream(STREAM_ID); } @Test @@ -453,16 +452,16 @@ public class DefaultHttp2ConnectionEncoderTest { mockSendFlowControlledWriteEverything(); int streamId = 5; when(stream.id()).thenReturn(streamId); - when(stream.state()).thenReturn(IDLE); + when(stream.state()).thenReturn(HALF_CLOSED_LOCAL); mockFutureAddListener(true); - when(local.createStream(eq(streamId))).thenReturn(stream); + when(local.createStream(eq(streamId), anyBoolean())).thenReturn(stream); when(writer.writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise))) .thenReturn(future); encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, true, promise); - verify(local).createStream(eq(streamId)); - verify(stream).open(eq(true)); + verify(local).createStream(eq(streamId), eq(true)); // Trigger the write and mark the promise successful to trigger listeners + assertNotNull(payloadCaptor.getValue()); payloadCaptor.getValue().write(0); promise.trySuccess(); verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index 5726df145b..4b2933f99d 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -81,9 +81,9 @@ public class DefaultHttp2ConnectionTest { @Test public void goAwayReceivedShouldCloseStreamsGreaterThanLastStream() throws Exception { - Http2Stream stream1 = client.local().createStream(3).open(false); - Http2Stream stream2 = client.local().createStream(5).open(false); - Http2Stream remoteStream = client.remote().createStream(4).open(false); + Http2Stream stream1 = client.local().createStream(3, false); + Http2Stream stream2 = client.local().createStream(5, false); + Http2Stream remoteStream = client.remote().createStream(4, false); assertEquals(State.OPEN, stream1.state()); assertEquals(State.OPEN, stream2.state()); @@ -102,9 +102,9 @@ public class DefaultHttp2ConnectionTest { @Test public void goAwaySentShouldCloseStreamsGreaterThanLastStream() throws Exception { - Http2Stream stream1 = server.remote().createStream(3).open(false); - Http2Stream stream2 = server.remote().createStream(5).open(false); - Http2Stream localStream = server.local().createStream(4).open(false); + Http2Stream stream1 = server.remote().createStream(3, false); + Http2Stream stream2 = server.remote().createStream(5, false); + Http2Stream localStream = server.local().createStream(4, false); server.goAwaySent(3, 8, null); @@ -120,25 +120,25 @@ public class DefaultHttp2ConnectionTest { @Test public void serverCreateStreamShouldSucceed() throws Http2Exception { - Http2Stream stream = server.local().createStream(2).open(false); + Http2Stream stream = server.local().createStream(2, false); assertEquals(2, stream.id()); assertEquals(State.OPEN, stream.state()); assertEquals(1, server.numActiveStreams()); assertEquals(2, server.local().lastStreamCreated()); - stream = server.local().createStream(4).open(true); + stream = server.local().createStream(4, true); assertEquals(4, stream.id()); assertEquals(State.HALF_CLOSED_LOCAL, stream.state()); assertEquals(2, server.numActiveStreams()); assertEquals(4, server.local().lastStreamCreated()); - stream = server.remote().createStream(3).open(true); + stream = server.remote().createStream(3, true); assertEquals(3, stream.id()); assertEquals(State.HALF_CLOSED_REMOTE, stream.state()); assertEquals(3, server.numActiveStreams()); assertEquals(3, server.remote().lastStreamCreated()); - stream = server.remote().createStream(5).open(false); + stream = server.remote().createStream(5, false); assertEquals(5, stream.id()); assertEquals(State.OPEN, stream.state()); assertEquals(4, server.numActiveStreams()); @@ -147,25 +147,25 @@ public class DefaultHttp2ConnectionTest { @Test public void clientCreateStreamShouldSucceed() throws Http2Exception { - Http2Stream stream = client.remote().createStream(2).open(false); + Http2Stream stream = client.remote().createStream(2, false); assertEquals(2, stream.id()); assertEquals(State.OPEN, stream.state()); assertEquals(1, client.numActiveStreams()); assertEquals(2, client.remote().lastStreamCreated()); - stream = client.remote().createStream(4).open(true); + stream = client.remote().createStream(4, true); assertEquals(4, stream.id()); assertEquals(State.HALF_CLOSED_REMOTE, stream.state()); assertEquals(2, client.numActiveStreams()); assertEquals(4, client.remote().lastStreamCreated()); - stream = client.local().createStream(3).open(true); + stream = client.local().createStream(3, true); assertEquals(3, stream.id()); assertEquals(State.HALF_CLOSED_LOCAL, stream.state()); assertEquals(3, client.numActiveStreams()); assertEquals(3, client.local().lastStreamCreated()); - stream = client.local().createStream(5).open(false); + stream = client.local().createStream(5, false); assertEquals(5, stream.id()); assertEquals(State.OPEN, stream.state()); assertEquals(4, client.numActiveStreams()); @@ -174,7 +174,7 @@ public class DefaultHttp2ConnectionTest { @Test public void serverReservePushStreamShouldSucceed() throws Http2Exception { - Http2Stream stream = server.remote().createStream(3).open(true); + Http2Stream stream = server.remote().createStream(3, true); Http2Stream pushStream = server.local().reservePushStream(2, stream); assertEquals(2, pushStream.id()); assertEquals(State.RESERVED_LOCAL, pushStream.state()); @@ -184,7 +184,7 @@ public class DefaultHttp2ConnectionTest { @Test public void clientReservePushStreamShouldSucceed() throws Http2Exception { - Http2Stream stream = server.remote().createStream(3).open(true); + Http2Stream stream = server.remote().createStream(3, true); Http2Stream pushStream = server.local().reservePushStream(4, stream); assertEquals(4, pushStream.id()); assertEquals(State.RESERVED_LOCAL, pushStream.state()); @@ -194,28 +194,28 @@ public class DefaultHttp2ConnectionTest { @Test(expected = Http2Exception.class) public void newStreamBehindExpectedShouldThrow() throws Http2Exception { - server.local().createStream(0).open(true); + server.local().createStream(0, true); } @Test(expected = Http2Exception.class) public void newStreamNotForServerShouldThrow() throws Http2Exception { - server.local().createStream(11).open(true); + server.local().createStream(11, true); } @Test(expected = Http2Exception.class) public void newStreamNotForClientShouldThrow() throws Http2Exception { - client.local().createStream(10).open(true); + client.local().createStream(10, true); } @Test(expected = Http2Exception.class) public void maxAllowedStreamsExceededShouldThrow() throws Http2Exception { server.local().maxActiveStreams(0); - server.local().createStream(2).open(true); + server.local().createStream(2, true); } @Test(expected = Http2Exception.class) public void reserveWithPushDisallowedShouldThrow() throws Http2Exception { - Http2Stream stream = server.remote().createStream(3).open(true); + Http2Stream stream = server.remote().createStream(3, true); server.remote().allowPushTo(false); server.local().reservePushStream(2, stream); } @@ -223,12 +223,12 @@ public class DefaultHttp2ConnectionTest { @Test(expected = Http2Exception.class) public void goAwayReceivedShouldDisallowCreation() throws Http2Exception { server.goAwayReceived(0, 1L, Unpooled.EMPTY_BUFFER); - server.remote().createStream(3).open(true); + server.remote().createStream(3, true); } @Test public void closeShouldSucceed() throws Http2Exception { - Http2Stream stream = server.remote().createStream(3).open(true); + Http2Stream stream = server.remote().createStream(3, true); stream.close(); assertEquals(State.CLOSED, stream.state()); assertEquals(0, server.numActiveStreams()); @@ -236,7 +236,7 @@ public class DefaultHttp2ConnectionTest { @Test public void closeLocalWhenOpenShouldSucceed() throws Http2Exception { - Http2Stream stream = server.remote().createStream(3).open(false); + Http2Stream stream = server.remote().createStream(3, false); stream.closeLocalSide(); assertEquals(State.HALF_CLOSED_LOCAL, stream.state()); assertEquals(1, server.numActiveStreams()); @@ -244,7 +244,7 @@ public class DefaultHttp2ConnectionTest { @Test public void closeRemoteWhenOpenShouldSucceed() throws Http2Exception { - Http2Stream stream = server.remote().createStream(3).open(false); + Http2Stream stream = server.remote().createStream(3, false); stream.closeRemoteSide(); assertEquals(State.HALF_CLOSED_REMOTE, stream.state()); assertEquals(1, server.numActiveStreams()); @@ -252,7 +252,7 @@ public class DefaultHttp2ConnectionTest { @Test public void closeOnlyOpenSideShouldClose() throws Http2Exception { - Http2Stream stream = server.remote().createStream(3).open(true); + Http2Stream stream = server.remote().createStream(3, true); stream.closeLocalSide(); assertEquals(State.CLOSED, stream.state()); assertEquals(0, server.numActiveStreams()); @@ -261,32 +261,32 @@ public class DefaultHttp2ConnectionTest { @SuppressWarnings("NumericOverflow") @Test(expected = Http2Exception.class) public void localStreamInvalidStreamIdShouldThrow() throws Http2Exception { - client.local().createStream(Integer.MAX_VALUE + 2).open(false); + client.local().createStream(Integer.MAX_VALUE + 2, false); } @SuppressWarnings("NumericOverflow") @Test(expected = Http2Exception.class) public void remoteStreamInvalidStreamIdShouldThrow() throws Http2Exception { - client.remote().createStream(Integer.MAX_VALUE + 1).open(false); + client.remote().createStream(Integer.MAX_VALUE + 1, false); } @Test public void localStreamCanDependUponIdleStream() throws Http2Exception { - Http2Stream streamA = client.local().createStream(1).open(false); + Http2Stream streamA = client.local().createStream(1, false); streamA.setPriority(3, MIN_WEIGHT, true); verifyDependUponIdleStream(streamA, client.stream(3), client.local()); } @Test public void remoteStreamCanDependUponIdleStream() throws Http2Exception { - Http2Stream streamA = client.remote().createStream(2).open(false); + Http2Stream streamA = client.remote().createStream(2, false); streamA.setPriority(4, MIN_WEIGHT, true); verifyDependUponIdleStream(streamA, client.stream(4), client.remote()); } @Test public void prioritizeShouldUseDefaults() throws Exception { - Http2Stream stream = client.local().createStream(1).open(false); + Http2Stream stream = client.local().createStream(1, false); assertEquals(1, client.connectionStream().numChildren()); assertEquals(2, client.connectionStream().prioritizableForTree()); assertEquals(stream, child(client.connectionStream(), 1)); @@ -297,7 +297,7 @@ public class DefaultHttp2ConnectionTest { @Test public void reprioritizeWithNoChangeShouldDoNothing() throws Exception { - Http2Stream stream = client.local().createStream(1).open(false); + Http2Stream stream = client.local().createStream(1, false); stream.setPriority(0, DEFAULT_PRIORITY_WEIGHT, false); assertEquals(1, client.connectionStream().numChildren()); assertEquals(2, client.connectionStream().prioritizableForTree()); @@ -309,10 +309,10 @@ public class DefaultHttp2ConnectionTest { @Test public void insertExclusiveShouldAddNewLevel() throws Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -359,10 +359,10 @@ public class DefaultHttp2ConnectionTest { @Test public void existingChildMadeExclusiveShouldNotCreateTreeCycle() throws Http2Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -412,12 +412,12 @@ public class DefaultHttp2ConnectionTest { @Test public void newExclusiveChildShouldUpdateOldParentCorrectly() throws Http2Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); - Http2Stream streamE = client.local().createStream(9).open(false); - Http2Stream streamF = client.local().createStream(11).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); + Http2Stream streamE = client.local().createStream(9, false); + Http2Stream streamF = client.local().createStream(11, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -483,10 +483,10 @@ public class DefaultHttp2ConnectionTest { @Test public void weightChangeWithNoTreeChangeShouldNotifyListeners() throws Http2Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -513,10 +513,10 @@ public class DefaultHttp2ConnectionTest { @Test public void sameNodeDependentShouldNotStackOverflowNorChangePrioritizableForTree() throws Http2Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -551,10 +551,10 @@ public class DefaultHttp2ConnectionTest { @Test public void multipleCircularDependencyShouldUpdatePrioritizable() throws Http2Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -606,10 +606,10 @@ public class DefaultHttp2ConnectionTest { @Test public void removeWithPrioritizableDependentsShouldNotRestructureTree() throws Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -657,12 +657,12 @@ public class DefaultHttp2ConnectionTest { @Test public void closeWithNoPrioritizableDependentsShouldRestructureTree() throws Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); - Http2Stream streamE = client.local().createStream(9).open(false); - Http2Stream streamF = client.local().createStream(11).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); + Http2Stream streamE = client.local().createStream(9, false); + Http2Stream streamF = client.local().createStream(11, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -717,12 +717,12 @@ public class DefaultHttp2ConnectionTest { @Test public void priorityChangeWithNoPrioritizableDependentsShouldRestructureTree() throws Exception { - Http2Stream streamA = client.local().createStream(1).open(false); - Http2Stream streamB = client.local().createStream(3).open(false); - Http2Stream streamC = client.local().createStream(5).open(false); - Http2Stream streamD = client.local().createStream(7).open(false); - Http2Stream streamE = client.local().createStream(9).open(false); - Http2Stream streamF = client.local().createStream(11).open(false); + Http2Stream streamA = client.local().createStream(1, false); + Http2Stream streamB = client.local().createStream(3, false); + Http2Stream streamC = client.local().createStream(5, false); + Http2Stream streamD = client.local().createStream(7, false); + Http2Stream streamE = client.local().createStream(9, false); + Http2Stream streamF = client.local().createStream(11, false); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); @@ -786,17 +786,17 @@ public class DefaultHttp2ConnectionTest { public void circularDependencyShouldRestructureTree() throws Exception { // Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-16#section-5.3.3 // Initialize all the nodes - Http2Stream streamA = client.local().createStream(1).open(false); + Http2Stream streamA = client.local().createStream(1, false); verifyParentChanged(streamA, null); - Http2Stream streamB = client.local().createStream(3).open(false); + Http2Stream streamB = client.local().createStream(3, false); verifyParentChanged(streamB, null); - Http2Stream streamC = client.local().createStream(5).open(false); + Http2Stream streamC = client.local().createStream(5, false); verifyParentChanged(streamC, null); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamD = client.local().createStream(7, false); verifyParentChanged(streamD, null); - Http2Stream streamE = client.local().createStream(9).open(false); + Http2Stream streamE = client.local().createStream(9, false); verifyParentChanged(streamE, null); - Http2Stream streamF = client.local().createStream(11).open(false); + Http2Stream streamF = client.local().createStream(11, false); verifyParentChanged(streamF, null); // Build the tree @@ -882,17 +882,17 @@ public class DefaultHttp2ConnectionTest { public void circularDependencyWithExclusiveShouldRestructureTree() throws Exception { // Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-16#section-5.3.3 // Initialize all the nodes - Http2Stream streamA = client.local().createStream(1).open(false); + Http2Stream streamA = client.local().createStream(1, false); verifyParentChanged(streamA, null); - Http2Stream streamB = client.local().createStream(3).open(false); + Http2Stream streamB = client.local().createStream(3, false); verifyParentChanged(streamB, null); - Http2Stream streamC = client.local().createStream(5).open(false); + Http2Stream streamC = client.local().createStream(5, false); verifyParentChanged(streamC, null); - Http2Stream streamD = client.local().createStream(7).open(false); + Http2Stream streamD = client.local().createStream(7, false); verifyParentChanged(streamD, null); - Http2Stream streamE = client.local().createStream(9).open(false); + Http2Stream streamE = client.local().createStream(9, false); verifyParentChanged(streamE, null); - Http2Stream streamF = client.local().createStream(11).open(false); + Http2Stream streamF = client.local().createStream(11, false); verifyParentChanged(streamF, null); // Build the tree @@ -1050,9 +1050,11 @@ public class DefaultHttp2ConnectionTest { // Now we add clienListener2 and exercise all listener functionality try { client.addListener(clientListener2); - Http2Stream stream = client.local().createStream(3); + Http2Stream stream = client.local().createIdleStream(3); verify(clientListener).onStreamAdded(any(Http2Stream.class)); verify(clientListener2).onStreamAdded(any(Http2Stream.class)); + verify(clientListener, never()).onStreamActive(any(Http2Stream.class)); + verify(clientListener2, never()).onStreamActive(any(Http2Stream.class)); stream.open(false); verify(clientListener).onStreamActive(any(Http2Stream.class)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index 3551cf9622..d1e8296cef 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -68,7 +68,7 @@ public class DefaultHttp2LocalFlowControllerTest { connection = new DefaultHttp2Connection(false); controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio); - connection.local().createStream(STREAM_ID).open(false); + connection.local().createStream(STREAM_ID, false); } @Test @@ -156,7 +156,7 @@ public class DefaultHttp2LocalFlowControllerTest { @Test public void connectionWindowShouldAdjustWithMultipleStreams() throws Http2Exception { int newStreamId = 3; - connection.local().createStream(newStreamId).open(false); + connection.local().createStream(newStreamId, false); try { assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); @@ -246,7 +246,7 @@ public class DefaultHttp2LocalFlowControllerTest { throws Http2Exception { int delta = newDefaultWindowSize - DEFAULT_WINDOW_SIZE; controller.incrementWindowSize(ctx, stream(0), delta); - Http2Stream stream = connection.local().createStream(newStreamId).open(false); + Http2Stream stream = connection.local().createStream(newStreamId, false); if (setStreamRatio) { controller.windowUpdateRatio(ctx, stream, ratio); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index 7acd2c7c32..bcc3dda5e0 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -87,10 +87,10 @@ public class DefaultHttp2RemoteFlowControllerTest { connection = new DefaultHttp2Connection(false); controller = new DefaultHttp2RemoteFlowController(connection); - connection.local().createStream(STREAM_A).open(false); - connection.local().createStream(STREAM_B).open(false); - Http2Stream streamC = connection.local().createStream(STREAM_C).open(false); - Http2Stream streamD = connection.local().createStream(STREAM_D).open(false); + connection.local().createStream(STREAM_A, false); + connection.local().createStream(STREAM_B, false); + Http2Stream streamC = connection.local().createStream(STREAM_C, false); + Http2Stream streamD = connection.local().createStream(STREAM_D, false); streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false); streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false); } @@ -885,7 +885,7 @@ public class DefaultHttp2RemoteFlowControllerTest { Http2Stream streamC = connection.stream(STREAM_C); Http2Stream streamD = connection.stream(STREAM_D); - Http2Stream streamE = connection.local().createStream(STREAM_E).open(false); + Http2Stream streamE = connection.local().createStream(STREAM_E, false); streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); // Send a bunch of data on each stream. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java index 316859f7ba..b069c6afb6 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java @@ -90,6 +90,7 @@ public class Http2ConnectionRoundtripTest { private CountDownLatch serverSettingsAckLatch; private CountDownLatch dataLatch; private CountDownLatch trailersLatch; + private CountDownLatch goAwayLatch; @Before public void setup() throws Exception { @@ -227,7 +228,7 @@ public class Http2ConnectionRoundtripTest { @Test public void noMoreStreamIdsShouldSendGoAway() throws Exception { - bootstrapEnv(1, 1, 3, 1); + bootstrapEnv(1, 1, 3, 1, 1); // Create a single stream by sending a HEADERS frame to the server. final Http2Headers headers = dummyHeaders(); @@ -249,7 +250,7 @@ public class Http2ConnectionRoundtripTest { } }); - assertTrue(requestLatch.await(5, SECONDS)); + assertTrue(goAwayLatch.await(5, SECONDS)); verify(serverListener).onGoAwayRead(any(ChannelHandlerContext.class), eq(0), eq(Http2Error.PROTOCOL_ERROR.code()), any(ByteBuf.class)); } @@ -403,10 +404,16 @@ public class Http2ConnectionRoundtripTest { private void bootstrapEnv(int dataCountDown, int settingsAckCount, int requestCountDown, int trailersCountDown) throws Exception { + bootstrapEnv(dataCountDown, settingsAckCount, requestCountDown, trailersCountDown, -1); + } + + private void bootstrapEnv(int dataCountDown, int settingsAckCount, + int requestCountDown, int trailersCountDown, int goAwayCountDown) throws Exception { requestLatch = new CountDownLatch(requestCountDown); serverSettingsAckLatch = new CountDownLatch(settingsAckCount); dataLatch = new CountDownLatch(dataCountDown); trailersLatch = new CountDownLatch(trailersCountDown); + goAwayLatch = goAwayCountDown > 0 ? new CountDownLatch(goAwayCountDown) : requestLatch; sb = new ServerBootstrap(); cb = new Bootstrap(); @@ -418,7 +425,7 @@ public class Http2ConnectionRoundtripTest { ChannelPipeline p = ch.pipeline(); serverFrameCountDown = new FrameCountDown(serverListener, serverSettingsAckLatch, - requestLatch, dataLatch, trailersLatch); + requestLatch, dataLatch, trailersLatch, goAwayLatch); p.addLast(new Http2ConnectionHandler(true, serverFrameCountDown)); } }); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java index 7957643f77..7925267b85 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java @@ -117,9 +117,9 @@ final class Http2TestUtil { Http2Stream stream = connection.stream(streamId); if (stream == null) { if (connection.isServer() && streamId % 2 == 0 || !connection.isServer() && streamId % 2 != 0) { - stream = connection.local().createStream(streamId).open(halfClosed); + stream = connection.local().createStream(streamId, halfClosed); } else { - stream = connection.remote().createStream(streamId).open(halfClosed); + stream = connection.remote().createStream(streamId, halfClosed); } } return stream; @@ -266,6 +266,7 @@ final class Http2TestUtil { private final CountDownLatch settingsAckLatch; private final CountDownLatch dataLatch; private final CountDownLatch trailersLatch; + private final CountDownLatch goAwayLatch; FrameCountDown(Http2FrameListener listener, CountDownLatch settingsAckLatch, CountDownLatch messageLatch) { this(listener, settingsAckLatch, messageLatch, null, null); @@ -273,11 +274,17 @@ final class Http2TestUtil { FrameCountDown(Http2FrameListener listener, CountDownLatch settingsAckLatch, CountDownLatch messageLatch, CountDownLatch dataLatch, CountDownLatch trailersLatch) { + this(listener, settingsAckLatch, messageLatch, dataLatch, trailersLatch, messageLatch); + } + + FrameCountDown(Http2FrameListener listener, CountDownLatch settingsAckLatch, CountDownLatch messageLatch, + CountDownLatch dataLatch, CountDownLatch trailersLatch, CountDownLatch goAwayLatch) { this.listener = listener; this.messageLatch = messageLatch; this.settingsAckLatch = settingsAckLatch; this.dataLatch = dataLatch; this.trailersLatch = trailersLatch; + this.goAwayLatch = goAwayLatch; } @Override @@ -362,7 +369,7 @@ final class Http2TestUtil { public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) throws Http2Exception { listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData); - messageLatch.countDown(); + goAwayLatch.countDown(); } @Override