diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index 6d6369b492..30233bf056 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -140,7 +140,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } @Override - public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) + public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception { // Streams automatically consume all remaining bytes when they are closed, so just ignore // if already closed. @@ -152,9 +152,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController throw new IllegalArgumentException("numBytes must be positive"); } - connectionState().consumeBytes(ctx, numBytes); - state(stream).consumeBytes(ctx, numBytes); + boolean windowUpdateSent = connectionState().consumeBytes(ctx, numBytes); + windowUpdateSent |= state(stream).consumeBytes(ctx, numBytes); + return windowUpdateSent; } + return false; } @Override @@ -374,10 +376,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } @Override - public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { // Return the bytes processed and update the window. returnProcessedBytes(numBytes); - writeWindowUpdateIfNeeded(ctx); + return writeWindowUpdateIfNeeded(ctx); } @Override @@ -386,15 +388,17 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } @Override - public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { + public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { if (endOfStream || initialStreamWindowSize <= 0) { - return; + return false; } int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio); if (processedWindow <= threshold) { writeWindowUpdate(ctx); + return true; } + return false; } /** @@ -444,12 +448,13 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } @Override - public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { + public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { throw new UnsupportedOperationException(); } @Override - public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + return false; } @Override @@ -503,10 +508,21 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController /** * Updates the flow control window for this stream if it is appropriate. + * + * @return true if {@code WINDOW_UPDATE} was written, false otherwise. */ - void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception; + boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception; - void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; + /** + * Indicates that the application has consumed {@code numBytes} from the connection or stream and is + * ready to receive more data. + * + * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate + * @param numBytes the number of bytes to be returned to the flow control window. + * @return true if {@code WINDOW_UPDATE} was written, false otherwise. + * @throws Http2Exception + */ + boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; int unconsumedBytes(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index 2fd5c3f798..0080813740 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -328,7 +328,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor } @Override - public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) + public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception { Http2Decompressor decompressor = decompressor(stream); Http2Decompressor copy = null; @@ -339,7 +339,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor // Convert the uncompressed consumed bytes to compressed (on the wire) bytes. numBytes = decompressor.consumeProcessedBytes(numBytes); } - flowController.consumeBytes(ctx, stream, numBytes); + return flowController.consumeBytes(ctx, stream, numBytes); } catch (Http2Exception e) { if (copy != null) { stream.setProperty(propertyKey, copy); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java index a06e42be0b..bdccd119b9 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java @@ -57,10 +57,11 @@ public interface Http2LocalFlowController extends Http2FlowController { * If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link * Http2Stream.State#CLOSED}), calling this method has no effect. * @param numBytes the number of bytes to be returned to the flow control window. + * @return true if a {@code WINDOW_UPDATE} was sent, false otherwise. * @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the * stream. */ - void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception; + boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception; /** * The number of bytes for the given stream that have been received but not yet consumed by the diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index 5fc4e0df87..06051a74a8 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -18,6 +18,8 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; @@ -86,11 +88,11 @@ public class DefaultHttp2LocalFlowControllerTest { receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent. - consumeBytes(STREAM_ID, 10); + assertFalse(consumeBytes(STREAM_ID, 10)); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); // Return the rest and verify the WINDOW_UPDATE is sent. - consumeBytes(STREAM_ID, dataSize - 10); + assertTrue(consumeBytes(STREAM_ID, dataSize - 10)); verifyWindowUpdateSent(STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); } @@ -110,7 +112,7 @@ public class DefaultHttp2LocalFlowControllerTest { verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID); - consumeBytes(STREAM_ID, dataSize); + assertTrue(consumeBytes(STREAM_ID, dataSize)); verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); verifyWindowUpdateNotSent(STREAM_ID); } @@ -123,7 +125,7 @@ public class DefaultHttp2LocalFlowControllerTest { // Don't set end-of-stream so we'll get a window update for the stream as well. receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); - consumeBytes(STREAM_ID, dataSize); + assertTrue(consumeBytes(STREAM_ID, dataSize)); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateSent(STREAM_ID, windowDelta); } @@ -150,7 +152,7 @@ public class DefaultHttp2LocalFlowControllerTest { // Send the next frame and verify that the expected window updates were sent. receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false); - consumeBytes(STREAM_ID, initialWindowSize); + assertTrue(consumeBytes(STREAM_ID, initialWindowSize)); int delta = newInitialWindowSize - initialWindowSize; verifyWindowUpdateSent(STREAM_ID, delta); verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); @@ -172,7 +174,7 @@ public class DefaultHttp2LocalFlowControllerTest { verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID)); - consumeBytes(STREAM_ID, data1); + assertTrue(consumeBytes(STREAM_ID, data1)); verifyWindowUpdateSent(STREAM_ID, data1); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1); @@ -191,8 +193,8 @@ public class DefaultHttp2LocalFlowControllerTest { assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId)); assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID)); - consumeBytes(STREAM_ID, data1); - consumeBytes(newStreamId, data2); + assertFalse(consumeBytes(STREAM_ID, data1)); + assertTrue(consumeBytes(newStreamId, data2)); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(newStreamId); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); @@ -266,8 +268,8 @@ public class DefaultHttp2LocalFlowControllerTest { assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID)); assertEquals(newDefaultWindowSize - data1, window(newStreamId)); assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID)); - consumeBytes(STREAM_ID, data2); - consumeBytes(newStreamId, data1); + assertFalse(consumeBytes(STREAM_ID, data2)); + assertTrue(consumeBytes(newStreamId, data1)); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateSent(newStreamId, data1); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); @@ -305,8 +307,8 @@ public class DefaultHttp2LocalFlowControllerTest { return buffer; } - private void consumeBytes(int streamId, int numBytes) throws Http2Exception { - controller.consumeBytes(ctx, stream(streamId), numBytes); + private boolean consumeBytes(int streamId, int numBytes) throws Http2Exception { + return controller.consumeBytes(ctx, stream(streamId), numBytes); } private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) { diff --git a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java index 816d4768fb..045c66f6bb 100644 --- a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java +++ b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java @@ -57,7 +57,8 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl } @Override - public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception { + public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception { + return false; } @Override