DefaultHttp2ConnectionDecoder notifies frame listener before connection of GOAWAYS (#10009)

Motivation:

Users of the DefaultHttp2ConnectionDecodcer are notified of inbound GoAwayFrames
after the connection has already closed any ignored streams, potentially
losing the signal that some streams may have been ignored by the peer and
are thus retryable.

Modifications:

Reorder the notifications of the frame and connection listeners to
propagate the frame first, giving the frame listeners the opportunity to
clean up ignored streams in their own way.

Result:
Fixes #9986
This commit is contained in:
Bryce Anderson 2020-02-10 02:26:54 -07:00 committed by Norman Maurer
parent 48f56ff6af
commit 00bf0a854c
2 changed files with 38 additions and 28 deletions

View File

@ -212,8 +212,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception { throws Http2Exception {
connection.goAwayReceived(lastStreamId, errorCode, debugData);
listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData); listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
connection.goAwayReceived(lastStreamId, errorCode, debugData);
} }
void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,

View File

@ -50,6 +50,8 @@ import java.io.ByteArrayOutputStream;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER; import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
@ -811,34 +813,31 @@ public class Http2ConnectionRoundtripTest {
} }
@Test @Test
public void createStreamSynchronouslyAfterGoAwayReceivedShouldFailLocally() throws Exception { public void listenerIsNotifiedOfGoawayBeforeStreamsAreRemovedFromTheConnection() throws Exception {
bootstrapEnv(1, 1, 2, 1, 1); bootstrapEnv(1, 1, 2, 1, 1);
final CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
doAnswer((Answer<Void>) invocationOnMock -> {
clientGoAwayLatch.countDown();
return null;
}).when(clientListener).onGoAwayRead(any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ByteBuf.class));
// We want both sides to do graceful shutdown during the test. // We want both sides to do graceful shutdown during the test.
setClientGracefulShutdownTime(10000); setClientGracefulShutdownTime(10000);
setServerGracefulShutdownTime(10000); setServerGracefulShutdownTime(10000);
final Http2Headers headers = dummyHeaders(); final AtomicReference<Http2Stream.State> clientStream3State = new AtomicReference<Http2Stream.State>();
final AtomicReference<ChannelFuture> clientWriteAfterGoAwayFutureRef = new AtomicReference<>(); final CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
final CountDownLatch clientWriteAfterGoAwayLatch = new CountDownLatch(1); doAnswer(new Answer<Void>() {
doAnswer((Answer<Void>) invocationOnMock -> { @Override
ChannelFuture f = http2Client.encoder().writeHeaders(ctx(), 5, headers, 0, (short) 16, false, 0, public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
true, newPromise()); clientStream3State.set(http2Client.connection().stream(3).state());
clientWriteAfterGoAwayFutureRef.set(f); clientGoAwayLatch.countDown();
f.addListener((ChannelFutureListener) future -> clientWriteAfterGoAwayLatch.countDown()); return null;
http2Client.flush(ctx()); }
return null;
}).when(clientListener).onGoAwayRead(any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ByteBuf.class)); }).when(clientListener).onGoAwayRead(any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ByteBuf.class));
// Create a single stream by sending a HEADERS frame to the server.
final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, () -> { runInChannel(clientChannel, () -> {
http2Client.encoder().writeHeaders(ctx(), 1, headers, 0, (short) 16, false, 0,
false, newPromise());
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
true, newPromise()); false, newPromise());
http2Client.flush(ctx()); http2Client.flush(ctx());
}); });
@ -848,23 +847,34 @@ public class Http2ConnectionRoundtripTest {
assertTrue(requestLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS)); assertTrue(requestLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
runInChannel(serverChannel, () -> { runInChannel(serverChannel, () -> {
http2Server.encoder().writeGoAway(serverCtx(), 3, NO_ERROR.code(), EMPTY_BUFFER, serverNewPromise()); http2Server.encoder().writeGoAway(serverCtx(), 1, NO_ERROR.code(), EMPTY_BUFFER, serverNewPromise());
http2Server.flush(serverCtx()); http2Server.flush(serverCtx());
}); });
// Wait for the client's write operation to complete. // wait for the client to receive the GO_AWAY.
assertTrue(clientWriteAfterGoAwayLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS)); assertTrue(clientGoAwayLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
verify(clientListener).onGoAwayRead(any(ChannelHandlerContext.class), eq(1), eq(NO_ERROR.code()),
any(ByteBuf.class));
assertEquals(Http2Stream.State.OPEN, clientStream3State.get());
ChannelFuture clientWriteAfterGoAwayFuture = clientWriteAfterGoAwayFutureRef.get(); // Make sure that stream 3 has been closed which is true if it's gone.
assertNotNull(clientWriteAfterGoAwayFuture); final CountDownLatch probeStreamCount = new CountDownLatch(1);
Throwable clientCause = clientWriteAfterGoAwayFuture.cause(); final AtomicBoolean stream3Exists = new AtomicBoolean();
assertThat(clientCause, is(instanceOf(Http2Exception.StreamException.class))); final AtomicInteger streamCount = new AtomicInteger();
assertEquals(Http2Error.REFUSED_STREAM.code(), ((Http2Exception.StreamException) clientCause).error().code()); runInChannel(this.clientChannel, () -> {
stream3Exists.set(http2Client.connection().stream(3) != null);
streamCount.set(http2Client.connection().numActiveStreams());
probeStreamCount.countDown();
});
// The stream should be closed right after
assertTrue(probeStreamCount.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
assertEquals(1, streamCount.get());
assertFalse(stream3Exists.get());
// Wait for the server to receive a GO_AWAY, but this is expected to timeout! // Wait for the server to receive a GO_AWAY, but this is expected to timeout!
assertFalse(goAwayLatch.await(1, SECONDS)); assertFalse(goAwayLatch.await(1, SECONDS));
verify(serverListener, never()).onGoAwayRead(any(ChannelHandlerContext.class), anyInt(), anyLong(), verify(serverListener, never()).onGoAwayRead(any(ChannelHandlerContext.class), anyInt(), anyLong(),
any(ByteBuf.class)); any(ByteBuf.class));
// Shutdown shouldn't wait for the server to close streams // Shutdown shouldn't wait for the server to close streams
setClientGracefulShutdownTime(0); setClientGracefulShutdownTime(0);