From b423a3578300b1dbcd8643336420d9683a023f51 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 26 Jan 2018 09:04:49 +0100 Subject: [PATCH] Correctly handle multiple calls to DefaultHttp2StreamChannel.Unsafe.close(...) Motivation: Calling DefaultHttp2StreamChannel.Unsafe.close(...) multiple times should not fail. Modification: - Correctly handle multiple calls to DefaultHttp2StreamChannel.Unsafe.close(...) - Complete closePromise and promise that is given to close(...) in the correct order. - Add unit test Result: Fixes [#7628] and [#7641] --- .../codec/http2/Http2MultiplexCodec.java | 84 +++++++++---------- .../codec/http2/Http2MultiplexCodecTest.java | 12 +++ 2 files changed, 54 insertions(+), 42 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index 1310f77d10..246b381059 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -32,7 +32,6 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelPipeline; import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator; -import io.netty.channel.DelegatingChannelPromiseNotifier; import io.netty.channel.EventLoop; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; @@ -768,7 +767,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { @SuppressWarnings("deprecation") private RecvByteBufAllocator.ExtendedHandle recvHandle; private boolean writeDoneAndNoFlush; - private ChannelPromise pendingClosePromise; + private boolean closeInitiated; @Override public void connect(final SocketAddress remoteAddress, @@ -831,58 +830,59 @@ public class Http2MultiplexCodec extends Http2FrameCodec { @Override public void disconnect(ChannelPromise promise) { - if (!promise.setUncancellable()) { - return; - } close(promise); } @Override - public void close(ChannelPromise promise) { + public void close(final ChannelPromise promise) { if (!promise.setUncancellable()) { return; } - if (closePromise.isDone()) { - promise.setFailure(new ClosedChannelException()); - return; - } - if (pendingClosePromise != null) { - pendingClosePromise.addListener(new DelegatingChannelPromiseNotifier(promise)); - return; - } - pendingClosePromise = promise; - try { - closePending = false; - fireChannelReadPending = false; - - // Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at - // all anyway. - if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) { - Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream()); - write(resetFrame, unsafe().voidPromise()); - flush(); - } - - if (inboundBuffer != null) { - for (;;) { - Object msg = inboundBuffer.poll(); - if (msg == null) { - break; + if (closeInitiated) { + if (closePromise.isDone()) { + // Closed already. + promise.setSuccess(); + } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise. + // This means close() was called before so we just register a listener and return + closePromise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + promise.setSuccess(); } - ReferenceCountUtil.release(msg); + }); + } + return; + } + closeInitiated = true; + + closePending = false; + fireChannelReadPending = false; + + // Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at + // all anyway. + if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) { + Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream()); + write(resetFrame, unsafe().voidPromise()); + flush(); + } + + if (inboundBuffer != null) { + for (;;) { + Object msg = inboundBuffer.poll(); + if (msg == null) { + break; } + ReferenceCountUtil.release(msg); } + } - // The promise should be notified before we call fireChannelInactive(). - promise.setSuccess(); - closePromise.setSuccess(); + // The promise should be notified before we call fireChannelInactive(). + closePromise.setSuccess(); + promise.setSuccess(); - pipeline().fireChannelInactive(); - if (isRegistered()) { - deregister(unsafe().voidPromise()); - } - } finally { - pendingClosePromise = null; + pipeline().fireChannelInactive(); + if (isRegistered()) { + deregister(unsafe().voidPromise()); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java index 4778729cd9..cd4e103bbc 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java @@ -464,6 +464,18 @@ public class Http2MultiplexCodecTest { assertSame(headers, headers2); } + @Test + public void callUnsafeCloseMultipleTimes() { + LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); + Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + childChannel.unsafe().close(childChannel.voidPromise()); + + ChannelPromise promise = childChannel.newPromise(); + childChannel.unsafe().close(promise); + promise.syncUninterruptibly(); + childChannel.closeFuture().syncUninterruptibly(); + } + private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream) { LastInboundHandler inboundHandler = new LastInboundHandler(); childChannelInitializer.handler = inboundHandler;