HTTP/2 outbound event after receiving go_away forces sending a go_away (#8069)
Motivation: If the local endpoint receives a GO_AWAY frame and then tries to write a stream with a streamId higher than the last know stream ID we will throw a connection error. This results in the local peer sending a GO_AWAY frame to the remote peer, but this is not necessary as the error can be isolated to the local endpoint and communicated via the ChannelFuture return value. Modifications: - Instead of throwing a connection error, throw a stream error that simulates the peer receiving the stream and replying with a RST Result: Connections are not closed abruptly when trying to create a stream on the local endpoint after a GO_AWAY frame is received.
This commit is contained in:
parent
ecc238bea5
commit
c321e8ea4a
@ -864,9 +864,9 @@ public class DefaultHttp2Connection implements Http2Connection {
|
|||||||
private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
|
private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
|
||||||
assert state != IDLE;
|
assert state != IDLE;
|
||||||
if (goAwayReceived() && streamId > localEndpoint.lastStreamKnownByPeer()) {
|
if (goAwayReceived() && streamId > localEndpoint.lastStreamKnownByPeer()) {
|
||||||
throw connectionError(PROTOCOL_ERROR, "Cannot create stream %d since this endpoint has received a " +
|
throw streamError(streamId, REFUSED_STREAM,
|
||||||
"GOAWAY frame with last stream id %d.", streamId,
|
"Cannot create stream %d since this endpoint has received a GOAWAY frame with last stream id %d.",
|
||||||
localEndpoint.lastStreamKnownByPeer());
|
streamId, localEndpoint.lastStreamKnownByPeer());
|
||||||
}
|
}
|
||||||
if (!isValidStreamId(streamId)) {
|
if (!isValidStreamId(streamId)) {
|
||||||
if (streamId < 0) {
|
if (streamId < 0) {
|
||||||
|
@ -701,7 +701,9 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (stream == null) {
|
if (stream == null) {
|
||||||
resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
|
if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
|
||||||
|
resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
|
resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
|
||||||
}
|
}
|
||||||
|
@ -52,11 +52,14 @@ import java.util.concurrent.CountDownLatch;
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
|
||||||
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
|
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
|
||||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
|
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
|
||||||
|
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
|
||||||
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||||
import static io.netty.handler.codec.http2.Http2TestUtil.randomString;
|
import static io.netty.handler.codec.http2.Http2TestUtil.randomString;
|
||||||
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
|
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
|
||||||
|
import static java.lang.Integer.MAX_VALUE;
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.hamcrest.CoreMatchers.not;
|
import static org.hamcrest.CoreMatchers.not;
|
||||||
@ -220,7 +223,7 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
anyLong());
|
anyLong());
|
||||||
|
|
||||||
// The server will not respond, and so don't wait for graceful shutdown
|
// The server will not respond, and so don't wait for graceful shutdown
|
||||||
http2Client.gracefulShutdownTimeoutMillis(0);
|
setClientGracefulShutdownTime(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -766,7 +769,7 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
assertTrue(clientChannel.isOpen());
|
assertTrue(clientChannel.isOpen());
|
||||||
|
|
||||||
// Set the timeout very low because we know graceful shutdown won't complete
|
// Set the timeout very low because we know graceful shutdown won't complete
|
||||||
http2Client.gracefulShutdownTimeoutMillis(0);
|
setClientGracefulShutdownTime(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -774,7 +777,7 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
bootstrapEnv(1, 1, 3, 1, 1);
|
bootstrapEnv(1, 1, 3, 1, 1);
|
||||||
|
|
||||||
// Don't wait for the server to close streams
|
// Don't wait for the server to close streams
|
||||||
http2Client.gracefulShutdownTimeoutMillis(0);
|
setClientGracefulShutdownTime(0);
|
||||||
|
|
||||||
// Create a single stream by sending a HEADERS frame to the server.
|
// Create a single stream by sending a HEADERS frame to the server.
|
||||||
final Http2Headers headers = dummyHeaders();
|
final Http2Headers headers = dummyHeaders();
|
||||||
@ -792,7 +795,7 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
runInChannel(clientChannel, new Http2Runnable() {
|
runInChannel(clientChannel, new Http2Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() throws Http2Exception {
|
public void run() throws Http2Exception {
|
||||||
http2Client.encoder().writeHeaders(ctx(), Integer.MAX_VALUE + 1, headers, 0, (short) 16, false, 0,
|
http2Client.encoder().writeHeaders(ctx(), MAX_VALUE + 1, headers, 0, (short) 16, false, 0,
|
||||||
true, newPromise());
|
true, newPromise());
|
||||||
http2Client.flush(ctx());
|
http2Client.flush(ctx());
|
||||||
}
|
}
|
||||||
@ -803,6 +806,89 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
eq(PROTOCOL_ERROR.code()), any(ByteBuf.class));
|
eq(PROTOCOL_ERROR.code()), any(ByteBuf.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void createStreamAfterReceiveGoAwayShouldNotSendGoAway() throws Exception {
|
||||||
|
bootstrapEnv(1, 1, 2, 1, 1);
|
||||||
|
|
||||||
|
// We want both sides to do graceful shutdown during the test.
|
||||||
|
setClientGracefulShutdownTime(10000);
|
||||||
|
setServerGracefulShutdownTime(10000);
|
||||||
|
|
||||||
|
final CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
doAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(clientListener).onGoAwayRead(any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ByteBuf.class));
|
||||||
|
|
||||||
|
// Create a single stream by sending a HEADERS frame to the server.
|
||||||
|
final Http2Headers headers = dummyHeaders();
|
||||||
|
runInChannel(clientChannel, new Http2Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() throws Http2Exception {
|
||||||
|
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
|
||||||
|
false, newPromise());
|
||||||
|
http2Client.flush(ctx());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue(serverSettingsAckLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
|
||||||
|
|
||||||
|
// Server has received the headers, so the stream is open
|
||||||
|
assertTrue(requestLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
|
||||||
|
|
||||||
|
runInChannel(serverChannel, new Http2Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() throws Http2Exception {
|
||||||
|
http2Server.encoder().writeGoAway(serverCtx(), 3, NO_ERROR.code(), EMPTY_BUFFER, serverNewPromise());
|
||||||
|
http2Server.flush(serverCtx());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait for the client to receive the GO_AWAY.
|
||||||
|
assertTrue(clientGoAwayLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
|
||||||
|
verify(clientListener).onGoAwayRead(any(ChannelHandlerContext.class), eq(3), eq(NO_ERROR.code()),
|
||||||
|
any(ByteBuf.class));
|
||||||
|
|
||||||
|
final AtomicReference<ChannelFuture> clientWriteAfterGoAwayFutureRef = new AtomicReference<ChannelFuture>();
|
||||||
|
final CountDownLatch clientWriteAfterGoAwayLatch = new CountDownLatch(1);
|
||||||
|
runInChannel(clientChannel, new Http2Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() throws Http2Exception {
|
||||||
|
ChannelFuture f = http2Client.encoder().writeHeaders(ctx(), 5, headers, 0, (short) 16, false, 0,
|
||||||
|
true, newPromise());
|
||||||
|
clientWriteAfterGoAwayFutureRef.set(f);
|
||||||
|
http2Client.flush(ctx());
|
||||||
|
f.addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
clientWriteAfterGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the client's write operation to complete.
|
||||||
|
assertTrue(clientWriteAfterGoAwayLatch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
|
||||||
|
|
||||||
|
ChannelFuture clientWriteAfterGoAwayFuture = clientWriteAfterGoAwayFutureRef.get();
|
||||||
|
assertNotNull(clientWriteAfterGoAwayFuture);
|
||||||
|
Throwable clientCause = clientWriteAfterGoAwayFuture.cause();
|
||||||
|
assertThat(clientCause, is(instanceOf(Http2Exception.StreamException.class)));
|
||||||
|
assertEquals(Http2Error.REFUSED_STREAM.code(), ((Http2Exception.StreamException) clientCause).error().code());
|
||||||
|
|
||||||
|
// Wait for the server to receive a GO_AWAY, but this is expected to timeout!
|
||||||
|
assertFalse(goAwayLatch.await(1, SECONDS));
|
||||||
|
verify(serverListener, never()).onGoAwayRead(any(ChannelHandlerContext.class), anyInt(), anyLong(),
|
||||||
|
any(ByteBuf.class));
|
||||||
|
|
||||||
|
// Shutdown shouldn't wait for the server to close streams
|
||||||
|
setClientGracefulShutdownTime(0);
|
||||||
|
setServerGracefulShutdownTime(0);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void flowControlProperlyChunksLargeMessage() throws Exception {
|
public void flowControlProperlyChunksLargeMessage() throws Exception {
|
||||||
final Http2Headers headers = dummyHeaders();
|
final Http2Headers headers = dummyHeaders();
|
||||||
@ -861,7 +947,7 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
assertArrayEquals(data.array(), received);
|
assertArrayEquals(data.array(), received);
|
||||||
} finally {
|
} finally {
|
||||||
// Don't wait for server to close streams
|
// Don't wait for server to close streams
|
||||||
http2Client.gracefulShutdownTimeoutMillis(0);
|
setClientGracefulShutdownTime(0);
|
||||||
data.release();
|
data.release();
|
||||||
out.close();
|
out.close();
|
||||||
}
|
}
|
||||||
@ -949,7 +1035,7 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// Don't wait for server to close streams
|
// Don't wait for server to close streams
|
||||||
http2Client.gracefulShutdownTimeoutMillis(0);
|
setClientGracefulShutdownTime(0);
|
||||||
data.release();
|
data.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1063,6 +1149,28 @@ public class Http2ConnectionRoundtripTest {
|
|||||||
any(ByteBuf.class), anyInt(), anyBoolean());
|
any(ByteBuf.class), anyInt(), anyBoolean());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setClientGracefulShutdownTime(final long millis) throws InterruptedException {
|
||||||
|
setGracefulShutdownTime(clientChannel, http2Client, millis);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setServerGracefulShutdownTime(final long millis) throws InterruptedException {
|
||||||
|
setGracefulShutdownTime(serverChannel, http2Server, millis);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setGracefulShutdownTime(Channel channel, final Http2ConnectionHandler handler,
|
||||||
|
final long millis) throws InterruptedException {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
runInChannel(channel, new Http2Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() throws Http2Exception {
|
||||||
|
handler.gracefulShutdownTimeoutMillis(millis);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue(latch.await(DEFAULT_AWAIT_TIMEOUT_SECONDS, SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a {@link ByteBuf} of the given length, filled with random bytes.
|
* Creates a {@link ByteBuf} of the given length, filled with random bytes.
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user