diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index 959f327a17..12116c8290 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -110,8 +110,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController FlowState state = state(stream); int unconsumedBytes = state.unconsumedBytes(); if (ctx != null && unconsumedBytes > 0) { - connectionState().consumeBytes(unconsumedBytes); - state.consumeBytes(unconsumedBytes); + if (consumeAllBytes(state, unconsumedBytes)) { + // As the user has no real control on when this callback is used we should better + // call flush() if we produced any window update to ensure we not stale. + ctx.flush(); + } } } catch (Http2Exception e) { PlatformDependent.throwException(e); @@ -187,13 +190,15 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); } - boolean windowUpdateSent = connectionState().consumeBytes(numBytes); - windowUpdateSent |= state(stream).consumeBytes(numBytes); - return windowUpdateSent; + return consumeAllBytes(state(stream), numBytes); } return false; } + private boolean consumeAllBytes(FlowState state, int numBytes) throws Http2Exception { + return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes); + } + @Override public int unconsumedBytes(Http2Stream stream) { return state(stream).unconsumedBytes(); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index 104e355882..cea7793a4e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -42,6 +43,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * Tests for {@link DefaultHttp2LocalFlowController}. @@ -68,15 +71,28 @@ public class DefaultHttp2LocalFlowControllerTest { @Before public void setup() throws Http2Exception { MockitoAnnotations.initMocks(this); - - when(ctx.newPromise()).thenReturn(promise); - when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); - when(ctx.executor()).thenReturn(executor); + setupChannelHandlerContext(false); when(executor.inEventLoop()).thenReturn(true); initController(false); } + private void setupChannelHandlerContext(boolean allowFlush) { + reset(ctx); + when(ctx.newPromise()).thenReturn(promise); + if (allowFlush) { + when(ctx.flush()).then(new Answer() { + @Override + public ChannelHandlerContext answer(InvocationOnMock invocationOnMock) { + return ctx; + } + }); + } else { + when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); + } + when(ctx.executor()).thenReturn(executor); + } + @Test public void dataFrameShouldBeAccepted() throws Http2Exception { receiveFlowControlledFrame(STREAM_ID, 10, 0, false); @@ -162,6 +178,24 @@ public class DefaultHttp2LocalFlowControllerTest { verifyWindowUpdateNotSent(STREAM_ID); } + @Test + public void windowUpdateShouldBeWrittenWhenStreamIsClosedAndFlushed() throws Http2Exception { + int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1; + + setupChannelHandlerContext(true); + + receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); + verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); + verifyWindowUpdateNotSent(STREAM_ID); + + connection.stream(STREAM_ID).close(); + + verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); + + // Verify we saw one flush. + verify(ctx).flush(); + } + @Test public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception { int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;