Proper shutdown of HTTP2 encoder when channelInactive
Motivation: The problem is described in https://github.com/grpc/grpc-java/issues/605. Basically, when using `StreamBufferingEncoder` there is a chance of creating zombie streams that never get closed. Modifications: Change `Http2ConnectionHandler`'s `channelInactive` handling logic to shutdown the encoder/decoder before shutting down the active streams. Result: Fixes https://github.com/grpc/grpc-java/issues/605
This commit is contained in:
parent
8650679f29
commit
6e044b082c
@ -184,25 +184,21 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
|
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
|
||||||
|
|
||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
try {
|
// Connection has terminated, close the encoder and decoder.
|
||||||
final Http2Connection connection = connection();
|
encoder().close();
|
||||||
// Check if there are streams to avoid the overhead of creating the ChannelFuture.
|
decoder().close();
|
||||||
if (connection.numActiveStreams() > 0) {
|
|
||||||
final ChannelFuture future = ctx.newSucceededFuture();
|
final Http2Connection connection = connection();
|
||||||
connection.forEachActiveStream(new Http2StreamVisitor() {
|
// Check if there are streams to avoid the overhead of creating the ChannelFuture.
|
||||||
@Override
|
if (connection.numActiveStreams() > 0) {
|
||||||
public boolean visit(Http2Stream stream) throws Http2Exception {
|
final ChannelFuture future = ctx.newSucceededFuture();
|
||||||
closeStream(stream, future);
|
connection.forEachActiveStream(new Http2StreamVisitor() {
|
||||||
return true;
|
@Override
|
||||||
}
|
public boolean visit(Http2Stream stream) throws Http2Exception {
|
||||||
});
|
closeStream(stream, future);
|
||||||
}
|
return true;
|
||||||
} finally {
|
}
|
||||||
try {
|
});
|
||||||
encoder().close();
|
|
||||||
} finally {
|
|
||||||
decoder().close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,9 +20,11 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
|||||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufUtil;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import io.netty.util.ByteString;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
@ -43,7 +45,7 @@ import java.util.TreeMap;
|
|||||||
* <p>
|
* <p>
|
||||||
* If a {@code GOAWAY} frame is received from the remote endpoint, all buffered writes for streams
|
* If a {@code GOAWAY} frame is received from the remote endpoint, all buffered writes for streams
|
||||||
* with an ID less than the specified {@code lastStreamId} will immediately fail with a
|
* with an ID less than the specified {@code lastStreamId} will immediately fail with a
|
||||||
* {@link StreamBufferingEncoder.GoAwayException}.
|
* {@link Http2GoAwayException}.
|
||||||
* <p/>
|
* <p/>
|
||||||
* <p>This implementation makes the buffering mostly transparent and is expected to be used as a
|
* <p>This implementation makes the buffering mostly transparent and is expected to be used as a
|
||||||
* drop-in decorator of {@link DefaultHttp2ConnectionEncoder}.
|
* drop-in decorator of {@link DefaultHttp2ConnectionEncoder}.
|
||||||
@ -51,17 +53,28 @@ import java.util.TreeMap;
|
|||||||
*/
|
*/
|
||||||
public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
|
public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown if buffered streams are terminated due to this encoder being closed.
|
||||||
|
*/
|
||||||
|
public static final class Http2ChannelClosedException extends Http2Exception {
|
||||||
|
private static final long serialVersionUID = 4768543442094476971L;
|
||||||
|
|
||||||
|
public Http2ChannelClosedException() {
|
||||||
|
super(Http2Error.REFUSED_STREAM, "Connection closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thrown by {@link StreamBufferingEncoder} if buffered streams are terminated due to
|
* Thrown by {@link StreamBufferingEncoder} if buffered streams are terminated due to
|
||||||
* receipt of a {@code GOAWAY}.
|
* receipt of a {@code GOAWAY}.
|
||||||
*/
|
*/
|
||||||
public static final class GoAwayException extends Http2Exception {
|
public static final class Http2GoAwayException extends Http2Exception {
|
||||||
private static final long serialVersionUID = 1326785622777291198L;
|
private static final long serialVersionUID = 1326785622777291198L;
|
||||||
private final int lastStreamId;
|
private final int lastStreamId;
|
||||||
private final long errorCode;
|
private final long errorCode;
|
||||||
private final ByteBuf debugData;
|
private final ByteString debugData;
|
||||||
|
|
||||||
public GoAwayException(int lastStreamId, long errorCode, ByteBuf debugData) {
|
public Http2GoAwayException(int lastStreamId, long errorCode, ByteString debugData) {
|
||||||
super(Http2Error.STREAM_CLOSED);
|
super(Http2Error.STREAM_CLOSED);
|
||||||
this.lastStreamId = lastStreamId;
|
this.lastStreamId = lastStreamId;
|
||||||
this.errorCode = errorCode;
|
this.errorCode = errorCode;
|
||||||
@ -76,7 +89,7 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
|
|||||||
return errorCode;
|
return errorCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuf debugData() {
|
public ByteString debugData() {
|
||||||
return debugData;
|
return debugData;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -87,6 +100,7 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
|
|||||||
*/
|
*/
|
||||||
private final TreeMap<Integer, PendingStream> pendingStreams = new TreeMap<Integer, PendingStream>();
|
private final TreeMap<Integer, PendingStream> pendingStreams = new TreeMap<Integer, PendingStream>();
|
||||||
private int maxConcurrentStreams;
|
private int maxConcurrentStreams;
|
||||||
|
private boolean closed;
|
||||||
|
|
||||||
public StreamBufferingEncoder(Http2ConnectionEncoder delegate) {
|
public StreamBufferingEncoder(Http2ConnectionEncoder delegate) {
|
||||||
this(delegate, SMALLEST_MAX_CONCURRENT_STREAMS);
|
this(delegate, SMALLEST_MAX_CONCURRENT_STREAMS);
|
||||||
@ -127,6 +141,9 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
|
|||||||
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
|
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
|
||||||
int streamDependency, short weight, boolean exclusive,
|
int streamDependency, short weight, boolean exclusive,
|
||||||
int padding, boolean endOfStream, ChannelPromise promise) {
|
int padding, boolean endOfStream, ChannelPromise promise) {
|
||||||
|
if (closed) {
|
||||||
|
return promise.setFailure(new Http2ChannelClosedException());
|
||||||
|
}
|
||||||
if (isExistingStream(streamId) || connection().goAwayReceived()) {
|
if (isExistingStream(streamId) || connection().goAwayReceived()) {
|
||||||
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
|
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
|
||||||
exclusive, padding, endOfStream, promise);
|
exclusive, padding, endOfStream, promise);
|
||||||
@ -198,8 +215,20 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
super.close();
|
try {
|
||||||
cancelPendingStreams();
|
if (!closed) {
|
||||||
|
closed = true;
|
||||||
|
|
||||||
|
// Fail all buffered streams.
|
||||||
|
Http2ChannelClosedException e = new Http2ChannelClosedException();
|
||||||
|
while (!pendingStreams.isEmpty()) {
|
||||||
|
PendingStream stream = pendingStreams.pollFirstEntry().getValue();
|
||||||
|
stream.close(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
super.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void tryCreatePendingStreams() {
|
private void tryCreatePendingStreams() {
|
||||||
@ -210,17 +239,10 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cancelPendingStreams() {
|
|
||||||
Exception e = new Exception("Connection closed.");
|
|
||||||
while (!pendingStreams.isEmpty()) {
|
|
||||||
PendingStream stream = pendingStreams.pollFirstEntry().getValue();
|
|
||||||
stream.close(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) {
|
private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) {
|
||||||
Iterator<PendingStream> iter = pendingStreams.values().iterator();
|
Iterator<PendingStream> iter = pendingStreams.values().iterator();
|
||||||
Exception e = new GoAwayException(lastStreamId, errorCode, debugData);
|
Exception e = new Http2GoAwayException(lastStreamId, errorCode,
|
||||||
|
new ByteString(ByteBufUtil.getBytes(debugData), false));
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
PendingStream stream = iter.next();
|
PendingStream stream = iter.next();
|
||||||
if (stream.streamId > lastStreamId) {
|
if (stream.streamId > lastStreamId) {
|
||||||
|
@ -41,9 +41,9 @@ import io.netty.channel.ChannelFuture;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.DefaultChannelPromise;
|
import io.netty.channel.DefaultChannelPromise;
|
||||||
import io.netty.handler.codec.http2.StreamBufferingEncoder.GoAwayException;
|
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
|
||||||
|
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
|
||||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -208,7 +208,7 @@ public class StreamBufferingEncoderTest {
|
|||||||
}
|
}
|
||||||
assertEquals(4, encoder.numBufferedStreams());
|
assertEquals(4, encoder.numBufferedStreams());
|
||||||
|
|
||||||
connection.goAwayReceived(11, 8, null);
|
connection.goAwayReceived(11, 8, EMPTY_BUFFER);
|
||||||
|
|
||||||
assertEquals(5, connection.numActiveStreams());
|
assertEquals(5, connection.numActiveStreams());
|
||||||
// The 4 buffered streams must have been failed.
|
// The 4 buffered streams must have been failed.
|
||||||
@ -233,7 +233,7 @@ public class StreamBufferingEncoderTest {
|
|||||||
|
|
||||||
assertEquals(1, connection.numActiveStreams());
|
assertEquals(1, connection.numActiveStreams());
|
||||||
assertEquals(2, encoder.numBufferedStreams());
|
assertEquals(2, encoder.numBufferedStreams());
|
||||||
verify(promise, never()).setFailure(any(GoAwayException.class));
|
verify(promise, never()).setFailure(any(Http2GoAwayException.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -410,6 +410,28 @@ public class StreamBufferingEncoderTest {
|
|||||||
verify(data).release();
|
verify(data).release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void closeShouldCancelAllBufferedStreams() {
|
||||||
|
encoder.writeSettingsAck(ctx, promise);
|
||||||
|
connection.local().maxActiveStreams(0);
|
||||||
|
|
||||||
|
encoderWriteHeaders(3, promise);
|
||||||
|
encoderWriteHeaders(5, promise);
|
||||||
|
encoderWriteHeaders(7, promise);
|
||||||
|
|
||||||
|
encoder.close();
|
||||||
|
verify(promise, times(3)).setFailure(any(Http2ChannelClosedException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void headersAfterCloseShouldImmediatelyFail() {
|
||||||
|
encoder.writeSettingsAck(ctx, promise);
|
||||||
|
encoder.close();
|
||||||
|
|
||||||
|
encoderWriteHeaders(3, promise);
|
||||||
|
verify(promise).setFailure(any(Http2ChannelClosedException.class));
|
||||||
|
}
|
||||||
|
|
||||||
private void setMaxConcurrentStreams(int newValue) {
|
private void setMaxConcurrentStreams(int newValue) {
|
||||||
try {
|
try {
|
||||||
encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));
|
encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));
|
||||||
|
Loading…
Reference in New Issue
Block a user