Notify http2 error handler before closeStreamLocal on HEADERS write failure (#8332)

Motivation:

When writing an HTTP/2 HEADERS with END_STREAM=1, the application expects
the stream to be closed afterward. However, the write can fail locally
due to HPACK encoder and similar. When that happens we need to make sure
to issue a RST_STREAM otherwise the stream can be closed locally but
orphaned remotely. The RST_STREAM is typically handled by
Http2ConnectionHandler.onStreamError, which will only send a RST_STREAM
if that stream still exists locally.

There are two possible flows for trailers, one handled immediately and
one going through the flow controller. Previously they behaved
differently, with the immedate code calling the error handler after
closing the stream. The immediate code also used a listener for calling
closeStreamLocal while the flow controlled code did so immediately after
the write.

The two code paths also differed in their VoidChannelPromise handling,
but both were broken. The immediate code path called unvoid() only if
END_STREAM=1, however it could always potentially add a listener via
notifyLifecycleManagerOnError(). And the flow controlled code path
unvoided incorrectly, changing the promise completion behavior. It also
passed the wrong promise to closeStreamLocal() in FlowControlledBase.

Modifications:

Move closeStreamLocal handling after calls to onError. This is the
primary change.

Now call closeStreamLocal immediately instead of when the future
completes. This is the more likely correct behavior as it matches that
of DATA frames.

Fix all the VoidChannelPromise handling.

Result:

Http2ConnectionHandler.onStreamError sees the same state as the remote
and issues a RST_STREAM, properly cleaning up the stream.
This commit is contained in:
Eric Anderson 2018-09-28 10:29:12 -07:00 committed by GitHub
parent 6138541033
commit a95b7a791e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 14 deletions

View File

@ -190,17 +190,10 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// for this stream. // for this stream.
Http2RemoteFlowController flowController = flowController(); Http2RemoteFlowController flowController = flowController();
if (!endOfStream || !flowController.hasFlowControlled(stream)) { if (!endOfStream || !flowController.hasFlowControlled(stream)) {
// The behavior here should mirror that in FlowControlledHeaders
promise = promise.unvoid();
boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream); boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
if (endOfStream) {
final Http2Stream finalStream = stream;
final ChannelFutureListener closeStreamLocalListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lifecycleManager.closeStreamLocal(finalStream, future);
}
};
promise = promise.unvoid().addListener(closeStreamLocalListener);
}
ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
weight, exclusive, padding, endOfStream, promise); weight, exclusive, padding, endOfStream, promise);
@ -222,6 +215,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
lifecycleManager.onError(ctx, true, failureCause); lifecycleManager.onError(ctx, true, failureCause);
} }
if (endOfStream) {
// Must handle calling onError before calling closeStreamLocal, otherwise the error handler will
// incorrectly think the stream no longer exists and so may not send RST_STREAM or perform similar
// appropriate action.
lifecycleManager.closeStreamLocal(stream, future);
}
return future; return future;
} else { } else {
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames. // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
@ -288,6 +288,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// Reserve the promised stream. // Reserve the promised stream.
connection.local().reservePushStream(promisedStreamId, stream); connection.local().reservePushStream(promisedStreamId, stream);
promise = promise.unvoid();
ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
promise); promise);
// Writing headers may fail during the encode state if they violate HPACK limits. // Writing headers may fail during the encode state if they violate HPACK limits.
@ -468,7 +469,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight, FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) { boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) {
super(stream, padding, endOfStream, promise); super(stream, padding, endOfStream, promise.unvoid());
this.headers = headers; this.headers = headers;
this.streamDependency = streamDependency; this.streamDependency = streamDependency;
this.weight = weight; this.weight = weight;
@ -491,9 +492,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override @Override
public void write(ChannelHandlerContext ctx, int allowedBytes) { public void write(ChannelHandlerContext ctx, int allowedBytes) {
boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream); boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
if (promise.isVoid()) { // The code is currently requiring adding this listener before writing, in order to call onError() before
promise = ctx.newPromise(); // closeStreamLocal().
}
promise.addListener(this); promise.addListener(this);
ChannelFuture f = frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, ChannelFuture f = frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,

View File

@ -34,6 +34,7 @@ import junit.framework.AssertionFailedError;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -65,6 +66,7 @@ import static org.mockito.Mockito.anyShort;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -707,6 +709,32 @@ public class DefaultHttp2ConnectionEncoderTest {
verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise)); verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise));
} }
@Test
public void headersWriteShouldHalfCloseAfterOnError() throws Exception {
final ChannelPromise promise = newPromise();
final Throwable ex = new RuntimeException();
// Fake an encoding error, like HPACK's HeaderListSizeException
when(writer.writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)))
.thenAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock invocation) {
promise.setFailure(ex);
return promise;
}
});
writeAllFlowControlledFrames();
createStream(STREAM_ID, false);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
assertTrue(promise.isDone());
assertFalse(promise.isSuccess());
InOrder inOrder = inOrder(lifecycleManager);
inOrder.verify(lifecycleManager).onError(eq(ctx), eq(true), eq(ex));
inOrder.verify(lifecycleManager).closeStreamLocal(eq(stream(STREAM_ID)), eq(promise));
}
@Test @Test
public void encoderDelegatesGoAwayToLifeCycleManager() { public void encoderDelegatesGoAwayToLifeCycleManager() {
ChannelPromise promise = newPromise(); ChannelPromise promise = newPromise();