Correctly update writability state of Http2StreamChannel created by Http2MultiplexCodec.

Motivation:

We missed to mark the Http2StreamChannel as writable in some cases which could lead to the situation that a Channel never becomes writable. Also when a Http2StreamChannel was created we always marked it non-writable at the beginning which means if the user will only start writing once the Channel becomes writable it will never happen as it only became writable after the first header was written.

Modifications:

- Correctly handle updates for writability in all cases
- Change unit tests to cover this.

Result:

Fixes [#7179].
This commit is contained in:
Norman Maurer 2017-09-06 09:05:51 +02:00
parent 15611dadbb
commit bf0a53179a
4 changed files with 63 additions and 13 deletions

View File

@ -582,6 +582,11 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
ctx.fireExceptionCaught(cause);
}
final boolean isWritable(DefaultHttp2FrameStream stream) {
Http2Stream s = stream.stream;
return s != null && connection().remote().flowController().isWritable(s);
}
private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
@Override
public void writabilityChanged(Http2Stream stream) {
@ -601,7 +606,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
static class DefaultHttp2FrameStream implements Http2FrameStream {
private volatile int id = -1;
private volatile Http2Stream stream;
volatile Http2Stream stream;
DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
assert id == -1 || stream.id() == id;

View File

@ -231,16 +231,17 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override
final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
Http2MultiplexCodecStream s = (Http2MultiplexCodecStream) stream;
switch (stream.state()) {
case HALF_CLOSED_REMOTE:
case OPEN:
if (((Http2MultiplexCodecStream) stream).channel != null) {
if (s.channel != null) {
// ignore if child channel was already created.
break;
}
// fall-trough
DefaultHttp2StreamChannel childChannel = new DefaultHttp2StreamChannel(stream, false);
ChannelFuture future = ctx.channel().eventLoop().register(childChannel);
ChannelFuture future = ctx.channel().eventLoop().register(new DefaultHttp2StreamChannel(s, false));
if (future.isDone()) {
registerDone(future);
} else {
@ -248,7 +249,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
}
break;
case CLOSED:
DefaultHttp2StreamChannel channel = ((Http2MultiplexCodecStream) stream).channel;
DefaultHttp2StreamChannel channel = s.channel;
if (channel != null) {
channel.streamClosed();
}
@ -395,17 +396,25 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
READ_PROCESSED_OK_TO_PROCESS_MORE
}
private boolean initialWritability(DefaultHttp2FrameStream stream) {
// If the stream id is not valid yet we will just mark the channel as writable as we will be notified
// about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
// This should be good enough and simplify things a lot.
return !isStreamIdValid(stream.id()) || isWritable(stream);
}
// TODO: Handle writability changes due writing from outside the eventloop.
private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
private final ChannelId channelId;
private final ChannelPipeline pipeline;
private final Http2FrameStream stream;
private final DefaultHttp2FrameStream stream;
private final ChannelPromise closePromise;
private final boolean outbound;
private volatile boolean registered;
// We start with the writability of the channel when creating the StreamChannel.
private volatile boolean writable;
private boolean closePending;
@ -428,9 +437,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
// channelReadComplete(...)
DefaultHttp2StreamChannel next;
DefaultHttp2StreamChannel(Http2FrameStream stream, boolean outbound) {
DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
this.stream = stream;
this.outbound = outbound;
writable = initialWritability(stream);
((Http2MultiplexCodecStream) stream).channel = this;
pipeline = new DefaultChannelPipeline(this) {
@Override
@ -484,8 +494,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override
public boolean isWritable() {
// TODO: Should we also take the parent channel into account ?
return isStreamIdValid(stream.id()) && writable;
return writable;
}
@Override
@ -1011,6 +1020,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
Throwable cause = future.cause();
if (cause == null) {
// As we just finished our first write which made the stream-id valid we need to re-evaluate
// the writability of the channel.
writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
promise.setSuccess();
} else {
promise.setFailure(cause);

View File

@ -24,6 +24,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
@ -74,7 +75,34 @@ public class Http2MultiplexCodecBuilderTest {
@Override
protected void initChannel(Channel ch) throws Exception {
serverConnectedChannel = ch;
ch.pipeline().addLast(new Http2MultiplexCodecBuilder(true, serverLastInboundHandler).build());
ch.pipeline().addLast(new Http2MultiplexCodecBuilder(true, new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
private boolean writable;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
writable |= ctx.channel().isWritable();
super.channelActive(ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writable |= ctx.channel().isWritable();
super.channelWritabilityChanged(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
assertTrue(writable);
super.channelInactive(ctx);
}
});
ch.pipeline().addLast(serverLastInboundHandler);
}
}).build());
serverChannelLatch.countDown();
}
});
@ -120,7 +148,7 @@ public class Http2MultiplexCodecBuilderTest {
}
@Test
public void multipleOutboundStreams() {
public void multipleOutboundStreams() throws Exception {
Http2StreamChannel childChannel1 = newOutboundStream(new TestChannelInitializer());
assertTrue(childChannel1.isActive());
assertFalse(isStreamIdValid(childChannel1.stream().id()));
@ -148,10 +176,12 @@ public class Http2MultiplexCodecBuilderTest {
childChannel1.close();
childChannel2.close();
serverLastInboundHandler.checkException();
}
@Test
public void createOutboundStream() {
public void createOutboundStream() throws Exception {
Channel childChannel = newOutboundStream(new TestChannelInitializer());
assertTrue(childChannel.isRegistered());
assertTrue(childChannel.isActive());
@ -178,10 +208,13 @@ public class Http2MultiplexCodecBuilderTest {
Http2ResetFrame rstFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(rstFrame);
assertEquals(3, rstFrame.stream().id());
serverLastInboundHandler.checkException();
}
@Sharable
private static class SharableLastInboundHandler extends LastInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();

View File

@ -372,7 +372,7 @@ public class Http2MultiplexCodecTest {
Http2StreamChannel childChannel = newOutboundStream();
assertTrue(childChannel.isActive());
assertFalse(childChannel.isWritable());
assertTrue(childChannel.isWritable());
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
parentChannel.flush();