Writability state of http2 child channels should be decoupled from the flow-controller (#9235)

Motivation:

We should decouple the writability state of the http2 child channels from the flow-controller and just tie it to its own pending bytes counter that is decremented by the parent Channel once the bytes were written.

Modifications:

- Decouple writability state of child channels from flow-contoller
- Update tests

Result:

Less coupling and more correct behavior. Fixes https://github.com/netty/netty/issues/8148.
This commit is contained in:
Norman Maurer 2019-06-18 09:37:59 +02:00
parent 1a487a0ff9
commit aaf5ec1fbb
2 changed files with 191 additions and 109 deletions

View File

@ -41,7 +41,6 @@ import io.netty.util.DefaultAttributeMap;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -52,6 +51,8 @@ import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
@ -136,10 +137,15 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
}
}
private static final AtomicLongFieldUpdater<DefaultHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "totalPendingSize");
private static final AtomicIntegerFieldUpdater<DefaultHttp2StreamChannel> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "unwritable");
private final ChannelHandler inboundStreamHandler;
private final ChannelHandler upgradeStreamHandler;
private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
private boolean parentReadInProgress;
private int idCount;
@ -220,21 +226,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
if (frame instanceof Http2StreamFrame) {
Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
((Http2MultiplexCodecStream) streamFrame.stream()).channel.fireChildRead(streamFrame);
} else if (frame instanceof Http2GoAwayFrame) {
return;
}
if (frame instanceof Http2GoAwayFrame) {
onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
// Allow other handlers to act on GOAWAY frame
ctx.fireChannelRead(frame);
} else if (frame instanceof Http2SettingsFrame) {
Http2Settings settings = ((Http2SettingsFrame) frame).settings();
if (settings.initialWindowSize() != null) {
initialOutboundStreamWindow = settings.initialWindowSize();
}
// Allow other handlers to act on SETTINGS frame
// Send frames down the pipeline
ctx.fireChannelRead(frame);
} else {
// Send any other frames down the pipeline
ctx.fireChannelRead(frame);
}
}
private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) {
@ -284,11 +282,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
}
}
@Override
final void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream, boolean writable) {
(((Http2MultiplexCodecStream) stream).channel).writabilityChanged(writable);
}
// TODO: This is most likely not the best way to expose this, need to think more about it.
final Http2StreamChannel newOutboundStream() {
return new DefaultHttp2StreamChannel(newStream(), true);
@ -386,6 +379,21 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
super.channelRead(ctx, msg);
}
@Override
public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
forEachActiveStream(new Http2FrameStreamVisitor() {
@Override
public boolean visit(Http2FrameStream stream) {
final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
// As the writability may change during visiting active streams we need to ensure we always fetch
// the current writability state of the channel.
childChannel.updateWritability(ctx.channel().isWritable());
return true;
}
});
super.channelWritabilityChanged(ctx);
}
final void onChannelReadComplete(ChannelHandlerContext ctx) {
// If we have many child channel we can optimize for the case when multiple call flush() in
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
@ -408,13 +416,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
DefaultHttp2StreamChannel channel;
}
private boolean initialWritability(DefaultHttp2FrameStream stream) {
// If the stream id is not valid yet we will just mark the channel as writable as we will be notified
// about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
// This should be good enough and simplify things a lot.
return !isStreamIdValid(stream.id()) || isWritable(stream);
}
/**
* The current status of the read-processing for a {@link Http2StreamChannel}.
*/
@ -435,7 +436,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
REQUESTED
}
// TODO: Handle writability changes due writing from outside the eventloop.
private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
@ -446,8 +446,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
private final boolean outbound;
private volatile boolean registered;
// We start with the writability of the channel when creating the StreamChannel.
private volatile boolean writable;
// Needs to be package-private to be able to access it from the outer-class AtomicLongFieldUpdater.
volatile long totalPendingSize;
volatile int unwritable;
// Cached to reduce GC
private Runnable fireChannelWritabilityChangedTask;
private boolean outboundClosed;
@ -474,23 +479,114 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
this.stream = stream;
this.outbound = outbound;
writable = initialWritability(stream);
((Http2MultiplexCodecStream) stream).channel = this;
pipeline = new DefaultChannelPipeline(this) {
@Override
protected void incrementPendingOutboundBytes(long size) {
// Do thing for now
DefaultHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
}
@Override
protected void decrementPendingOutboundBytes(long size) {
// Do thing for now
DefaultHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
}
};
closePromise = pipeline.newPromise();
channelId = new Http2StreamChannelId(parent().id(), ++idCount);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// Once the totalPendingSize dropped below the low water-mark we can mark the child channel
// as writable again. Before doing so we also need to ensure the parent channel is writable to
// prevent excessive buffering in the parent outbound buffer. If the parent is not writable
// we will mark the child channel as writable once the parent becomes writable by calling
// updateWritability later.
if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
setWritable(invokeLater);
}
}
void updateWritability(boolean parentIsWritable) {
if (parentIsWritable) {
// The parent is writable again but the child channel itself may still not be writable.
// Lets try to set the child channel writable to match the state of the parent channel
// if (and only if) the totalPendingSize is smaller then the low water-mark.
// If this is not the case we will try again later once we drop under it.
trySetWritable();
} else {
// No matter what the current totalPendingSize for the child channel is as soon as the parent
// channel is unwritable we also need to mark the child channel as unwritable to try to keep
// buffering to a minimum.
setUnwritable(false);
}
}
private void trySetWritable() {
if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
setWritable(false);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
@Override
public Http2FrameStream stream() {
return stream;
@ -525,7 +621,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override
public boolean isWritable() {
return writable;
return unwritable == 0 && parent().isWritable();
}
@Override
@ -565,13 +661,25 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override
public long bytesBeforeUnwritable() {
// TODO: Do a proper impl
return config().getWriteBufferHighWaterMark();
long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize;
// If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
// writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
// synchronized together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? bytes : 0;
}
return 0;
}
@Override
public long bytesBeforeWritable() {
// TODO: Do a proper impl
long bytes = totalPendingSize - config().getWriteBufferLowWaterMark();
// If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
// Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
// together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? 0 : bytes;
}
return 0;
}
@ -741,15 +849,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return parent().toString() + "(H2 - " + stream + ')';
}
void writabilityChanged(boolean writable) {
assert eventLoop().inEventLoop();
if (writable != this.writable && isActive()) {
// Only notify if we received a state change.
this.writable = writable;
pipeline().fireChannelWritabilityChanged();
}
}
/**
* Receive a read message. This does not notify handlers unless a read is in progress on the
* channel.
@ -1101,12 +1200,16 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return;
}
firstFrameWritten = true;
ChannelFuture future = write0(frame);
if (future.isDone()) {
firstWriteComplete(future, promise);
ChannelFuture f = write0(frame);
if (f.isDone()) {
firstWriteComplete(f, promise);
} else {
future.addListener((ChannelFutureListener) future12 ->
firstWriteComplete(future12, promise));
final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg);
incrementPendingOutboundBytes(bytes, false);
f.addListener((ChannelFutureListener) future -> {
firstWriteComplete(future, promise);
decrementPendingOutboundBytes(bytes, false);
});
}
return;
}
@ -1119,11 +1222,16 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return;
}
ChannelFuture future = write0(msg);
if (future.isDone()) {
writeComplete(future, promise);
ChannelFuture f = write0(msg);
if (f.isDone()) {
writeComplete(f, promise);
} else {
future.addListener((ChannelFutureListener) future1 -> writeComplete(future1, promise));
final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg);
incrementPendingOutboundBytes(bytes, false);
f.addListener((ChannelFutureListener) future -> {
writeComplete(future, promise);
decrementPendingOutboundBytes(bytes, false);
});
}
} catch (Throwable t) {
promise.tryFailure(t);
@ -1135,9 +1243,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
Throwable cause = future.cause();
if (cause == null) {
// As we just finished our first write which made the stream-id valid we need to re-evaluate
// the writability of the channel.
writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
promise.setSuccess();
} else {
// If the first write fails there is not much we can do, just close
@ -1230,49 +1335,16 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
super(channel);
}
@Override
public int getWriteBufferHighWaterMark() {
return min(parent().config().getWriteBufferHighWaterMark(), initialOutboundStreamWindow);
}
@Override
public int getWriteBufferLowWaterMark() {
return min(parent().config().getWriteBufferLowWaterMark(), initialOutboundStreamWindow);
}
@Override
public MessageSizeEstimator getMessageSizeEstimator() {
return FlowControlledFrameSizeEstimator.INSTANCE;
}
@Override
public WriteBufferWaterMark getWriteBufferWaterMark() {
int mark = getWriteBufferHighWaterMark();
return new WriteBufferWaterMark(mark, mark);
}
@Override
public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {

View File

@ -21,7 +21,9 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpScheme;
@ -52,6 +54,7 @@ import static io.netty.handler.codec.http2.Http2TestUtil.bb;
import static io.netty.util.ReferenceCountUtil.release;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -656,38 +659,45 @@ public class Http2MultiplexCodecTest {
parentChannel.flush();
// Test for initial window size
assertEquals(initialRemoteStreamWindow, childChannel.config().getWriteBufferHighWaterMark());
assertTrue(initialRemoteStreamWindow < childChannel.config().getWriteBufferHighWaterMark());
assertTrue(childChannel.isWritable());
childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(16 * 1024 * 1024)));
assertEquals(0, childChannel.bytesBeforeUnwritable());
assertFalse(childChannel.isWritable());
}
@Test
public void writabilityAndFlowControl() {
LastInboundHandler inboundHandler = new LastInboundHandler();
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
assertEquals("", inboundHandler.writabilityStates());
public void writabilityOfParentIsRespected() {
Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter());
childChannel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(2048, 4096));
parentChannel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(256, 512));
assertTrue(childChannel.isWritable());
// HEADERS frames are not flow controlled, so they should not affect the flow control window.
assertTrue(parentChannel.isActive());
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true);
parentChannel.flush();
assertTrue(childChannel.isWritable());
assertEquals("", inboundHandler.writabilityStates());
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true);
childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(256)));
assertTrue(childChannel.isWritable());
assertEquals("", inboundHandler.writabilityStates());
childChannel.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(512)));
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false);
long bytesBeforeUnwritable = childChannel.bytesBeforeUnwritable();
assertNotEquals(0, bytesBeforeUnwritable);
// Add something to the ChannelOutboundBuffer of the parent to simulate queuing in the parents channel buffer
// and verify that this also effects the child channel in terms of writability.
parentChannel.unsafe().outboundBuffer().addMessage(
Unpooled.buffer().writeZero(800), 800, parentChannel.voidPromise());
assertFalse(parentChannel.isWritable());
assertFalse(childChannel.isWritable());
assertEquals("false", inboundHandler.writabilityStates());
assertEquals(0, childChannel.bytesBeforeUnwritable());
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false);
assertFalse(childChannel.isWritable());
assertEquals("false", inboundHandler.writabilityStates());
// Flush everything which simulate writing everything to the socket.
parentChannel.flush();
assertTrue(parentChannel.isWritable());
assertTrue(childChannel.isWritable());
assertEquals(bytesBeforeUnwritable, childChannel.bytesBeforeUnwritable());
}
@Test