HTTP/2 DefaultHttp2ConnectionEncoder data frame size incorrect if error

Motivation:
If an error occurs during a write operation then DefaultHttp2ConnectionEncoder.FlowControlledData will clear the CoalescingBufferQueue which will reset the queue's readable bytes to 0. To recover from an error the DefaultHttp2RemoteFlowController will attempt to return bytes to the flow control window, but since the frame has reset its own size this will lead to invalid flow control accounting.

Modifications:
- DefaultHttp2ConnectionEncoder.FlowControlledData should not reset its size if an error occurs

Result:
No more flow controller errors due to DefaultHttp2ConnectionEncoder.FlowControlledData setting its size to 0 if an error occurs.
This commit is contained in:
Scott Mitchell 2016-03-07 16:20:45 -08:00
parent 8ec594c6eb
commit fc099292fd
3 changed files with 15 additions and 3 deletions

View File

@ -331,22 +331,26 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
*/
private final class FlowControlledData extends FlowControlledBase {
private final CoalescingBufferQueue queue;
private int dataSize;
public FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
ChannelPromise promise) {
super(stream, padding, endOfStream, promise);
queue = new CoalescingBufferQueue(promise.channel());
queue.add(buf, promise);
dataSize = queue.readableBytes();
}
@Override
public int size() {
return queue.readableBytes() + padding;
return dataSize + padding;
}
@Override
public void error(ChannelHandlerContext ctx, Throwable cause) {
queue.releaseAndFailAll(cause);
// Don't update dataSize because we need to ensure the size() method returns a consistent size even after
// error so we don't invalidate flow control when returning bytes to flow control.
lifecycleManager.onError(ctx, cause);
promise.tryFailure(cause);
}
@ -363,6 +367,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
int writeableData = min(queuedData, allowedBytes);
ChannelPromise writePromise = ctx.newPromise().addListener(this);
ByteBuf toWrite = queue.remove(writeableData, writePromise);
dataSize = queue.readableBytes();
// Determine how much padding to write.
int writeablePadding = min(allowedBytes - writeableData, padding);
@ -381,6 +386,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
return false;
}
nextData.queue.copyTo(queue);
dataSize = queue.readableBytes();
// Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
padding = Math.max(padding, nextData.padding);
endOfStream = nextData.endOfStream;

View File

@ -146,7 +146,8 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
}
void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
assert hasFrame || newStreamableBytes == 0;
assert hasFrame || newStreamableBytes == 0 :
"hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes;
int delta = newStreamableBytes - streamableBytes;
if (delta != 0) {

View File

@ -37,11 +37,16 @@ import java.util.ArrayDeque;
public final class CoalescingBufferQueue {
private final Channel channel;
private final ArrayDeque<Object> bufAndListenerPairs = new ArrayDeque<Object>();
private final ArrayDeque<Object> bufAndListenerPairs;
private int readableBytes;
public CoalescingBufferQueue(Channel channel) {
this(channel, 4);
}
public CoalescingBufferQueue(Channel channel, int initSize) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
bufAndListenerPairs = new ArrayDeque<Object>(initSize);
}
/**