Convert Http2Error.STREAM_CLOSED to ClosedChannelException when using child channels

Motivation:

We should convert Http2Exceptions that are produced because of STREAM_CLOSED to ClosedChannelException when hand-over to the child channel to make it more consistent with other transports.

Modifications:

- Check if STREAM_CLOSED is used and if so create a new ClosedChannelException (while preserve the original exception as cause) and use it in the child channel
- Ensure STREAM_CLOSED is used in DefaultHttp2RemoteFlowController when writes are failed because of a closed stream.
- Add testcase

Result:

More consistent and correct exception usage.
This commit is contained in:
Norman Maurer 2018-01-30 02:50:29 +01:00 committed by Scott Mitchell
parent 6e6edb59e7
commit c795e8897b
4 changed files with 71 additions and 22 deletions

View File

@ -27,6 +27,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -100,12 +101,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
public void onStreamClosed(Http2Stream stream) { public void onStreamClosed(Http2Stream stream) {
// Any pending frames can never be written, cancel and // Any pending frames can never be written, cancel and
// write errors for any pending frames. // write errors for any pending frames.
state(stream).cancel(); state(stream).cancel(STREAM_CLOSED, null);
} }
@Override @Override
public void onStreamHalfClosed(Http2Stream stream) { public void onStreamHalfClosed(Http2Stream stream) {
if (HALF_CLOSED_LOCAL.equals(stream.state())) { if (HALF_CLOSED_LOCAL == stream.state()) {
/** /**
* When this method is called there should not be any * When this method is called there should not be any
* pending frames left if the API is used correctly. However, * pending frames left if the API is used correctly. However,
@ -117,7 +118,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* *
* This is to cancel any such illegal writes. * This is to cancel any such illegal writes.
*/ */
state(stream).cancel(); state(stream).cancel(STREAM_CLOSED, null);
} }
} }
}); });
@ -392,7 +393,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// If a cancellation occurred while writing, call cancel again to // If a cancellation occurred while writing, call cancel again to
// clear and error all of the pending writes. // clear and error all of the pending writes.
if (cancelled) { if (cancelled) {
cancel(cause); cancel(INTERNAL_ERROR, cause);
} }
} }
return writtenBytes; return writtenBytes;
@ -461,18 +462,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return pendingWriteQueue.peek(); return pendingWriteQueue.peek();
} }
/**
* Any operations that may be pending are cleared and the status of these operations is failed.
*/
void cancel() {
cancel(null);
}
/** /**
* Clears the pending queue and writes errors for each remaining frame. * Clears the pending queue and writes errors for each remaining frame.
* @param error the {@link Http2Error} to use.
* @param cause the {@link Throwable} that caused this method to be invoked. * @param cause the {@link Throwable} that caused this method to be invoked.
*/ */
private void cancel(Throwable cause) { void cancel(Http2Error error, Throwable cause) {
cancelled = true; cancelled = true;
// Ensure that the queue can't be modified while we are writing. // Ensure that the queue can't be modified while we are writing.
if (writing) { if (writing) {
@ -482,7 +477,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
FlowControlled frame = pendingWriteQueue.poll(); FlowControlled frame = pendingWriteQueue.poll();
if (frame != null) { if (frame != null) {
// Only create exception once and reuse to reduce overhead of filling in the stacktrace. // Only create exception once and reuse to reduce overhead of filling in the stacktrace.
final Http2Exception exception = streamError(stream.id(), INTERNAL_ERROR, cause, final Http2Exception exception = streamError(stream.id(), error, cause,
"Stream closed before write could take place"); "Stream closed before write could take place");
do { do {
writeError(frame, exception); writeError(frame, exception);

View File

@ -429,6 +429,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
// We start with the writability of the channel when creating the StreamChannel. // We start with the writability of the channel when creating the StreamChannel.
private volatile boolean writable; private volatile boolean writable;
private boolean outboundClosed;
private boolean closePending; private boolean closePending;
private boolean readInProgress; private boolean readInProgress;
private Queue<Object> inboundBuffer; private Queue<Object> inboundBuffer;
@ -876,6 +877,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
// The promise should be notified before we call fireChannelInactive(). // The promise should be notified before we call fireChannelInactive().
outboundClosed = true;
closePromise.setSuccess(); closePromise.setSuccess();
promise.setSuccess(); promise.setSuccess();
@ -979,7 +981,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
return; return;
} }
if (!isActive()) { if (!isActive() ||
// Once the outbound side was closed we should not allow header / data frames
outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
promise.setFailure(CLOSED_CHANNEL_EXCEPTION); promise.setFailure(CLOSED_CHANNEL_EXCEPTION);
return; return;
@ -1045,7 +1049,8 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
writabilityChanged(Http2MultiplexCodec.this.isWritable(stream)); writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
promise.setSuccess(); promise.setSuccess();
} else { } else {
promise.setFailure(cause); promise.setFailure(wrapStreamClosedError(cause));
// If the first write fails there is not much we can do, just close
closeForcibly(); closeForcibly();
} }
} }
@ -1055,10 +1060,29 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
if (cause == null) { if (cause == null) {
promise.setSuccess(); promise.setSuccess();
} else { } else {
promise.setFailure(cause); Throwable error = wrapStreamClosedError(cause);
promise.setFailure(error);
if (error instanceof ClosedChannelException) {
if (config.isAutoClose()) {
// Close channel if needed.
closeForcibly();
} else {
outboundClosed = true;
}
}
} }
} }
private Throwable wrapStreamClosedError(Throwable cause) {
// If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
// mimic other transports and make it easier to reason about what exceptions to expect.
if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
return new ClosedChannelException().initCause(cause);
}
return cause;
}
private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) { private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
if (frame.stream() != null && frame.stream() != stream) { if (frame.stream() != null && frame.stream() != stream) {
String msgString = frame.toString(); String msgString = frame.toString();

View File

@ -257,8 +257,8 @@ public abstract class DefaultHttp2RemoteFlowControllerTest {
moreData.assertNotWritten(); moreData.assertNotWritten();
connection.stream(STREAM_A).close(); connection.stream(STREAM_A).close();
data.assertError(); data.assertError(Http2Error.STREAM_CLOSED);
moreData.assertError(); moreData.assertError(Http2Error.STREAM_CLOSED);
verifyZeroInteractions(listener); verifyZeroInteractions(listener);
} }
@ -1106,8 +1106,11 @@ public abstract class DefaultHttp2RemoteFlowControllerTest {
return merged; return merged;
} }
public void assertError() { public void assertError(Http2Error error) {
assertNotNull(t); assertNotNull(t);
if (error != null) {
assertSame(error, ((Http2Exception) t).error());
}
} }
} }
} }

View File

@ -30,14 +30,16 @@ import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.util.ReferenceCountUtil.release; import static io.netty.util.ReferenceCountUtil.release;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -302,6 +304,31 @@ public class Http2MultiplexCodecTest {
inboundHandler.checkException(); inboundHandler.checkException();
} }
@Test(expected = ClosedChannelException.class)
public void streamClosedErrorTranslatedToClosedChannelExceptionOnWrites() throws Exception {
writer = new Writer() {
@Override
void write(Object msg, ChannelPromise promise) {
promise.tryFailure(new StreamException(inboundStream.id(), Http2Error.STREAM_CLOSED, "Stream Closed"));
}
};
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
Channel childChannel = newOutboundStream();
assertTrue(childChannel.isActive());
ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
parentChannel.flush();
assertFalse(childChannel.isActive());
assertFalse(childChannel.isOpen());
inboundHandler.checkException();
future.syncUninterruptibly();
}
@Test @Test
public void creatingWritingReadingAndClosingOutboundStreamShouldWork() { public void creatingWritingReadingAndClosingOutboundStreamShouldWork() {
LastInboundHandler inboundHandler = new LastInboundHandler(); LastInboundHandler inboundHandler = new LastInboundHandler();