Proper shutdown of HTTP2 encoder when channelInactive

Motivation:

The problem is described in https://github.com/grpc/grpc-java/issues/605. Basically, when using `StreamBufferingEncoder` there is a chance of creating zombie streams that never get closed.

Modifications:

Change `Http2ConnectionHandler`'s `channelInactive` handling logic to shutdown the encoder/decoder before shutting down the active streams.

Result:

Fixes https://github.com/grpc/grpc-java/issues/605
This commit is contained in:
nmittler 2015-07-08 11:38:22 -07:00
parent a99e29e032
commit 8a16081a93
3 changed files with 79 additions and 39 deletions

View File

@ -181,25 +181,21 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
public void channelActive(ChannelHandlerContext ctx) throws Exception { } public void channelActive(ChannelHandlerContext ctx) throws Exception { }
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try { // Connection has terminated, close the encoder and decoder.
final Http2Connection connection = connection(); encoder().close();
// Check if there are streams to avoid the overhead of creating the ChannelFuture. decoder().close();
if (connection.numActiveStreams() > 0) {
final ChannelFuture future = ctx.newSucceededFuture(); final Http2Connection connection = connection();
connection.forEachActiveStream(new Http2StreamVisitor() { // Check if there are streams to avoid the overhead of creating the ChannelFuture.
@Override if (connection.numActiveStreams() > 0) {
public boolean visit(Http2Stream stream) throws Http2Exception { final ChannelFuture future = ctx.newSucceededFuture();
closeStream(stream, future); connection.forEachActiveStream(new Http2StreamVisitor() {
return true; @Override
} public boolean visit(Http2Stream stream) throws Http2Exception {
}); closeStream(stream, future);
} return true;
} finally { }
try { });
encoder().close();
} finally {
decoder().close();
}
} }
} }

View File

@ -20,9 +20,11 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.ByteString;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -43,7 +45,7 @@ import java.util.TreeMap;
* <p> * <p>
* If a {@code GOAWAY} frame is received from the remote endpoint, all buffered writes for streams * If a {@code GOAWAY} frame is received from the remote endpoint, all buffered writes for streams
* with an ID less than the specified {@code lastStreamId} will immediately fail with a * with an ID less than the specified {@code lastStreamId} will immediately fail with a
* {@link StreamBufferingEncoder.GoAwayException}. * {@link Http2GoAwayException}.
* <p/> * <p/>
* <p>This implementation makes the buffering mostly transparent and is expected to be used as a * <p>This implementation makes the buffering mostly transparent and is expected to be used as a
* drop-in decorator of {@link DefaultHttp2ConnectionEncoder}. * drop-in decorator of {@link DefaultHttp2ConnectionEncoder}.
@ -51,17 +53,28 @@ import java.util.TreeMap;
*/ */
public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder { public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
/**
* Thrown if buffered streams are terminated due to this encoder being closed.
*/
public static final class Http2ChannelClosedException extends Http2Exception {
private static final long serialVersionUID = 4768543442094476971L;
public Http2ChannelClosedException() {
super(Http2Error.REFUSED_STREAM, "Connection closed");
}
}
/** /**
* Thrown by {@link StreamBufferingEncoder} if buffered streams are terminated due to * Thrown by {@link StreamBufferingEncoder} if buffered streams are terminated due to
* receipt of a {@code GOAWAY}. * receipt of a {@code GOAWAY}.
*/ */
public static final class GoAwayException extends Http2Exception { public static final class Http2GoAwayException extends Http2Exception {
private static final long serialVersionUID = 1326785622777291198L; private static final long serialVersionUID = 1326785622777291198L;
private final int lastStreamId; private final int lastStreamId;
private final long errorCode; private final long errorCode;
private final ByteBuf debugData; private final ByteString debugData;
public GoAwayException(int lastStreamId, long errorCode, ByteBuf debugData) { public Http2GoAwayException(int lastStreamId, long errorCode, ByteString debugData) {
super(Http2Error.STREAM_CLOSED); super(Http2Error.STREAM_CLOSED);
this.lastStreamId = lastStreamId; this.lastStreamId = lastStreamId;
this.errorCode = errorCode; this.errorCode = errorCode;
@ -76,7 +89,7 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
return errorCode; return errorCode;
} }
public ByteBuf debugData() { public ByteString debugData() {
return debugData; return debugData;
} }
} }
@ -87,6 +100,7 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
*/ */
private final TreeMap<Integer, PendingStream> pendingStreams = new TreeMap<Integer, PendingStream>(); private final TreeMap<Integer, PendingStream> pendingStreams = new TreeMap<Integer, PendingStream>();
private int maxConcurrentStreams; private int maxConcurrentStreams;
private boolean closed;
public StreamBufferingEncoder(Http2ConnectionEncoder delegate) { public StreamBufferingEncoder(Http2ConnectionEncoder delegate) {
this(delegate, SMALLEST_MAX_CONCURRENT_STREAMS); this(delegate, SMALLEST_MAX_CONCURRENT_STREAMS);
@ -127,6 +141,9 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int streamDependency, short weight, boolean exclusive,
int padding, boolean endOfStream, ChannelPromise promise) { int padding, boolean endOfStream, ChannelPromise promise) {
if (closed) {
return promise.setFailure(new Http2ChannelClosedException());
}
if (isExistingStream(streamId) || connection().goAwayReceived()) { if (isExistingStream(streamId) || connection().goAwayReceived()) {
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise); exclusive, padding, endOfStream, promise);
@ -198,8 +215,20 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
@Override @Override
public void close() { public void close() {
super.close(); try {
cancelPendingStreams(); if (!closed) {
closed = true;
// Fail all buffered streams.
Http2ChannelClosedException e = new Http2ChannelClosedException();
while (!pendingStreams.isEmpty()) {
PendingStream stream = pendingStreams.pollFirstEntry().getValue();
stream.close(e);
}
}
} finally {
super.close();
}
} }
private void tryCreatePendingStreams() { private void tryCreatePendingStreams() {
@ -210,17 +239,10 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
} }
} }
private void cancelPendingStreams() {
Exception e = new Exception("Connection closed.");
while (!pendingStreams.isEmpty()) {
PendingStream stream = pendingStreams.pollFirstEntry().getValue();
stream.close(e);
}
}
private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) { private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) {
Iterator<PendingStream> iter = pendingStreams.values().iterator(); Iterator<PendingStream> iter = pendingStreams.values().iterator();
Exception e = new GoAwayException(lastStreamId, errorCode, debugData); Exception e = new Http2GoAwayException(lastStreamId, errorCode,
new ByteString(ByteBufUtil.getBytes(debugData), false));
while (iter.hasNext()) { while (iter.hasNext()) {
PendingStream stream = iter.next(); PendingStream stream = iter.next();
if (stream.streamId > lastStreamId) { if (stream.streamId > lastStreamId) {

View File

@ -41,9 +41,9 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.StreamBufferingEncoder.GoAwayException; import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -208,7 +208,7 @@ public class StreamBufferingEncoderTest {
} }
assertEquals(4, encoder.numBufferedStreams()); assertEquals(4, encoder.numBufferedStreams());
connection.goAwayReceived(11, 8, null); connection.goAwayReceived(11, 8, EMPTY_BUFFER);
assertEquals(5, connection.numActiveStreams()); assertEquals(5, connection.numActiveStreams());
// The 4 buffered streams must have been failed. // The 4 buffered streams must have been failed.
@ -233,7 +233,7 @@ public class StreamBufferingEncoderTest {
assertEquals(1, connection.numActiveStreams()); assertEquals(1, connection.numActiveStreams());
assertEquals(2, encoder.numBufferedStreams()); assertEquals(2, encoder.numBufferedStreams());
verify(promise, never()).setFailure(any(GoAwayException.class)); verify(promise, never()).setFailure(any(Http2GoAwayException.class));
} }
@Test @Test
@ -410,6 +410,28 @@ public class StreamBufferingEncoderTest {
verify(data).release(); verify(data).release();
} }
@Test
public void closeShouldCancelAllBufferedStreams() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(0);
encoderWriteHeaders(3, promise);
encoderWriteHeaders(5, promise);
encoderWriteHeaders(7, promise);
encoder.close();
verify(promise, times(3)).setFailure(any(Http2ChannelClosedException.class));
}
@Test
public void headersAfterCloseShouldImmediatelyFail() {
encoder.writeSettingsAck(ctx, promise);
encoder.close();
encoderWriteHeaders(3, promise);
verify(promise).setFailure(any(Http2ChannelClosedException.class));
}
private void setMaxConcurrentStreams(int newValue) { private void setMaxConcurrentStreams(int newValue) {
try { try {
encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue)); encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));