diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index f343506136..8c5718a371 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -357,10 +357,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { @Override public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) { - if (FlowControlledData.class != next.getClass()) { + FlowControlledData nextData; + if (FlowControlledData.class != next.getClass() || + Integer.MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) { return false; } - FlowControlledData nextData = (FlowControlledData) next; nextData.queue.copyTo(queue); // Given that we're merging data into a frame it doesn't really make sense to accumulate padding. padding = Math.max(padding, nextData.padding); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index c99f86d627..0acedebdcf 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -404,9 +404,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override void enqueueFrame(FlowControlled frame) { FlowControlled last = pendingWriteQueue.peekLast(); - if (last == null || !last.merge(ctx, frame)) { - pendingWriteQueue.offer(frame); + if (last == null) { + enqueueFrameWithoutMerge(frame); + return; } + + int lastSize = last.size(); + if (last.merge(ctx, frame)) { + incrementPendingBytes(last.size() - lastSize, true); + return; + } + enqueueFrameWithoutMerge(frame); + } + + private void enqueueFrameWithoutMerge(FlowControlled frame) { + pendingWriteQueue.offer(frame); // This must be called after adding to the queue in order so that hasFrame() is // updated before updating the stream state. incrementPendingBytes(frame.size(), true); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index 58742311dd..d7f26f5876 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -212,6 +212,23 @@ public abstract class DefaultHttp2RemoteFlowControllerTest { assertFalse(controller.isWritable(stream(STREAM_A))); } + @Test + public void flowControllerCorrectlyAccountsForBytesWithMerge() throws Http2Exception { + controller.initialWindowSize(112); // This must be more than the total merged frame size 110 + FakeFlowControlled data1 = new FakeFlowControlled(5, 2, true); + FakeFlowControlled data2 = new FakeFlowControlled(5, 100, true); + sendData(STREAM_A, data1); + sendData(STREAM_A, data2); + data1.assertNotWritten(); + data1.assertNotWritten(); + data2.assertMerged(); + controller.writePendingBytes(); + data1.assertFullyWritten(); + data2.assertNotWritten(); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + assertTrue(controller.isWritable(stream(STREAM_A))); + } + @Test public void stalledStreamShouldQueuePayloads() throws Http2Exception { controller.initialWindowSize(0); @@ -953,9 +970,10 @@ public abstract class DefaultHttp2RemoteFlowControllerTest { } private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled { - - private int currentSize; - private int originalSize; + private int currentPadding; + private int currentPayloadSize; + private int originalPayloadSize; + private int originalPadding; private boolean writeCalled; private final boolean mergeable; private boolean merged; @@ -963,20 +981,26 @@ public abstract class DefaultHttp2RemoteFlowControllerTest { private Throwable t; private FakeFlowControlled(int size) { - this.currentSize = size; - this.originalSize = size; - this.mergeable = false; + this(size, false); } private FakeFlowControlled(int size, boolean mergeable) { - this.currentSize = size; - this.originalSize = size; + this(size, 0, mergeable); + } + + private FakeFlowControlled(int payloadSize, int padding, boolean mergeable) { + currentPayloadSize = originalPayloadSize = payloadSize; + currentPadding = originalPadding = padding; this.mergeable = mergeable; } @Override public int size() { - return currentSize; + return currentPayloadSize + currentPadding; + } + + private int originalSize() { + return originalPayloadSize + originalPadding; } @Override @@ -990,28 +1014,36 @@ public abstract class DefaultHttp2RemoteFlowControllerTest { @Override public void write(ChannelHandlerContext ctx, int allowedBytes) { - if (allowedBytes <= 0 && currentSize != 0) { + if (allowedBytes <= 0 && size() != 0) { // Write has been called but no data can be written return; } writeCalled = true; - int written = Math.min(currentSize, allowedBytes); - currentSize -= written; + int written = Math.min(size(), allowedBytes); + if (written > currentPayloadSize) { + written -= currentPayloadSize; + currentPayloadSize = 0; + currentPadding -= written; + } else { + currentPayloadSize -= written; + } } @Override public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) { if (mergeable && next instanceof FakeFlowControlled) { - this.originalSize += ((FakeFlowControlled) next).originalSize; - this.currentSize += ((FakeFlowControlled) next).originalSize; - ((FakeFlowControlled) next).merged = true; + FakeFlowControlled ffcNext = (FakeFlowControlled) next; + originalPayloadSize += ffcNext.originalPayloadSize; + currentPayloadSize += ffcNext.originalPayloadSize; + currentPadding = originalPadding = Math.max(originalPadding, ffcNext.originalPadding); + ffcNext.merged = true; return true; } return false; } public int written() { - return originalSize - currentSize; + return originalSize() - size(); } public void assertNotWritten() { @@ -1029,7 +1061,8 @@ public abstract class DefaultHttp2RemoteFlowControllerTest { public void assertFullyWritten() { assertTrue(writeCalled); - assertEquals(0, currentSize); + assertEquals(0, currentPayloadSize); + assertEquals(0, currentPadding); } public boolean assertMerged() {