HTTP/2 Graceful Shutdown Timeout

Motivation:
If any streams are still active the graceful shutdown code will wait until they are all closed before the connection is closed. In some situations this event may never occur, and thus a timeout should be supported so the socket can be closed even if all streams haven't been closed.

Modifications:
- Add a configurable timeout for when the graceful shutdown process is attempted.
- Update unit tests to be faster, and use this graceful timeout

Result:
Local endpoint can protect from local or remote issues which prevent the channel from being closed during the graceful shutdown process.
This commit is contained in:
Scott Mitchell 2015-08-18 10:36:16 -07:00
parent ce6931e0e5
commit 08477eaf03
9 changed files with 278 additions and 125 deletions

View File

@ -14,6 +14,25 @@
*/ */
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.netty.buffer.ByteBufUtil.hexDump; import static io.netty.buffer.ByteBufUtil.hexDump;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
@ -28,21 +47,8 @@ import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min; import static java.lang.Math.min;
import static java.lang.String.format; import static java.lang.String.format;
import io.netty.buffer.ByteBuf; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import io.netty.buffer.ByteBufUtil; import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.util.List;
/** /**
* Provides the default implementation for processing inbound frame events and delegates to a * Provides the default implementation for processing inbound frame events and delegates to a
@ -56,11 +62,14 @@ import java.util.List;
public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager, public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
ChannelOutboundHandler { ChannelOutboundHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
private static final long DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS = MILLISECONDS.convert(30, SECONDS);
private final Http2ConnectionDecoder decoder; private final Http2ConnectionDecoder decoder;
private final Http2ConnectionEncoder encoder; private final Http2ConnectionEncoder encoder;
private final Http2Settings initialSettings; private final Http2Settings initialSettings;
private ChannelFutureListener closeListener; private ChannelFutureListener closeListener;
private BaseDecoder byteDecoder; private BaseDecoder byteDecoder;
private long gracefulShutdownTimeoutMillis = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
public Http2ConnectionHandler(boolean server, Http2FrameListener listener) { public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
this(new DefaultHttp2Connection(server), listener); this(new DefaultHttp2Connection(server), listener);
@ -116,6 +125,28 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
} }
} }
/**
* Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
* the connection during the graceful shutdown process.
*/
public long gracefulShutdownTimeoutMillis() {
return gracefulShutdownTimeoutMillis;
}
/**
* Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
* the connection during the graceful shutdown process.
* @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
* streams to be closed before closing the connection during the graceful shutdown process.
*/
public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
if (gracefulShutdownTimeoutMillis < 0) {
throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
" (expected: >= 0)");
}
this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
}
public Http2Connection connection() { public Http2Connection connection() {
return encoder.connection(); return encoder.connection();
} }
@ -439,13 +470,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
ChannelFuture future = goAway(ctx, null); ChannelFuture future = goAway(ctx, null);
ctx.flush(); ctx.flush();
doGracefulShutdown(ctx, future, promise);
}
private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, ChannelPromise promise) {
// If there are no active streams, close immediately after the send is complete. // If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive. // Otherwise wait until all streams are inactive.
if (isGracefulShutdownComplete()) { if (isGracefulShutdownComplete()) {
future.addListener(new ClosingChannelFutureListener(ctx, promise)); future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else { } else {
closeListener = new ClosingChannelFutureListener(ctx, promise); closeListener = new ClosingChannelFutureListener(ctx, promise,
gracefulShutdownTimeoutMillis, MILLISECONDS);
} }
} }
@ -585,7 +620,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
if (http2Ex == null) { if (http2Ex == null) {
http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause); http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
} }
goAway(ctx, http2Ex).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
ChannelPromise promise = ctx.newPromise();
ChannelFuture future = goAway(ctx, http2Ex);
switch (http2Ex.shutdownHint()) {
case GRACEFUL_SHUTDOWN:
doGracefulShutdown(ctx, future, promise);
break;
default:
future.addListener(new ClosingChannelFutureListener(ctx, promise));
break;
}
} }
/** /**
@ -750,14 +795,31 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
private static final class ClosingChannelFutureListener implements ChannelFutureListener { private static final class ClosingChannelFutureListener implements ChannelFutureListener {
private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
private final ChannelPromise promise; private final ChannelPromise promise;
private final ScheduledFuture<?> timeoutTask;
ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) { ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx; this.ctx = ctx;
this.promise = promise; this.promise = promise;
timeoutTask = null;
}
ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
long timeout, TimeUnit unit) {
this.ctx = ctx;
this.promise = promise;
timeoutTask = ctx.executor().schedule(new OneTimeTask() {
@Override
public void run() {
ctx.close(promise);
}
}, timeout, unit);
} }
@Override @Override
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception { public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
if (timeoutTask != null) {
timeoutTask.cancel(false);
}
ctx.close(promise); ctx.close(promise);
} }
} }

View File

@ -15,37 +15,61 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/** /**
* Exception thrown when an HTTP/2 error was encountered. * Exception thrown when an HTTP/2 error was encountered.
*/ */
public class Http2Exception extends Exception { public class Http2Exception extends Exception {
private static final long serialVersionUID = -6943456574080986447L; private static final long serialVersionUID = -6941186345430164209L;
private final Http2Error error; private final Http2Error error;
private final ShutdownHint shutdownHint;
public Http2Exception(Http2Error error) { public Http2Exception(Http2Error error) {
this.error = error; this(error, ShutdownHint.HARD_SHUTDOWN);
}
public Http2Exception(Http2Error error, ShutdownHint shutdownHint) {
this.error = checkNotNull(error, "error");
this.shutdownHint = checkNotNull(shutdownHint, "shutdownHint");
} }
public Http2Exception(Http2Error error, String message) { public Http2Exception(Http2Error error, String message) {
this(error, message, ShutdownHint.HARD_SHUTDOWN);
}
public Http2Exception(Http2Error error, String message, ShutdownHint shutdownHint) {
super(message); super(message);
this.error = error; this.error = checkNotNull(error, "error");
this.shutdownHint = checkNotNull(shutdownHint, "shutdownHint");
} }
public Http2Exception(Http2Error error, String message, Throwable cause) { public Http2Exception(Http2Error error, String message, Throwable cause) {
this(error, message, cause, ShutdownHint.HARD_SHUTDOWN);
}
public Http2Exception(Http2Error error, String message, Throwable cause, ShutdownHint shutdownHint) {
super(message, cause); super(message, cause);
this.error = error; this.error = checkNotNull(error, "error");
this.shutdownHint = checkNotNull(shutdownHint, "shutdownHint");
} }
public Http2Error error() { public Http2Error error() {
return error; return error;
} }
/**
* Provide a hint as to what type of shutdown should be executed. Note this hint may be ignored.
*/
public ShutdownHint shutdownHint() {
return shutdownHint;
}
/** /**
* Use if an error has occurred which can not be isolated to a single stream, but instead applies * Use if an error has occurred which can not be isolated to a single stream, but instead applies
* to the entire connection. * to the entire connection.
@ -142,11 +166,30 @@ public class Http2Exception extends Exception {
return isStreamError(e) ? ((StreamException) e).streamId() : CONNECTION_STREAM_ID; return isStreamError(e) ? ((StreamException) e).streamId() : CONNECTION_STREAM_ID;
} }
/**
* Provides a hint as to if shutdown is justified, what type of shutdown should be executed.
*/
public static enum ShutdownHint {
/**
* Do not shutdown the underlying channel.
*/
NO_SHUTDOWN,
/**
* Attempt to execute a "graceful" shutdown. The definition of "graceful" is left to the implementation.
* An example of "graceful" would be wait for some amount of time until all active streams are closed.
*/
GRACEFUL_SHUTDOWN,
/**
* Close the channel immediately after a {@code GOAWAY} is sent.
*/
HARD_SHUTDOWN;
}
/** /**
* Used when a stream creation attempt fails but may be because the stream was previously closed. * Used when a stream creation attempt fails but may be because the stream was previously closed.
*/ */
public static final class ClosedStreamCreationException extends Http2Exception { public static final class ClosedStreamCreationException extends Http2Exception {
private static final long serialVersionUID = -1911637707391622439L; private static final long serialVersionUID = -6746542974372246206L;
public ClosedStreamCreationException(Http2Error error) { public ClosedStreamCreationException(Http2Error error) {
super(error); super(error);
@ -165,16 +208,16 @@ public class Http2Exception extends Exception {
* Represents an exception that can be isolated to a single stream (as opposed to the entire connection). * Represents an exception that can be isolated to a single stream (as opposed to the entire connection).
*/ */
public static final class StreamException extends Http2Exception { public static final class StreamException extends Http2Exception {
private static final long serialVersionUID = 462766352505067095L; private static final long serialVersionUID = 602472544416984384L;
private final int streamId; private final int streamId;
StreamException(int streamId, Http2Error error, String message) { StreamException(int streamId, Http2Error error, String message) {
super(error, message); super(error, message, ShutdownHint.NO_SHUTDOWN);
this.streamId = streamId; this.streamId = streamId;
} }
StreamException(int streamId, Http2Error error, String message, Throwable cause) { StreamException(int streamId, Http2Error error, String message, Throwable cause) {
super(error, message, cause); super(error, message, cause, ShutdownHint.NO_SHUTDOWN);
this.streamId = streamId; this.streamId = streamId;
} }
@ -187,11 +230,11 @@ public class Http2Exception extends Exception {
* Provides the ability to handle multiple stream exceptions with one throw statement. * Provides the ability to handle multiple stream exceptions with one throw statement.
*/ */
public static final class CompositeStreamException extends Http2Exception implements Iterable<StreamException> { public static final class CompositeStreamException extends Http2Exception implements Iterable<StreamException> {
private static final long serialVersionUID = -434398146294199889L; private static final long serialVersionUID = 7091134858213711015L;
private final List<StreamException> exceptions; private final List<StreamException> exceptions;
public CompositeStreamException(Http2Error error, int initialCapacity) { public CompositeStreamException(Http2Error error, int initialCapacity) {
super(error); super(error, ShutdownHint.NO_SHUTDOWN);
exceptions = new ArrayList<StreamException>(initialCapacity); exceptions = new ArrayList<StreamException>(initialCapacity);
} }

View File

@ -24,10 +24,10 @@ public class Http2NoMoreStreamIdsException extends Http2Exception {
private static final String ERROR_MESSAGE = "No more streams can be created on this connection"; private static final String ERROR_MESSAGE = "No more streams can be created on this connection";
public Http2NoMoreStreamIdsException() { public Http2NoMoreStreamIdsException() {
super(PROTOCOL_ERROR, ERROR_MESSAGE); super(PROTOCOL_ERROR, ERROR_MESSAGE, ShutdownHint.GRACEFUL_SHUTDOWN);
} }
public Http2NoMoreStreamIdsException(Throwable cause) { public Http2NoMoreStreamIdsException(Throwable cause) {
super(PROTOCOL_ERROR, ERROR_MESSAGE, cause); super(PROTOCOL_ERROR, ERROR_MESSAGE, cause, ShutdownHint.GRACEFUL_SHUTDOWN);
} }
} }

View File

@ -118,7 +118,14 @@ public class DataCompressionHttp2Test {
@After @After
public void teardown() throws InterruptedException { public void teardown() throws InterruptedException {
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync(); serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -315,6 +322,9 @@ public class DataCompressionHttp2Test {
new DelegatingDecompressorFrameListener(clientConnection, new DelegatingDecompressorFrameListener(clientConnection,
clientListener)); clientListener));
clientHandler = new Http2ConnectionHandler(decoder, clientEncoder); clientHandler = new Http2ConnectionHandler(decoder, clientEncoder);
// By default tests don't wait for server to gracefully shutdown streams
clientHandler.gracefulShutdownTimeoutMillis(0);
p.addLast(clientHandler); p.addLast(clientHandler);
} }
}); });

View File

@ -15,6 +15,30 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.List;
import static io.netty.buffer.Unpooled.copiedBuffer; import static io.netty.buffer.Unpooled.copiedBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
@ -38,29 +62,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.List;
/** /**
* Tests for {@link Http2ConnectionHandler} * Tests for {@link Http2ConnectionHandler}
*/ */
@ -95,6 +96,9 @@ public class Http2ConnectionHandlerTest {
@Mock @Mock
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
@Mock
private EventExecutor executor;
@Mock @Mock
private Channel channel; private Channel channel;
@ -172,6 +176,7 @@ public class Http2ConnectionHandlerTest {
when(ctx.newSucceededFuture()).thenReturn(future); when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise); when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future); when(ctx.write(any())).thenReturn(future);
when(ctx.executor()).thenReturn(executor);
} }
private Http2ConnectionHandler newHandler() throws Exception { private Http2ConnectionHandler newHandler() throws Exception {

View File

@ -28,13 +28,13 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.local.LocalAddress;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown; import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -45,7 +45,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -102,7 +101,14 @@ public class Http2ConnectionRoundtripTest {
@After @After
public void teardown() throws Exception { public void teardown() throws Exception {
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync(); serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -127,7 +133,7 @@ public class Http2ConnectionRoundtripTest {
} }
}); });
assertTrue(requestLatch.await(5, SECONDS)); assertTrue(requestLatch.await(2, SECONDS));
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers), verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers),
eq(0), eq(weight), eq(false), eq(0), eq(true)); eq(0), eq(weight), eq(false), eq(0), eq(true));
// Wait for some time to see if a go_away or reset frame will be received. // Wait for some time to see if a go_away or reset frame will be received.
@ -138,6 +144,9 @@ public class Http2ConnectionRoundtripTest {
anyLong(), any(ByteBuf.class)); anyLong(), any(ByteBuf.class));
verify(serverListener, never()).onRstStreamRead(any(ChannelHandlerContext.class), anyInt(), verify(serverListener, never()).onRstStreamRead(any(ChannelHandlerContext.class), anyInt(),
anyLong()); anyLong());
// The server will not respond, and so don't wait for graceful shutdown
http2Client.gracefulShutdownTimeoutMillis(0);
} }
@Test @Test
@ -255,14 +264,20 @@ public class Http2ConnectionRoundtripTest {
}); });
// The close should NOT occur. // The close should NOT occur.
assertFalse(closeLatch.await(5, SECONDS)); assertFalse(closeLatch.await(2, SECONDS));
assertTrue(clientChannel.isOpen()); assertTrue(clientChannel.isOpen());
// Set the timeout very low because we know graceful shutdown won't complete
http2Client.gracefulShutdownTimeoutMillis(0);
} }
@Test @Test
public void noMoreStreamIdsShouldSendGoAway() throws Exception { public void noMoreStreamIdsShouldSendGoAway() throws Exception {
bootstrapEnv(1, 1, 3, 1, 1); bootstrapEnv(1, 1, 3, 1, 1);
// Don't wait for the server to close streams
http2Client.gracefulShutdownTimeoutMillis(0);
// Create a single stream by sending a HEADERS frame to the server. // Create a single stream by sending a HEADERS frame to the server.
final Http2Headers headers = dummyHeaders(); final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@ -322,7 +337,7 @@ public class Http2ConnectionRoundtripTest {
public void run() throws Http2Exception { public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
false, newPromise()); false, newPromise());
http2Client.encoder().writeData(ctx(), 3, data.retain(), 0, false, newPromise()); http2Client.encoder().writeData(ctx(), 3, data.duplicate().retain(), 0, false, newPromise());
// Write trailers. // Write trailers.
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
@ -347,6 +362,8 @@ public class Http2ConnectionRoundtripTest {
byte[] received = out.toByteArray(); byte[] received = out.toByteArray();
assertArrayEquals(data.array(), received); assertArrayEquals(data.array(), received);
} finally { } finally {
// Don't wait for server to close streams
http2Client.gracefulShutdownTimeoutMillis(0);
data.release(); data.release();
out.close(); out.close();
} }
@ -434,6 +451,8 @@ public class Http2ConnectionRoundtripTest {
assertEquals(pingMsg, receivedPing); assertEquals(pingMsg, receivedPing);
} }
} finally { } finally {
// Don't wait for server to close streams
http2Client.gracefulShutdownTimeoutMillis(0);
data.release(); data.release();
pingData.release(); pingData.release();
} }
@ -454,8 +473,8 @@ public class Http2ConnectionRoundtripTest {
sb = new ServerBootstrap(); sb = new ServerBootstrap();
cb = new Bootstrap(); cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); sb.group(new DefaultEventLoopGroup());
sb.channel(NioServerSocketChannel.class); sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
@ -467,8 +486,8 @@ public class Http2ConnectionRoundtripTest {
} }
}); });
cb.group(new NioEventLoopGroup()); cb.group(new DefaultEventLoopGroup());
cb.channel(NioSocketChannel.class); cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
@ -477,10 +496,9 @@ public class Http2ConnectionRoundtripTest {
} }
}); });
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel(); serverChannel = sb.bind(new LocalAddress("Http2ConnectionRoundtripTest")).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess()); assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel(); clientChannel = ccf.channel();
http2Client = clientChannel.pipeline().get(Http2ConnectionHandler.class); http2Client = clientChannel.pipeline().get(Http2ConnectionHandler.class);

View File

@ -25,12 +25,12 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.local.LocalAddress;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -40,7 +40,6 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -83,7 +82,14 @@ public class Http2FrameRoundtripTest {
@After @After
public void teardown() throws Exception { public void teardown() throws Exception {
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync(); serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
@ -176,7 +182,7 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, data.retain(), newPromise()); frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, data.duplicate().retain(), newPromise());
ctx().flush(); ctx().flush();
} }
}); });
@ -207,7 +213,7 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
frameWriter.writePing(ctx(), true, data.retain(), newPromise()); frameWriter.writePing(ctx(), true, data.duplicate().retain(), newPromise());
ctx().flush(); ctx().flush();
} }
}); });
@ -323,7 +329,7 @@ public class Http2FrameRoundtripTest {
public void run() { public void run() {
for (int i = 1; i < numStreams + 1; ++i) { for (int i = 1; i < numStreams + 1; ++i) {
frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false, 0, false, newPromise()); frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false, 0, false, newPromise());
frameWriter.writeData(ctx(), i, data.retain(), 0, true, newPromise()); frameWriter.writeData(ctx(), i, data.duplicate().retain(), 0, true, newPromise());
ctx().flush(); ctx().flush();
} }
} }
@ -355,8 +361,8 @@ public class Http2FrameRoundtripTest {
sb = new ServerBootstrap(); sb = new ServerBootstrap();
cb = new Bootstrap(); cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); sb.group(new DefaultEventLoopGroup());
sb.channel(NioServerSocketChannel.class); sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
@ -366,8 +372,8 @@ public class Http2FrameRoundtripTest {
} }
}); });
cb.group(new NioEventLoopGroup()); cb.group(new DefaultEventLoopGroup());
cb.channel(NioSocketChannel.class); cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
@ -376,10 +382,9 @@ public class Http2FrameRoundtripTest {
} }
}); });
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel(); serverChannel = sb.bind(new LocalAddress("Http2FrameRoundtripTest")).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess()); assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel(); clientChannel = ccf.channel();
} }

View File

@ -24,9 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.local.LocalAddress;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest;
@ -38,7 +39,6 @@ import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown; import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -48,7 +48,6 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -99,7 +98,14 @@ public class HttpToHttp2ConnectionHandlerTest {
@After @After
public void teardown() throws Exception { public void teardown() throws Exception {
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync(); serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -312,8 +318,8 @@ public class HttpToHttp2ConnectionHandlerTest {
sb = new ServerBootstrap(); sb = new ServerBootstrap();
cb = new Bootstrap(); cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); sb.group(new DefaultEventLoopGroup());
sb.channel(NioServerSocketChannel.class); sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
@ -324,20 +330,21 @@ public class HttpToHttp2ConnectionHandlerTest {
} }
}); });
cb.group(new NioEventLoopGroup()); cb.group(new DefaultEventLoopGroup());
cb.channel(NioSocketChannel.class); cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline(); ChannelPipeline p = ch.pipeline();
p.addLast(new HttpToHttp2ConnectionHandler(false, clientListener)); HttpToHttp2ConnectionHandler handler = new HttpToHttp2ConnectionHandler(false, clientListener);
handler.gracefulShutdownTimeoutMillis(0);
p.addLast(handler);
} }
}); });
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel(); serverChannel = sb.bind(new LocalAddress("HttpToHttp2ConnectionHandlerTest")).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess()); assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel(); clientChannel = ccf.channel();
} }

View File

@ -25,10 +25,11 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.local.LocalAddress;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage; import io.netty.handler.codec.http.FullHttpMessage;
@ -44,7 +45,6 @@ import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -53,7 +53,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -116,8 +115,8 @@ public class InboundHttp2ToHttpAdapterTest {
sb = new ServerBootstrap(); sb = new ServerBootstrap();
cb = new Bootstrap(); cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); sb.group(new DefaultEventLoopGroup());
sb.channel(NioServerSocketChannel.class); sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
@ -153,8 +152,8 @@ public class InboundHttp2ToHttpAdapterTest {
} }
}); });
cb.group(new NioEventLoopGroup()); cb.group(new DefaultEventLoopGroup());
cb.channel(NioSocketChannel.class); cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
@ -173,10 +172,9 @@ public class InboundHttp2ToHttpAdapterTest {
} }
}); });
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel(); serverChannel = sb.bind(new LocalAddress("InboundHttp2ToHttpAdapterTest")).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); ChannelFuture ccf = cb.connect(serverChannel.localAddress());
assertTrue(ccf.awaitUninterruptibly().isSuccess()); assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel(); clientChannel = ccf.channel();
} }
@ -185,7 +183,14 @@ public class InboundHttp2ToHttpAdapterTest {
public void teardown() throws Exception { public void teardown() throws Exception {
cleanupCapturedRequests(); cleanupCapturedRequests();
cleanupCapturedResponses(); cleanupCapturedResponses();
if (clientChannel != null) {
clientChannel.close().sync();
clientChannel = null;
}
if (serverChannel != null) {
serverChannel.close().sync(); serverChannel.close().sync();
serverChannel = null;
}
Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS); Future<?> serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS); Future<?> clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
@ -194,8 +199,6 @@ public class InboundHttp2ToHttpAdapterTest {
clientGroup.sync(); clientGroup.sync();
clientDelegator = null; clientDelegator = null;
serverDelegator = null; serverDelegator = null;
clientChannel = null;
serverChannel = null;
serverConnectedChannel = null; serverConnectedChannel = null;
} }
@ -265,7 +268,7 @@ public class InboundHttp2ToHttpAdapterTest {
@Override @Override
public void run() { public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient()); frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient()); frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
}); });
@ -410,7 +413,7 @@ public class InboundHttp2ToHttpAdapterTest {
@Override @Override
public void run() { public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient()); frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient()); frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, true, newPromiseClient()); frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
@ -455,8 +458,8 @@ public class InboundHttp2ToHttpAdapterTest {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient()); frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient()); frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
frameWriter.writePriority(ctxClient(), 5, 3, (short) 123, true, newPromiseClient()); frameWriter.writePriority(ctxClient(), 5, 3, (short) 123, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient()); frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.retain(), 0, true, newPromiseClient()); frameWriter.writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
}); });
@ -506,8 +509,8 @@ public class InboundHttp2ToHttpAdapterTest {
public void run() { public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient()); frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient()); frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient()); frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.retain(), 0, true, newPromiseClient()); frameWriter.writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writePriority(ctxClient(), 5, 3, (short) 222, false, newPromiseClient()); frameWriter.writePriority(ctxClient(), 5, 3, (short) 222, false, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
@ -577,8 +580,8 @@ public class InboundHttp2ToHttpAdapterTest {
public void run() { public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2Headers, 0, false, newPromiseServer()); frameWriter.writeHeaders(ctxServer(), 3, http2Headers, 0, false, newPromiseServer());
frameWriter.writePushPromise(ctxServer(), 3, 5, http2Headers2, 0, newPromiseServer()); frameWriter.writePushPromise(ctxServer(), 3, 5, http2Headers2, 0, newPromiseServer());
frameWriter.writeData(ctxServer(), 3, content.retain(), 0, true, newPromiseServer()); frameWriter.writeData(ctxServer(), 3, content.duplicate().retain(), 0, true, newPromiseServer());
frameWriter.writeData(ctxServer(), 5, content2.retain(), 0, true, newPromiseServer()); frameWriter.writeData(ctxServer(), 5, content2.duplicate().retain(), 0, true, newPromiseServer());
ctxServer().flush(); ctxServer().flush();
} }
}); });
@ -655,7 +658,7 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
frameWriter.writeData(ctxClient(), 3, payload.retain(), 0, true, newPromiseClient()); frameWriter.writeData(ctxClient(), 3, payload.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
} }
}); });