Reduce coupeling between Http2FrameCodec and Http2Multiplex* (#9273)
Motivation: Http2MultiplexCodec and Http2MultiplexHandler had a very strong coupling with Http2FrameCodec which we can reduce easily. The end-goal should be to have no coupling at all. Modifications: - Reduce coupling by move some common logic to Http2CodecUtil - Move logic to check if a stream may have existed before to Http2FrameCodec - Use ArrayDeque as replacement for custom double-linked-list which makes the code a lot more readable - Use WindowUpdateFrame to signal consume bytes (just as users do when they use Http2FrameCodec directly) Result: Less coupling and cleaner code.
This commit is contained in:
parent
78adeb5408
commit
3e681ab513
@ -135,6 +135,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
private Runnable fireChannelWritabilityChangedTask;
|
||||
|
||||
private boolean outboundClosed;
|
||||
private int flowControlledBytes;
|
||||
|
||||
/**
|
||||
* This variable represents if a read is in progress for the current channel or was requested.
|
||||
@ -148,13 +149,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
|
||||
/** {@code true} after the first HEADERS frame has been written **/
|
||||
private boolean firstFrameWritten;
|
||||
|
||||
// Currently the child channel and parent channel are always on the same EventLoop thread. This allows us to
|
||||
// extend the read loop of a child channel if the child channel drains its queued data during read, and the
|
||||
// parent channel is still in its read loop. The next/previous links build a doubly linked list that the parent
|
||||
// channel will iterate in its channelReadComplete to end the read cycle for each child channel in the list.
|
||||
AbstractHttp2StreamChannel next;
|
||||
AbstractHttp2StreamChannel previous;
|
||||
private boolean readCompletePending;
|
||||
|
||||
AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
|
||||
this.stream = stream;
|
||||
@ -534,16 +529,18 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
// otherwise we would have drained it from the queue and processed it during the read cycle.
|
||||
assert inboundBuffer == null || inboundBuffer.isEmpty();
|
||||
final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
|
||||
unsafe.doRead0(frame, allocHandle);
|
||||
flowControlledBytes += unsafe.doRead0(frame, allocHandle);
|
||||
// We currently don't need to check for readEOS because the parent channel and child channel are limited
|
||||
// to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
|
||||
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
|
||||
// cost of additional readComplete notifications on the rare path.
|
||||
if (allocHandle.continueReading()) {
|
||||
tryAddChildChannelToReadPendingQueue();
|
||||
if (!readCompletePending) {
|
||||
readCompletePending = true;
|
||||
addChannelToReadCompletePendingQueue();
|
||||
}
|
||||
} else {
|
||||
tryRemoveChildChannelFromReadPendingQueue();
|
||||
unsafe.notifyReadComplete(allocHandle);
|
||||
unsafe.notifyReadComplete(allocHandle, true);
|
||||
}
|
||||
} else {
|
||||
if (inboundBuffer == null) {
|
||||
@ -555,8 +552,8 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
|
||||
void fireChildReadComplete() {
|
||||
assert eventLoop().inEventLoop();
|
||||
assert readStatus != ReadStatus.IDLE;
|
||||
unsafe.notifyReadComplete(unsafe.recvBufAllocHandle());
|
||||
assert readStatus != ReadStatus.IDLE || !readCompletePending;
|
||||
unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false);
|
||||
}
|
||||
|
||||
private final class Http2ChannelUnsafe implements Unsafe {
|
||||
@ -648,14 +645,16 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
return;
|
||||
}
|
||||
closeInitiated = true;
|
||||
|
||||
tryRemoveChildChannelFromReadPendingQueue();
|
||||
// Just set to false as removing from an underlying queue would even be more expensive.
|
||||
readCompletePending = false;
|
||||
|
||||
final boolean wasActive = isActive();
|
||||
|
||||
// Only ever send a reset frame if the connection is still alive and if the stream may have existed
|
||||
updateLocalWindowIfNeeded();
|
||||
|
||||
// Only ever send a reset frame if the connection is still alive and if the stream was created before
|
||||
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
|
||||
if (parent().isActive() && !readEOS && streamMayHaveExisted(stream())) {
|
||||
if (parent().isActive() && !readEOS && Http2CodecUtil.isStreamIdValid(stream.id())) {
|
||||
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
|
||||
write(resetFrame, unsafe().voidPromise());
|
||||
flush();
|
||||
@ -776,7 +775,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
allocHandle.reset(config());
|
||||
boolean continueReading = false;
|
||||
do {
|
||||
doRead0((Http2Frame) message, allocHandle);
|
||||
flowControlledBytes += doRead0((Http2Frame) message, allocHandle);
|
||||
} while ((readEOS || (continueReading = allocHandle.continueReading())) &&
|
||||
(message = inboundBuffer.poll()) != null);
|
||||
|
||||
@ -785,10 +784,12 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
// currently reading it is possile that more frames will be delivered to this child channel. In
|
||||
// the case that this child channel still wants to read we delay the channelReadComplete on this
|
||||
// child channel until the parent is done reading.
|
||||
boolean added = tryAddChildChannelToReadPendingQueue();
|
||||
assert added;
|
||||
if (!readCompletePending) {
|
||||
readCompletePending = true;
|
||||
addChannelToReadCompletePendingQueue();
|
||||
}
|
||||
} else {
|
||||
notifyReadComplete(allocHandle);
|
||||
notifyReadComplete(allocHandle, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -797,13 +798,30 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
readEOS = true;
|
||||
}
|
||||
|
||||
void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle) {
|
||||
assert next == null && previous == null;
|
||||
private void updateLocalWindowIfNeeded() {
|
||||
if (flowControlledBytes != 0) {
|
||||
int bytes = flowControlledBytes;
|
||||
flowControlledBytes = 0;
|
||||
write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
|
||||
writeDoneAndNoFlush = true;
|
||||
}
|
||||
}
|
||||
|
||||
void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete) {
|
||||
if (!readCompletePending && !forceReadComplete) {
|
||||
return;
|
||||
}
|
||||
// Set to false just in case we added the channel multiple times before.
|
||||
readCompletePending = false;
|
||||
|
||||
if (readStatus == ReadStatus.REQUESTED) {
|
||||
readStatus = ReadStatus.IN_PROGRESS;
|
||||
} else {
|
||||
readStatus = ReadStatus.IDLE;
|
||||
}
|
||||
|
||||
updateLocalWindowIfNeeded();
|
||||
|
||||
allocHandle.readComplete();
|
||||
pipeline().fireChannelReadComplete();
|
||||
if (config().isAutoRead()) {
|
||||
@ -820,7 +838,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
|
||||
int doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
|
||||
pipeline().fireChannelRead(frame);
|
||||
allocHandle.incMessagesRead(1);
|
||||
|
||||
@ -828,21 +846,12 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
|
||||
allocHandle.attemptedBytesRead(numBytesToBeConsumed);
|
||||
allocHandle.lastBytesRead(numBytesToBeConsumed);
|
||||
if (numBytesToBeConsumed != 0) {
|
||||
try {
|
||||
if (consumeBytes(stream, numBytesToBeConsumed)) {
|
||||
// We wrote some WINDOW_UPDATE frame, so we may need to do a flush.
|
||||
writeDoneAndNoFlush = true;
|
||||
flush();
|
||||
}
|
||||
} catch (Http2Exception e) {
|
||||
pipeline().fireExceptionCaught(e);
|
||||
}
|
||||
}
|
||||
return numBytesToBeConsumed;
|
||||
} else {
|
||||
allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE);
|
||||
allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1033,10 +1042,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
return promise;
|
||||
}
|
||||
|
||||
protected abstract boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception;
|
||||
protected abstract boolean isParentReadInProgress();
|
||||
protected abstract boolean streamMayHaveExisted(Http2FrameStream stream);
|
||||
protected abstract void tryRemoveChildChannelFromReadPendingQueue();
|
||||
protected abstract boolean tryAddChildChannelToReadPendingQueue();
|
||||
protected abstract void addChannelToReadCompletePendingQueue();
|
||||
protected abstract ChannelHandlerContext parentContext();
|
||||
}
|
||||
|
@ -91,16 +91,4 @@ public abstract class Http2ChannelDuplexHandler implements ChannelHandler {
|
||||
}
|
||||
return (Http2FrameCodec) frameCodecCtx.handler();
|
||||
}
|
||||
|
||||
boolean isValidLocalStreamId(Http2FrameStream stream) {
|
||||
return frameCodec.connection().local().isValidStreamId(stream.id());
|
||||
}
|
||||
|
||||
boolean streamMayHaveExisted(Http2FrameStream stream) {
|
||||
return frameCodec.connection().streamMayHaveExisted(stream.id());
|
||||
}
|
||||
|
||||
boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception {
|
||||
return frameCodec.consumeBytes(stream.id(), bytes);
|
||||
}
|
||||
}
|
||||
|
@ -151,6 +151,10 @@ public final class Http2CodecUtil {
|
||||
return streamId >= 0;
|
||||
}
|
||||
|
||||
static boolean isStreamIdValid(int streamId, boolean server) {
|
||||
return isStreamIdValid(streamId) && server == ((streamId & 1) == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether or not the given value for max frame size falls within the valid range.
|
||||
*/
|
||||
|
@ -294,7 +294,16 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
||||
}
|
||||
} else if (msg instanceof Http2ResetFrame) {
|
||||
Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
|
||||
encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
|
||||
int id = rstFrame.stream().id();
|
||||
// Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a
|
||||
// stream in an invalid state and cause a connection error.
|
||||
if (connection().streamMayHaveExisted(id)) {
|
||||
encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
|
||||
} else {
|
||||
ReferenceCountUtil.release(rstFrame);
|
||||
promise.setFailure(Http2Exception.streamError(
|
||||
rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
|
||||
}
|
||||
} else if (msg instanceof Http2PingFrame) {
|
||||
Http2PingFrame frame = (Http2PingFrame) msg;
|
||||
encoder().writePing(ctx, frame.ack(), frame.content(), promise);
|
||||
|
@ -27,6 +27,9 @@ import io.netty.util.ReferenceCounted;
|
||||
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
@ -84,14 +87,14 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
|
||||
private final ChannelHandler inboundStreamHandler;
|
||||
private final ChannelHandler upgradeStreamHandler;
|
||||
private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
|
||||
new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
|
||||
// Choose 100 which is what is used most of the times as default.
|
||||
Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
|
||||
|
||||
private boolean parentReadInProgress;
|
||||
private int idCount;
|
||||
|
||||
// Linked-List for Http2MultiplexCodecStreamChannel instances that need to be processed by channelReadComplete(...)
|
||||
private AbstractHttp2StreamChannel head;
|
||||
private AbstractHttp2StreamChannel tail;
|
||||
|
||||
// Need to be volatile as accessed from within the Http2MultiplexCodecStreamChannel in a multi-threaded fashion.
|
||||
volatile ChannelHandlerContext ctx;
|
||||
|
||||
@ -131,14 +134,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
|
||||
super.handlerRemoved0(ctx);
|
||||
|
||||
// Unlink the linked list to guard against GC nepotism.
|
||||
AbstractHttp2StreamChannel ch = head;
|
||||
while (ch != null) {
|
||||
AbstractHttp2StreamChannel curr = ch;
|
||||
ch = curr.next;
|
||||
curr.next = curr.previous = null;
|
||||
}
|
||||
head = tail = null;
|
||||
readCompletePendingQueue.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -221,50 +217,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isChildChannelInReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
return childChannel.previous != null || childChannel.next != null || head == childChannel;
|
||||
}
|
||||
|
||||
private boolean tryAddChildChannelToReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
if (!isChildChannelInReadPendingQueue(childChannel)) {
|
||||
addChildChannelToReadPendingQueue(childChannel);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void addChildChannelToReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
if (tail == null) {
|
||||
assert head == null;
|
||||
tail = head = childChannel;
|
||||
} else {
|
||||
childChannel.previous = tail;
|
||||
tail.next = childChannel;
|
||||
tail = childChannel;
|
||||
}
|
||||
}
|
||||
|
||||
private void tryRemoveChildChannelFromReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
if (isChildChannelInReadPendingQueue(childChannel)) {
|
||||
removeChildChannelFromReadPendingQueue(childChannel);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeChildChannelFromReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
AbstractHttp2StreamChannel previous = childChannel.previous;
|
||||
if (childChannel.next != null) {
|
||||
childChannel.next.previous = previous;
|
||||
} else {
|
||||
tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back.
|
||||
}
|
||||
if (previous != null) {
|
||||
previous.next = childChannel.next;
|
||||
} else {
|
||||
head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward.
|
||||
}
|
||||
childChannel.next = childChannel.previous = null;
|
||||
}
|
||||
|
||||
private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
|
||||
try {
|
||||
forEachActiveStream(stream -> {
|
||||
@ -287,17 +239,30 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
*/
|
||||
@Override
|
||||
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
try {
|
||||
onChannelReadComplete(ctx);
|
||||
} finally {
|
||||
parentReadInProgress = false;
|
||||
tail = head = null;
|
||||
// We always flush as this is what Http2ConnectionHandler does for now.
|
||||
flush0(ctx);
|
||||
}
|
||||
processPendingReadCompleteQueue();
|
||||
channelReadComplete0(ctx);
|
||||
}
|
||||
|
||||
private void processPendingReadCompleteQueue() {
|
||||
parentReadInProgress = true;
|
||||
try {
|
||||
// If we have many child channel we can optimize for the case when multiple call flush() in
|
||||
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
|
||||
// write calls on the socket which is expensive.
|
||||
for (;;) {
|
||||
AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
|
||||
if (childChannel == null) {
|
||||
break;
|
||||
}
|
||||
childChannel.fireChildReadComplete();
|
||||
}
|
||||
} finally {
|
||||
parentReadInProgress = false;
|
||||
readCompletePendingQueue.clear();
|
||||
// We always flush as this is what Http2ConnectionHandler does for now.
|
||||
flush0(ctx);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
parentReadInProgress = true;
|
||||
@ -315,20 +280,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
ctx.fireChannelWritabilityChanged();
|
||||
}
|
||||
|
||||
final void onChannelReadComplete(ChannelHandlerContext ctx) {
|
||||
// If we have many child channel we can optimize for the case when multiple call flush() in
|
||||
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
|
||||
// write calls on the socket which is expensive.
|
||||
AbstractHttp2StreamChannel current = head;
|
||||
while (current != null) {
|
||||
AbstractHttp2StreamChannel childChannel = current;
|
||||
// Clear early in case fireChildReadComplete() causes it to need to be re-processed
|
||||
current = current.next;
|
||||
childChannel.next = childChannel.previous = null;
|
||||
childChannel.fireChildReadComplete();
|
||||
}
|
||||
}
|
||||
|
||||
final void flush0(ChannelHandlerContext ctx) {
|
||||
flush(ctx);
|
||||
}
|
||||
@ -339,29 +290,18 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
super(stream, ++idCount, inboundHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception {
|
||||
return Http2MultiplexCodec.this.consumeBytes(stream.id(), bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isParentReadInProgress() {
|
||||
return parentReadInProgress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean streamMayHaveExisted(Http2FrameStream stream) {
|
||||
return Http2MultiplexCodec.this.connection().streamMayHaveExisted(stream.id());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tryRemoveChildChannelFromReadPendingQueue() {
|
||||
Http2MultiplexCodec.this.tryRemoveChildChannelFromReadPendingQueue(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean tryAddChildChannelToReadPendingQueue() {
|
||||
return Http2MultiplexCodec.this.tryAddChildChannelToReadPendingQueue(this);
|
||||
protected void addChannelToReadCompletePendingQueue() {
|
||||
// If there is no space left in the queue, just keep on processing everything that is already
|
||||
// stored there and try again.
|
||||
while (!readCompletePendingQueue.offer(this)) {
|
||||
processPendingReadCompleteQueue();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -24,11 +24,15 @@ import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.ServerChannel;
|
||||
import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
|
||||
@ -85,15 +89,14 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
|
||||
|
||||
private final ChannelHandler inboundStreamHandler;
|
||||
private final ChannelHandler upgradeStreamHandler;
|
||||
private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
|
||||
new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
|
||||
// Choose 100 which is what is used most of the times as default.
|
||||
Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
|
||||
|
||||
private boolean parentReadInProgress;
|
||||
private int idCount;
|
||||
|
||||
// Linked-List for Http2MultiplexHandlerStreamChannel instances that need to be processed by
|
||||
// channelReadComplete(...)
|
||||
private AbstractHttp2StreamChannel head;
|
||||
private AbstractHttp2StreamChannel tail;
|
||||
|
||||
// Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
|
||||
private volatile ChannelHandlerContext ctx;
|
||||
|
||||
@ -144,14 +147,7 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
protected void handlerRemoved0(ChannelHandlerContext ctx) {
|
||||
// Unlink the linked list to guard against GC nepotism.
|
||||
AbstractHttp2StreamChannel ch = head;
|
||||
while (ch != null) {
|
||||
AbstractHttp2StreamChannel curr = ch;
|
||||
ch = curr.next;
|
||||
curr.next = curr.previous = null;
|
||||
}
|
||||
head = tail = null;
|
||||
readCompletePendingQueue.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -264,47 +260,12 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
|
||||
ctx.fireExceptionCaught(cause);
|
||||
}
|
||||
|
||||
private boolean isChildChannelInReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
return childChannel.previous != null || childChannel.next != null || head == childChannel;
|
||||
}
|
||||
|
||||
private boolean tryAddChildChannelToReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
if (!isChildChannelInReadPendingQueue(childChannel)) {
|
||||
if (tail == null) {
|
||||
assert head == null;
|
||||
tail = head = childChannel;
|
||||
} else {
|
||||
childChannel.previous = tail;
|
||||
tail.next = childChannel;
|
||||
tail = childChannel;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void tryRemoveChildChannelFromReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
|
||||
if (isChildChannelInReadPendingQueue(childChannel)) {
|
||||
AbstractHttp2StreamChannel previous = childChannel.previous;
|
||||
if (childChannel.next != null) {
|
||||
childChannel.next.previous = previous;
|
||||
} else {
|
||||
tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back.
|
||||
}
|
||||
if (previous != null) {
|
||||
previous.next = childChannel.next;
|
||||
} else {
|
||||
head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward.
|
||||
}
|
||||
childChannel.next = childChannel.previous = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
|
||||
try {
|
||||
final boolean server = ctx.channel().parent() instanceof ServerChannel;
|
||||
forEachActiveStream(stream -> {
|
||||
final int streamId = stream.id();
|
||||
if (streamId > goAwayFrame.lastStreamId() && isValidLocalStreamId(stream)) {
|
||||
if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
|
||||
final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
|
||||
((DefaultHttp2FrameStream) stream).attachment;
|
||||
childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
|
||||
@ -322,29 +283,30 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
|
||||
*/
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
processPendingReadCompleteQueue();
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
private void processPendingReadCompleteQueue() {
|
||||
parentReadInProgress = true;
|
||||
// If we have many child channel we can optimize for the case when multiple call flush() in
|
||||
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
|
||||
// write calls on the socket which is expensive.
|
||||
AbstractHttp2StreamChannel current = head;
|
||||
if (current != null) {
|
||||
AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
|
||||
if (childChannel != null) {
|
||||
try {
|
||||
do {
|
||||
AbstractHttp2StreamChannel childChannel = current;
|
||||
// Clear early in case fireChildReadComplete() causes it to need to be re-processed
|
||||
current = current.next;
|
||||
childChannel.next = childChannel.previous = null;
|
||||
childChannel.fireChildReadComplete();
|
||||
} while (current != null);
|
||||
childChannel = readCompletePendingQueue.poll();
|
||||
} while (childChannel != null);
|
||||
} finally {
|
||||
parentReadInProgress = false;
|
||||
tail = head = null;
|
||||
readCompletePendingQueue.clear();
|
||||
ctx.flush();
|
||||
}
|
||||
} else {
|
||||
parentReadInProgress = false;
|
||||
}
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
|
||||
@ -353,29 +315,18 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
|
||||
super(stream, ++idCount, inboundHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception {
|
||||
return Http2MultiplexHandler.this.consumeBytes(stream, bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isParentReadInProgress() {
|
||||
return parentReadInProgress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean streamMayHaveExisted(Http2FrameStream stream) {
|
||||
return Http2MultiplexHandler.this.streamMayHaveExisted(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tryRemoveChildChannelFromReadPendingQueue() {
|
||||
Http2MultiplexHandler.this.tryRemoveChildChannelFromReadPendingQueue(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean tryAddChildChannelToReadPendingQueue() {
|
||||
return Http2MultiplexHandler.this.tryAddChildChannelToReadPendingQueue(this);
|
||||
protected void addChannelToReadCompletePendingQueue() {
|
||||
// If there is no space left in the queue, just keep on processing everything that is already
|
||||
// stored there and try again.
|
||||
while (!readCompletePendingQueue.offer(this)) {
|
||||
processPendingReadCompleteQueue();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
|
||||
final class MaxCapacityQueue<E> implements Queue<E> {
|
||||
private final Queue<E> queue;
|
||||
private final int maxCapacity;
|
||||
|
||||
MaxCapacityQueue(Queue<E> queue, int maxCapacity) {
|
||||
this.queue = queue;
|
||||
this.maxCapacity = maxCapacity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(E element) {
|
||||
if (offer(element)) {
|
||||
return true;
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean offer(E element) {
|
||||
if (maxCapacity <= queue.size()) {
|
||||
return false;
|
||||
}
|
||||
return queue.offer(element);
|
||||
}
|
||||
|
||||
@Override
|
||||
public E remove() {
|
||||
return queue.remove();
|
||||
}
|
||||
|
||||
@Override
|
||||
public E poll() {
|
||||
return queue.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public E element() {
|
||||
return queue.element();
|
||||
}
|
||||
|
||||
@Override
|
||||
public E peek() {
|
||||
return queue.peek();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return queue.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object o) {
|
||||
return queue.contains(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<E> iterator() {
|
||||
return queue.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
return queue.toArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
return queue.toArray(a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
return queue.remove(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
return queue.containsAll(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends E> c) {
|
||||
if (maxCapacity >= size() + c.size()) {
|
||||
return queue.addAll(c);
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {
|
||||
return queue.removeAll(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {
|
||||
return queue.retainAll(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
queue.clear();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user