diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java index 2b894cfabe..547d724f55 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -1035,11 +1035,10 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements // There is nothing to flush so this is a NOOP. return; } - try { - flush0(parentContext()); - } finally { - writeDoneAndNoFlush = false; - } + // We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data + // that are explicit flushed. + writeDoneAndNoFlush = false; + flush0(parentContext()); } @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java index e9b686c40c..048467890b 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java @@ -17,7 +17,10 @@ package io.netty.handler.codec.http2; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -26,6 +29,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.CharsetUtil; import io.netty.util.NetUtil; import io.netty.util.ReferenceCountUtil; import org.junit.After; @@ -34,12 +38,33 @@ import org.junit.Test; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertFalse; public class Http2MultiplexTransportTest { + private static final ChannelHandler DISCARD_HANDLER = new ChannelInboundHandlerAdapter() { + + @Override + public boolean isSharable() { + return true; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + ReferenceCountUtil.release(evt); + } + }; + private EventLoopGroup eventLoopGroup; private Channel clientChannel; private Channel serverChannel; @@ -66,13 +91,13 @@ public class Http2MultiplexTransportTest { @Test(timeout = 10000) public void asyncSettingsAckWithMultiplexCodec() throws InterruptedException { - asyncSettingsAck0(new Http2MultiplexCodecBuilder(true, new HttpInboundHandler()).build(), null); + asyncSettingsAck0(new Http2MultiplexCodecBuilder(true, DISCARD_HANDLER).build(), null); } @Test(timeout = 10000) public void asyncSettingsAckWithMultiplexHandler() throws InterruptedException { asyncSettingsAck0(new Http2FrameCodecBuilder(true).build(), - new Http2MultiplexHandler(new HttpInboundHandler())); + new Http2MultiplexHandler(DISCARD_HANDLER)); } private void asyncSettingsAck0(final Http2FrameCodec codec, final ChannelHandler multiplexer) @@ -120,7 +145,7 @@ public class Http2MultiplexTransportTest { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(Http2MultiplexCodecBuilder - .forClient(new HttpInboundHandler()).autoAckSettingsFrame(false).build()); + .forClient(DISCARD_HANDLER).autoAckSettingsFrame(false).build()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { @@ -152,6 +177,81 @@ public class Http2MultiplexTransportTest { serverAckAllLatch.await(); } - @ChannelHandler.Sharable - private static final class HttpInboundHandler extends ChannelInboundHandlerAdapter { } + @Test(timeout = 5000L) + public void testFlushNotDiscarded() + throws InterruptedException { + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + + try { + ServerBootstrap sb = new ServerBootstrap(); + sb.group(eventLoopGroup); + sb.channel(NioServerSocketChannel.class); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new Http2FrameCodecBuilder(true).build()); + ch.pipeline().addLast(new Http2MultiplexHandler(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) { + executorService.schedule(new Runnable() { + @Override + public void run() { + ctx.writeAndFlush(new DefaultHttp2HeadersFrame( + new DefaultHttp2Headers(), false)).addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + ctx.write(new DefaultHttp2DataFrame( + Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII), + true)); + ctx.channel().eventLoop().execute(new Runnable() { + @Override + public void run() { + ctx.flush(); + } + }); + } + }); + } + }, 500, TimeUnit.MILLISECONDS); + } + ReferenceCountUtil.release(msg); + } + })); + } + }); + serverChannel = sb.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).syncUninterruptibly().channel(); + + final CountDownLatch latch = new CountDownLatch(1); + Bootstrap bs = new Bootstrap(); + bs.group(eventLoopGroup); + bs.channel(NioSocketChannel.class); + bs.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new Http2FrameCodecBuilder(false).build()); + ch.pipeline().addLast(new Http2MultiplexHandler(DISCARD_HANDLER)); + } + }); + clientChannel = bs.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); + Http2StreamChannelBootstrap h2Bootstrap = new Http2StreamChannelBootstrap(clientChannel); + h2Bootstrap.handler(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2DataFrame && ((Http2DataFrame) msg).isEndStream()) { + latch.countDown(); + } + ReferenceCountUtil.release(msg); + } + }); + Http2StreamChannel streamChannel = h2Bootstrap.open().syncUninterruptibly().getNow(); + streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), true)) + .syncUninterruptibly(); + + latch.await(); + } finally { + executorService.shutdown(); + } + } }