From d5d1b898d56152d67c81e5cfd722675097d51e73 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 27 Jun 2018 13:28:41 -0600 Subject: [PATCH] Reorder channel state changes in Http2MultiplexCodec child channel Motivation: If a write fails for a Http2MultiplexChannel stream channel, the channel may be forcibly closed, but only after the promise has been failed. That means continuations attached to the promise may see the channel in an inconsistent state of still being open and active. Modifications: Move the satisfaction of the promise to after the channel cleanup logic runs. Result: Listeners attached to the future that resulted in a Failed write will see the stream channel in the correct state. --- .../codec/http2/Http2MultiplexCodec.java | 5 +-- .../codec/http2/Http2MultiplexCodecTest.java | 44 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index 33b59da13e..521b897a9c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -1145,9 +1145,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec { writabilityChanged(Http2MultiplexCodec.this.isWritable(stream)); promise.setSuccess(); } else { - promise.setFailure(wrapStreamClosedError(cause)); // If the first write fails there is not much we can do, just close closeForcibly(); + promise.setFailure(wrapStreamClosedError(cause)); } } @@ -1157,8 +1157,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec { promise.setSuccess(); } else { Throwable error = wrapStreamClosedError(cause); - promise.setFailure(error); - if (error instanceof ClosedChannelException) { if (config.isAutoClose()) { // Close channel if needed. @@ -1167,6 +1165,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { outboundClosed = true; } } + promise.setFailure(error); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java index c272b9a898..314bef7c23 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java @@ -33,9 +33,12 @@ import io.netty.util.AttributeKey; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.util.ReferenceCountUtil; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -464,6 +467,7 @@ public class Http2MultiplexCodecTest { childChannel.close(p).syncUninterruptibly(); assertFalse(channelOpen.get()); + assertFalse(channelActive.get()); assertFalse(childChannel.isActive()); } @@ -488,6 +492,46 @@ public class Http2MultiplexCodecTest { childChannel.close().syncUninterruptibly(); assertFalse(channelOpen.get()); + assertFalse(channelActive.get()); + assertFalse(childChannel.isActive()); + } + + @Test + public void channelClosedWhenWriteFutureFails() { + final Queue writePromises = new ArrayDeque(); + writer = new Writer() { + @Override + void write(Object msg, ChannelPromise promise) { + ReferenceCountUtil.release(msg); + writePromises.offer(promise); + } + }; + + LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); + Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + + assertTrue(childChannel.isOpen()); + assertTrue(childChannel.isActive()); + + final AtomicBoolean channelOpen = new AtomicBoolean(true); + final AtomicBoolean channelActive = new AtomicBoolean(true); + + ChannelFuture f = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); + assertFalse(f.isDone()); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + channelOpen.set(future.channel().isOpen()); + channelActive.set(future.channel().isActive()); + } + }); + + ChannelPromise first = writePromises.poll(); + first.setFailure(new ClosedChannelException()); + f.awaitUninterruptibly(); + + assertFalse(channelOpen.get()); + assertFalse(channelActive.get()); assertFalse(childChannel.isActive()); }