Try to mark child channel writable again once the parent channel becomes writable (#9254)
Motivation:
f945a071db
decoupled the writability state from the flow controller but could lead to the situation of a lot of writability updates events were propagated to the child channels. This change ensure we only take into account if the parent channel becomes writable again before we try to set the child channels to writable.
Modifications:
Only listen for channel writability changes for if the parent channel becomes writable again.
Result:
Less writability updates.
This commit is contained in:
parent
d9db218291
commit
e3be64fd9c
@ -146,6 +146,15 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
private final ChannelHandler inboundStreamHandler;
|
||||
private final ChannelHandler upgradeStreamHandler;
|
||||
|
||||
private final Http2FrameStreamVisitor writableVisitor = new Http2FrameStreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2FrameStream stream) {
|
||||
final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
|
||||
childChannel.trySetWritable();
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
private boolean parentReadInProgress;
|
||||
private int idCount;
|
||||
|
||||
@ -381,17 +390,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
|
||||
@Override
|
||||
public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
|
||||
forEachActiveStream(new Http2FrameStreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2FrameStream stream) {
|
||||
final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
|
||||
// As the writability may change during visiting active streams we need to ensure we always fetch
|
||||
// the current writability state of the channel.
|
||||
childChannel.updateWritability(ctx.channel().isWritable());
|
||||
return true;
|
||||
}
|
||||
});
|
||||
super.channelWritabilityChanged(ctx);
|
||||
if (ctx.channel().isWritable()) {
|
||||
// While the writability state may change during iterating of the streams we just set all of the streams
|
||||
// to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
|
||||
forEachActiveStream(writableVisitor);
|
||||
}
|
||||
|
||||
ctx.fireChannelWritabilityChanged();
|
||||
}
|
||||
|
||||
final void onChannelReadComplete(ChannelHandlerContext ctx) {
|
||||
@ -516,28 +521,17 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
// as writable again. Before doing so we also need to ensure the parent channel is writable to
|
||||
// prevent excessive buffering in the parent outbound buffer. If the parent is not writable
|
||||
// we will mark the child channel as writable once the parent becomes writable by calling
|
||||
// updateWritability later.
|
||||
// trySetWritable() later.
|
||||
if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
|
||||
setWritable(invokeLater);
|
||||
}
|
||||
}
|
||||
|
||||
void updateWritability(boolean parentIsWritable) {
|
||||
if (parentIsWritable) {
|
||||
// The parent is writable again but the child channel itself may still not be writable.
|
||||
// Lets try to set the child channel writable to match the state of the parent channel
|
||||
// if (and only if) the totalPendingSize is smaller then the low water-mark.
|
||||
// If this is not the case we will try again later once we drop under it.
|
||||
trySetWritable();
|
||||
} else {
|
||||
// No matter what the current totalPendingSize for the child channel is as soon as the parent
|
||||
// channel is unwritable we also need to mark the child channel as unwritable to try to keep
|
||||
// buffering to a minimum.
|
||||
setUnwritable(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void trySetWritable() {
|
||||
void trySetWritable() {
|
||||
// The parent is writable again but the child channel itself may still not be writable.
|
||||
// Lets try to set the child channel writable to match the state of the parent channel
|
||||
// if (and only if) the totalPendingSize is smaller then the low water-mark.
|
||||
// If this is not the case we will try again later once we drop under it.
|
||||
if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
|
||||
setWritable(false);
|
||||
}
|
||||
@ -621,7 +615,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
|
||||
@Override
|
||||
public boolean isWritable() {
|
||||
return unwritable == 0 && parent().isWritable();
|
||||
return unwritable == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -686,18 +686,36 @@ public class Http2MultiplexCodecTest {
|
||||
long bytesBeforeUnwritable = childChannel.bytesBeforeUnwritable();
|
||||
assertNotEquals(0, bytesBeforeUnwritable);
|
||||
// Add something to the ChannelOutboundBuffer of the parent to simulate queuing in the parents channel buffer
|
||||
// and verify that this also effects the child channel in terms of writability.
|
||||
// and verify that this only affect the writability of the parent channel while the child stays writable
|
||||
// until it used all of its credits.
|
||||
parentChannel.unsafe().outboundBuffer().addMessage(
|
||||
Unpooled.buffer().writeZero(800), 800, parentChannel.voidPromise());
|
||||
assertFalse(parentChannel.isWritable());
|
||||
assertFalse(childChannel.isWritable());
|
||||
assertEquals(0, childChannel.bytesBeforeUnwritable());
|
||||
|
||||
assertTrue(childChannel.isWritable());
|
||||
assertEquals(4096, childChannel.bytesBeforeUnwritable());
|
||||
|
||||
// Flush everything which simulate writing everything to the socket.
|
||||
parentChannel.flush();
|
||||
assertTrue(parentChannel.isWritable());
|
||||
assertTrue(childChannel.isWritable());
|
||||
assertEquals(bytesBeforeUnwritable, childChannel.bytesBeforeUnwritable());
|
||||
|
||||
ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2DataFrame(
|
||||
Unpooled.buffer().writeZero((int) bytesBeforeUnwritable)));
|
||||
assertFalse(childChannel.isWritable());
|
||||
assertTrue(parentChannel.isWritable());
|
||||
|
||||
parentChannel.flush();
|
||||
assertFalse(future.isDone());
|
||||
assertTrue(parentChannel.isWritable());
|
||||
assertFalse(childChannel.isWritable());
|
||||
|
||||
// Now write an window update frame for the stream which then should ensure we will flush the bytes that were
|
||||
// queued in the RemoteFlowController before for the stream.
|
||||
frameInboundWriter.writeInboundWindowUpdate(childChannel.stream().id(), (int) bytesBeforeUnwritable);
|
||||
assertTrue(childChannel.isWritable());
|
||||
assertTrue(future.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user