Call ctx.flush() when onStreamClosed(...) produces a window update frame (#9818)

Motivation:

We use the onStreamClosed(...) callback to return unconsumed bytes back to the window of the connection when needed. When this happens we will write a window update frame but not automatically call ctx.flush(). As the user has no insight into this it could in the worst case result in a "deadlock" as the frame is never written out ot the socket.

Modifications:

- If onStreamClosed(...) produces a window update frame call ctx.flush()
- Add unit test

Result:

No stales possible due unflushed window update frames produced by onStreamClosed(...) when not all bytes were consumed before the stream was closed
This commit is contained in:
Norman Maurer 2019-11-28 11:11:32 +01:00
parent 16c88fd9ed
commit 8cfd71c354
2 changed files with 48 additions and 9 deletions

View File

@ -111,8 +111,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
FlowState state = state(stream);
int unconsumedBytes = state.unconsumedBytes();
if (ctx != null && unconsumedBytes > 0) {
connectionState().consumeBytes(unconsumedBytes);
state.consumeBytes(unconsumedBytes);
if (consumeAllBytes(state, unconsumedBytes)) {
// As the user has no real control on when this callback is used we should better
// call flush() if we produced any window update to ensure we not stale.
ctx.flush();
}
}
} catch (Http2Exception e) {
PlatformDependent.throwException(e);
@ -188,13 +191,15 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
boolean windowUpdateSent = connectionState().consumeBytes(numBytes);
windowUpdateSent |= state(stream).consumeBytes(numBytes);
return windowUpdateSent;
return consumeAllBytes(state(stream), numBytes);
}
return false;
}
private boolean consumeAllBytes(FlowState state, int numBytes) throws Http2Exception {
return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
}
@Override
public int unconsumedBytes(Http2Stream stream) {
return state(stream).unconsumedBytes();

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ -42,6 +43,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests for {@link DefaultHttp2LocalFlowController}.
@ -68,15 +71,28 @@ public class DefaultHttp2LocalFlowControllerTest {
@Before
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
when(ctx.executor()).thenReturn(executor);
setupChannelHandlerContext(false);
when(executor.inEventLoop()).thenReturn(true);
initController(false);
}
private void setupChannelHandlerContext(boolean allowFlush) {
reset(ctx);
when(ctx.newPromise()).thenReturn(promise);
if (allowFlush) {
when(ctx.flush()).then(new Answer<ChannelHandlerContext>() {
@Override
public ChannelHandlerContext answer(InvocationOnMock invocationOnMock) {
return ctx;
}
});
} else {
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
}
when(ctx.executor()).thenReturn(executor);
}
@Test
public void dataFrameShouldBeAccepted() throws Http2Exception {
receiveFlowControlledFrame(STREAM_ID, 10, 0, false);
@ -162,6 +178,24 @@ public class DefaultHttp2LocalFlowControllerTest {
verifyWindowUpdateNotSent(STREAM_ID);
}
@Test
public void windowUpdateShouldBeWrittenWhenStreamIsClosedAndFlushed() throws Http2Exception {
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;
setupChannelHandlerContext(true);
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
verifyWindowUpdateNotSent(STREAM_ID);
connection.stream(STREAM_ID).close();
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
// Verify we saw one flush.
verify(ctx).flush();
}
@Test
public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception {
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;