Ignore frames for streams that may have previously existed.
Motivation: The recent PR that discarded the Http2StreamRemovalPolicy causes connection errors when receiving a frame for a stream that no longer exists. We should ignore these frames if we think there's a chance that the stream has existed previously Modifications: Modified the Http2Connection interface to provide a `streamMayHaveExisted` method. Also removed the requireStream() method to identify all of the places in the code that need to be updated. Modified the encoder and decoder to properly handle cases where a stream may have existed but no longer does. Result: Fixes #3643
This commit is contained in:
parent
26a7a5ec25
commit
ab925abc7d
@ -113,17 +113,13 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream requireStream(int streamId) throws Http2Exception {
|
||||
Http2Stream stream = stream(streamId);
|
||||
if (stream == null) {
|
||||
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
|
||||
}
|
||||
return stream;
|
||||
public Http2Stream stream(int streamId) {
|
||||
return streamMap.get(streamId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream stream(int streamId) {
|
||||
return streamMap.get(streamId);
|
||||
public boolean streamMayHaveExisted(int streamId) {
|
||||
return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -166,7 +162,7 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
forEachActiveStream(new Http2StreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2Stream stream) {
|
||||
if (stream.id() > lastKnownStream && localEndpoint.createdStreamId(stream.id())) {
|
||||
if (stream.id() > lastKnownStream && localEndpoint.isValidStreamId(stream.id())) {
|
||||
stream.close();
|
||||
}
|
||||
return true;
|
||||
@ -197,7 +193,7 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
forEachActiveStream(new Http2StreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2Stream stream) {
|
||||
if (stream.id() > lastKnownStream && remoteEndpoint.createdStreamId(stream.id())) {
|
||||
if (stream.id() > lastKnownStream && remoteEndpoint.isValidStreamId(stream.id())) {
|
||||
stream.close();
|
||||
}
|
||||
return true;
|
||||
@ -551,11 +547,11 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
|
||||
final DefaultEndpoint<? extends Http2FlowController> createdBy() {
|
||||
return localEndpoint.createdStreamId(id) ? localEndpoint : remoteEndpoint;
|
||||
return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
|
||||
}
|
||||
|
||||
final boolean isLocal() {
|
||||
return localEndpoint.createdStreamId(id);
|
||||
return localEndpoint.isValidStreamId(id);
|
||||
}
|
||||
|
||||
final void weight(short weight) {
|
||||
@ -880,9 +876,14 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createdStreamId(int streamId) {
|
||||
public boolean isValidStreamId(int streamId) {
|
||||
boolean even = (streamId & 1) == 0;
|
||||
return server == even;
|
||||
return streamId > 0 && server == even;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mayHaveCreatedStream(int streamId) {
|
||||
return isValidStreamId(streamId) && streamId <= lastStreamCreated;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1021,7 +1022,7 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
if (streamId < 0) {
|
||||
throw new Http2NoMoreStreamIdsException();
|
||||
}
|
||||
if (!createdStreamId(streamId)) {
|
||||
if (!isValidStreamId(streamId)) {
|
||||
throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
|
||||
server ? "server" : "client");
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.streamError;
|
||||
import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
@ -184,16 +185,20 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
@Override
|
||||
public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data,
|
||||
int padding, boolean endOfStream) throws Http2Exception {
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
Http2LocalFlowController flowController = flowController();
|
||||
int bytesToReturn = data.readableBytes() + padding;
|
||||
|
||||
if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) {
|
||||
// Count the frame towards the connection flow control window and don't process it further.
|
||||
if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
|
||||
// Ignoring this frame. We still need to count the frame towards the connection flow control
|
||||
// window, but we immediately mark all bytes as consumed.
|
||||
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
|
||||
flowController.consumeBytes(ctx, stream, bytesToReturn);
|
||||
|
||||
// Since no bytes are consumed, return them all.
|
||||
// Verify that the stream may have existed after we apply flow control.
|
||||
verifyStreamMayHaveExisted(streamId);
|
||||
|
||||
// All bytes have been consumed.
|
||||
return bytesToReturn;
|
||||
}
|
||||
|
||||
@ -264,31 +269,40 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
|
||||
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
|
||||
if (stream == null) {
|
||||
boolean allowHalfClosedRemote = false;
|
||||
if (stream == null && !connection.streamMayHaveExisted(streamId)) {
|
||||
stream = connection.remote().createStream(streamId).open(endOfStream);
|
||||
} else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) {
|
||||
// Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
|
||||
allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
|
||||
}
|
||||
|
||||
if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
|
||||
// Ignore this frame.
|
||||
return;
|
||||
} else {
|
||||
switch (stream.state()) {
|
||||
case RESERVED_REMOTE:
|
||||
case IDLE:
|
||||
stream.open(endOfStream);
|
||||
break;
|
||||
case OPEN:
|
||||
case HALF_CLOSED_LOCAL:
|
||||
// Allowed to receive headers in these states.
|
||||
break;
|
||||
case HALF_CLOSED_REMOTE:
|
||||
case CLOSED:
|
||||
}
|
||||
|
||||
switch (stream.state()) {
|
||||
case RESERVED_REMOTE:
|
||||
case IDLE:
|
||||
stream.open(endOfStream);
|
||||
break;
|
||||
case OPEN:
|
||||
case HALF_CLOSED_LOCAL:
|
||||
// Allowed to receive headers in these states.
|
||||
break;
|
||||
case HALF_CLOSED_REMOTE:
|
||||
if (!allowHalfClosedRemote) {
|
||||
throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
|
||||
stream.id(), stream.state());
|
||||
default:
|
||||
// Connection error.
|
||||
throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
|
||||
stream.state());
|
||||
}
|
||||
stream.id(), stream.state());
|
||||
}
|
||||
break;
|
||||
case CLOSED:
|
||||
throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
|
||||
stream.id(), stream.state());
|
||||
default:
|
||||
// Connection error.
|
||||
throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
|
||||
stream.state());
|
||||
}
|
||||
|
||||
try {
|
||||
@ -315,10 +329,15 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
|
||||
try {
|
||||
if (stream == null) {
|
||||
if (connection.streamMayHaveExisted(streamId)) {
|
||||
// Ignore this frame.
|
||||
return;
|
||||
}
|
||||
|
||||
// PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the
|
||||
// first frame to be received for a stream that we must create the stream.
|
||||
stream = connection.remote().createStream(streamId);
|
||||
} else if (streamCreatedAfterGoAwaySent(stream)) {
|
||||
} else if (streamCreatedAfterGoAwaySent(streamId)) {
|
||||
// Ignore this frame.
|
||||
return;
|
||||
}
|
||||
@ -336,7 +355,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
|
||||
@Override
|
||||
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
if (stream == null) {
|
||||
verifyStreamMayHaveExisted(streamId);
|
||||
return;
|
||||
}
|
||||
|
||||
switch(stream.state()) {
|
||||
case IDLE:
|
||||
@ -434,12 +457,16 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
@Override
|
||||
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
|
||||
Http2Headers headers, int padding) throws Http2Exception {
|
||||
Http2Stream parentStream = connection.requireStream(streamId);
|
||||
Http2Stream parentStream = connection.stream(streamId);
|
||||
|
||||
if (streamCreatedAfterGoAwaySent(parentStream)) {
|
||||
if (streamCreatedAfterGoAwaySent(streamId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (parentStream == null) {
|
||||
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
|
||||
}
|
||||
|
||||
switch (parentStream.state()) {
|
||||
case OPEN:
|
||||
case HALF_CLOSED_LOCAL:
|
||||
@ -483,9 +510,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
@Override
|
||||
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
|
||||
throws Http2Exception {
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
|
||||
if (stream.state() == CLOSED || streamCreatedAfterGoAwaySent(stream)) {
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) {
|
||||
// Ignore this frame.
|
||||
verifyStreamMayHaveExisted(streamId);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -501,10 +529,16 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
onUnknownFrame0(ctx, frameType, streamId, flags, payload);
|
||||
}
|
||||
|
||||
private boolean streamCreatedAfterGoAwaySent(Http2Stream stream) {
|
||||
private boolean streamCreatedAfterGoAwaySent(int streamId) {
|
||||
// Ignore inbound frames after a GOAWAY was sent and the stream id is greater than
|
||||
// the last stream id set in the GOAWAY frame.
|
||||
return connection().goAwaySent() && stream.id() > connection().remote().lastKnownStream();
|
||||
return connection.goAwaySent() && streamId > connection.remote().lastKnownStream();
|
||||
}
|
||||
|
||||
private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception {
|
||||
if (!connection.streamMayHaveExisted(streamId)) {
|
||||
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,7 +112,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
final boolean endOfStream, ChannelPromise promise) {
|
||||
final Http2Stream stream;
|
||||
try {
|
||||
stream = connection.requireStream(streamId);
|
||||
stream = requireStream(streamId);
|
||||
|
||||
// Verify that the stream is in the appropriate state for sending DATA frames.
|
||||
switch (stream.state()) {
|
||||
case OPEN:
|
||||
@ -250,7 +251,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
|
||||
}
|
||||
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
Http2Stream stream = requireStream(streamId);
|
||||
// Reserve the promised stream.
|
||||
connection.local().reservePushStream(promisedStreamId, stream);
|
||||
} catch (Throwable e) {
|
||||
@ -296,6 +297,20 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
return frameWriter.configuration();
|
||||
}
|
||||
|
||||
private Http2Stream requireStream(int streamId) {
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
if (stream == null) {
|
||||
final String message;
|
||||
if (connection.streamMayHaveExisted(streamId)) {
|
||||
message = "Stream no longer exists: " + streamId;
|
||||
} else {
|
||||
message = "Stream does not exist: " + streamId;
|
||||
}
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
return stream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
|
||||
* only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
|
||||
|
@ -123,16 +123,16 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
@Override
|
||||
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
|
||||
throws Http2Exception {
|
||||
if (stream.id() == CONNECTION_STREAM_ID) {
|
||||
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
|
||||
}
|
||||
if (numBytes <= 0) {
|
||||
throw new IllegalArgumentException("numBytes must be positive");
|
||||
}
|
||||
|
||||
// Streams automatically consume all remaining bytes when they are closed, so just ignore
|
||||
// if already closed.
|
||||
if (!isClosed(stream)) {
|
||||
if (stream != null && !isClosed(stream)) {
|
||||
if (stream.id() == CONNECTION_STREAM_ID) {
|
||||
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
|
||||
}
|
||||
if (numBytes <= 0) {
|
||||
throw new IllegalArgumentException("numBytes must be positive");
|
||||
}
|
||||
|
||||
connectionState().consumeBytes(ctx, numBytes);
|
||||
state(stream).consumeBytes(ctx, numBytes);
|
||||
}
|
||||
@ -208,11 +208,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
|
||||
int dataLength = data.readableBytes() + padding;
|
||||
|
||||
// Apply the connection-level flow control
|
||||
|
||||
DefaultFlowState connectionState = connectionState();
|
||||
connectionState.receiveFlowControlledFrame(dataLength);
|
||||
|
||||
if (!isClosed(stream)) {
|
||||
if (stream != null && !isClosed(stream)) {
|
||||
// Apply the stream-level flow control
|
||||
DefaultFlowState state = state(stream);
|
||||
state.endOfStream(endOfStream);
|
||||
|
@ -147,7 +147,13 @@ public interface Http2Connection {
|
||||
* Indicates whether the given streamId is from the set of IDs used by this endpoint to
|
||||
* create new streams.
|
||||
*/
|
||||
boolean createdStreamId(int streamId);
|
||||
boolean isValidStreamId(int streamId);
|
||||
|
||||
/**
|
||||
* Indicates whether or not this endpoint may have created the given stream. This is {@code true} if
|
||||
* {@link #isValidStreamId(int)} and {@code streamId} <= {@link #lastStreamCreated()}.
|
||||
*/
|
||||
boolean mayHaveCreatedStream(int streamId);
|
||||
|
||||
/**
|
||||
* Indicates whether or not this endpoint is currently allowed to create new streams. This will be
|
||||
@ -263,16 +269,17 @@ public interface Http2Connection {
|
||||
*/
|
||||
void removeListener(Listener listener);
|
||||
|
||||
/**
|
||||
* Attempts to get the stream for the given ID. If it doesn't exist, throws.
|
||||
*/
|
||||
Http2Stream requireStream(int streamId) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets the stream if it exists. If not, returns {@code null}.
|
||||
*/
|
||||
Http2Stream stream(int streamId);
|
||||
|
||||
/**
|
||||
* Indicates whether or not the given stream may have existed within this connection. This is a short form
|
||||
* for calling {@link Endpoint#mayHaveCreatedStream(int)} on both endpoints.
|
||||
*/
|
||||
boolean streamMayHaveExisted(int streamId);
|
||||
|
||||
/**
|
||||
* Gets the stream object representing the connection, itself (i.e. stream zero). This object
|
||||
* always exists.
|
||||
|
@ -29,21 +29,17 @@ public interface Http2FrameListener {
|
||||
* @param streamId the subject stream for the frame.
|
||||
* @param data payload buffer for the frame. This buffer will be released by the codec.
|
||||
* @param padding the number of padding bytes found at the end of the frame.
|
||||
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
|
||||
* endpoint for this stream.
|
||||
* @return the number of bytes that have been processed by the application. The returned bytes
|
||||
* are used by the inbound flow controller to determine the appropriate time to expand
|
||||
* the inbound flow control window (i.e. send {@code WINDOW_UPDATE}). Returning a value
|
||||
* equal to the length of {@code data} + {@code padding} will effectively opt-out of
|
||||
* application-level flow control for this frame. Returning a value less than the length
|
||||
* of {@code data} + {@code padding} will defer the returning of the processed bytes,
|
||||
* which the application must later return via
|
||||
* {@link Http2InboundFlowState#returnProcessedBytes(ChannelHandlerContext, int)}. The
|
||||
* returned value must be >= {@code 0} and <= {@code data.readableBytes()} +
|
||||
* {@code padding}.
|
||||
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream.
|
||||
* @return the number of bytes that have been processed by the application. The returned bytes are used by the
|
||||
* inbound flow controller to determine the appropriate time to expand the inbound flow control window (i.e. send
|
||||
* {@code WINDOW_UPDATE}). Returning a value equal to the length of {@code data} + {@code padding} will effectively
|
||||
* opt-out of application-level flow control for this frame. Returning a value less than the length of {@code data}
|
||||
* + {@code padding} will defer the returning of the processed bytes, which the application must later return via
|
||||
* {@link Http2LocalFlowController#consumeBytes(ChannelHandlerContext, Http2Stream, int)}. The returned value must
|
||||
* be >= {@code 0} and <= {@code data.readableBytes()} + {@code padding}.
|
||||
*/
|
||||
int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
|
||||
boolean endOfStream) throws Http2Exception;
|
||||
boolean endOfStream) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Handles an inbound {@code HEADERS} frame.
|
||||
|
@ -24,20 +24,20 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
public interface Http2LocalFlowController extends Http2FlowController {
|
||||
|
||||
/**
|
||||
* Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control
|
||||
* policies to it for both the {@code stream} as well as the connection. If any flow control
|
||||
* policies have been violated, an exception is raised immediately, otherwise the frame is
|
||||
* considered to have "passed" flow control.
|
||||
* Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control policies to it for both
|
||||
* the {@code stream} as well as the connection. If any flow control policies have been violated, an exception is
|
||||
* raised immediately, otherwise the frame is considered to have "passed" flow control.
|
||||
* <p/>
|
||||
* If {@code stream} is closed, flow control should only be applied to the connection window.
|
||||
* If {@code stream} is {@code null} or closed, flow control should only be applied to the connection window and the
|
||||
* bytes are immediately consumed.
|
||||
*
|
||||
* @param ctx the context from the handler where the frame was read.
|
||||
* @param stream the subject stream for the received frame. The connection stream object must
|
||||
* not be used.
|
||||
* @param stream the subject stream for the received frame. The connection stream object must not be used. If {@code
|
||||
* stream} is {@code null} or closed, flow control should only be applied to the connection window and the bytes are
|
||||
* immediately consumed.
|
||||
* @param data payload buffer for the frame.
|
||||
* @param padding the number of padding bytes found at the end of the frame.
|
||||
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
|
||||
* endpoint for this stream.
|
||||
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream.
|
||||
* @throws Http2Exception if any flow control errors are encountered.
|
||||
*/
|
||||
void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
|
||||
@ -49,13 +49,13 @@ public interface Http2LocalFlowController extends Http2FlowController {
|
||||
* control window will collapse. Consuming bytes enables the flow controller to send {@code WINDOW_UPDATE} to
|
||||
* restore a portion of the flow control window for the stream.
|
||||
* <p/>
|
||||
* If {@code stream} is closed (i.e. {@link Http2Stream#state()} method returns {@link Http2Stream.State#CLOSED}),
|
||||
* the consumed bytes are only restored to the connection window. When a stream is closed, the flow controller
|
||||
* automatically restores any unconsumed bytes for that stream to the connection window. This is done to ensure that
|
||||
* the connection window does not degrade over time as streams are closed.
|
||||
* If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
|
||||
* Http2Stream.State#CLOSED}), calling this method has no effect.
|
||||
*
|
||||
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate
|
||||
* @param stream the stream for which window space should be freed. The connection stream object must not be used.
|
||||
* If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
|
||||
* Http2Stream.State#CLOSED}), calling this method has no effect.
|
||||
* @param numBytes the number of bytes to be returned to the flow control window.
|
||||
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the
|
||||
* stream.
|
||||
|
@ -19,7 +19,6 @@ import static io.netty.buffer.Unpooled.wrappedBuffer;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
|
||||
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
|
||||
@ -38,7 +37,9 @@ import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
@ -47,10 +48,6 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelPromise;
|
||||
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
@ -59,6 +56,9 @@ import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Tests for {@link DefaultHttp2ConnectionDecoder}.
|
||||
*/
|
||||
@ -135,7 +135,7 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
}
|
||||
}).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
|
||||
when(connection.stream(STREAM_ID)).thenReturn(stream);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
|
||||
when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(true);
|
||||
when(connection.local()).thenReturn(local);
|
||||
when(local.flowController()).thenReturn(localFlow);
|
||||
when(encoder.flowController()).thenReturn(remoteFlow);
|
||||
@ -182,6 +182,48 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = Http2Exception.class)
|
||||
public void dataReadForUnknownStreamShouldApplyFlowControlAndFail() throws Exception {
|
||||
when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
final ByteBuf data = dummyData();
|
||||
int padding = 10;
|
||||
int processedBytes = data.readableBytes() + padding;
|
||||
try {
|
||||
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
|
||||
} finally {
|
||||
try {
|
||||
verify(localFlow)
|
||||
.receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true));
|
||||
verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes));
|
||||
verifyNoMoreInteractions(localFlow);
|
||||
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
|
||||
} finally {
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataReadForUnknownStreamShouldApplyFlowControl() throws Exception {
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
final ByteBuf data = dummyData();
|
||||
int padding = 10;
|
||||
int processedBytes = data.readableBytes() + padding;
|
||||
try {
|
||||
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
|
||||
verify(localFlow)
|
||||
.receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true));
|
||||
verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes));
|
||||
verifyNoMoreInteractions(localFlow);
|
||||
|
||||
// Verify that the event was absorbed and not propagated to the observer.
|
||||
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
|
||||
} finally {
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyDataFrameShouldApplyFlowControl() throws Exception {
|
||||
final ByteBuf data = EMPTY_BUFFER;
|
||||
@ -314,6 +356,19 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
verify(stream, never()).open(anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headersReadForUnknownStreamShouldBeIgnored() throws Exception {
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
|
||||
verify(remote, never()).createStream(eq(STREAM_ID));
|
||||
verify(stream, never()).open(anyBoolean());
|
||||
|
||||
// Verify that the event was absorbed and not propagated to the oberver.
|
||||
verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean());
|
||||
verify(remote, never()).createStream(anyInt());
|
||||
verify(stream, never()).open(anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headersReadForUnknownStreamShouldCreateStream() throws Exception {
|
||||
final int streamId = 5;
|
||||
@ -408,6 +463,12 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt());
|
||||
}
|
||||
|
||||
@Test(expected = Http2Exception.class)
|
||||
public void pushPromiseReadForUnknownStreamShouldThrow() throws Exception {
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pushPromiseReadShouldSucceed() throws Exception {
|
||||
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
|
||||
@ -425,9 +486,17 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void priorityReadShouldSucceed() throws Exception {
|
||||
public void priorityReadForUnknownStreamShouldBeIgnored() throws Exception {
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
|
||||
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
|
||||
verify(listener, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void priorityReadShouldCreateNewStream() throws Exception {
|
||||
when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(null);
|
||||
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
|
||||
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
|
||||
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
|
||||
@ -435,25 +504,6 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
verify(stream, never()).open(anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void priorityReadOnPreviouslyExistingStreamShouldSucceed() throws Exception {
|
||||
doAnswer(new Answer<Http2Stream>() {
|
||||
@Override
|
||||
public Http2Stream answer(InvocationOnMock in) throws Throwable {
|
||||
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
|
||||
}
|
||||
}).when(remote).createStream(eq(STREAM_ID));
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(null);
|
||||
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
|
||||
when(connection.stream(STREAM_DEPENDENCY_ID)).thenReturn(stream);
|
||||
when(connection.requireStream(STREAM_DEPENDENCY_ID)).thenReturn(stream);
|
||||
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
|
||||
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
|
||||
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
|
||||
verify(remote).createStream(STREAM_ID);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void priorityReadOnPreviouslyParentExistingStreamShouldSucceed() throws Exception {
|
||||
doAnswer(new Answer<Http2Stream>() {
|
||||
@ -462,8 +512,6 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
|
||||
}
|
||||
}).when(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
|
||||
when(connection.stream(STREAM_ID)).thenReturn(stream);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
|
||||
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
|
||||
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
|
||||
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
|
||||
@ -479,8 +527,17 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
|
||||
@Test(expected = Http2Exception.class)
|
||||
public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception {
|
||||
when(connection.requireStream(5)).thenThrow(connectionError(PROTOCOL_ERROR, ""));
|
||||
decode().onWindowUpdateRead(ctx, 5, 10);
|
||||
when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void windowUpdateReadForUnknownStreamShouldBeIgnored() throws Exception {
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
|
||||
verify(remoteFlow, never()).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt());
|
||||
verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -500,8 +557,17 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
|
||||
@Test(expected = Http2Exception.class)
|
||||
public void rstStreamReadForUnknownStreamShouldThrow() throws Exception {
|
||||
when(connection.requireStream(5)).thenThrow(connectionError(PROTOCOL_ERROR, ""));
|
||||
decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code());
|
||||
when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rstStreamReadForUnknownStreamShouldBeIgnored() throws Exception {
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
|
||||
verify(lifecycleManager, never()).closeStream(eq(stream), eq(future));
|
||||
verify(listener, never()).onRstStreamRead(eq(ctx), anyInt(), anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -148,7 +148,6 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
}).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
|
||||
when(connection.stream(STREAM_ID)).thenReturn(stream);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
|
||||
when(connection.local()).thenReturn(local);
|
||||
when(connection.remote()).thenReturn(remote);
|
||||
when(remote.flowController()).thenReturn(remoteFlow);
|
||||
@ -357,7 +356,6 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
@Test
|
||||
public void priorityWriteShouldSetPriorityForStream() throws Exception {
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(null);
|
||||
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
|
||||
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
|
||||
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
|
||||
@ -374,10 +372,8 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
}).when(local).createStream(eq(STREAM_ID));
|
||||
when(connection.stream(STREAM_ID)).thenReturn(null);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(null);
|
||||
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
|
||||
when(connection.stream(0)).thenReturn(stream);
|
||||
when(connection.requireStream(0)).thenReturn(stream);
|
||||
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
|
||||
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
|
||||
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
|
||||
@ -393,7 +389,6 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
}).when(stream).setPriority(eq(0), eq((short) 255), eq(true));
|
||||
when(connection.stream(STREAM_ID)).thenReturn(stream);
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
|
||||
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
|
||||
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
|
||||
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
|
||||
|
@ -74,11 +74,6 @@ public class DefaultHttp2ConnectionTest {
|
||||
client.addListener(clientListener);
|
||||
}
|
||||
|
||||
@Test(expected = Http2Exception.class)
|
||||
public void getStreamOrFailWithoutStreamShouldFail() throws Http2Exception {
|
||||
server.requireStream(100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getStreamWithoutStreamShouldReturnNull() {
|
||||
assertNull(server.stream(100));
|
||||
|
@ -217,6 +217,18 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataReceivedForNullStreamShouldImmediatelyConsumeBytes() throws Http2Exception {
|
||||
receiveFlowControlledFrame(null, 10, 0, false);
|
||||
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumeBytesForNullStreamShouldIgnore() throws Http2Exception {
|
||||
controller.consumeBytes(ctx, null, 10);
|
||||
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void globalRatioShouldImpactStreams() throws Http2Exception {
|
||||
float ratio = 0.6f;
|
||||
@ -311,7 +323,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
return stream(streamId).localFlowState().windowSize();
|
||||
}
|
||||
|
||||
private Http2Stream stream(int streamId) throws Http2Exception {
|
||||
return connection.requireStream(streamId);
|
||||
private Http2Stream stream(int streamId) {
|
||||
return connection.stream(streamId);
|
||||
}
|
||||
}
|
||||
|
@ -1233,8 +1233,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
return controller.streamableBytesForTree(stream);
|
||||
}
|
||||
|
||||
private Http2Stream stream(int streamId) throws Http2Exception {
|
||||
return connection.requireStream(streamId);
|
||||
private Http2Stream stream(int streamId) {
|
||||
return connection.stream(streamId);
|
||||
}
|
||||
|
||||
private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled {
|
||||
|
Loading…
Reference in New Issue
Block a user