Call ctx.flush() when onStreamClosed(...) produces a window update frame (#9818)

Motivation:

We use the onStreamClosed(...) callback to return unconsumed bytes back to the window of the connection when needed. When this happens we will write a window update frame but not automatically call ctx.flush(). As the user has no insight into this it could in the worst case result in a "deadlock" as the frame is never written out ot the socket.

Modifications:

- If onStreamClosed(...) produces a window update frame call ctx.flush()
- Add unit test

Result:

No stales possible due unflushed window update frames produced by onStreamClosed(...) when not all bytes were consumed before the stream was closed
This commit is contained in:
Norman Maurer 2019-11-28 11:11:32 +01:00 committed by GitHub
parent 3654d2c245
commit d0f94200e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 9 deletions

View File

@ -110,8 +110,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
FlowState state = state(stream); FlowState state = state(stream);
int unconsumedBytes = state.unconsumedBytes(); int unconsumedBytes = state.unconsumedBytes();
if (ctx != null && unconsumedBytes > 0) { if (ctx != null && unconsumedBytes > 0) {
connectionState().consumeBytes(unconsumedBytes); if (consumeAllBytes(state, unconsumedBytes)) {
state.consumeBytes(unconsumedBytes); // As the user has no real control on when this callback is used we should better
// call flush() if we produced any window update to ensure we not stale.
ctx.flush();
}
} }
} catch (Http2Exception e) { } catch (Http2Exception e) {
PlatformDependent.throwException(e); PlatformDependent.throwException(e);
@ -187,13 +190,15 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
} }
boolean windowUpdateSent = connectionState().consumeBytes(numBytes); return consumeAllBytes(state(stream), numBytes);
windowUpdateSent |= state(stream).consumeBytes(numBytes);
return windowUpdateSent;
} }
return false; return false;
} }
private boolean consumeAllBytes(FlowState state, int numBytes) throws Http2Exception {
return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
}
@Override @Override
public int unconsumedBytes(Http2Stream stream) { public int unconsumedBytes(Http2Stream stream) {
return state(stream).unconsumedBytes(); return state(stream).unconsumedBytes();

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -42,6 +43,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** /**
* Tests for {@link DefaultHttp2LocalFlowController}. * Tests for {@link DefaultHttp2LocalFlowController}.
@ -68,15 +71,28 @@ public class DefaultHttp2LocalFlowControllerTest {
@Before @Before
public void setup() throws Http2Exception { public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
setupChannelHandlerContext(false);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
when(ctx.executor()).thenReturn(executor);
when(executor.inEventLoop()).thenReturn(true); when(executor.inEventLoop()).thenReturn(true);
initController(false); initController(false);
} }
private void setupChannelHandlerContext(boolean allowFlush) {
reset(ctx);
when(ctx.newPromise()).thenReturn(promise);
if (allowFlush) {
when(ctx.flush()).then(new Answer<ChannelHandlerContext>() {
@Override
public ChannelHandlerContext answer(InvocationOnMock invocationOnMock) {
return ctx;
}
});
} else {
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
}
when(ctx.executor()).thenReturn(executor);
}
@Test @Test
public void dataFrameShouldBeAccepted() throws Http2Exception { public void dataFrameShouldBeAccepted() throws Http2Exception {
receiveFlowControlledFrame(STREAM_ID, 10, 0, false); receiveFlowControlledFrame(STREAM_ID, 10, 0, false);
@ -162,6 +178,24 @@ public class DefaultHttp2LocalFlowControllerTest {
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID);
} }
@Test
public void windowUpdateShouldBeWrittenWhenStreamIsClosedAndFlushed() throws Http2Exception {
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;
setupChannelHandlerContext(true);
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
verifyWindowUpdateNotSent(STREAM_ID);
connection.stream(STREAM_ID).close();
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
// Verify we saw one flush.
verify(ctx).flush();
}
@Test @Test
public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception { public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception {
int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1; int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1;