Writability state of http2 child channels should be decoupled from the flow-controller (#9235)

Motivation:

We should decouple the writability state of the http2 child channels from the flow-controller and just tie it to its own pending bytes counter that is decremented by the parent Channel once the bytes were written.

Modifications:

- Decouple writability state of child channels from flow-contoller
- Update tests

Result:

Less coupling and more correct behavior. Fixes https://github.com/netty/netty/issues/8148.
This commit is contained in:
Norman Maurer 2019-06-18 09:37:59 +02:00 committed by GitHub
parent b1fb40e42d
commit f945a071db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 187 additions and 108 deletions

View File

@ -41,7 +41,6 @@ import io.netty.util.DefaultAttributeMap;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted; import io.netty.util.ReferenceCounted;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -52,6 +51,8 @@ import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid; import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
@ -146,10 +147,15 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
} }
private static final AtomicLongFieldUpdater<DefaultHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "totalPendingSize");
private static final AtomicIntegerFieldUpdater<DefaultHttp2StreamChannel> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "unwritable");
private final ChannelHandler inboundStreamHandler; private final ChannelHandler inboundStreamHandler;
private final ChannelHandler upgradeStreamHandler; private final ChannelHandler upgradeStreamHandler;
private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
private boolean parentReadInProgress; private boolean parentReadInProgress;
private int idCount; private int idCount;
@ -230,21 +236,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
if (frame instanceof Http2StreamFrame) { if (frame instanceof Http2StreamFrame) {
Http2StreamFrame streamFrame = (Http2StreamFrame) frame; Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
((Http2MultiplexCodecStream) streamFrame.stream()).channel.fireChildRead(streamFrame); ((Http2MultiplexCodecStream) streamFrame.stream()).channel.fireChildRead(streamFrame);
} else if (frame instanceof Http2GoAwayFrame) { return;
onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
// Allow other handlers to act on GOAWAY frame
ctx.fireChannelRead(frame);
} else if (frame instanceof Http2SettingsFrame) {
Http2Settings settings = ((Http2SettingsFrame) frame).settings();
if (settings.initialWindowSize() != null) {
initialOutboundStreamWindow = settings.initialWindowSize();
}
// Allow other handlers to act on SETTINGS frame
ctx.fireChannelRead(frame);
} else {
// Send any other frames down the pipeline
ctx.fireChannelRead(frame);
} }
if (frame instanceof Http2GoAwayFrame) {
onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
}
// Send frames down the pipeline
ctx.fireChannelRead(frame);
} }
private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) { private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) {
@ -294,11 +292,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
} }
@Override
final void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream, boolean writable) {
(((Http2MultiplexCodecStream) stream).channel).writabilityChanged(writable);
}
// TODO: This is most likely not the best way to expose this, need to think more about it. // TODO: This is most likely not the best way to expose this, need to think more about it.
final Http2StreamChannel newOutboundStream() { final Http2StreamChannel newOutboundStream() {
return new DefaultHttp2StreamChannel(newStream(), true); return new DefaultHttp2StreamChannel(newStream(), true);
@ -399,6 +392,21 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
} }
@Override
public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
forEachActiveStream(new Http2FrameStreamVisitor() {
@Override
public boolean visit(Http2FrameStream stream) {
final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
// As the writability may change during visiting active streams we need to ensure we always fetch
// the current writability state of the channel.
childChannel.updateWritability(ctx.channel().isWritable());
return true;
}
});
super.channelWritabilityChanged(ctx);
}
final void onChannelReadComplete(ChannelHandlerContext ctx) { final void onChannelReadComplete(ChannelHandlerContext ctx) {
// If we have many child channel we can optimize for the case when multiple call flush() in // If we have many child channel we can optimize for the case when multiple call flush() in
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
@ -421,13 +429,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
DefaultHttp2StreamChannel channel; DefaultHttp2StreamChannel channel;
} }
private boolean initialWritability(DefaultHttp2FrameStream stream) {
// If the stream id is not valid yet we will just mark the channel as writable as we will be notified
// about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
// This should be good enough and simplify things a lot.
return !isStreamIdValid(stream.id()) || isWritable(stream);
}
/** /**
* The current status of the read-processing for a {@link Http2StreamChannel}. * The current status of the read-processing for a {@link Http2StreamChannel}.
*/ */
@ -448,7 +449,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
REQUESTED REQUESTED
} }
// TODO: Handle writability changes due writing from outside the eventloop.
private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel { private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this); private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe(); private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
@ -459,8 +459,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
private final boolean outbound; private final boolean outbound;
private volatile boolean registered; private volatile boolean registered;
// We start with the writability of the channel when creating the StreamChannel.
private volatile boolean writable; // Needs to be package-private to be able to access it from the outer-class AtomicLongFieldUpdater.
volatile long totalPendingSize;
volatile int unwritable;
// Cached to reduce GC
private Runnable fireChannelWritabilityChangedTask;
private boolean outboundClosed; private boolean outboundClosed;
@ -487,23 +492,114 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) { DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
this.stream = stream; this.stream = stream;
this.outbound = outbound; this.outbound = outbound;
writable = initialWritability(stream);
((Http2MultiplexCodecStream) stream).channel = this; ((Http2MultiplexCodecStream) stream).channel = this;
pipeline = new DefaultChannelPipeline(this) { pipeline = new DefaultChannelPipeline(this) {
@Override @Override
protected void incrementPendingOutboundBytes(long size) { protected void incrementPendingOutboundBytes(long size) {
// Do thing for now DefaultHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
} }
@Override @Override
protected void decrementPendingOutboundBytes(long size) { protected void decrementPendingOutboundBytes(long size) {
// Do thing for now DefaultHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
} }
}; };
closePromise = pipeline.newPromise(); closePromise = pipeline.newPromise();
channelId = new Http2StreamChannelId(parent().id(), ++idCount); channelId = new Http2StreamChannelId(parent().id(), ++idCount);
} }
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// Once the totalPendingSize dropped below the low water-mark we can mark the child channel
// as writable again. Before doing so we also need to ensure the parent channel is writable to
// prevent excessive buffering in the parent outbound buffer. If the parent is not writable
// we will mark the child channel as writable once the parent becomes writable by calling
// updateWritability later.
if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
setWritable(invokeLater);
}
}
void updateWritability(boolean parentIsWritable) {
if (parentIsWritable) {
// The parent is writable again but the child channel itself may still not be writable.
// Lets try to set the child channel writable to match the state of the parent channel
// if (and only if) the totalPendingSize is smaller then the low water-mark.
// If this is not the case we will try again later once we drop under it.
trySetWritable();
} else {
// No matter what the current totalPendingSize for the child channel is as soon as the parent
// channel is unwritable we also need to mark the child channel as unwritable to try to keep
// buffering to a minimum.
setUnwritable(false);
}
}
private void trySetWritable() {
if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
setWritable(false);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
@Override @Override
public Http2FrameStream stream() { public Http2FrameStream stream() {
return stream; return stream;
@ -538,7 +634,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override @Override
public boolean isWritable() { public boolean isWritable() {
return writable; return unwritable == 0 && parent().isWritable();
} }
@Override @Override
@ -578,13 +674,25 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override @Override
public long bytesBeforeUnwritable() { public long bytesBeforeUnwritable() {
// TODO: Do a proper impl long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize;
return config().getWriteBufferHighWaterMark(); // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
// writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
// synchronized together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? bytes : 0;
}
return 0;
} }
@Override @Override
public long bytesBeforeWritable() { public long bytesBeforeWritable() {
// TODO: Do a proper impl long bytes = totalPendingSize - config().getWriteBufferLowWaterMark();
// If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
// Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
// together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? 0 : bytes;
}
return 0; return 0;
} }
@ -744,15 +852,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return parent().toString() + "(H2 - " + stream + ')'; return parent().toString() + "(H2 - " + stream + ')';
} }
void writabilityChanged(boolean writable) {
assert eventLoop().inEventLoop();
if (writable != this.writable && isActive()) {
// Only notify if we received a state change.
this.writable = writable;
pipeline().fireChannelWritabilityChanged();
}
}
/** /**
* Receive a read message. This does not notify handlers unless a read is in progress on the * Receive a read message. This does not notify handlers unless a read is in progress on the
* channel. * channel.
@ -1105,14 +1204,17 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return; return;
} }
firstFrameWritten = true; firstFrameWritten = true;
ChannelFuture future = write0(frame); ChannelFuture f = write0(frame);
if (future.isDone()) { if (f.isDone()) {
firstWriteComplete(future, promise); firstWriteComplete(f, promise);
} else { } else {
future.addListener(new ChannelFutureListener() { final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg);
incrementPendingOutboundBytes(bytes, false);
f.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
firstWriteComplete(future, promise); firstWriteComplete(future, promise);
decrementPendingOutboundBytes(bytes, false);
} }
}); });
} }
@ -1127,14 +1229,17 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return; return;
} }
ChannelFuture future = write0(msg); ChannelFuture f = write0(msg);
if (future.isDone()) { if (f.isDone()) {
writeComplete(future, promise); writeComplete(f, promise);
} else { } else {
future.addListener(new ChannelFutureListener() { final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg);
incrementPendingOutboundBytes(bytes, false);
f.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
writeComplete(future, promise); writeComplete(future, promise);
decrementPendingOutboundBytes(bytes, false);
} }
}); });
} }
@ -1148,9 +1253,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) { private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
Throwable cause = future.cause(); Throwable cause = future.cause();
if (cause == null) { if (cause == null) {
// As we just finished our first write which made the stream-id valid we need to re-evaluate
// the writability of the channel.
writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
promise.setSuccess(); promise.setSuccess();
} else { } else {
// If the first write fails there is not much we can do, just close // If the first write fails there is not much we can do, just close
@ -1243,49 +1345,16 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
super(channel); super(channel);
} }
@Override
public int getWriteBufferHighWaterMark() {
return min(parent().config().getWriteBufferHighWaterMark(), initialOutboundStreamWindow);
}
@Override
public int getWriteBufferLowWaterMark() {
return min(parent().config().getWriteBufferLowWaterMark(), initialOutboundStreamWindow);
}
@Override @Override
public MessageSizeEstimator getMessageSizeEstimator() { public MessageSizeEstimator getMessageSizeEstimator() {
return FlowControlledFrameSizeEstimator.INSTANCE; return FlowControlledFrameSizeEstimator.INSTANCE;
} }
@Override
public WriteBufferWaterMark getWriteBufferWaterMark() {
int mark = getWriteBufferHighWaterMark();
return new WriteBufferWaterMark(mark, mark);
}
@Override @Override
public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
@Deprecated
public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
throw new UnsupportedOperationException();
}
@Override @Override
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) { if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {

View File

@ -22,7 +22,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpScheme; import io.netty.handler.codec.http.HttpScheme;
@ -53,6 +55,7 @@ import static io.netty.handler.codec.http2.Http2TestUtil.bb;
import static io.netty.util.ReferenceCountUtil.release; import static io.netty.util.ReferenceCountUtil.release;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -674,38 +677,45 @@ public class Http2MultiplexCodecTest {
parentChannel.flush(); parentChannel.flush();
// Test for initial window size // Test for initial window size
assertEquals(initialRemoteStreamWindow, childChannel.config().getWriteBufferHighWaterMark()); assertTrue(initialRemoteStreamWindow < childChannel.config().getWriteBufferHighWaterMark());
assertTrue(childChannel.isWritable()); assertTrue(childChannel.isWritable());
childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(16 * 1024 * 1024))); childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(16 * 1024 * 1024)));
assertEquals(0, childChannel.bytesBeforeUnwritable());
assertFalse(childChannel.isWritable()); assertFalse(childChannel.isWritable());
} }
@Test @Test
public void writabilityAndFlowControl() { public void writabilityOfParentIsRespected() {
LastInboundHandler inboundHandler = new LastInboundHandler(); Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter());
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); childChannel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(2048, 4096));
assertEquals("", inboundHandler.writabilityStates()); parentChannel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(256, 512));
assertTrue(childChannel.isWritable()); assertTrue(childChannel.isWritable());
// HEADERS frames are not flow controlled, so they should not affect the flow control window. assertTrue(parentChannel.isActive());
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true); parentChannel.flush();
assertTrue(childChannel.isWritable()); assertTrue(childChannel.isWritable());
assertEquals("", inboundHandler.writabilityStates()); childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(256)));
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true);
assertTrue(childChannel.isWritable()); assertTrue(childChannel.isWritable());
assertEquals("", inboundHandler.writabilityStates()); childChannel.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(512)));
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false); long bytesBeforeUnwritable = childChannel.bytesBeforeUnwritable();
assertNotEquals(0, bytesBeforeUnwritable);
// Add something to the ChannelOutboundBuffer of the parent to simulate queuing in the parents channel buffer
// and verify that this also effects the child channel in terms of writability.
parentChannel.unsafe().outboundBuffer().addMessage(
Unpooled.buffer().writeZero(800), 800, parentChannel.voidPromise());
assertFalse(parentChannel.isWritable());
assertFalse(childChannel.isWritable()); assertFalse(childChannel.isWritable());
assertEquals("false", inboundHandler.writabilityStates()); assertEquals(0, childChannel.bytesBeforeUnwritable());
codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false); // Flush everything which simulate writing everything to the socket.
assertFalse(childChannel.isWritable()); parentChannel.flush();
assertEquals("false", inboundHandler.writabilityStates()); assertTrue(parentChannel.isWritable());
assertTrue(childChannel.isWritable());
assertEquals(bytesBeforeUnwritable, childChannel.bytesBeforeUnwritable());
} }
@Test @Test