Allow override of HTTP/2 graceful connection shutdown.

Motivation:

Currently the graceful shutdown of the HTTP/2 connection waits until there are no active streams. There may be use cases that buffer stream creation (due to limits imposed by MAX_CONCURRENT_STREAMS), in which case they may still want those streams to complete before closing.

Modifications:

Added a isGracefulShutdownComplete method to Http2ConnectionHandler, which can be overridden by a subclass.

Result:

Graceful shutdown logic can be overridden.
This commit is contained in:
nmittler 2015-05-06 12:04:55 -07:00
parent f6c2c99efc
commit 77d0042310

View File

@ -22,6 +22,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.String.format;
@ -35,6 +36,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -363,7 +365,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (connection().numActiveStreams() == 0) {
if (isGracefulShutdownComplete()) {
future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else {
closeListener = new ClosingChannelFutureListener(ctx, promise);
@ -455,18 +457,18 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
stream.close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// If this connection is closing and there are no longer any
// active streams, close after the current operation completes.
if (closeListener != null && connection().numActiveStreams() == 0) {
ChannelFutureListener closeListener = Http2ConnectionHandler.this.closeListener;
// This method could be called multiple times
// and we don't want to notify the closeListener multiple times.
Http2ConnectionHandler.this.closeListener = null;
closeListener.operationComplete(future);
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// If this connection is closing and the graceful shutdown has completed, close the connection
// once this operation completes.
if (closeListener != null && isGracefulShutdownComplete()) {
ChannelFutureListener closeListener = Http2ConnectionHandler.this.closeListener;
// This method could be called multiple times
// and we don't want to notify the closeListener multiple times.
Http2ConnectionHandler.this.closeListener = null;
closeListener.operationComplete(future);
}
}
}
});
}
@ -489,6 +491,15 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
ctx.flush();
}
/**
* Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
* if the graceful shutdown has completed and the connection can be safely closed. This implementation just
* guarantees that there are no active streams. Subclasses may override to provide additional checks.
*/
protected boolean isGracefulShutdownComplete() {
return connection().numActiveStreams() == 0;
}
/**
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
* streams are closed, the connection is shut down.
@ -566,18 +577,26 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
final String debugString = debugData.toString(UTF_8);
future.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (errorCode != NO_ERROR.code()) {
if (logger.isWarnEnabled()) {
logger.warn(
format("Sent GOAWAY: lastStreamId '%d', errorCode '%d', " +
"debugData '%s'. Forcing shutdown of the connection.",
lastStreamId, errorCode, debugString), future.cause());
}
ctx.close();
}
} else {
if (logger.isErrorEnabled()) {
logger.error(
format("Sending GOAWAY failed: lastStreamId '%d', errorCode '%d', debugData '%s'.",
lastStreamId, errorCode, debugData), future.cause());
format("Sending GOAWAY failed: lastStreamId '%d', errorCode '%d', " +
"debugData '%s'. Forcing shutdown of the connection.",
lastStreamId, errorCode, debugString), future.cause());
}
ctx.close();
}