Simplifying and centralizing HTTP/2 exception handling logic

Motivation:

Currently, Http2LifecycleManager implements the exception handling logic
which makes it difficult to extend or modify the exception handling
behavior.  Simply overriding exceptionCaught() will only affect one of
the many possible exception paths. We need to reorganize the exception
handling code to centralize the exception handling logic into a single
place that can easily be extended by subclasses of
Http2ConnectionHandler.

Modifications:

Made Http2LifecycleManager an interface, implemented directly by
Http2ConnectionHandler. This adds a circular dependency between the
handler and the encoder/decoder, so I added builders for them that allow
the constructor of Http2ConnectionHandler to set itself as the lifecycle
manager and build them.

Changed Http2LifecycleManager.onHttpException to just
onException(Throwable) to simplify the interface. This method is now the
central control point for all exceptions. Subclasses now only need to
override onException() to intercept any exception encountered by the
handler.

Result:

HTTP/2 has more extensible exception handling, that is less likely to
see exceptions vanish into the ether.
This commit is contained in:
nmittler 2014-10-08 14:17:21 -07:00
parent 276b826b59
commit 1429543436
12 changed files with 539 additions and 248 deletions

View File

@ -46,29 +46,78 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private final Http2FrameListener listener;
private boolean prefaceReceived;
public DefaultHttp2ConnectionDecoder(Http2Connection connection, Http2FrameReader frameReader,
Http2InboundFlowController inboundFlow, Http2ConnectionEncoder encoder,
Http2LifecycleManager lifecycleManager, Http2FrameListener listener) {
this.connection = checkNotNull(connection, "connection");
this.frameReader = checkNotNull(frameReader, "frameReader");
this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
this.encoder = checkNotNull(encoder, "encoder");
this.inboundFlow = checkNotNull(inboundFlow, "inboundFlow");
this.listener = checkNotNull(listener, "listener");
/**
* Builder for instances of {@link DefaultHttp2ConnectionDecoder}.
*/
public static class Builder implements Http2ConnectionDecoder.Builder {
private Http2Connection connection;
private Http2LifecycleManager lifecycleManager;
private Http2ConnectionEncoder encoder;
private Http2FrameReader frameReader;
private Http2InboundFlowController inboundFlow;
private Http2FrameListener listener;
@Override
public Builder connection(Http2Connection connection) {
this.connection = connection;
return this;
}
@Override
public Builder lifecycleManager(Http2LifecycleManager lifecycleManager) {
this.lifecycleManager = lifecycleManager;
return this;
}
@Override
public Builder inboundFlow(Http2InboundFlowController inboundFlow) {
this.inboundFlow = inboundFlow;
return this;
}
@Override
public Builder frameReader(Http2FrameReader frameReader) {
this.frameReader = frameReader;
return this;
}
@Override
public Builder listener(Http2FrameListener listener) {
this.listener = listener;
return this;
}
@Override
public Builder encoder(Http2ConnectionEncoder encoder) {
this.encoder = encoder;
return this;
}
@Override
public Http2ConnectionDecoder build() {
return new DefaultHttp2ConnectionDecoder(this);
}
}
public static Builder newBuilder() {
return new Builder();
}
protected DefaultHttp2ConnectionDecoder(Builder builder) {
this.connection = checkNotNull(builder.connection, "connection");
this.frameReader = checkNotNull(builder.frameReader, "frameReader");
this.lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
this.encoder = checkNotNull(builder.encoder, "encoder");
this.inboundFlow = checkNotNull(builder.inboundFlow, "inboundFlow");
this.listener = checkNotNull(builder.listener, "listener");
}
@Override
public Http2Connection connection() {
return connection;
}
public Http2FrameListener listener() {
return listener;
}
public Http2LifecycleManager lifecycleManager() {
return lifecycleManager;
}
@Override
public boolean prefaceReceived() {
return prefaceReceived;
}

View File

@ -15,7 +15,6 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
@ -42,12 +41,68 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// This initial capacity is plenty for SETTINGS traffic.
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter,
Http2OutboundFlowController outboundFlow, Http2LifecycleManager lifecycleManager) {
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
this.connection = checkNotNull(connection, "connection");
this.outboundFlow = checkNotNull(outboundFlow, "outboundFlow");
this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
/**
* Builder for new instances of {@link DefaultHttp2ConnectionEncoder}.
*/
public static class Builder implements Http2ConnectionEncoder.Builder {
protected Http2FrameWriter frameWriter;
protected Http2Connection connection;
protected Http2OutboundFlowController outboundFlow;
protected Http2LifecycleManager lifecycleManager;
@Override
public Builder connection(
Http2Connection connection) {
this.connection = connection;
return this;
}
@Override
public Builder lifecycleManager(
Http2LifecycleManager lifecycleManager) {
this.lifecycleManager = lifecycleManager;
return this;
}
@Override
public Builder frameWriter(
Http2FrameWriter frameWriter) {
this.frameWriter = frameWriter;
return this;
}
@Override
public Builder outboundFlow(
Http2OutboundFlowController outboundFlow) {
this.outboundFlow = outboundFlow;
return this;
}
@Override
public Http2ConnectionEncoder build() {
return new DefaultHttp2ConnectionEncoder(this);
}
}
public static Builder newBuilder() {
return new Builder();
}
protected DefaultHttp2ConnectionEncoder(Builder builder) {
this.frameWriter = checkNotNull(builder.frameWriter, "frameWriter");
this.connection = checkNotNull(builder.connection, "connection");
this.outboundFlow = checkNotNull(builder.outboundFlow, "outboundFlow");
this.lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
}
@Override
public Http2FrameWriter frameWriter() {
return frameWriter;
}
@Override
public Http2Connection connection() {
return connection;
}
@Override
@ -109,7 +164,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The write failed, handle the error.
lifecycleManager.onHttp2Exception(ctx, toHttp2Exception(future.cause()));
lifecycleManager.onException(ctx, future.cause());
} else if (endStream) {
// Close the local side of the stream if this is the last frame
Http2Stream stream = connection.stream(streamId);

View File

@ -15,7 +15,6 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
@ -128,19 +127,6 @@ public final class Http2CodecUtil {
return ignoreSettingsHandler;
}
/**
* Converts the given cause to a {@link Http2Exception} if it isn't already.
*/
public static Http2Exception toHttp2Exception(Throwable cause) {
// Look for an embedded Http2Exception.
Http2Exception httpException = getEmbeddedHttp2Exception(cause);
if (httpException != null) {
return httpException;
}
return new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
}
/**
* Iteratively looks through the causaility chain for the given exception and returns the first
* {@link Http2Exception} or {@code null} if none.

View File

@ -25,6 +25,52 @@ import java.util.List;
*/
public interface Http2ConnectionDecoder extends Closeable {
/**
* Builder for new instances of {@link Http2ConnectionDecoder}.
*/
public interface Builder {
/**
* Sets the {@link Http2Connection} to be used when building the decoder.
*/
Builder connection(Http2Connection connection);
/**
* Sets the {@link LifecycleManager} to be used when building the decoder.
*/
Builder lifecycleManager(Http2LifecycleManager lifecycleManager);
/**
* Sets the {@link Http2InboundFlowController} to be used when building the decoder.
*/
Builder inboundFlow(Http2InboundFlowController inboundFlow);
/**
* Sets the {@link Http2FrameReader} to be used when building the decoder.
*/
Builder frameReader(Http2FrameReader frameReader);
/**
* Sets the {@link Http2FrameListener} to be used when building the decoder.
*/
Builder listener(Http2FrameListener listener);
/**
* Sets the {@link Http2ConnectionEncoder} used when building the decoder.
*/
Builder encoder(Http2ConnectionEncoder encoder);
/**
* Creates a new decoder instance.
*/
Http2ConnectionDecoder build();
}
/**
* Provides direct access to the underlying connection.
*/
Http2Connection connection();
/**
* Called by the {@link Http2ConnectionHandler} to decode the next frame from the input buffer.
*/

View File

@ -14,11 +14,53 @@
*/
package io.netty.handler.codec.http2;
/**
* Handler for outbound traffic on behalf of {@link Http2ConectionHandler}.
*/
public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundFlowController {
/**
* Builder for new instances of {@link Http2ConnectionEncoder}.
*/
public interface Builder {
/**
* Sets the {@link Http2Connection} to be used when building the encoder.
*/
Builder connection(Http2Connection connection);
/**
* Sets the {@link LifecycleManager} to be used when building the encoder.
*/
Builder lifecycleManager(Http2LifecycleManager lifecycleManager);
/**
* Sets the {@link Http2FrameWriter} to be used when building the encoder.
*/
Builder frameWriter(Http2FrameWriter frameWriter);
/**
* Sets the {@link Http2OutboundFlowController} to be used when building the encoder.
*/
Builder outboundFlow(Http2OutboundFlowController outboundFlow);
/**
* Creates a new encoder instance.
*/
Http2ConnectionEncoder build();
}
/**
* Provides direct access to the underlying connection.
*/
Http2Connection connection();
/**
* Provides direct access to the underlying frame writer object.
*/
Http2FrameWriter frameWriter();
/**
* Gets the local settings on the top of the queue that has been sent but not ACKed. This may
* return {@code null}.

View File

@ -17,6 +17,8 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
@ -37,13 +39,12 @@ import java.util.List;
* <p>
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController}
*/
public class Http2ConnectionHandler extends ByteToMessageDecoder {
private final Http2LifecycleManager lifecycleManager;
public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager {
private final Http2ConnectionDecoder decoder;
private final Http2ConnectionEncoder encoder;
private final Http2Connection connection;
private ByteBuf clientPrefaceString;
private boolean prefaceSent;
private ChannelFutureListener closeListener;
public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
this(new DefaultHttp2Connection(server), listener);
@ -63,36 +64,47 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow, Http2FrameListener listener) {
checkNotNull(frameWriter, "frameWriter");
checkNotNull(inboundFlow, "inboundFlow");
checkNotNull(outboundFlow, "outboundFlow");
checkNotNull(listener, "listener");
this.connection = checkNotNull(connection, "connection");
this.lifecycleManager = new Http2LifecycleManager(connection, frameWriter);
this.encoder =
new DefaultHttp2ConnectionEncoder(connection, frameWriter, outboundFlow,
lifecycleManager);
DefaultHttp2ConnectionEncoder.newBuilder().connection(connection)
.frameWriter(frameWriter).outboundFlow(outboundFlow).lifecycleManager(this)
.build();
this.decoder =
new DefaultHttp2ConnectionDecoder(connection, frameReader, inboundFlow, encoder,
lifecycleManager, listener);
DefaultHttp2ConnectionDecoder.newBuilder().connection(connection)
.frameReader(frameReader).inboundFlow(inboundFlow).encoder(encoder)
.listener(listener).lifecycleManager(this).build();
clientPrefaceString = clientPrefaceString(connection);
}
public Http2ConnectionHandler(Http2Connection connection, Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder, Http2LifecycleManager lifecycleManager) {
this.connection = checkNotNull(connection, "connection");
this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
this.encoder = checkNotNull(encoder, "encoder");
this.decoder = checkNotNull(decoder, "decoder");
clientPrefaceString = clientPrefaceString(connection);
/**
* Constructor for pre-configured encoder and decoder builders. Just sets the {@code this} as the
* {@link Http2LifecycleManager} and builds them.
*/
public Http2ConnectionHandler(Http2ConnectionDecoder.Builder decoderBuilder,
Http2ConnectionEncoder.Builder encoderBuilder) {
checkNotNull(decoderBuilder, "decoderBuilder");
checkNotNull(encoderBuilder, "encoderBuilder");
// Build the encoder.
decoderBuilder.lifecycleManager(this);
encoder = checkNotNull(encoderBuilder.build(), "encoder");
// Build the decoder.
decoderBuilder.encoder(encoder);
encoderBuilder.lifecycleManager(this);
decoder = checkNotNull(decoderBuilder.build(), "decoder");
// Verify that the encoder and decoder use the same connection.
checkNotNull(encoder.connection(), "encoder.connection");
checkNotNull(decoder.connection(), "decoder.connection");
if (encoder.connection() != decoder.connection()) {
throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
}
clientPrefaceString = clientPrefaceString(encoder.connection());
}
public Http2Connection connection() {
return connection;
}
public Http2LifecycleManager lifecycleManager() {
return lifecycleManager;
return encoder.connection();
}
public Http2ConnectionDecoder decoder() {
@ -108,7 +120,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
* Reserves local stream 1 for the HTTP/2 response.
*/
public void onHttpClientUpgrade() throws Http2Exception {
if (connection.isServer()) {
if (connection().isServer()) {
throw protocolError("Client-side HTTP upgrade requested for a server");
}
if (prefaceSent || decoder.prefaceReceived()) {
@ -116,7 +128,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
}
// Create a local stream used for the HTTP cleartext upgrade.
connection.createLocalStream(HTTP_UPGRADE_STREAM_ID, true);
connection().createLocalStream(HTTP_UPGRADE_STREAM_ID, true);
}
/**
@ -124,7 +136,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
* @param settings the settings for the remote endpoint.
*/
public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
if (!connection.isServer()) {
if (!connection().isServer()) {
throw protocolError("Server-side HTTP upgrade requested for a client");
}
if (prefaceSent || decoder.prefaceReceived()) {
@ -135,7 +147,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
encoder.remoteSettings(settings);
// Create a stream in the half-closed state.
connection.createRemoteStream(HTTP_UPGRADE_STREAM_ID, true);
connection().createRemoteStream(HTTP_UPGRADE_STREAM_ID, true);
}
@Override
@ -160,15 +172,29 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
lifecycleManager.close(ctx, promise);
// Avoid NotYetConnectedException
if (!ctx.channel().isActive()) {
ctx.close(promise);
return;
}
ChannelFuture future = writeGoAway(ctx, null);
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (connection().numActiveStreams() == 0) {
future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else {
closeListener = new ClosingChannelFutureListener(ctx, promise);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture future = ctx.newSucceededFuture();
final Collection<Http2Stream> streams = connection.activeStreams();
final Collection<Http2Stream> streams = connection().activeStreams();
for (Http2Stream s : streams.toArray(new Http2Stream[streams.size()])) {
lifecycleManager.closeStream(s, future);
closeStream(s, future);
}
super.channelInactive(ctx);
}
@ -178,14 +204,165 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Http2Exception ex = getEmbeddedHttp2Exception(cause);
if (ex != null) {
lifecycleManager.onHttp2Exception(ctx, ex);
if (getEmbeddedHttp2Exception(cause) != null) {
// Some exception in the causality chain is an Http2Exception - handle it.
onException(ctx, cause);
} else {
super.exceptionCaught(ctx, cause);
}
}
/**
* Closes the local side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
*/
public void closeLocalSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
case OPEN:
stream.closeLocalSide();
break;
default:
closeStream(stream, future);
break;
}
}
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
*/
public void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_REMOTE:
case OPEN:
stream.closeRemoteSide();
break;
default:
closeStream(stream, future);
break;
}
}
/**
* Closes the given stream and adds a hook to close the channel after the given future
* completes.
*
* @param stream the stream to be closed.
* @param future the future after which to close the channel.
*/
@Override
public void closeStream(Http2Stream stream, ChannelFuture future) {
stream.close();
// If this connection is closing and there are no longer any
// active streams, close after the current operation completes.
if (closeListener != null && connection().numActiveStreams() == 0) {
future.addListener(closeListener);
}
}
/**
* Central handler for all exceptions caught during HTTP/2 processing.
*/
@Override
public void onException(ChannelHandlerContext ctx, Throwable cause) {
Http2Exception embedded = getEmbeddedHttp2Exception(cause);
if (embedded instanceof Http2StreamException) {
onStreamError(ctx, cause, (Http2StreamException) embedded);
} else {
onConnectionError(ctx, cause, embedded);
}
}
/**
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
* streams are closed, the connection is shut down.
*
* @param ctx the channel context
* @param cause the exception that was caught
* @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
* be {@code null} if it's an unknown exception.
*/
protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) {
if (http2Ex == null) {
http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
}
writeGoAway(ctx, http2Ex).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
}
/**
* Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
* stream.
*
* @param ctx the channel context
* @param cause the exception that was caught
* @param http2Ex the {@link Http2StreamException} that is embedded in the causality chain.
*/
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, Http2StreamException http2Ex) {
writeRstStream(ctx, http2Ex.streamId(), http2Ex.error().code(), ctx.newPromise());
}
protected Http2FrameWriter frameWriter() {
return encoder().frameWriter();
}
/**
* Writes a {@code RST_STREAM} frame to the remote endpoint and updates the connection state appropriately.
*/
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
Http2Stream stream = connection().stream(streamId);
ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
ctx.flush();
if (stream != null) {
stream.terminateSent();
closeStream(stream, promise);
}
return future;
}
/**
* Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state appropriately.
*/
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
if (connection().isGoAway()) {
debugData.release();
return ctx.newSucceededFuture();
}
ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
ctx.flush();
connection().remote().goAwayReceived(lastStreamId);
return future;
}
/**
* Sends a {@code GO_AWAY} frame appropriate for the given exception.
*/
private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) {
if (connection().isGoAway()) {
return ctx.newSucceededFuture();
}
// The connection isn't alredy going away, send the GO_AWAY frame now to start
// the process.
int errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
ByteBuf debugData = Http2CodecUtil.toByteBuf(ctx, cause);
int lastKnownStream = connection().remote().lastStreamCreated();
return writeGoAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise());
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
@ -197,10 +374,8 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
}
decoder.decodeFrame(ctx, in, out);
} catch (Http2Exception e) {
lifecycleManager.onHttp2Exception(ctx, e);
} catch (Throwable e) {
lifecycleManager.onHttp2Exception(ctx, new Http2Exception(Http2Error.INTERNAL_ERROR, e.getMessage(), e));
onException(ctx, e);
}
}
@ -214,7 +389,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
prefaceSent = true;
if (!connection.isServer()) {
if (!connection().isServer()) {
// Clients must send the preface string as the first bytes on the connection.
ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
@ -276,4 +451,22 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder {
private static ByteBuf clientPrefaceString(Http2Connection connection) {
return connection.isServer() ? connectionPrefaceBuf() : null;
}
/**
* Closes the channel when the future completes.
*/
private static final class ClosingChannelFutureListener implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final ChannelPromise promise;
ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
this.promise = promise;
}
@Override
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
ctx.close(promise);
}
}
}

View File

@ -14,11 +14,8 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@ -26,37 +23,7 @@ import io.netty.channel.ChannelPromise;
* Manager for the life cycle of the HTTP/2 connection. Handles graceful shutdown of the channel,
* closing only after all of the streams have closed.
*/
public class Http2LifecycleManager {
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private ChannelFutureListener closeListener;
public Http2LifecycleManager(Http2Connection connection, Http2FrameWriter frameWriter) {
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
}
/**
* Handles the close processing on behalf of the {@link DelegatingHttp2ConnectionHandler}.
*/
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// Avoid NotYetConnectedException
if (!ctx.channel().isActive()) {
ctx.close(promise);
return;
}
ChannelFuture future = writeGoAway(ctx, null);
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (connection.numActiveStreams() == 0) {
future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else {
closeListener = new ClosingChannelFutureListener(ctx, promise);
}
}
public interface Http2LifecycleManager {
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
@ -65,17 +32,7 @@ public class Http2LifecycleManager {
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
*/
public void closeLocalSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
case OPEN:
stream.closeLocalSide();
break;
default:
closeStream(stream, future);
break;
}
}
void closeLocalSide(Http2Stream stream, ChannelFuture future);
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
@ -84,17 +41,7 @@ public class Http2LifecycleManager {
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
*/
public void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_REMOTE:
case OPEN:
stream.closeRemoteSide();
break;
default:
closeStream(stream, future);
break;
}
}
void closeRemoteSide(Http2Stream stream, ChannelFuture future);
/**
* Closes the given stream and adds a hook to close the channel after the given future
@ -103,109 +50,24 @@ public class Http2LifecycleManager {
* @param stream the stream to be closed.
* @param future the future after which to close the channel.
*/
public void closeStream(Http2Stream stream, ChannelFuture future) {
stream.close();
// If this connection is closing and there are no longer any
// active streams, close after the current operation completes.
if (closeListener != null && connection.numActiveStreams() == 0) {
future.addListener(closeListener);
}
}
void closeStream(Http2Stream stream, ChannelFuture future);
/**
* Processes the given exception. Depending on the type of exception, delegates to either
* {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or
* {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}.
* Writes a {@code RST_STREAM} frame to the remote endpoint and updates the connection state
* appropriately.
*/
public void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
if (e instanceof Http2StreamException) {
onStreamError(ctx, (Http2StreamException) e);
} else {
onConnectionError(ctx, e);
}
}
ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise);
/**
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until
* all streams are closed before shutting down the connection.
* Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state
* appropriately.
*/
private void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
writeGoAway(ctx, cause).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
}
ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData, ChannelPromise promise);
/**
* Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream.
* Processes the given exception.
*/
private void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise());
}
/**
* Writes a RST_STREAM frame to the remote endpoint and updates the connection state appropriately.
*/
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
Http2Stream stream = connection.stream(streamId);
ChannelFuture future = frameWriter.writeRstStream(ctx, streamId, errorCode, promise);
ctx.flush();
if (stream != null) {
stream.terminateSent();
closeStream(stream, promise);
}
return future;
}
/**
* Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state appropriately.
*/
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
if (connection.isGoAway()) {
debugData.release();
return ctx.newSucceededFuture();
}
ChannelFuture future = frameWriter.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
ctx.flush();
connection.remote().goAwayReceived(lastStreamId);
return future;
}
/**
* Sends a GO_AWAY frame appropriate for the given exception.
*/
private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) {
if (connection.isGoAway()) {
return ctx.newSucceededFuture();
}
// The connection isn't alredy going away, send the GO_AWAY frame now to start
// the process.
int errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
ByteBuf debugData = Http2CodecUtil.toByteBuf(ctx, cause);
int lastKnownStream = connection.remote().lastStreamCreated();
return writeGoAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise());
}
/**
* Closes the channel when the future completes.
*/
private static final class ClosingChannelFutureListener implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final ChannelPromise promise;
ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
this.promise = promise;
}
@Override
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
ctx.close(promise);
}
}
void onException(ChannelHandlerContext ctx, Throwable cause);
}

View File

@ -25,6 +25,11 @@ public class Http2StreamException extends Http2Exception {
this.streamId = streamId;
}
public Http2StreamException(int streamId, Http2Error error, String message, Throwable cause) {
super(error, message, cause);
this.streamId = streamId;
}
public Http2StreamException(int streamId, Http2Error error) {
super(error);
this.streamId = streamId;

View File

@ -61,7 +61,7 @@ public class DefaultHttp2ConnectionDecoderTest {
private static final int STREAM_ID = 1;
private static final int PUSH_STREAM_ID = 2;
private DefaultHttp2ConnectionDecoder decoder;
private Http2ConnectionDecoder decoder;
@Mock
private Http2Connection connection;
@ -143,9 +143,9 @@ public class DefaultHttp2ConnectionDecoderTest {
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
decoder =
new DefaultHttp2ConnectionDecoder(connection, reader, inboundFlow, encoder,
lifecycleManager, listener);
decoder = DefaultHttp2ConnectionDecoder.newBuilder().connection(connection)
.frameReader(reader).inboundFlow(inboundFlow).encoder(encoder)
.listener(listener).lifecycleManager(lifecycleManager).build();
// Simulate receiving the initial settings from the remote endpoint.
decode().onSettingsRead(ctx, new Http2Settings());

View File

@ -60,7 +60,7 @@ public class DefaultHttp2ConnectionEncoderTest {
private static final int STREAM_ID = 1;
private static final int PUSH_STREAM_ID = 2;
private DefaultHttp2ConnectionEncoder encoder;
private Http2ConnectionEncoder encoder;
@Mock
private Http2Connection connection;
@ -143,7 +143,9 @@ public class DefaultHttp2ConnectionEncoderTest {
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
encoder = new DefaultHttp2ConnectionEncoder(connection, writer, outboundFlow, lifecycleManager);
encoder = DefaultHttp2ConnectionEncoder.newBuilder().connection(connection)
.frameWriter(writer).outboundFlow(outboundFlow)
.lifecycleManager(lifecycleManager).build();
}
@Test

View File

@ -41,6 +41,7 @@ import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -79,7 +80,10 @@ public class Http2ConnectionHandlerTest {
private Http2Stream stream;
@Mock
private Http2LifecycleManager lifecycleManager;
private Http2ConnectionDecoder.Builder decoderBuilder;
@Mock
private Http2ConnectionEncoder.Builder encoderBuilder;
@Mock
private Http2ConnectionDecoder decoder;
@ -87,13 +91,24 @@ public class Http2ConnectionHandlerTest {
@Mock
private Http2ConnectionEncoder encoder;
@Mock
private Http2FrameWriter frameWriter;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
promise = new DefaultChannelPromise(channel);
when(encoderBuilder.build()).thenReturn(encoder);
when(decoderBuilder.build()).thenReturn(decoder);
when(encoder.connection()).thenReturn(connection);
when(decoder.connection()).thenReturn(connection);
when(encoder.frameWriter()).thenReturn(frameWriter);
when(frameWriter.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future);
when(channel.isActive()).thenReturn(true);
when(connection.remote()).thenReturn(remote);
when(connection.local()).thenReturn(local);
when(connection.activeStreams()).thenReturn(Collections.singletonList(stream));
doAnswer(new Answer<Http2Stream>() {
@Override
@ -120,7 +135,7 @@ public class Http2ConnectionHandlerTest {
}
private Http2ConnectionHandler newHandler() {
return new Http2ConnectionHandler(connection, decoder, encoder, lifecycleManager);
return new Http2ConnectionHandler(decoderBuilder, encoderBuilder);
}
@After
@ -147,7 +162,10 @@ public class Http2ConnectionHandlerTest {
when(connection.isServer()).thenReturn(true);
handler = newHandler();
handler.channelRead(ctx, copiedBuffer("BAD_PREFACE", UTF_8));
verify(lifecycleManager).onHttp2Exception(eq(ctx), any(Http2Exception.class));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verify(frameWriter).writeGoAway(eq(ctx), eq(0), eq((long) PROTOCOL_ERROR.code()),
captor.capture(), eq(promise));
captor.getValue().release();
}
@Test
@ -157,23 +175,20 @@ public class Http2ConnectionHandlerTest {
verify(decoder).decodeFrame(eq(ctx), any(ByteBuf.class), Matchers.<List<Object>>any());
}
@Test
public void closeShouldCallLifecycleManager() throws Exception {
handler.close(ctx, promise);
verify(lifecycleManager).close(eq(ctx), eq(promise));
}
@Test
public void channelInactiveShouldCloseStreams() throws Exception {
handler.channelInactive(ctx);
verify(lifecycleManager).closeStream(eq(stream), eq(future));
verify(stream).close();
}
@Test
public void http2ExceptionShouldCallLifecycleManager() throws Exception {
public void connectionErrorShouldStartShutdown() throws Exception {
Http2Exception e = new Http2Exception(PROTOCOL_ERROR);
when(remote.lastStreamCreated()).thenReturn(STREAM_ID);
handler.exceptionCaught(ctx, e);
verify(lifecycleManager).onHttp2Exception(eq(ctx), eq(e));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()),
captor.capture(), eq(promise));
captor.getValue().release();
}
}

View File

@ -28,6 +28,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
@ -142,6 +143,41 @@ public class Http2ConnectionRoundtripTest {
assertFalse(clientChannel.isOpen());
}
@Test
public void listenerExceptionShouldCloseConnection() throws Exception {
final Http2Headers headers = dummyHeaders();
doThrow(new RuntimeException("Fake Exception")).when(serverListener).onHeadersRead(
any(ChannelHandlerContext.class), eq(3), eq(headers), eq(0), eq((short) 16),
eq(false), eq(0), eq(false));
bootstrapEnv(1, 1);
// Create a latch to track when the close occurs.
final CountDownLatch closeLatch = new CountDownLatch(1);
clientChannel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
closeLatch.countDown();
}
});
// Create a single stream by sending a HEADERS frame to the server.
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
}
});
// Wait for the server to create the stream.
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
// Wait for the close to occur.
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
assertFalse(clientChannel.isOpen());
}
@Test
public void nonHttp2ExceptionInPipelineShouldNotCloseConnection() throws Exception {
bootstrapEnv(1, 1);