HTTP/2 child channel read cycle doesn't respect RecvByteBufAllocator and (#8147)

Motivation:
Http2MultiplexCodec queues data internally if data is delivered from the
parent channel but the child channel did not request data. If the parent
channel notifies of a stream closure it is possible data in the queue
will be discarded before closing the channel.
Http2MultiplexCodec interacts with RecvByteBufAllocator to control the
child channel's demand for read. However it currently only ever reads a
maximum of one time per loop. This can thrash the read loop and bloat
the call stack if auto read is on, because channelReadComplete will
re-enter the read loop synchronously, and also neglect to deliver data
during the parent's read loop (if it is active). This also meant the
readPendingQueue was not utilized as originally intended (to extend the
child channel's read loop during the parent channel's read loop if
demand for data still existed).

Modifications:
- Modify the child channel's read loop to respect the
RecvByteBufAllocator, and append to the parents readPendingQueue if
appropriate.
- Stream closure notification behaves like EPOLL and KQUEUE transports
and reads all queued data, because the data is already queued in memory
and it is known there will be no more data. This will also replenish the
connection flow control window which may otherwise be constrained by a
closed stream.

Result:
More correct read loop and less risk of dropping data.
This commit is contained in:
Scott Mitchell 2018-07-26 19:44:21 -04:00 committed by GitHub
parent 620dad0c26
commit 53b2dea3f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 471 additions and 182 deletions

View File

@ -31,10 +31,10 @@ import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.DefaultChannelPipeline; import io.netty.channel.DefaultChannelPipeline;
import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator.Handle;
import io.netty.channel.VoidChannelPromise; import io.netty.channel.VoidChannelPromise;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
@ -56,7 +56,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid; import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static java.lang.Math.min; import static java.lang.Math.min;
/** /**
@ -111,7 +110,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() { private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) {
registerDone(future); registerDone(future);
} }
}; };
@ -148,19 +147,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
} }
private static final class Http2StreamChannelRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
@Override
public MaxMessageHandle newHandle() {
return new MaxMessageHandle() {
@Override
public int guess() {
return 1024;
}
};
}
}
private final ChannelHandler inboundStreamHandler; private final ChannelHandler inboundStreamHandler;
private final ChannelHandler upgradeStreamHandler; private final ChannelHandler upgradeStreamHandler;
@ -230,7 +216,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
while (ch != null) { while (ch != null) {
DefaultHttp2StreamChannel curr = ch; DefaultHttp2StreamChannel curr = ch;
ch = curr.next; ch = curr.next;
curr.next = null; curr.next = curr.previous = null;
} }
head = tail = null; head = tail = null;
} }
@ -244,7 +230,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) { final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2StreamFrame) { if (frame instanceof Http2StreamFrame) {
Http2StreamFrame streamFrame = (Http2StreamFrame) frame; Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
onHttp2StreamFrame(((Http2MultiplexCodecStream) streamFrame.stream()).channel, streamFrame); ((Http2MultiplexCodecStream) streamFrame.stream()).channel.fireChildRead(streamFrame);
} else if (frame instanceof Http2GoAwayFrame) { } else if (frame instanceof Http2GoAwayFrame) {
onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame); onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
// Allow other handlers to act on GOAWAY frame // Allow other handlers to act on GOAWAY frame
@ -331,36 +317,46 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
} }
private void onHttp2StreamFrame(DefaultHttp2StreamChannel childChannel, Http2StreamFrame frame) { private boolean isChildChannelInReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
switch (childChannel.fireChildRead(frame)) { return childChannel.previous != null || childChannel.next != null || head == childChannel;
case READ_PROCESSED_BUT_STOP_READING: }
childChannel.fireChildReadComplete();
break; final void tryAddChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
case READ_PROCESSED_OK_TO_PROCESS_MORE: if (!isChildChannelInReadPendingQueue(childChannel)) {
addChildChannelToReadPendingQueue(childChannel); addChildChannelToReadPendingQueue(childChannel);
break;
case READ_IGNORED_CHANNEL_INACTIVE:
case READ_QUEUED:
// nothing to do:
break;
default:
throw new Error();
} }
} }
final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) { final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
if (!childChannel.fireChannelReadPending) {
assert childChannel.next == null;
if (tail == null) { if (tail == null) {
assert head == null; assert head == null;
tail = head = childChannel; tail = head = childChannel;
} else { } else {
childChannel.previous = tail;
tail.next = childChannel; tail.next = childChannel;
tail = childChannel; tail = childChannel;
} }
childChannel.fireChannelReadPending = true;
} }
private void tryRemoveChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
if (isChildChannelInReadPendingQueue(childChannel)) {
removeChildChannelFromReadPendingQueue(childChannel);
}
}
private void removeChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
DefaultHttp2StreamChannel previous = childChannel.previous;
if (childChannel.next != null) {
childChannel.next.previous = previous;
} else {
tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back.
}
if (previous != null) {
previous.next = childChannel.next;
} else {
head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward.
}
childChannel.next = childChannel.previous = null;
} }
private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) { private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
@ -387,8 +383,14 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
*/ */
@Override @Override
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
parentReadInProgress = false; try {
onChannelReadComplete(ctx); onChannelReadComplete(ctx);
} finally {
parentReadInProgress = false;
tail = head = null;
// We always flush as this is what Http2ConnectionHandler does for now.
flush0(ctx);
}
channelReadComplete0(ctx); channelReadComplete0(ctx);
} }
@ -402,23 +404,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
// If we have many child channel we can optimize for the case when multiple call flush() in // If we have many child channel we can optimize for the case when multiple call flush() in
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
// write calls on the socket which is expensive. // write calls on the socket which is expensive.
try {
DefaultHttp2StreamChannel current = head; DefaultHttp2StreamChannel current = head;
while (current != null) { while (current != null) {
DefaultHttp2StreamChannel childChannel = current; DefaultHttp2StreamChannel childChannel = current;
if (childChannel.fireChannelReadPending) {
// Clear early in case fireChildReadComplete() causes it to need to be re-processed // Clear early in case fireChildReadComplete() causes it to need to be re-processed
childChannel.fireChannelReadPending = false;
childChannel.fireChildReadComplete();
}
childChannel.next = null;
current = current.next; current = current.next;
} childChannel.next = childChannel.previous = null;
} finally { childChannel.fireChildReadComplete();
tail = head = null;
// We always flush as this is what Http2ConnectionHandler does for now.
flush0(ctx);
} }
} }
@ -447,13 +439,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
DefaultHttp2StreamChannel channel; DefaultHttp2StreamChannel channel;
} }
private enum ReadState {
READ_QUEUED,
READ_IGNORED_CHANNEL_INACTIVE,
READ_PROCESSED_BUT_STOP_READING,
READ_PROCESSED_OK_TO_PROCESS_MORE
}
private boolean initialWritability(DefaultHttp2FrameStream stream) { private boolean initialWritability(DefaultHttp2FrameStream stream) {
// If the stream id is not valid yet we will just mark the channel as writable as we will be notified // If the stream id is not valid yet we will just mark the channel as writable as we will be notified
// about non-writability state as soon as the first Http2HeaderFrame is written (if needed). // about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
@ -476,24 +461,24 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
private volatile boolean writable; private volatile boolean writable;
private boolean outboundClosed; private boolean outboundClosed;
private boolean closePending; /**
* This variable represents if a read is in progress for the current channel. Note that depending upon the
* {@link RecvByteBufAllocator} behavior a read may extend beyond the {@link Http2ChannelUnsafe#beginRead()}
* method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may drain all pending data, and then if the
* parent channel is reading this channel may still accept frames.
*/
private boolean readInProgress; private boolean readInProgress;
private Queue<Object> inboundBuffer; private Queue<Object> inboundBuffer;
/** {@code true} after the first HEADERS frame has been written **/ /** {@code true} after the first HEADERS frame has been written **/
private boolean firstFrameWritten; private boolean firstFrameWritten;
/** {@code true} if a close without an error was initiated **/ // Currently the child channel and parent channel are always on the same EventLoop thread. This allows us to
private boolean streamClosedWithoutError; // extend the read loop of a child channel if the child channel drains its queued data during read, and the
// parent channel is still in its read loop. The next/previous links build a doubly linked list that the parent
// Keeps track of flush calls in channelReadComplete(...) and aggregate these. // channel will iterate in its channelReadComplete to end the read cycle for each child channel in the list.
private boolean inFireChannelReadComplete;
boolean fireChannelReadPending;
// Holds the reference to the next DefaultHttp2StreamChannel that should be processed in
// channelReadComplete(...)
DefaultHttp2StreamChannel next; DefaultHttp2StreamChannel next;
DefaultHttp2StreamChannel previous;
DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) { DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
this.stream = stream; this.stream = stream;
@ -521,13 +506,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
void streamClosed() { void streamClosed() {
streamClosedWithoutError = true; unsafe.readEOS();
if (readInProgress) { // Attempt to drain any queued data from the queue and deliver it to the application before closing this
// Just call closeForcibly() as this will take care of fireChannelInactive(). // channel.
unsafe().closeForcibly(); unsafe.doBeginRead();
} else {
closePending = true;
}
} }
@Override @Override
@ -771,49 +753,48 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
* Receive a read message. This does not notify handlers unless a read is in progress on the * Receive a read message. This does not notify handlers unless a read is in progress on the
* channel. * channel.
*/ */
ReadState fireChildRead(Http2Frame frame) { void fireChildRead(Http2Frame frame) {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
if (!isActive()) { if (!isActive()) {
ReferenceCountUtil.release(frame); ReferenceCountUtil.release(frame);
return ReadState.READ_IGNORED_CHANNEL_INACTIVE; } else if (readInProgress) {
} // If readInProgress there cannot be anything in the queue, otherwise we would have drained it from the
if (readInProgress && (inboundBuffer == null || inboundBuffer.isEmpty())) { // queue and processed it during the read cycle.
// Check for null because inboundBuffer doesn't support null; we want to be consistent assert inboundBuffer == null || inboundBuffer.isEmpty();
// for what values are supported. final Handle allocHandle = unsafe.recvBufAllocHandle();
RecvByteBufAllocator.ExtendedHandle allocHandle = unsafe.recvBufAllocHandle();
unsafe.doRead0(frame, allocHandle); unsafe.doRead0(frame, allocHandle);
return allocHandle.continueReading() ? // We currently don't need to check for readEOS because the parent channel and child channel are limited
ReadState.READ_PROCESSED_OK_TO_PROCESS_MORE : ReadState.READ_PROCESSED_BUT_STOP_READING; // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
// cost of additional readComplete notifications on the rare path.
if (allocHandle.continueReading()) {
tryAddChildChannelToReadPendingQueue(this);
} else {
tryRemoveChildChannelFromReadPendingQueue(this);
unsafe.notifyReadComplete(allocHandle);
}
} else { } else {
if (inboundBuffer == null) { if (inboundBuffer == null) {
inboundBuffer = new ArrayDeque<Object>(4); inboundBuffer = new ArrayDeque<Object>(4);
} }
inboundBuffer.add(frame); inboundBuffer.add(frame);
return ReadState.READ_QUEUED;
} }
} }
void fireChildReadComplete() { void fireChildReadComplete() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
try { assert readInProgress;
if (readInProgress) { unsafe.notifyReadComplete(unsafe.recvBufAllocHandle());
inFireChannelReadComplete = true;
readInProgress = false;
unsafe().recvBufAllocHandle().readComplete();
pipeline().fireChannelReadComplete();
}
} finally {
inFireChannelReadComplete = false;
}
} }
private final class Http2ChannelUnsafe implements Unsafe { private final class Http2ChannelUnsafe implements Unsafe {
private final VoidChannelPromise unsafeVoidPromise = private final VoidChannelPromise unsafeVoidPromise =
new VoidChannelPromise(DefaultHttp2StreamChannel.this, false); new VoidChannelPromise(DefaultHttp2StreamChannel.this, false);
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private RecvByteBufAllocator.ExtendedHandle recvHandle; private Handle recvHandle;
private boolean writeDoneAndNoFlush; private boolean writeDoneAndNoFlush;
private boolean closeInitiated; private boolean closeInitiated;
private boolean readEOS;
@Override @Override
public void connect(final SocketAddress remoteAddress, public void connect(final SocketAddress remoteAddress,
@ -825,9 +806,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
@Override @Override
public RecvByteBufAllocator.ExtendedHandle recvBufAllocHandle() { public Handle recvBufAllocHandle() {
if (recvHandle == null) { if (recvHandle == null) {
recvHandle = (RecvByteBufAllocator.ExtendedHandle) config().getRecvByteBufAllocator().newHandle(); recvHandle = config().getRecvByteBufAllocator().newHandle();
recvHandle.reset(config());
} }
return recvHandle; return recvHandle;
} }
@ -892,7 +874,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
// This means close() was called before so we just register a listener and return // This means close() was called before so we just register a listener and return
closePromise.addListener(new ChannelFutureListener() { closePromise.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) {
promise.setSuccess(); promise.setSuccess();
} }
}); });
@ -901,15 +883,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
closeInitiated = true; closeInitiated = true;
closePending = false; tryRemoveChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel.this);
fireChannelReadPending = false;
final boolean wasActive = isActive(); final boolean wasActive = isActive();
// Only ever send a reset frame if the connection is still alive and if the stream may have existed // Only ever send a reset frame if the connection is still alive and if the stream may have existed
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error. // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
if (parent().isActive() && !streamClosedWithoutError && if (parent().isActive() && !readEOS && connection().streamMayHaveExisted(stream().id())) {
connection().streamMayHaveExisted(stream().id())) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream()); Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
write(resetFrame, unsafe().voidPromise()); write(resetFrame, unsafe().voidPromise());
flush(); flush();
@ -1009,58 +989,64 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return; return;
} }
readInProgress = true; readInProgress = true;
doBeginRead();
}
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); void doBeginRead() {
allocHandle.reset(config()); Object message;
if (inboundBuffer == null || inboundBuffer.isEmpty()) { if (inboundBuffer == null || (message = inboundBuffer.poll()) == null) {
if (closePending) { if (readEOS) {
unsafe.closeForcibly(); unsafe.closeForcibly();
} }
return; } else {
} final Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config());
// We have already checked that the queue is not empty, so before this value is used it will always be boolean continueReading = false;
// set by allocHandle.continueReading().
boolean continueReading;
do { do {
Object m = inboundBuffer.poll(); doRead0((Http2Frame) message, allocHandle);
if (m == null) { } while ((readEOS || (continueReading = allocHandle.continueReading())) &&
continueReading = false; (message = inboundBuffer.poll()) != null);
break;
}
doRead0((Http2Frame) m, allocHandle);
} while (continueReading = allocHandle.continueReading());
if (continueReading && parentReadInProgress) { if (continueReading && parentReadInProgress && !readEOS) {
// We don't know if more frames will be delivered in the parent channel's read loop, so add this // Currently the parent and child channel are on the same EventLoop thread. If the parent is
// channel to the channelReadComplete queue to be notified later. // currently reading it is possile that more frames will be delivered to this child channel. In
// the case that this child channel still wants to read we delay the channelReadComplete on this
// child channel until the parent is done reading.
assert !isChildChannelInReadPendingQueue(DefaultHttp2StreamChannel.this);
addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this); addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this);
} else { } else {
// Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent notifyReadComplete(allocHandle);
// channel is not currently reading we need to force a flush at the child channel, because we cannot }
// rely upon flush occurring in channelReadComplete on the parent channel. }
}
void readEOS() {
readEOS = true;
}
void notifyReadComplete(Handle allocHandle) {
assert next == null && previous == null;
readInProgress = false; readInProgress = false;
allocHandle.readComplete(); allocHandle.readComplete();
pipeline().fireChannelReadComplete(); pipeline().fireChannelReadComplete();
// Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
// channel is not currently reading we need to force a flush at the child channel, because we cannot
// rely upon flush occurring in channelReadComplete on the parent channel.
flush(); flush();
if (closePending) { if (readEOS) {
unsafe.closeForcibly(); unsafe.closeForcibly();
} }
} }
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) { void doRead0(Http2Frame frame, Handle allocHandle) {
int numBytesToBeConsumed = 0;
if (frame instanceof Http2DataFrame) {
numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
allocHandle.lastBytesRead(numBytesToBeConsumed);
} else {
allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
}
allocHandle.incMessagesRead(1);
pipeline().fireChannelRead(frame); pipeline().fireChannelRead(frame);
allocHandle.incMessagesRead(1);
if (frame instanceof Http2DataFrame) {
final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
allocHandle.attemptedBytesRead(numBytesToBeConsumed);
allocHandle.lastBytesRead(numBytesToBeConsumed);
if (numBytesToBeConsumed != 0) { if (numBytesToBeConsumed != 0) {
try { try {
writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed); writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed);
@ -1068,6 +1054,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
pipeline().fireExceptionCaught(e); pipeline().fireExceptionCaught(e);
} }
} }
} else {
allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE);
allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
}
} }
@Override @Override
@ -1104,7 +1094,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} else { } else {
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) {
firstWriteComplete(future, promise); firstWriteComplete(future, promise);
} }
}); });
@ -1126,7 +1116,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} else { } else {
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) {
writeComplete(future, promise); writeComplete(future, promise);
} }
}); });
@ -1197,18 +1187,16 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
@Override @Override
public void flush() { public void flush() {
if (!writeDoneAndNoFlush) { // If we are currently in the parent channel's read loop we should just ignore the flush.
// We will ensure we trigger ctx.flush() after we processed all Channels later on and
// so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
// write(...) or writev(...) operation on the socket.
if (!writeDoneAndNoFlush || parentReadInProgress) {
// There is nothing to flush so this is a NOOP. // There is nothing to flush so this is a NOOP.
return; return;
} }
try { try {
// If we are currently in the channelReadComplete(...) call we should just ignore the flush.
// We will ensure we trigger ctx.flush() after we processed all Channels later on and
// so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
// write(...) or writev(...) operation on the socket.
if (!inFireChannelReadComplete) {
flush0(ctx); flush0(ctx);
}
} finally { } finally {
writeDoneAndNoFlush = false; writeDoneAndNoFlush = false;
} }
@ -1232,10 +1220,8 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
* changes. * changes.
*/ */
private final class Http2StreamChannelConfig extends DefaultChannelConfig { private final class Http2StreamChannelConfig extends DefaultChannelConfig {
Http2StreamChannelConfig(Channel channel) { Http2StreamChannelConfig(Channel channel) {
super(channel); super(channel);
setRecvByteBufAllocator(new Http2StreamChannelRecvByteBufAllocator());
} }
@Override @Override

View File

@ -28,8 +28,14 @@ import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpScheme; import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.LastInboundHandler.Consumer;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
@ -38,12 +44,6 @@ import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static io.netty.util.ReferenceCountUtil.release; import static io.netty.util.ReferenceCountUtil.release;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -743,9 +743,197 @@ public class Http2MultiplexCodecTest {
childChannel.closeFuture().syncUninterruptibly(); childChannel.closeFuture().syncUninterruptibly();
} }
@Test
public void endOfStreamDoesNotDiscardData() {
AtomicInteger numReads = new AtomicInteger(1);
final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean();
Consumer<ChannelHandlerContext> ctxConsumer = new Consumer<ChannelHandlerContext>() {
@Override
public void accept(ChannelHandlerContext obj) {
if (shouldDisableAutoRead.get()) {
obj.channel().config().setAutoRead(false);
}
}
};
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer);
Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
childChannel.config().setAutoRead(false);
Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream);
Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream);
Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream);
Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream);
assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound());
// We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
parentChannel.writeOneInbound(new Object());
codec.onHttp2Frame(dataFrame1);
assertEquals(dataFrame1, inboundHandler.readInbound());
// Deliver frames, and then a stream closed while read is inactive.
codec.onHttp2Frame(dataFrame2);
codec.onHttp2Frame(dataFrame3);
codec.onHttp2Frame(dataFrame4);
shouldDisableAutoRead.set(true);
childChannel.config().setAutoRead(true);
numReads.set(1);
inboundStream.state = Http2Stream.State.CLOSED;
codec.onHttp2StreamStateChanged(inboundStream);
// Detecting EOS should flush all pending data regardless of read calls.
assertEquals(dataFrame2, inboundHandler.readInbound());
assertEquals(dataFrame3, inboundHandler.readInbound());
assertEquals(dataFrame4, inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
// Now we want to call channelReadComplete and simulate the end of the read loop.
parentChannel.flushInbound();
childChannel.closeFuture().syncUninterruptibly();
dataFrame1.release();
dataFrame2.release();
dataFrame3.release();
dataFrame4.release();
}
@Test
public void childQueueIsDrainedAndNewDataIsDispatchedInParentReadLoopAutoRead() {
AtomicInteger numReads = new AtomicInteger(1);
final AtomicInteger channelReadCompleteCount = new AtomicInteger(0);
final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean();
Consumer<ChannelHandlerContext> ctxConsumer = new Consumer<ChannelHandlerContext>() {
@Override
public void accept(ChannelHandlerContext obj) {
channelReadCompleteCount.incrementAndGet();
if (shouldDisableAutoRead.get()) {
obj.channel().config().setAutoRead(false);
}
}
};
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer);
Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
childChannel.config().setAutoRead(false);
Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream);
Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream);
Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream);
Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream);
assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound());
// We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
parentChannel.writeOneInbound(new Object());
codec.onHttp2Frame(dataFrame1);
assertEquals(dataFrame1, inboundHandler.readInbound());
// We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that
// when beginRead() is called the child channel is added to the readPending queue of the parent channel.
codec.onHttp2Frame(dataFrame2);
numReads.set(10);
shouldDisableAutoRead.set(true);
childChannel.config().setAutoRead(true);
codec.onHttp2Frame(dataFrame3);
codec.onHttp2Frame(dataFrame4);
// Detecting EOS should flush all pending data regardless of read calls.
assertEquals(dataFrame2, inboundHandler.readInbound());
assertEquals(dataFrame3, inboundHandler.readInbound());
assertEquals(dataFrame4, inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
// Now we want to call channelReadComplete and simulate the end of the read loop.
parentChannel.flushInbound();
// 3 = 1 for initialization + 1 for read when auto read was off + 1 for when auto read was back on
assertEquals(3, channelReadCompleteCount.get());
dataFrame1.release();
dataFrame2.release();
dataFrame3.release();
dataFrame4.release();
}
@Test
public void childQueueIsDrainedAndNewDataIsDispatchedInParentReadLoopNoAutoRead() {
AtomicInteger numReads = new AtomicInteger(1);
final AtomicInteger channelReadCompleteCount = new AtomicInteger(0);
final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean();
Consumer<ChannelHandlerContext> ctxConsumer = new Consumer<ChannelHandlerContext>() {
@Override
public void accept(ChannelHandlerContext obj) {
channelReadCompleteCount.incrementAndGet();
if (shouldDisableAutoRead.get()) {
obj.channel().config().setAutoRead(false);
}
}
};
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer);
Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
childChannel.config().setAutoRead(false);
Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream);
Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream);
Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream);
Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream);
assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound());
// We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
parentChannel.writeOneInbound(new Object());
codec.onHttp2Frame(dataFrame1);
assertEquals(dataFrame1, inboundHandler.readInbound());
// We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that
// when beginRead() is called the child channel is added to the readPending queue of the parent channel.
codec.onHttp2Frame(dataFrame2);
numReads.set(2);
childChannel.read();
assertEquals(dataFrame2, inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
// This is the second item that was read, this should be the last until we call read() again. This should also
// notify of readComplete().
codec.onHttp2Frame(dataFrame3);
assertEquals(dataFrame3, inboundHandler.readInbound());
codec.onHttp2Frame(dataFrame4);
assertNull(inboundHandler.readInbound());
childChannel.read();
assertEquals(dataFrame4, inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
// Now we want to call channelReadComplete and simulate the end of the read loop.
parentChannel.flushInbound();
// 3 = 1 for initialization + 1 for first read of 2 items + 1 for second read of 2 items +
// 1 for parent channel readComplete
assertEquals(4, channelReadCompleteCount.get());
dataFrame1.release();
dataFrame2.release();
dataFrame3.release();
dataFrame4.release();
}
private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream) { private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream) {
LastInboundHandler inboundHandler = new LastInboundHandler(); return streamActiveAndWriteHeaders(stream, null, LastInboundHandler.<ChannelHandlerContext>noopConsumer());
}
private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream,
AtomicInteger maxReads,
Consumer<ChannelHandlerContext> contextConsumer) {
LastInboundHandler inboundHandler = new LastInboundHandler(contextConsumer);
childChannelInitializer.handler = inboundHandler; childChannelInitializer.handler = inboundHandler;
childChannelInitializer.maxReads = maxReads;
assertFalse(inboundHandler.isChannelActive()); assertFalse(inboundHandler.isChannelActive());
((TestableHttp2MultiplexCodec.Stream) stream).state = Http2Stream.State.OPEN; ((TestableHttp2MultiplexCodec.Stream) stream).state = Http2Stream.State.OPEN;
codec.onHttp2StreamStateChanged(stream); codec.onHttp2StreamStateChanged(stream);

View File

@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
/** /**
@ -34,11 +35,36 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
*/ */
public class LastInboundHandler extends ChannelDuplexHandler { public class LastInboundHandler extends ChannelDuplexHandler {
private final List<Object> queue = new ArrayList<Object>(); private final List<Object> queue = new ArrayList<Object>();
private final Consumer<ChannelHandlerContext> channelReadCompleteConsumer;
private Throwable lastException; private Throwable lastException;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private boolean channelActive; private boolean channelActive;
private String writabilityStates = ""; private String writabilityStates = "";
// TODO(scott): use JDK 8's Consumer
public interface Consumer<T> {
void accept(T obj);
}
private static final Consumer<Object> NOOP_CONSUMER = new Consumer<Object>() {
@Override
public void accept(Object obj) {
}
};
@SuppressWarnings("unchecked")
public static <T> Consumer<T> noopConsumer() {
return (Consumer<T>) NOOP_CONSUMER;
}
public LastInboundHandler() {
this(LastInboundHandler.<ChannelHandlerContext>noopConsumer());
}
public LastInboundHandler(Consumer<ChannelHandlerContext> channelReadCompleteConsumer) {
this.channelReadCompleteConsumer = checkNotNull(channelReadCompleteConsumer, "channelReadCompleteConsumer");
}
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx); super.handlerAdded(ctx);
@ -86,6 +112,11 @@ public class LastInboundHandler extends ChannelDuplexHandler {
queue.add(msg); queue.add(msg);
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
channelReadCompleteConsumer.accept(ctx);
}
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
queue.add(new UserEvent(evt)); queue.add(new UserEvent(evt));

View File

@ -16,10 +16,17 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Channel initializer useful in tests. * Channel initializer useful in tests.
@ -27,6 +34,7 @@ import io.netty.channel.ChannelInitializer;
@Sharable @Sharable
public class TestChannelInitializer extends ChannelInitializer<Channel> { public class TestChannelInitializer extends ChannelInitializer<Channel> {
ChannelHandler handler; ChannelHandler handler;
AtomicInteger maxReads;
@Override @Override
public void initChannel(Channel channel) { public void initChannel(Channel channel) {
@ -34,5 +42,81 @@ public class TestChannelInitializer extends ChannelInitializer<Channel> {
channel.pipeline().addLast(handler); channel.pipeline().addLast(handler);
handler = null; handler = null;
} }
if (maxReads != null) {
channel.config().setRecvByteBufAllocator(new TestNumReadsRecvByteBufAllocator(maxReads));
}
}
/**
* Designed to read a single byte at a time to control the number of reads done at a fine granularity.
*/
private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
private final AtomicInteger numReads;
TestNumReadsRecvByteBufAllocator(AtomicInteger numReads) {
this.numReads = numReads;
}
@Override
public ExtendedHandle newHandle() {
return new ExtendedHandle() {
private int attemptedBytesRead;
private int lastBytesRead;
private int numMessagesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess(), guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
numMessagesRead = 0;
}
@Override
public void incMessagesRead(int numMessages) {
numMessagesRead += numMessages;
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return numMessagesRead < numReads.get();
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return continueReading();
}
@Override
public void readComplete() {
// Nothing needs to be done or adjusted after each read cycle is completed.
}
};
}
} }
} }