Always consume bytes for closed HTTP/2 streams.

Motivation:

The current local flow controller does not guarantee that unconsumed bytes for a closed stream will be restored to the connection window.  This may lead to degradation of the connection window over time.

Modifications:

Modified DefaultHttp2LocalFlowController to guarantee that any unconsumed bytes are returned to the connection window as soon as the stream is closed. We also immediately consume any bytes when receiving DATA for a closed stream.

Result:

Fixes #3668
This commit is contained in:
nmittler 2015-04-20 16:02:26 -07:00
parent abccf18411
commit 26a7a5ec25
4 changed files with 100 additions and 45 deletions

View File

@ -1144,19 +1144,19 @@ public class DefaultHttp2Connection implements Http2Connection {
void removeFromActiveStreams(DefaultStream stream) {
if (streams.remove(stream)) {
try {
// Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams--;
// Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams--;
}
notifyClosed(stream);
removeStream(stream);
}
for (int i = 0; i < listeners.size(); i++) {
try {
listeners.get(i).onStreamClosed(stream);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onStreamClosed.", e);
}
}
} finally {
removeStream(stream);
private void notifyClosed(DefaultStream stream) {
for (int i = 0; i < listeners.size(); i++) {
try {
listeners.get(i).onStreamClosed(stream);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onStreamClosed.", e);
}
}
}

View File

@ -31,6 +31,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.Http2Stream.FlowControlState;
import io.netty.util.internal.PlatformDependent;
/**
* Basic implementation of {@link Http2LocalFlowController}.
@ -44,6 +45,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private ChannelHandlerContext ctx;
private volatile float windowUpdateRatio;
private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE;
@ -74,6 +76,22 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
// frames which may have been exchanged while it was in IDLE
state(stream).window(initialWindowSize);
}
@Override
public void onStreamClosed(Http2Stream stream) {
try {
// When a stream is closed, consume any remaining bytes so that they
// are restored to the connection window.
DefaultFlowState state = state(stream);
int unconsumedBytes = state.unconsumedBytes();
if (ctx != null && unconsumedBytes > 0) {
connectionState().consumeBytes(ctx, unconsumedBytes);
state.consumeBytes(ctx, unconsumedBytes);
}
} catch (Http2Exception e) {
PlatformDependent.throwException(e);
}
}
});
}
@ -105,7 +123,19 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception {
state(stream).consumeBytes(ctx, numBytes);
if (stream.id() == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
if (numBytes <= 0) {
throw new IllegalArgumentException("numBytes must be positive");
}
// Streams automatically consume all remaining bytes when they are closed, so just ignore
// if already closed.
if (!isClosed(stream)) {
connectionState().consumeBytes(ctx, numBytes);
state(stream).consumeBytes(ctx, numBytes);
}
}
@Override
@ -174,15 +204,23 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data,
int padding, boolean endOfStream) throws Http2Exception {
this.ctx = checkNotNull(ctx, "ctx");
int dataLength = data.readableBytes() + padding;
// Apply the connection-level flow control
connectionState().receiveFlowControlledFrame(dataLength);
// Apply the stream-level flow control
DefaultFlowState state = state(stream);
state.endOfStream(endOfStream);
state.receiveFlowControlledFrame(dataLength);
DefaultFlowState connectionState = connectionState();
connectionState.receiveFlowControlledFrame(dataLength);
if (!isClosed(stream)) {
// Apply the stream-level flow control
DefaultFlowState state = state(stream);
state.endOfStream(endOfStream);
state.receiveFlowControlledFrame(dataLength);
} else if (dataLength > 0) {
// Immediately consume the bytes for the connection window.
connectionState.consumeBytes(ctx, dataLength);
}
}
private DefaultFlowState connectionState() {
@ -193,6 +231,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
return (DefaultFlowState) checkNotNull(stream, "stream").localFlowState();
}
private static boolean isClosed(Http2Stream stream) {
return stream.state() == Http2Stream.State.CLOSED;
}
/**
* Flow control window state for an individual stream.
*/
@ -323,18 +365,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
if (stream.id() == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
if (numBytes <= 0) {
throw new IllegalArgumentException("numBytes must be positive");
}
// Return bytes to the connection window
DefaultFlowState connectionState = connectionState();
connectionState.returnProcessedBytes(numBytes);
connectionState.writeWindowUpdateIfNeeded(ctx);
// Return the bytes processed and update the window.
returnProcessedBytes(numBytes);
writeWindowUpdateIfNeeded(ctx);
@ -348,7 +378,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* Updates the flow control window for this stream if it is appropriate.
*/
void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
if (endOfStream || initialStreamWindowSize <= 0) {
if (endOfStream || initialStreamWindowSize <= 0 || isClosed(stream)) {
return;
}

View File

@ -28,6 +28,8 @@ public interface Http2LocalFlowController extends Http2FlowController {
* policies to it for both the {@code stream} as well as the connection. If any flow control
* policies have been violated, an exception is raised immediately, otherwise the frame is
* considered to have "passed" flow control.
* <p/>
* If {@code stream} is closed, flow control should only be applied to the connection window.
*
* @param ctx the context from the handler where the frame was read.
* @param stream the subject stream for the received frame. The connection stream object must
@ -39,22 +41,24 @@ public interface Http2LocalFlowController extends Http2FlowController {
* @throws Http2Exception if any flow control errors are encountered.
*/
void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
boolean endOfStream) throws Http2Exception;
/**
* Indicates that the application has consumed a number of bytes for the given stream and is
* therefore ready to receive more data from the remote endpoint. The application must consume
* any bytes that it receives or the flow control window will collapse. Consuming bytes enables
* the flow controller to send {@code WINDOW_UPDATE} to restore a portion of the flow control
* window for the stream.
* Indicates that the application has consumed a number of bytes for the given stream and is therefore ready to
* receive more data from the remote endpoint. The application must consume any bytes that it receives or the flow
* control window will collapse. Consuming bytes enables the flow controller to send {@code WINDOW_UPDATE} to
* restore a portion of the flow control window for the stream.
* <p/>
* If {@code stream} is closed (i.e. {@link Http2Stream#state()} method returns {@link Http2Stream.State#CLOSED}),
* the consumed bytes are only restored to the connection window. When a stream is closed, the flow controller
* automatically restores any unconsumed bytes for that stream to the connection window. This is done to ensure that
* the connection window does not degrade over time as streams are closed.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if
* appropriate
* @param stream the stream for which window space should be freed. The connection stream object
* must not be used.
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate
* @param stream the stream for which window space should be freed. The connection stream object must not be used.
* @param numBytes the number of bytes to be returned to the flow control window.
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes}
* for the stream.
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the
* stream.
*/
void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception;

View File

@ -201,6 +201,22 @@ public class DefaultHttp2LocalFlowControllerTest {
}
}
@Test
public void closeShouldConsumeBytes() throws Http2Exception {
receiveFlowControlledFrame(STREAM_ID, 10, 0, false);
assertEquals(10, controller.unconsumedBytes(connection.connectionStream()));
stream(STREAM_ID).close();
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
}
@Test
public void dataReceivedForClosedStreamShouldImmediatelyConsumeBytes() throws Http2Exception {
Http2Stream stream = stream(STREAM_ID);
stream.close();
receiveFlowControlledFrame(stream, 10, 0, false);
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
}
@Test
public void globalRatioShouldImpactStreams() throws Http2Exception {
float ratio = 0.6f;
@ -254,10 +270,15 @@ public class DefaultHttp2LocalFlowControllerTest {
}
private void receiveFlowControlledFrame(int streamId, int dataSize, int padding,
boolean endOfStream) throws Http2Exception {
boolean endOfStream) throws Http2Exception {
receiveFlowControlledFrame(stream(streamId), dataSize, padding, endOfStream);
}
private void receiveFlowControlledFrame(Http2Stream stream, int dataSize, int padding,
boolean endOfStream) throws Http2Exception {
final ByteBuf buf = dummyData(dataSize);
try {
controller.receiveFlowControlledFrame(ctx, stream(streamId), buf, padding, endOfStream);
controller.receiveFlowControlledFrame(ctx, stream, buf, padding, endOfStream);
} finally {
buf.release();
}