HTTP/2 Child Channel reading and flushing

Motivation:
If a child channel's read is triggered outside the parent channel's read
loop then it is possible a WINDOW_UPDATE will be written, but not
flushed.
If a child channel's beginRead processes data from the inboundBuffer and
then readPending is set to false, which will result in data not being
delivered if in the parent's read loop and more data is attempted to be
delievered to that child channel.

Modifications:
- The child channel must force a flush if a frame is written as a result
of reading a frame, and this is not in the parent channel's read loop
- The child channel must allow a transition from dequeueing from
beginRead into the parent channel's read loop to deliver more data

Result:
The child channel flushes data when reading outside the parent's read
loop, and has frames delivered more reliably.
This commit is contained in:
Scott Mitchell 2017-10-23 14:53:53 -07:00
parent 413c7c2cd8
commit 911b2acc50
4 changed files with 91 additions and 56 deletions

View File

@ -310,9 +310,9 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
localFlow.initialWindowSize(targetConnectionWindow);
}
final void consumeBytes(int streamId, int bytes) throws Http2Exception {
final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
Http2Stream stream = connection().stream(streamId);
connection().local().flowController().consumeBytes(stream, bytes);
return connection().local().flowController().consumeBytes(stream, bytes);
}
private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {

View File

@ -158,6 +158,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
// TODO: We may be able to optimize when we really need to call flush(...) during channelReadComplete(...)
// by checking if this is true and only then call flush(...).
private boolean flushNeeded;
private boolean parentReadInProgress;
private int idCount;
// Linked-List for DefaultHttp2StreamChannel instances that need to be processed by channelReadComplete(...)
@ -288,18 +289,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
childChannel.fireChildReadComplete();
break;
case READ_PROCESSED_OK_TO_PROCESS_MORE:
if (!childChannel.fireChannelReadPending) {
assert childChannel.next == null;
if (tail == null) {
assert head == null;
tail = head = childChannel;
} else {
tail.next = childChannel;
tail = childChannel;
}
childChannel.fireChannelReadPending = true;
}
addChildChannelToReadPendingQueue(childChannel);
break;
case READ_IGNORED_CHANNEL_INACTIVE:
case READ_QUEUED:
@ -310,6 +300,21 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
}
}
final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
if (!childChannel.fireChannelReadPending) {
assert childChannel.next == null;
if (tail == null) {
assert head == null;
tail = head = childChannel;
} else {
tail.next = childChannel;
tail = childChannel;
}
childChannel.fireChannelReadPending = true;
}
}
private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
try {
forEachActiveStream(new Http2FrameStreamVisitor() {
@ -337,10 +342,17 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
*/
@Override
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
parentReadInProgress = false;
onChannelReadComplete(ctx);
channelReadComplete0(ctx);
}
@Override
public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
parentReadInProgress = true;
super.channelRead(ctx, msg);
}
final void onChannelReadComplete(ChannelHandlerContext ctx) {
// If we have many child channel we can optimize for the case when multiple call flush() in
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
@ -378,10 +390,19 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
flush(ctx);
}
// Allow to override for testing
void onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx,
/**
* Return bytes to flow control.
* <p>
* Package private to allow to override for testing
* @param ctx The {@link ChannelHandlerContext} associated with the parent channel.
* @param stream The object representing the HTTP/2 stream.
* @param bytes The number of bytes to return to flow control.
* @return {@code true} if a frame has been written as a result of this method call.
* @throws Http2Exception If this operation violates the flow control limits.
*/
boolean onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx,
Http2FrameStream stream, int bytes) throws Http2Exception {
consumeBytes(stream.id(), bytes);
return consumeBytes(stream.id(), bytes);
}
// Allow to extend for testing
@ -697,7 +718,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override
public String toString() {
return parent().toString() + "(HTTP/2 - " + stream + ')';
return parent().toString() + "(H2 - " + stream + ')';
}
void writabilityChanged(boolean writable) {
@ -709,13 +730,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
}
}
private Queue<Object> inboundBuffer() {
if (inboundBuffer == null) {
inboundBuffer = new ArrayDeque<Object>(4);
}
return inboundBuffer;
}
/**
* Receive a read message. This does not notify handlers unless a read is in progress on the
* channel.
@ -726,8 +740,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
ReferenceCountUtil.release(frame);
return ReadState.READ_IGNORED_CHANNEL_INACTIVE;
}
if (readInProgress) {
assert inboundBuffer == null || inboundBuffer.isEmpty();
if (readInProgress && (inboundBuffer == null || inboundBuffer.isEmpty())) {
// Check for null because inboundBuffer doesn't support null; we want to be consistent
// for what values are supported.
RecvByteBufAllocator.ExtendedHandle allocHandle = unsafe.recvBufAllocHandle();
@ -735,7 +748,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return allocHandle.continueReading() ?
ReadState.READ_PROCESSED_OK_TO_PROCESS_MORE : ReadState.READ_PROCESSED_BUT_STOP_READING;
} else {
inboundBuffer().add(frame);
if (inboundBuffer == null) {
inboundBuffer = new ArrayDeque<Object>(4);
}
inboundBuffer.add(frame);
return ReadState.READ_QUEUED;
}
}
@ -902,32 +918,45 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
if (readInProgress || !isActive()) {
return;
}
readInProgress = true;
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config());
if (inboundBuffer == null) {
if (inboundBuffer == null || inboundBuffer.isEmpty()) {
if (closePending) {
unsafe.closeForcibly();
} else {
readInProgress = true;
}
return;
}
// We have already checked that the queue is not empty, so before this value is used it will always be
// set by allocHandle.continueReading().
boolean continueReading;
do {
Object m = inboundBuffer.poll();
if (m == null) {
if (closePending) {
pipeline().fireChannelReadComplete();
unsafe.closeForcibly();
}
return;
continueReading = false;
break;
}
doRead0((Http2Frame) m, allocHandle);
} while (allocHandle.continueReading());
} while (continueReading = allocHandle.continueReading());
allocHandle.readComplete();
pipeline().fireChannelReadComplete();
if (continueReading && parentReadInProgress) {
// We don't know if more frames will be delivered in the parent channel's read loop, so add this
// channel to the channelReadComplete queue to be notified later.
addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this);
} else {
// Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
// channel is not currently reading we need to force a flush at the child channel, because we cannot
// rely upon flush occurring in channelReadComplete on the parent channel.
readInProgress = false;
allocHandle.readComplete();
pipeline().fireChannelReadComplete();
flush();
if (closePending) {
unsafe.closeForcibly();
}
}
}
@SuppressWarnings("deprecation")
@ -944,7 +973,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
if (numBytesToBeConsumed != 0) {
try {
onBytesConsumed(ctx, stream, numBytesToBeConsumed);
writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed);
} catch (Http2Exception e) {
pipeline().fireExceptionCaught(e);
}
@ -1057,22 +1086,22 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override
public void flush() {
if (writeDoneAndNoFlush) {
try {
// If we are current channelReadComplete(...) call we should just mark this Channel with a flush
// pending. We will ensure we trigger ctx.flush() after we processed all Channels later on and
// so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
// write(...) or writev(...) operation on the socket.
if (inFireChannelReadComplete) {
flushPending = true;
} else {
flush0(ctx);
}
} finally {
writeDoneAndNoFlush = false;
}
} else {
if (!writeDoneAndNoFlush) {
// There is nothing to flush so this is a NOOP.
return;
}
try {
// If we are current channelReadComplete(...) call we should just mark this Channel with a flush
// pending. We will ensure we trigger ctx.flush() after we processed all Channels later on and
// so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
// write(...) or writev(...) operation on the socket.
if (inFireChannelReadComplete) {
flushPending = true;
} else {
flush0(ctx);
}
} finally {
writeDoneAndNoFlush = false;
}
}

View File

@ -68,4 +68,9 @@ final class Http2StreamChannelId implements ChannelId {
Http2StreamChannelId otherId = (Http2StreamChannelId) obj;
return id == otherId.id && parentId.equals(otherId.parentId);
}
@Override
public String toString() {
return asShortText();
}
}

View File

@ -506,8 +506,9 @@ public class Http2MultiplexCodecTest {
}
@Override
void onBytesConsumed(ChannelHandlerContext ctx, Http2FrameStream stream, int bytes) {
boolean onBytesConsumed(ChannelHandlerContext ctx, Http2FrameStream stream, int bytes) {
writer.write(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream), ctx.newPromise());
return true;
}
@Override