diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 36073ed1bc..49edaf9440 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -184,25 +184,21 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http public void channelActive(ChannelHandlerContext ctx) throws Exception { } public void channelInactive(ChannelHandlerContext ctx) throws Exception { - try { - final Http2Connection connection = connection(); - // Check if there are streams to avoid the overhead of creating the ChannelFuture. - if (connection.numActiveStreams() > 0) { - final ChannelFuture future = ctx.newSucceededFuture(); - connection.forEachActiveStream(new Http2StreamVisitor() { - @Override - public boolean visit(Http2Stream stream) throws Http2Exception { - closeStream(stream, future); - return true; - } - }); - } - } finally { - try { - encoder().close(); - } finally { - decoder().close(); - } + // Connection has terminated, close the encoder and decoder. + encoder().close(); + decoder().close(); + + final Http2Connection connection = connection(); + // Check if there are streams to avoid the overhead of creating the ChannelFuture. + if (connection.numActiveStreams() > 0) { + final ChannelFuture future = ctx.newSucceededFuture(); + connection.forEachActiveStream(new Http2StreamVisitor() { + @Override + public boolean visit(Http2Stream stream) throws Http2Exception { + closeStream(stream, future); + return true; + } + }); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamBufferingEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamBufferingEncoder.java index 7da5cf7229..7ed8e860c2 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamBufferingEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamBufferingEncoder.java @@ -20,9 +20,11 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.util.ByteString; import io.netty.util.ReferenceCountUtil; import java.util.ArrayDeque; @@ -43,7 +45,7 @@ import java.util.TreeMap; *

* If a {@code GOAWAY} frame is received from the remote endpoint, all buffered writes for streams * with an ID less than the specified {@code lastStreamId} will immediately fail with a - * {@link StreamBufferingEncoder.GoAwayException}. + * {@link Http2GoAwayException}. *

*

This implementation makes the buffering mostly transparent and is expected to be used as a * drop-in decorator of {@link DefaultHttp2ConnectionEncoder}. @@ -51,17 +53,28 @@ import java.util.TreeMap; */ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder { + /** + * Thrown if buffered streams are terminated due to this encoder being closed. + */ + public static final class Http2ChannelClosedException extends Http2Exception { + private static final long serialVersionUID = 4768543442094476971L; + + public Http2ChannelClosedException() { + super(Http2Error.REFUSED_STREAM, "Connection closed"); + } + } + /** * Thrown by {@link StreamBufferingEncoder} if buffered streams are terminated due to * receipt of a {@code GOAWAY}. */ - public static final class GoAwayException extends Http2Exception { + public static final class Http2GoAwayException extends Http2Exception { private static final long serialVersionUID = 1326785622777291198L; private final int lastStreamId; private final long errorCode; - private final ByteBuf debugData; + private final ByteString debugData; - public GoAwayException(int lastStreamId, long errorCode, ByteBuf debugData) { + public Http2GoAwayException(int lastStreamId, long errorCode, ByteString debugData) { super(Http2Error.STREAM_CLOSED); this.lastStreamId = lastStreamId; this.errorCode = errorCode; @@ -76,7 +89,7 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder { return errorCode; } - public ByteBuf debugData() { + public ByteString debugData() { return debugData; } } @@ -87,6 +100,7 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder { */ private final TreeMap pendingStreams = new TreeMap(); private int maxConcurrentStreams; + private boolean closed; public StreamBufferingEncoder(Http2ConnectionEncoder delegate) { this(delegate, SMALLEST_MAX_CONCURRENT_STREAMS); @@ -127,6 +141,9 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder { public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) { + if (closed) { + return promise.setFailure(new Http2ChannelClosedException()); + } if (isExistingStream(streamId) || connection().goAwayReceived()) { return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream, promise); @@ -198,8 +215,20 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder { @Override public void close() { - super.close(); - cancelPendingStreams(); + try { + if (!closed) { + closed = true; + + // Fail all buffered streams. + Http2ChannelClosedException e = new Http2ChannelClosedException(); + while (!pendingStreams.isEmpty()) { + PendingStream stream = pendingStreams.pollFirstEntry().getValue(); + stream.close(e); + } + } + } finally { + super.close(); + } } private void tryCreatePendingStreams() { @@ -210,17 +239,10 @@ public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder { } } - private void cancelPendingStreams() { - Exception e = new Exception("Connection closed."); - while (!pendingStreams.isEmpty()) { - PendingStream stream = pendingStreams.pollFirstEntry().getValue(); - stream.close(e); - } - } - private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) { Iterator iter = pendingStreams.values().iterator(); - Exception e = new GoAwayException(lastStreamId, errorCode, debugData); + Exception e = new Http2GoAwayException(lastStreamId, errorCode, + new ByteString(ByteBufUtil.getBytes(debugData), false)); while (iter.hasNext()) { PendingStream stream = iter.next(); if (stream.streamId > lastStreamId) { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java index e2b786a48f..ff21be9287 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java @@ -41,9 +41,9 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; -import io.netty.handler.codec.http2.StreamBufferingEncoder.GoAwayException; +import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException; +import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException; import io.netty.util.ReferenceCountUtil; -import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.ImmediateEventExecutor; import org.junit.After; import org.junit.Before; @@ -208,7 +208,7 @@ public class StreamBufferingEncoderTest { } assertEquals(4, encoder.numBufferedStreams()); - connection.goAwayReceived(11, 8, null); + connection.goAwayReceived(11, 8, EMPTY_BUFFER); assertEquals(5, connection.numActiveStreams()); // The 4 buffered streams must have been failed. @@ -233,7 +233,7 @@ public class StreamBufferingEncoderTest { assertEquals(1, connection.numActiveStreams()); assertEquals(2, encoder.numBufferedStreams()); - verify(promise, never()).setFailure(any(GoAwayException.class)); + verify(promise, never()).setFailure(any(Http2GoAwayException.class)); } @Test @@ -410,6 +410,28 @@ public class StreamBufferingEncoderTest { verify(data).release(); } + @Test + public void closeShouldCancelAllBufferedStreams() { + encoder.writeSettingsAck(ctx, promise); + connection.local().maxActiveStreams(0); + + encoderWriteHeaders(3, promise); + encoderWriteHeaders(5, promise); + encoderWriteHeaders(7, promise); + + encoder.close(); + verify(promise, times(3)).setFailure(any(Http2ChannelClosedException.class)); + } + + @Test + public void headersAfterCloseShouldImmediatelyFail() { + encoder.writeSettingsAck(ctx, promise); + encoder.close(); + + encoderWriteHeaders(3, promise); + verify(promise).setFailure(any(Http2ChannelClosedException.class)); + } + private void setMaxConcurrentStreams(int newValue) { try { encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));