HTTP/2 Graceful Shutdown Timeout

Motivation:
If any streams are still active the graceful shutdown code will wait until they are all closed before the connection is closed. In some situations this event may never occur, and thus a timeout should be supported so the socket can be closed even if all streams haven't been closed.

Modifications:
- Add a configurable timeout for when the graceful shutdown process is attempted.
- Update unit tests to be faster, and use this graceful timeout

Result:
Local endpoint can protect from local or remote issues which prevent the channel from being closed during the graceful shutdown process.
This commit is contained in:
Scott Mitchell 2015-08-18 10:36:16 -07:00
parent 5b1ae35957
commit 6a4a5f5571
9 changed files with 276 additions and 123 deletions

View File

@ -14,6 +14,23 @@
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.netty.buffer.ByteBufUtil.hexDump;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
@ -28,19 +45,8 @@ import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;
import static java.lang.String.format;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.List;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Provides the default implementation for processing inbound frame events and delegates to a
@ -53,11 +59,14 @@ import java.util.List;
*/
public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
private static final long DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS = MILLISECONDS.convert(30, SECONDS);
private final Http2ConnectionDecoder decoder;
private final Http2ConnectionEncoder encoder;
private final Http2Settings initialSettings;
private ChannelFutureListener closeListener;
private BaseDecoder byteDecoder;
private long gracefulShutdownTimeoutMillis = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
this(new DefaultHttp2Connection(server), listener);
@ -113,6 +122,28 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
}
}
/**
* Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
* the connection during the graceful shutdown process.
*/
public long gracefulShutdownTimeoutMillis() {
return gracefulShutdownTimeoutMillis;
}
/**
* Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
* the connection during the graceful shutdown process.
* @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
* streams to be closed before closing the connection during the graceful shutdown process.
*/
public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
if (gracefulShutdownTimeoutMillis < 0) {
throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
" (expected: >= 0)");
}
this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
}
public Http2Connection connection() {
return encoder.connection();
}
@ -420,13 +451,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
ChannelFuture future = goAway(ctx, null);
ctx.flush();
doGracefulShutdown(ctx, future, promise);
}
private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, ChannelPromise promise) {
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (isGracefulShutdownComplete()) {
future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else {
closeListener = new ClosingChannelFutureListener(ctx, promise);
closeListener = new ClosingChannelFutureListener(ctx, promise,
gracefulShutdownTimeoutMillis, MILLISECONDS);
}
}
@ -551,7 +586,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
if (http2Ex == null) {
http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
}
goAway(ctx, http2Ex).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
ChannelPromise promise = ctx.newPromise();
ChannelFuture future = goAway(ctx, http2Ex);
switch (http2Ex.shutdownHint()) {
case GRACEFUL_SHUTDOWN:
doGracefulShutdown(ctx, future, promise);
break;
default:
future.addListener(new ClosingChannelFutureListener(ctx, promise));
break;
}
}
/**
@ -716,14 +761,31 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
private static final class ClosingChannelFutureListener implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final ChannelPromise promise;
private final ScheduledFuture<?> timeoutTask;
ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
this.promise = promise;
timeoutTask = null;
}
ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
long timeout, TimeUnit unit) {
this.ctx = ctx;
this.promise = promise;
timeoutTask = ctx.executor().schedule(new OneTimeTask() {
@Override
public void run() {
ctx.close(promise);
}
}, timeout, unit);
}
@Override
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
if (timeoutTask != null) {
timeoutTask.cancel(false);
}
ctx.close(promise);
}
}

View File

@ -15,37 +15,61 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* Exception thrown when an HTTP/2 error was encountered.
*/
public class Http2Exception extends Exception {
private static final long serialVersionUID = -6943456574080986447L;
private static final long serialVersionUID = -6941186345430164209L;
private final Http2Error error;
private final ShutdownHint shutdownHint;
public Http2Exception(Http2Error error) {
this.error = error;
this(error, ShutdownHint.HARD_SHUTDOWN);
}
public Http2Exception(Http2Error error, ShutdownHint shutdownHint) {
this.error = checkNotNull(error, "error");
this.shutdownHint = checkNotNull(shutdownHint, "shutdownHint");
}
public Http2Exception(Http2Error error, String message) {
this(error, message, ShutdownHint.HARD_SHUTDOWN);
}
public Http2Exception(Http2Error error, String message, ShutdownHint shutdownHint) {
super(message);
this.error = error;
this.error = checkNotNull(error, "error");
this.shutdownHint = checkNotNull(shutdownHint, "shutdownHint");
}
public Http2Exception(Http2Error error, String message, Throwable cause) {
this(error, message, cause, ShutdownHint.HARD_SHUTDOWN);
}
public Http2Exception(Http2Error error, String message, Throwable cause, ShutdownHint shutdownHint) {
super(message, cause);
this.error = error;
this.error = checkNotNull(error, "error");
this.shutdownHint = checkNotNull(shutdownHint, "shutdownHint");
}
public Http2Error error() {
return error;
}
/**
* Provide a hint as to what type of shutdown should be executed. Note this hint may be ignored.
*/
public ShutdownHint shutdownHint() {
return shutdownHint;
}
/**
* Use if an error has occurred which can not be isolated to a single stream, but instead applies
* to the entire connection.
@ -142,11 +166,30 @@ public class Http2Exception extends Exception {
return isStreamError(e) ? ((StreamException) e).streamId() : CONNECTION_STREAM_ID;
}
/**
* Provides a hint as to if shutdown is justified, what type of shutdown should be executed.
*/
public static enum ShutdownHint {
/**
* Do not shutdown the underlying channel.
*/
NO_SHUTDOWN,
/**
* Attempt to execute a "graceful" shutdown. The definition of "graceful" is left to the implementation.
* An example of "graceful" would be wait for some amount of time until all active streams are closed.
*/
GRACEFUL_SHUTDOWN,
/**
* Close the channel immediately after a {@code GOAWAY} is sent.
*/
HARD_SHUTDOWN;
}
/**
* Used when a stream creation attempt fails but may be because the stream was previously closed.
*/
public static final class ClosedStreamCreationException extends Http2Exception {
private static final long serialVersionUID = -1911637707391622439L;
private static final long serialVersionUID = -6746542974372246206L;
public ClosedStreamCreationException(Http2Error error) {
super(error);
@ -165,16 +208,16 @@ public class Http2Exception extends Exception {
* Represents an exception that can be isolated to a single stream (as opposed to the entire connection).
*/
public static final class StreamException extends Http2Exception {
private static final long serialVersionUID = 462766352505067095L;
private static final long serialVersionUID = 602472544416984384L;
private final int streamId;
StreamException(int streamId, Http2Error error, String message) {
super(error, message);
super(error, message, ShutdownHint.NO_SHUTDOWN);
this.streamId = streamId;
}
StreamException(int streamId, Http2Error error, String message, Throwable cause) {
super(error, message, cause);
super(error, message, cause, ShutdownHint.NO_SHUTDOWN);
this.streamId = streamId;
}
@ -187,11 +230,11 @@ public class Http2Exception extends Exception {
* Provides the ability to handle multiple stream exceptions with one throw statement.
*/
public static final class CompositeStreamException extends Http2Exception implements Iterable<StreamException> {
private static final long serialVersionUID = -434398146294199889L;
private static final long serialVersionUID = 7091134858213711015L;
private final List<StreamException> exceptions;
public CompositeStreamException(Http2Error error, int initialCapacity) {
super(error);
super(error, ShutdownHint.NO_SHUTDOWN);
exceptions = new ArrayList<StreamException>(initialCapacity);
}

View File

@ -24,10 +24,10 @@ public class Http2NoMoreStreamIdsException extends Http2Exception {
private static final String ERROR_MESSAGE = "No more streams can be created on this connection";
public Http2NoMoreStreamIdsException() {
super(PROTOCOL_ERROR, ERROR_MESSAGE);
super(PROTOCOL_ERROR, ERROR_MESSAGE, ShutdownHint.GRACEFUL_SHUTDOWN);
}
public Http2NoMoreStreamIdsException(Throwable cause) {
super(PROTOCOL_ERROR, ERROR_MESSAGE, cause);
super(PROTOCOL_ERROR, ERROR_MESSAGE, cause, ShutdownHint.GRACEFUL_SHUTDOWN);
}
}

View File

@ -118,7 +118,14 @@ public class DataCompressionHttp2Test {
@After
public void teardown() throws InterruptedException {
serverChannel.close().sync();
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -315,6 +322,9 @@ public class DataCompressionHttp2Test {
new DelegatingDecompressorFrameListener(clientConnection,
clientListener));
clientHandler = new Http2ConnectionHandler(decoder, clientEncoder);
// By default tests don't wait for server to gracefully shutdown streams
clientHandler.gracefulShutdownTimeoutMillis(0);
p.addLast(clientHandler);
}
});

View File

@ -15,6 +15,30 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.List;
import static io.netty.buffer.Unpooled.copiedBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
@ -38,29 +62,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.List;
/**
* Tests for {@link Http2ConnectionHandler}
*/
@ -95,6 +96,9 @@ public class Http2ConnectionHandlerTest {
@Mock
private ChannelHandlerContext ctx;
@Mock
private EventExecutor executor;
@Mock
private Channel channel;
@ -172,6 +176,7 @@ public class Http2ConnectionHandlerTest {
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
when(ctx.executor()).thenReturn(executor);
}
private Http2ConnectionHandler newHandler() throws Exception {

View File

@ -28,13 +28,13 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@ -45,7 +45,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -102,7 +101,14 @@ public class Http2ConnectionRoundtripTest {
@After
public void teardown() throws Exception {
serverChannel.close().sync();
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -127,7 +133,7 @@ public class Http2ConnectionRoundtripTest {
}
});
assertTrue(requestLatch.await(5, SECONDS));
assertTrue(requestLatch.await(2, SECONDS));
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers),
eq(0), eq(weight), eq(false), eq(0), eq(true));
// Wait for some time to see if a go_away or reset frame will be received.
@ -138,6 +144,9 @@ public class Http2ConnectionRoundtripTest {
anyLong(), any(ByteBuf.class));
verify(serverListener, never()).onRstStreamRead(any(ChannelHandlerContext.class), anyInt(),
anyLong());
// The server will not respond, and so don't wait for graceful shutdown
http2Client.gracefulShutdownTimeoutMillis(0);
}
@Test
@ -255,14 +264,20 @@ public class Http2ConnectionRoundtripTest {
});
// The close should NOT occur.
assertFalse(closeLatch.await(5, SECONDS));
assertFalse(closeLatch.await(2, SECONDS));
assertTrue(clientChannel.isOpen());
// Set the timeout very low because we know graceful shutdown won't complete
http2Client.gracefulShutdownTimeoutMillis(0);
}
@Test
public void noMoreStreamIdsShouldSendGoAway() throws Exception {
bootstrapEnv(1, 1, 3, 1, 1);
// Don't wait for the server to close streams
http2Client.gracefulShutdownTimeoutMillis(0);
// Create a single stream by sending a HEADERS frame to the server.
final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, new Http2Runnable() {
@ -322,7 +337,7 @@ public class Http2ConnectionRoundtripTest {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
false, newPromise());
http2Client.encoder().writeData(ctx(), 3, data.retain(), 0, false, newPromise());
http2Client.encoder().writeData(ctx(), 3, data.duplicate().retain(), 0, false, newPromise());
// Write trailers.
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
@ -347,6 +362,8 @@ public class Http2ConnectionRoundtripTest {
byte[] received = out.toByteArray();
assertArrayEquals(data.array(), received);
} finally {
// Don't wait for server to close streams
http2Client.gracefulShutdownTimeoutMillis(0);
data.release();
out.close();
}
@ -434,6 +451,8 @@ public class Http2ConnectionRoundtripTest {
assertEquals(pingMsg, receivedPing);
}
} finally {
// Don't wait for server to close streams
http2Client.gracefulShutdownTimeoutMillis(0);
data.release();
pingData.release();
}
@ -454,8 +473,8 @@ public class Http2ConnectionRoundtripTest {
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
sb.group(new DefaultEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
@ -467,8 +486,8 @@ public class Http2ConnectionRoundtripTest {
}
});
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.group(new DefaultEventLoopGroup());
cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
@ -477,10 +496,9 @@ public class Http2ConnectionRoundtripTest {
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
serverChannel = sb.bind(new LocalAddress("Http2ConnectionRoundtripTest")).sync().channel();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
http2Client = clientChannel.pipeline().get(Http2ConnectionHandler.class);

View File

@ -25,12 +25,12 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@ -40,7 +40,6 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -83,7 +82,14 @@ public class Http2FrameRoundtripTest {
@After
public void teardown() throws Exception {
serverChannel.close().sync();
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
@ -176,7 +182,7 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, data.retain(), newPromise());
frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, data.duplicate().retain(), newPromise());
ctx().flush();
}
});
@ -207,7 +213,7 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePing(ctx(), true, data.retain(), newPromise());
frameWriter.writePing(ctx(), true, data.duplicate().retain(), newPromise());
ctx().flush();
}
});
@ -323,7 +329,7 @@ public class Http2FrameRoundtripTest {
public void run() {
for (int i = 1; i < numStreams + 1; ++i) {
frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false, 0, false, newPromise());
frameWriter.writeData(ctx(), i, data.retain(), 0, true, newPromise());
frameWriter.writeData(ctx(), i, data.duplicate().retain(), 0, true, newPromise());
ctx().flush();
}
}
@ -355,8 +361,8 @@ public class Http2FrameRoundtripTest {
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
sb.group(new DefaultEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
@ -366,8 +372,8 @@ public class Http2FrameRoundtripTest {
}
});
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.group(new DefaultEventLoopGroup());
cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
@ -376,10 +382,9 @@ public class Http2FrameRoundtripTest {
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
serverChannel = sb.bind(new LocalAddress("Http2FrameRoundtripTest")).sync().channel();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
}

View File

@ -24,9 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
@ -38,7 +39,6 @@ import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
import io.netty.util.AsciiString;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@ -48,7 +48,6 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -99,7 +98,14 @@ public class HttpToHttp2ConnectionHandlerTest {
@After
public void teardown() throws Exception {
serverChannel.close().sync();
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -312,8 +318,8 @@ public class HttpToHttp2ConnectionHandlerTest {
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
sb.group(new DefaultEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
@ -324,20 +330,21 @@ public class HttpToHttp2ConnectionHandlerTest {
}
});
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.group(new DefaultEventLoopGroup());
cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpToHttp2ConnectionHandler(false, clientListener));
HttpToHttp2ConnectionHandler handler = new HttpToHttp2ConnectionHandler(false, clientListener);
handler.gracefulShutdownTimeoutMillis(0);
p.addLast(handler);
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
serverChannel = sb.bind(new LocalAddress("HttpToHttp2ConnectionHandlerTest")).sync().channel();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
}

View File

@ -25,10 +25,11 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
@ -44,7 +45,6 @@ import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@ -53,7 +53,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -116,8 +115,8 @@ public class InboundHttp2ToHttpAdapterTest {
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
sb.group(new DefaultEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
@ -153,8 +152,8 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.group(new DefaultEventLoopGroup());
cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
@ -173,10 +172,9 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
serverChannel = sb.bind(new LocalAddress("InboundHttp2ToHttpAdapterTest")).sync().channel();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
}
@ -185,7 +183,14 @@ public class InboundHttp2ToHttpAdapterTest {
public void teardown() throws Exception {
cleanupCapturedRequests();
cleanupCapturedResponses();
serverChannel.close().sync();
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -194,8 +199,6 @@ public class InboundHttp2ToHttpAdapterTest {
clientGroup.sync();
clientDelegator = null;
serverDelegator = null;
clientChannel = null;
serverChannel = null;
serverConnectedChannel = null;
}
@ -265,7 +268,7 @@ public class InboundHttp2ToHttpAdapterTest {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
@ -410,7 +413,7 @@ public class InboundHttp2ToHttpAdapterTest {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, true, newPromiseClient());
ctxClient().flush();
}
@ -455,8 +458,8 @@ public class InboundHttp2ToHttpAdapterTest {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
frameWriter.writePriority(ctxClient(), 5, 3, (short) 123, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
@ -506,8 +509,8 @@ public class InboundHttp2ToHttpAdapterTest {
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writePriority(ctxClient(), 5, 3, (short) 222, false, newPromiseClient());
ctxClient().flush();
}
@ -577,8 +580,8 @@ public class InboundHttp2ToHttpAdapterTest {
public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2Headers, 0, false, newPromiseServer());
frameWriter.writePushPromise(ctxServer(), 3, 5, http2Headers2, 0, newPromiseServer());
frameWriter.writeData(ctxServer(), 3, content.retain(), 0, true, newPromiseServer());
frameWriter.writeData(ctxServer(), 5, content2.retain(), 0, true, newPromiseServer());
frameWriter.writeData(ctxServer(), 3, content.duplicate().retain(), 0, true, newPromiseServer());
frameWriter.writeData(ctxServer(), 5, content2.duplicate().retain(), 0, true, newPromiseServer());
ctxServer().flush();
}
});
@ -655,7 +658,7 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeData(ctxClient(), 3, payload.retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, payload.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});