Allow override of HTTP/2 graceful connection shutdown.
Motivation: Currently the graceful shutdown of the HTTP/2 connection waits until there are no active streams. There may be use cases that buffer stream creation (due to limits imposed by MAX_CONCURRENT_STREAMS), in which case they may still want those streams to complete before closing. Modifications: Added a isGracefulShutdownComplete method to Http2ConnectionHandler, which can be overridden by a subclass. Result: Graceful shutdown logic can be overridden.
This commit is contained in:
parent
d9ca3a01f1
commit
36061c50b1
@ -22,6 +22,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
|
|||||||
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||||
import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
|
import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
|
||||||
|
import static io.netty.util.CharsetUtil.UTF_8;
|
||||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
import static java.lang.String.format;
|
import static java.lang.String.format;
|
||||||
|
|
||||||
@ -34,6 +35,7 @@ import io.netty.channel.ChannelPromise;
|
|||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
|
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
|
||||||
import io.netty.handler.codec.http2.Http2Exception.StreamException;
|
import io.netty.handler.codec.http2.Http2Exception.StreamException;
|
||||||
|
import io.netty.util.CharsetUtil;
|
||||||
import io.netty.util.concurrent.GenericFutureListener;
|
import io.netty.util.concurrent.GenericFutureListener;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
@ -344,7 +346,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
|
|
||||||
// If there are no active streams, close immediately after the send is complete.
|
// If there are no active streams, close immediately after the send is complete.
|
||||||
// Otherwise wait until all streams are inactive.
|
// Otherwise wait until all streams are inactive.
|
||||||
if (connection().numActiveStreams() == 0) {
|
if (isGracefulShutdownComplete()) {
|
||||||
future.addListener(new ClosingChannelFutureListener(ctx, promise));
|
future.addListener(new ClosingChannelFutureListener(ctx, promise));
|
||||||
} else {
|
} else {
|
||||||
closeListener = new ClosingChannelFutureListener(ctx, promise);
|
closeListener = new ClosingChannelFutureListener(ctx, promise);
|
||||||
@ -416,18 +418,18 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
stream.close();
|
stream.close();
|
||||||
|
|
||||||
future.addListener(new ChannelFutureListener() {
|
future.addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
// If this connection is closing and there are no longer any
|
// If this connection is closing and the graceful shutdown has completed, close the connection
|
||||||
// active streams, close after the current operation completes.
|
// once this operation completes.
|
||||||
if (closeListener != null && connection().numActiveStreams() == 0) {
|
if (closeListener != null && isGracefulShutdownComplete()) {
|
||||||
ChannelFutureListener closeListener = Http2ConnectionHandler.this.closeListener;
|
ChannelFutureListener closeListener = Http2ConnectionHandler.this.closeListener;
|
||||||
// This method could be called multiple times
|
// This method could be called multiple times
|
||||||
// and we don't want to notify the closeListener multiple times.
|
// and we don't want to notify the closeListener multiple times.
|
||||||
Http2ConnectionHandler.this.closeListener = null;
|
Http2ConnectionHandler.this.closeListener = null;
|
||||||
closeListener.operationComplete(future);
|
closeListener.operationComplete(future);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -450,6 +452,15 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
|
||||||
|
* if the graceful shutdown has completed and the connection can be safely closed. This implementation just
|
||||||
|
* guarantees that there are no active streams. Subclasses may override to provide additional checks.
|
||||||
|
*/
|
||||||
|
protected boolean isGracefulShutdownComplete() {
|
||||||
|
return connection().numActiveStreams() == 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
|
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
|
||||||
* streams are closed, the connection is shut down.
|
* streams are closed, the connection is shut down.
|
||||||
@ -527,18 +538,26 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
|
|
||||||
ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
|
ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
|
||||||
|
|
||||||
|
final String debugString = debugData.toString(UTF_8);
|
||||||
future.addListener(new GenericFutureListener<ChannelFuture>() {
|
future.addListener(new GenericFutureListener<ChannelFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
if (errorCode != NO_ERROR.code()) {
|
if (errorCode != NO_ERROR.code()) {
|
||||||
|
if (logger.isWarnEnabled()) {
|
||||||
|
logger.warn(
|
||||||
|
format("Sent GOAWAY: lastStreamId '%d', errorCode '%d', " +
|
||||||
|
"debugData '%s'. Forcing shutdown of the connection.",
|
||||||
|
lastStreamId, errorCode, debugString), future.cause());
|
||||||
|
}
|
||||||
ctx.close();
|
ctx.close();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
logger.error(
|
logger.error(
|
||||||
format("Sending GOAWAY failed: lastStreamId '%d', errorCode '%d', debugData '%s'.",
|
format("Sending GOAWAY failed: lastStreamId '%d', errorCode '%d', " +
|
||||||
lastStreamId, errorCode, debugData), future.cause());
|
"debugData '%s'. Forcing shutdown of the connection.",
|
||||||
|
lastStreamId, errorCode, debugString), future.cause());
|
||||||
}
|
}
|
||||||
ctx.close();
|
ctx.close();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user