HTTP/2 child channel may discard flush when done from an arbitrary thread (#10019)
Motivation: We need to carefully manage flushes to ensure we not discard these by mistake due wrongly implemented consolidation of flushes. Modifications: - Ensure we reset flag before we actually call flush0(...) - Add unit test Result: Fixes https://github.com/netty/netty/issues/10015
This commit is contained in:
parent
df4fd115df
commit
2eb49866d2
@ -1023,11 +1023,10 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
// There is nothing to flush so this is a NOOP.
|
// There is nothing to flush so this is a NOOP.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
// We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
|
||||||
flush0(parentContext());
|
// that are explicit flushed.
|
||||||
} finally {
|
|
||||||
writeDoneAndNoFlush = false;
|
writeDoneAndNoFlush = false;
|
||||||
}
|
flush0(parentContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -17,14 +17,18 @@ package io.netty.handler.codec.http2;
|
|||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerAdapter;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
import io.netty.util.CharsetUtil;
|
||||||
import io.netty.util.NetUtil;
|
import io.netty.util.NetUtil;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -33,12 +37,33 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
public class Http2MultiplexTransportTest {
|
public class Http2MultiplexTransportTest {
|
||||||
|
private static final ChannelHandler DISCARD_HANDLER = new ChannelHandlerAdapter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSharable() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||||
|
ReferenceCountUtil.release(evt);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
private EventLoopGroup eventLoopGroup;
|
private EventLoopGroup eventLoopGroup;
|
||||||
private Channel clientChannel;
|
private Channel clientChannel;
|
||||||
private Channel serverChannel;
|
private Channel serverChannel;
|
||||||
@ -65,13 +90,13 @@ public class Http2MultiplexTransportTest {
|
|||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void asyncSettingsAckWithMultiplexCodec() throws InterruptedException {
|
public void asyncSettingsAckWithMultiplexCodec() throws InterruptedException {
|
||||||
asyncSettingsAck0(new Http2MultiplexCodecBuilder(true, new HttpInboundHandler()).build(), null);
|
asyncSettingsAck0(new Http2MultiplexCodecBuilder(true, DISCARD_HANDLER).build(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void asyncSettingsAckWithMultiplexHandler() throws InterruptedException {
|
public void asyncSettingsAckWithMultiplexHandler() throws InterruptedException {
|
||||||
asyncSettingsAck0(new Http2FrameCodecBuilder(true).build(),
|
asyncSettingsAck0(new Http2FrameCodecBuilder(true).build(),
|
||||||
new Http2MultiplexHandler(new HttpInboundHandler()));
|
new Http2MultiplexHandler(DISCARD_HANDLER));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void asyncSettingsAck0(final Http2FrameCodec codec, final ChannelHandler multiplexer)
|
private void asyncSettingsAck0(final Http2FrameCodec codec, final ChannelHandler multiplexer)
|
||||||
@ -119,7 +144,7 @@ public class Http2MultiplexTransportTest {
|
|||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) {
|
protected void initChannel(Channel ch) {
|
||||||
ch.pipeline().addLast(Http2MultiplexCodecBuilder
|
ch.pipeline().addLast(Http2MultiplexCodecBuilder
|
||||||
.forClient(new HttpInboundHandler()).autoAckSettingsFrame(false).build());
|
.forClient(DISCARD_HANDLER).autoAckSettingsFrame(false).build());
|
||||||
ch.pipeline().addLast(new ChannelHandler() {
|
ch.pipeline().addLast(new ChannelHandler() {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
@ -151,6 +176,70 @@ public class Http2MultiplexTransportTest {
|
|||||||
serverAckAllLatch.await();
|
serverAckAllLatch.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ChannelHandler.Sharable
|
@Test(timeout = 5000L)
|
||||||
private static final class HttpInboundHandler implements ChannelHandler { }
|
public void testFlushNotDiscarded()
|
||||||
|
throws InterruptedException {
|
||||||
|
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ServerBootstrap sb = new ServerBootstrap();
|
||||||
|
sb.group(eventLoopGroup);
|
||||||
|
sb.channel(NioServerSocketChannel.class);
|
||||||
|
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) {
|
||||||
|
ch.pipeline().addLast(new Http2FrameCodecBuilder(true).build());
|
||||||
|
ch.pipeline().addLast(new Http2MultiplexHandler(new ChannelHandler() {
|
||||||
|
@Override
|
||||||
|
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
|
||||||
|
if (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) {
|
||||||
|
executorService.schedule(() -> {
|
||||||
|
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(
|
||||||
|
new DefaultHttp2Headers(), false)).addListener(
|
||||||
|
(ChannelFutureListener) future -> {
|
||||||
|
ctx.write(new DefaultHttp2DataFrame(
|
||||||
|
Unpooled.copiedBuffer("Hello World",
|
||||||
|
CharsetUtil.US_ASCII), true));
|
||||||
|
ctx.channel().eventLoop().execute(ctx::flush);
|
||||||
|
});
|
||||||
|
}, 500, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
serverChannel = sb.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
Bootstrap bs = new Bootstrap();
|
||||||
|
bs.group(eventLoopGroup);
|
||||||
|
bs.channel(NioSocketChannel.class);
|
||||||
|
bs.handler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) {
|
||||||
|
ch.pipeline().addLast(new Http2FrameCodecBuilder(false).build());
|
||||||
|
ch.pipeline().addLast(new Http2MultiplexHandler(DISCARD_HANDLER));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
clientChannel = bs.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
|
||||||
|
Http2StreamChannelBootstrap h2Bootstrap = new Http2StreamChannelBootstrap(clientChannel);
|
||||||
|
h2Bootstrap.handler(new ChannelHandler() {
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
if (msg instanceof Http2DataFrame && ((Http2DataFrame) msg).isEndStream()) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Http2StreamChannel streamChannel = h2Bootstrap.open().syncUninterruptibly().getNow();
|
||||||
|
streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), true))
|
||||||
|
.syncUninterruptibly();
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
} finally {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user