diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 6ee351c7eb..2bc22a422c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -331,22 +331,26 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { */ private final class FlowControlledData extends FlowControlledBase { private final CoalescingBufferQueue queue; + private int dataSize; public FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream, ChannelPromise promise) { super(stream, padding, endOfStream, promise); queue = new CoalescingBufferQueue(promise.channel()); queue.add(buf, promise); + dataSize = queue.readableBytes(); } @Override public int size() { - return queue.readableBytes() + padding; + return dataSize + padding; } @Override public void error(ChannelHandlerContext ctx, Throwable cause) { queue.releaseAndFailAll(cause); + // Don't update dataSize because we need to ensure the size() method returns a consistent size even after + // error so we don't invalidate flow control when returning bytes to flow control. lifecycleManager.onError(ctx, cause); promise.tryFailure(cause); } @@ -363,6 +367,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { int writeableData = min(queuedData, allowedBytes); ChannelPromise writePromise = ctx.newPromise().addListener(this); ByteBuf toWrite = queue.remove(writeableData, writePromise); + dataSize = queue.readableBytes(); // Determine how much padding to write. int writeablePadding = min(allowedBytes - writeableData, padding); @@ -381,6 +386,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return false; } nextData.queue.copyTo(queue); + dataSize = queue.readableBytes(); // Given that we're merging data into a frame it doesn't really make sense to accumulate padding. padding = Math.max(padding, nextData.padding); endOfStream = nextData.endOfStream; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java index 565ddfaa26..42b864ce19 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java @@ -146,7 +146,8 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor } void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) { - assert hasFrame || newStreamableBytes == 0; + assert hasFrame || newStreamableBytes == 0 : + "hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes; int delta = newStreamableBytes - streamableBytes; if (delta != 0) { diff --git a/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java index 6512103d52..39a88ab576 100644 --- a/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java @@ -37,11 +37,16 @@ import java.util.ArrayDeque; public final class CoalescingBufferQueue { private final Channel channel; - private final ArrayDeque bufAndListenerPairs = new ArrayDeque(); + private final ArrayDeque bufAndListenerPairs; private int readableBytes; public CoalescingBufferQueue(Channel channel) { + this(channel, 4); + } + + public CoalescingBufferQueue(Channel channel, int initSize) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); + bufAndListenerPairs = new ArrayDeque(initSize); } /**