diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java
index 47f2533f3f..b9075144f3 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java
@@ -148,7 +148,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
private final Integer initialFlowControlWindowSize;
- private ChannelHandlerContext ctx;
+ ChannelHandlerContext ctx;
/** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
private int numBufferedStreams;
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java
index 46aa97a536..d19ce2b8f8 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java
@@ -414,28 +414,11 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
}
}
- // Allow to override for testing
- void flush0(ChannelHandlerContext ctx) {
+ final void flush0(ChannelHandlerContext ctx) {
flush(ctx);
}
- /**
- * Return bytes to flow control.
- *
- * Package private to allow to override for testing
- * @param ctx The {@link ChannelHandlerContext} associated with the parent channel.
- * @param stream The object representing the HTTP/2 stream.
- * @param bytes The number of bytes to return to flow control.
- * @return {@code true} if a frame has been written as a result of this method call.
- * @throws Http2Exception If this operation violates the flow control limits.
- */
- boolean onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx,
- Http2FrameStream stream, int bytes) throws Http2Exception {
- return consumeBytes(stream.id(), bytes);
- }
-
- // Allow to extend for testing
- static class Http2MultiplexCodecStream extends DefaultHttp2FrameStream {
+ static final class Http2MultiplexCodecStream extends DefaultHttp2FrameStream {
DefaultHttp2StreamChannel channel;
}
@@ -1084,7 +1067,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
allocHandle.lastBytesRead(numBytesToBeConsumed);
if (numBytesToBeConsumed != 0) {
try {
- writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed);
+ writeDoneAndNoFlush |= consumeBytes(stream.id(), numBytesToBeConsumed);
} catch (Http2Exception e) {
pipeline().fireExceptionCaught(e);
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java
index 94f0d49720..c5732ec687 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java
@@ -27,6 +27,7 @@ import static io.netty.util.internal.ObjectUtil.checkNotNull;
@UnstableApi
public class Http2MultiplexCodecBuilder
extends AbstractHttp2ConnectionHandlerBuilder {
+ private Http2FrameWriter frameWriter;
final ChannelHandler childHandler;
private ChannelHandler upgradeStreamHandler;
@@ -44,6 +45,12 @@ public class Http2MultiplexCodecBuilder
return handler;
}
+ // For testing only.
+ Http2MultiplexCodecBuilder frameWriter(Http2FrameWriter frameWriter) {
+ this.frameWriter = checkNotNull(frameWriter, "frameWriter");
+ return this;
+ }
+
/**
* Creates a builder for a HTTP/2 client.
*
@@ -160,6 +167,28 @@ public class Http2MultiplexCodecBuilder
@Override
public Http2MultiplexCodec build() {
+ Http2FrameWriter frameWriter = this.frameWriter;
+ if (frameWriter != null) {
+ // This is to support our tests and will never be executed by the user as frameWriter(...)
+ // is package-private.
+ DefaultHttp2Connection connection = new DefaultHttp2Connection(isServer(), maxReservedStreams());
+ Long maxHeaderListSize = initialSettings().maxHeaderListSize();
+ Http2FrameReader frameReader = new DefaultHttp2FrameReader(maxHeaderListSize == null ?
+ new DefaultHttp2HeadersDecoder(true) :
+ new DefaultHttp2HeadersDecoder(true, maxHeaderListSize));
+
+ if (frameLogger() != null) {
+ frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger());
+ frameReader = new Http2InboundFrameLogger(frameReader, frameLogger());
+ }
+ Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
+ if (encoderEnforceMaxConcurrentStreams()) {
+ encoder = new StreamBufferingEncoder(encoder);
+ }
+ Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
+
+ return build(decoder, encoder, initialSettings());
+ }
return super.build();
}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java
index 2123986047..a3ccf038d0 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java
@@ -15,9 +15,7 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@@ -57,6 +55,11 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
+import static io.netty.handler.codec.http2.Http2TestUtil.anyChannelPromise;
+import static io.netty.handler.codec.http2.Http2TestUtil.anyHttp2Settings;
+import static io.netty.handler.codec.http2.Http2TestUtil.assertEqualsAndRelease;
+import static io.netty.handler.codec.http2.Http2TestUtil.bb;
+
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -73,7 +76,6 @@ import static org.mockito.Mockito.anyShort;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.same;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -86,9 +88,10 @@ public class Http2FrameCodecTest {
private Http2FrameWriter frameWriter;
private Http2FrameCodec frameCodec;
private EmbeddedChannel channel;
+
// For injecting inbound frames
- private Http2FrameListener frameListener;
- private ChannelHandlerContext http2HandlerCtx;
+ private Http2FrameInboundWriter frameInboundWriter;
+
private LastInboundHandler inboundHandler;
private final Http2Headers request = new DefaultHttp2Headers()
@@ -123,29 +126,29 @@ public class Http2FrameCodecTest {
*/
tearDown();
- frameWriter = spy(new VerifiableHttp2FrameWriter());
+ frameWriter = Http2TestUtil.mockedFrameWriter();
+
frameCodec = frameCodecBuilder.frameWriter(frameWriter).frameLogger(new Http2FrameLogger(LogLevel.TRACE))
.initialSettings(initialRemoteSettings).build();
- frameListener = ((DefaultHttp2ConnectionDecoder) frameCodec.decoder())
- .internalFrameListener();
inboundHandler = new LastInboundHandler();
channel = new EmbeddedChannel();
+ frameInboundWriter = new Http2FrameInboundWriter(channel);
channel.connect(new InetSocketAddress(0));
channel.pipeline().addLast(frameCodec);
channel.pipeline().addLast(inboundHandler);
channel.pipeline().fireChannelActive();
- http2HandlerCtx = channel.pipeline().context(frameCodec);
-
// Handshake
- verify(frameWriter).writeSettings(eq(http2HandlerCtx),
- anyHttp2Settings(), anyChannelPromise());
+ verify(frameWriter).writeSettings(eqFrameCodecCtx(), anyHttp2Settings(), anyChannelPromise());
verifyNoMoreInteractions(frameWriter);
channel.writeInbound(Http2CodecUtil.connectionPrefaceBuf());
- frameListener.onSettingsRead(http2HandlerCtx, initialRemoteSettings);
- verify(frameWriter).writeSettingsAck(eq(http2HandlerCtx), anyChannelPromise());
- frameListener.onSettingsAckRead(http2HandlerCtx);
+
+ frameInboundWriter.writeInboundSettings(initialRemoteSettings);
+
+ verify(frameWriter).writeSettingsAck(eqFrameCodecCtx(), anyChannelPromise());
+
+ frameInboundWriter.writeInboundSettingsAck();
Http2SettingsFrame settingsFrame = inboundHandler.readInbound();
assertNotNull(settingsFrame);
@@ -153,7 +156,7 @@ public class Http2FrameCodecTest {
@Test
public void stateChanges() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true);
+ frameInboundWriter.writeInboundHeaders(1, request, 31, true);
Http2Stream stream = frameCodec.connection().stream(1);
assertNotNull(stream);
@@ -169,12 +172,12 @@ public class Http2FrameCodecTest {
assertEquals(inboundFrame, new DefaultHttp2HeadersFrame(request, true, 31).stream(stream2));
assertNull(inboundHandler.readInbound());
- inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2));
+ channel.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2));
verify(frameWriter).writeHeaders(
- eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
+ eqFrameCodecCtx(), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
eq(27), eq(true), anyChannelPromise());
verify(frameWriter, never()).writeRstStream(
- any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise());
+ eqFrameCodecCtx(), anyInt(), anyLong(), anyChannelPromise());
assertEquals(State.CLOSED, stream.state());
event = inboundHandler.readInboundMessageOrUserEvent();
@@ -185,7 +188,7 @@ public class Http2FrameCodecTest {
@Test
public void headerRequestHeaderResponse() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true);
+ frameInboundWriter.writeInboundHeaders(1, request, 31, true);
Http2Stream stream = frameCodec.connection().stream(1);
assertNotNull(stream);
@@ -198,34 +201,34 @@ public class Http2FrameCodecTest {
assertEquals(inboundFrame, new DefaultHttp2HeadersFrame(request, true, 31).stream(stream2));
assertNull(inboundHandler.readInbound());
- inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2));
+ channel.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2));
verify(frameWriter).writeHeaders(
- eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
+ eqFrameCodecCtx(), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
eq(27), eq(true), anyChannelPromise());
verify(frameWriter, never()).writeRstStream(
- any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise());
+ eqFrameCodecCtx(), anyInt(), anyLong(), anyChannelPromise());
assertEquals(State.CLOSED, stream.state());
assertTrue(channel.isActive());
}
- @Test
- public void flowControlShouldBeResilientToMissingStreams() throws Http2Exception {
- Http2Connection conn = new DefaultHttp2Connection(true);
- Http2ConnectionEncoder enc = new DefaultHttp2ConnectionEncoder(conn, new DefaultHttp2FrameWriter());
- Http2ConnectionDecoder dec = new DefaultHttp2ConnectionDecoder(conn, enc, new DefaultHttp2FrameReader());
- Http2FrameCodec codec = new Http2FrameCodec(enc, dec, new Http2Settings());
- EmbeddedChannel em = new EmbeddedChannel(codec);
+ @Test
+ public void flowControlShouldBeResilientToMissingStreams() throws Http2Exception {
+ Http2Connection conn = new DefaultHttp2Connection(true);
+ Http2ConnectionEncoder enc = new DefaultHttp2ConnectionEncoder(conn, new DefaultHttp2FrameWriter());
+ Http2ConnectionDecoder dec = new DefaultHttp2ConnectionDecoder(conn, enc, new DefaultHttp2FrameReader());
+ Http2FrameCodec codec = new Http2FrameCodec(enc, dec, new Http2Settings());
+ EmbeddedChannel em = new EmbeddedChannel(codec);
- // We call #consumeBytes on a stream id which has not been seen yet to emulate the case
- // where a stream is deregistered which in reality can happen in response to a RST.
- assertFalse(codec.consumeBytes(1, 1));
- assertTrue(em.finishAndReleaseAll());
- }
+ // We call #consumeBytes on a stream id which has not been seen yet to emulate the case
+ // where a stream is deregistered which in reality can happen in response to a RST.
+ assertFalse(codec.consumeBytes(1, 1));
+ assertTrue(em.finishAndReleaseAll());
+ }
@Test
public void entityRequestEntityResponse() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 1, request, 0, false);
+ frameInboundWriter.writeInboundHeaders(1, request, 0, false);
Http2Stream stream = frameCodec.connection().stream(1);
assertNotNull(stream);
@@ -239,39 +242,35 @@ public class Http2FrameCodecTest {
assertNull(inboundHandler.readInbound());
ByteBuf hello = bb("hello");
- frameListener.onDataRead(http2HandlerCtx, 1, hello, 31, true);
- // Release hello to emulate ByteToMessageDecoder
- hello.release();
+ frameInboundWriter.writeInboundData(1, hello, 31, true);
Http2DataFrame inboundData = inboundHandler.readInbound();
Http2DataFrame expected = new DefaultHttp2DataFrame(bb("hello"), true, 31).stream(stream2);
- assertEquals(expected, inboundData);
+ assertEqualsAndRelease(expected, inboundData);
- assertEquals(1, inboundData.refCnt());
- expected.release();
- inboundData.release();
assertNull(inboundHandler.readInbound());
- inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).stream(stream2));
- verify(frameWriter).writeHeaders(eq(http2HandlerCtx), eq(1), eq(response), anyInt(),
+ channel.writeOutbound(new DefaultHttp2HeadersFrame(response, false).stream(stream2));
+ verify(frameWriter).writeHeaders(eqFrameCodecCtx(), eq(1), eq(response), anyInt(),
anyShort(), anyBoolean(), eq(0), eq(false), anyChannelPromise());
- inboundHandler.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27).stream(stream2));
+ channel.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27).stream(stream2));
ArgumentCaptor outboundData = ArgumentCaptor.forClass(ByteBuf.class);
- verify(frameWriter).writeData(eq(http2HandlerCtx), eq(1), outboundData.capture(), eq(27),
+ verify(frameWriter).writeData(eqFrameCodecCtx(), eq(1), outboundData.capture(), eq(27),
eq(true), anyChannelPromise());
ByteBuf bb = bb("world");
assertEquals(bb, outboundData.getValue());
assertEquals(1, outboundData.getValue().refCnt());
bb.release();
- verify(frameWriter, never()).writeRstStream(
- any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise());
+ outboundData.getValue().release();
+
+ verify(frameWriter, never()).writeRstStream(eqFrameCodecCtx(), anyInt(), anyLong(), anyChannelPromise());
assertTrue(channel.isActive());
}
@Test
public void sendRstStream() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, true);
+ frameInboundWriter.writeInboundHeaders(3, request, 31, true);
Http2Stream stream = frameCodec.connection().stream(3);
assertNotNull(stream);
@@ -285,16 +284,15 @@ public class Http2FrameCodecTest {
assertNotNull(stream2);
assertEquals(3, stream2.id());
- inboundHandler.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).stream(stream2));
- verify(frameWriter).writeRstStream(
- eq(http2HandlerCtx), eq(3), eq(314L), anyChannelPromise());
+ channel.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).stream(stream2));
+ verify(frameWriter).writeRstStream(eqFrameCodecCtx(), eq(3), eq(314L), anyChannelPromise());
assertEquals(State.CLOSED, stream.state());
assertTrue(channel.isActive());
}
@Test
public void receiveRstStream() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
+ frameInboundWriter.writeInboundHeaders(3, request, 31, false);
Http2Stream stream = frameCodec.connection().stream(3);
assertNotNull(stream);
@@ -304,7 +302,7 @@ public class Http2FrameCodecTest {
Http2HeadersFrame actualHeaders = inboundHandler.readInbound();
assertEquals(expectedHeaders.stream(actualHeaders.stream()), actualHeaders);
- frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.NO_ERROR.code());
+ frameInboundWriter.writeInboundRstStream(3, Http2Error.NO_ERROR.code());
Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).stream(actualHeaders.stream());
Http2ResetFrame actualRst = inboundHandler.readInbound();
@@ -315,8 +313,7 @@ public class Http2FrameCodecTest {
@Test
public void sendGoAway() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
-
+ frameInboundWriter.writeInboundHeaders(3, request, 31, false);
Http2Stream stream = frameCodec.connection().stream(3);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
@@ -324,32 +321,29 @@ public class Http2FrameCodecTest {
ByteBuf debugData = bb("debug");
ByteBuf expected = debugData.copy();
- Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData.slice());
+ Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData);
goAwayFrame.setExtraStreamIds(2);
- inboundHandler.writeOutbound(goAwayFrame);
- verify(frameWriter).writeGoAway(
- eq(http2HandlerCtx), eq(7), eq(Http2Error.NO_ERROR.code()), eq(expected), anyChannelPromise());
+ channel.writeOutbound(goAwayFrame);
+ verify(frameWriter).writeGoAway(eqFrameCodecCtx(), eq(7),
+ eq(Http2Error.NO_ERROR.code()), eq(expected), anyChannelPromise());
assertEquals(1, debugData.refCnt());
assertEquals(State.OPEN, stream.state());
assertTrue(channel.isActive());
expected.release();
+ debugData.release();
}
@Test
public void receiveGoaway() throws Exception {
ByteBuf debugData = bb("foo");
- frameListener.onGoAwayRead(http2HandlerCtx, 2, Http2Error.NO_ERROR.code(), debugData);
- // Release debugData to emulate ByteToMessageDecoder
- debugData.release();
+ frameInboundWriter.writeInboundGoAway(2, Http2Error.NO_ERROR.code(), debugData);
Http2GoAwayFrame expectedFrame = new DefaultHttp2GoAwayFrame(2, Http2Error.NO_ERROR.code(), bb("foo"));
Http2GoAwayFrame actualFrame = inboundHandler.readInbound();
- assertEquals(expectedFrame, actualFrame);
- assertNull(inboundHandler.readInbound());
+ assertEqualsAndRelease(expectedFrame, actualFrame);
- expectedFrame.release();
- actualFrame.release();
+ assertNull(inboundHandler.readInbound());
}
@Test
@@ -383,7 +377,7 @@ public class Http2FrameCodecTest {
@Test
public void goAwayLastStreamIdOverflowed() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 5, request, 31, false);
+ frameInboundWriter.writeInboundHeaders(5, request, 31, false);
Http2Stream stream = frameCodec.connection().stream(5);
assertNotNull(stream);
@@ -393,10 +387,10 @@ public class Http2FrameCodecTest {
Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData.slice());
goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
- inboundHandler.writeOutbound(goAwayFrame);
+ channel.writeOutbound(goAwayFrame);
// When the last stream id computation overflows, the last stream id should just be set to 2^31 - 1.
- verify(frameWriter).writeGoAway(eq(http2HandlerCtx), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()),
- eq(debugData), anyChannelPromise());
+ verify(frameWriter).writeGoAway(eqFrameCodecCtx(), eq(Integer.MAX_VALUE),
+ eq(Http2Error.NO_ERROR.code()), eq(debugData), anyChannelPromise());
assertEquals(1, debugData.refCnt());
assertEquals(State.OPEN, stream.state());
assertTrue(channel.isActive());
@@ -404,13 +398,13 @@ public class Http2FrameCodecTest {
@Test
public void streamErrorShouldFireExceptionForInbound() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
+ frameInboundWriter.writeInboundHeaders(3, request, 31, false);
Http2Stream stream = frameCodec.connection().stream(3);
assertNotNull(stream);
StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo");
- frameCodec.onError(http2HandlerCtx, false, streamEx);
+ channel.pipeline().fireExceptionCaught(streamEx);
Http2FrameStreamEvent event = inboundHandler.readInboundMessageOrUserEvent();
assertEquals(Http2FrameStreamEvent.Type.State, event.type());
@@ -430,13 +424,13 @@ public class Http2FrameCodecTest {
@Test
public void streamErrorShouldNotFireExceptionForOutbound() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
+ frameInboundWriter.writeInboundHeaders(3, request, 31, false);
Http2Stream stream = frameCodec.connection().stream(3);
assertNotNull(stream);
StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo");
- frameCodec.onError(http2HandlerCtx, true, streamEx);
+ frameCodec.onError(frameCodec.ctx, true, streamEx);
Http2FrameStreamEvent event = inboundHandler.readInboundMessageOrUserEvent();
assertEquals(Http2FrameStreamEvent.Type.State, event.type());
@@ -452,14 +446,14 @@ public class Http2FrameCodecTest {
@Test
public void windowUpdateFrameDecrementsConsumedBytes() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
+ frameInboundWriter.writeInboundHeaders(3, request, 31, false);
Http2Connection connection = frameCodec.connection();
Http2Stream stream = connection.stream(3);
assertNotNull(stream);
ByteBuf data = Unpooled.buffer(100).writeZero(100);
- frameListener.onDataRead(http2HandlerCtx, 3, data, 0, true);
+ frameInboundWriter.writeInboundData(3, data, 0, false);
Http2HeadersFrame inboundHeaders = inboundHandler.readInbound();
assertNotNull(inboundHeaders);
@@ -472,12 +466,11 @@ public class Http2FrameCodecTest {
int after = connection.local().flowController().unconsumedBytes(stream);
assertEquals(100, before - after);
assertTrue(f.isSuccess());
- data.release();
}
@Test
public void windowUpdateMayFail() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
+ frameInboundWriter.writeInboundHeaders(3, request, 31, false);
Http2Connection connection = frameCodec.connection();
Http2Stream stream = connection.stream(3);
assertNotNull(stream);
@@ -496,10 +489,10 @@ public class Http2FrameCodecTest {
@Test
public void inboundWindowUpdateShouldBeForwarded() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
- frameListener.onWindowUpdateRead(http2HandlerCtx, 3, 100);
+ frameInboundWriter.writeInboundHeaders(3, request, 31, false);
+ frameInboundWriter.writeInboundWindowUpdate(3, 100);
// Connection-level window update
- frameListener.onWindowUpdateRead(http2HandlerCtx, 0, 100);
+ frameInboundWriter.writeInboundWindowUpdate(0, 100);
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
@@ -558,7 +551,7 @@ public class Http2FrameCodecTest {
unknownFrame.stream(stream);
channel.write(unknownFrame);
- verify(frameWriter).writeFrame(eq(http2HandlerCtx), eq(unknownFrame.frameType()),
+ verify(frameWriter).writeFrame(eqFrameCodecCtx(), eq(unknownFrame.frameType()),
eq(unknownFrame.stream().id()), eq(unknownFrame.flags()), eq(buffer), any(ChannelPromise.class));
}
@@ -567,7 +560,7 @@ public class Http2FrameCodecTest {
Http2Settings settings = new Http2Settings();
channel.write(new DefaultHttp2SettingsFrame(settings));
- verify(frameWriter).writeSettings(eq(http2HandlerCtx), same(settings), any(ChannelPromise.class));
+ verify(frameWriter).writeSettings(eqFrameCodecCtx(), same(settings), any(ChannelPromise.class));
}
@Test(timeout = 5000)
@@ -619,7 +612,7 @@ public class Http2FrameCodecTest {
assertFalse(promise2.isDone());
// Increase concurrent streams limit to 2
- frameListener.onSettingsRead(http2HandlerCtx, new Http2Settings().maxConcurrentStreams(2));
+ frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(2));
channel.flush();
@@ -643,7 +636,7 @@ public class Http2FrameCodecTest {
@Test
public void receivePing() throws Http2Exception {
- frameListener.onPingRead(http2HandlerCtx, 12345L);
+ frameInboundWriter.writeInboundPing(false, 12345L);
Http2PingFrame pingFrame = inboundHandler.readInbound();
assertNotNull(pingFrame);
@@ -656,13 +649,13 @@ public class Http2FrameCodecTest {
public void sendPing() {
channel.writeAndFlush(new DefaultHttp2PingFrame(12345));
- verify(frameWriter).writePing(eq(http2HandlerCtx), eq(false), eq(12345L), anyChannelPromise());
+ verify(frameWriter).writePing(eqFrameCodecCtx(), eq(false), eq(12345L), anyChannelPromise());
}
@Test
public void receiveSettings() throws Http2Exception {
Http2Settings settings = new Http2Settings().maxConcurrentStreams(1);
- frameListener.onSettingsRead(http2HandlerCtx, settings);
+ frameInboundWriter.writeInboundSettings(settings);
Http2SettingsFrame settingsFrame = inboundHandler.readInbound();
assertNotNull(settingsFrame);
@@ -674,7 +667,7 @@ public class Http2FrameCodecTest {
Http2Settings settings = new Http2Settings().maxConcurrentStreams(1);
channel.writeAndFlush(new DefaultHttp2SettingsFrame(settings));
- verify(frameWriter).writeSettings(eq(http2HandlerCtx), eq(settings), anyChannelPromise());
+ verify(frameWriter).writeSettings(eqFrameCodecCtx(), eq(settings), anyChannelPromise());
}
@Test
@@ -682,7 +675,7 @@ public class Http2FrameCodecTest {
setUp(Http2FrameCodecBuilder.forServer().encoderEnforceMaxConcurrentStreams(true),
new Http2Settings().maxConcurrentStreams(1));
- frameListener.onHeadersRead(http2HandlerCtx, 3, request, 0, false);
+ frameInboundWriter.writeInboundHeaders(3, request, 0, false);
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
@@ -736,8 +729,7 @@ public class Http2FrameCodecTest {
@Test
public void upgradeEventNoRefCntError() throws Exception {
- frameListener.onHeadersRead(http2HandlerCtx, Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false);
-
+ frameInboundWriter.writeInboundHeaders(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false);
// Using reflect as the constructor is package-private and the class is final.
Constructor constructor =
UpgradeEvent.class.getDeclaredConstructor(CharSequence.class, FullHttpRequest.class);
@@ -753,7 +745,7 @@ public class Http2FrameCodecTest {
@Test
public void upgradeWithoutFlowControlling() throws Exception {
- channel.pipeline().addAfter(http2HandlerCtx.name(), null, new ChannelInboundHandlerAdapter() {
+ channel.pipeline().addAfter(frameCodec.ctx.name(), null, new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2DataFrame) {
@@ -774,7 +766,7 @@ public class Http2FrameCodecTest {
}
});
- frameListener.onHeadersRead(http2HandlerCtx, Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false);
+ frameInboundWriter.writeInboundHeaders(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false);
// Using reflect as the constructor is package-private and the class is final.
Constructor constructor =
@@ -792,24 +784,7 @@ public class Http2FrameCodecTest {
channel.pipeline().fireUserEventTriggered(upgradeEvent);
}
- private static ChannelPromise anyChannelPromise() {
- return any(ChannelPromise.class);
- }
-
- private static Http2Settings anyHttp2Settings() {
- return any(Http2Settings.class);
- }
-
- private static ByteBuf bb(String s) {
- return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
- }
-
- private static class VerifiableHttp2FrameWriter extends DefaultHttp2FrameWriter {
- @Override
- public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
- int padding, boolean endStream, ChannelPromise promise) {
- // duplicate 'data' to prevent readerIndex from being changed, to ease verification
- return super.writeData(ctx, streamId, data.duplicate(), padding, endStream, promise);
- }
+ private ChannelHandlerContext eqFrameCodecCtx() {
+ return eq(frameCodec.ctx);
}
}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java
new file mode 100644
index 0000000000..ace50aadbe
--- /dev/null
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2018 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.handler.codec.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutor;
+
+import java.net.SocketAddress;
+
+/**
+ * Utility class which allows easy writing of HTTP2 frames via {@link EmbeddedChannel#writeInbound(Object...)}.
+ */
+final class Http2FrameInboundWriter {
+
+ private final ChannelHandlerContext ctx;
+ private final Http2FrameWriter writer;
+
+ Http2FrameInboundWriter(EmbeddedChannel channel) {
+ this(channel, new DefaultHttp2FrameWriter());
+ }
+
+ Http2FrameInboundWriter(EmbeddedChannel channel, Http2FrameWriter writer) {
+ this.ctx = new WriteInboundChannelHandlerContext(channel);
+ this.writer = writer;
+ }
+
+ void writeInboundData(int streamId, ByteBuf data, int padding, boolean endStream) {
+ writer.writeData(ctx, streamId, data, padding, endStream, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundHeaders(int streamId, Http2Headers headers,
+ int padding, boolean endStream) {
+ writer.writeHeaders(ctx, streamId, headers, padding, endStream, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundHeaders(int streamId, Http2Headers headers,
+ int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) {
+ writer.writeHeaders(ctx, streamId, headers, streamDependency,
+ weight, exclusive, padding, endStream, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundPriority(int streamId, int streamDependency,
+ short weight, boolean exclusive) {
+ writer.writePriority(ctx, streamId, streamDependency, weight,
+ exclusive, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundRstStream(int streamId, long errorCode) {
+ writer.writeRstStream(ctx, streamId, errorCode, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundSettings(Http2Settings settings) {
+ writer.writeSettings(ctx, settings, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundSettingsAck() {
+ writer.writeSettingsAck(ctx, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundPing(boolean ack, long data) {
+ writer.writePing(ctx, ack, data, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writePushPromise(int streamId, int promisedStreamId,
+ Http2Headers headers, int padding) {
+ writer.writePushPromise(ctx, streamId, promisedStreamId,
+ headers, padding, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundGoAway(int lastStreamId, long errorCode, ByteBuf debugData) {
+ writer.writeGoAway(ctx, lastStreamId, errorCode, debugData, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundWindowUpdate(int streamId, int windowSizeIncrement) {
+ writer.writeWindowUpdate(ctx, streamId, windowSizeIncrement, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ void writeInboundFrame(byte frameType, int streamId,
+ Http2Flags flags, ByteBuf payload) {
+ writer.writeFrame(ctx, frameType, streamId, flags, payload, ctx.newPromise()).syncUninterruptibly();
+ }
+
+ private static final class WriteInboundChannelHandlerContext extends ChannelOutboundHandlerAdapter
+ implements ChannelHandlerContext {
+ private final EmbeddedChannel channel;
+
+ WriteInboundChannelHandlerContext(EmbeddedChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public Channel channel() {
+ return channel;
+ }
+
+ @Override
+ public EventExecutor executor() {
+ return channel.eventLoop();
+ }
+
+ @Override
+ public String name() {
+ return "WriteInbound";
+ }
+
+ @Override
+ public ChannelHandler handler() {
+ return this;
+ }
+
+ @Override
+ public boolean isRemoved() {
+ return false;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRegistered() {
+ channel.pipeline().fireChannelRegistered();
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelUnregistered() {
+ channel.pipeline().fireChannelUnregistered();
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelActive() {
+ channel.pipeline().fireChannelActive();
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelInactive() {
+ channel.pipeline().fireChannelInactive();
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+ channel.pipeline().fireExceptionCaught(cause);
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireUserEventTriggered(Object evt) {
+ channel.pipeline().fireUserEventTriggered(evt);
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRead(Object msg) {
+ channel.pipeline().fireChannelRead(msg);
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelReadComplete() {
+ channel.pipeline().fireChannelReadComplete();
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelWritabilityChanged() {
+ channel.pipeline().fireChannelWritabilityChanged();
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext read() {
+ channel.read();
+ return this;
+ }
+
+ @Override
+ public ChannelHandlerContext flush() {
+ channel.pipeline().fireChannelReadComplete();
+ return this;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+ return channel.pipeline();
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return channel.alloc();
+ }
+
+ @Override
+ public Attribute attr(AttributeKey key) {
+ return channel.attr(key);
+ }
+
+ @Override
+ public boolean hasAttr(AttributeKey key) {
+ return channel.hasAttr(key);
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+ return channel.bind(localAddress);
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+ return channel.connect(remoteAddress);
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+ return channel.connect(remoteAddress, localAddress);
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+ return channel.disconnect();
+ }
+
+ @Override
+ public ChannelFuture close() {
+ return channel.close();
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+ return channel.deregister();
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
+ return channel.bind(localAddress, promise);
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
+ return channel.connect(remoteAddress, promise);
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ return channel.connect(remoteAddress, localAddress, promise);
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+ return channel.disconnect(promise);
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+ return channel.close(promise);
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+ return channel.deregister(promise);
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+ return write(msg, newPromise());
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+ return writeAndFlush(msg, promise);
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+ try {
+ channel.writeInbound(msg);
+ channel.runPendingTasks();
+ promise.setSuccess();
+ } catch (Throwable cause) {
+ promise.setFailure(cause);
+ }
+ return promise;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ return writeAndFlush(msg, newPromise());
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ return channel.newPromise();
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ return channel.newProgressivePromise();
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ return channel.newSucceededFuture();
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ return channel.newFailedFuture(cause);
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ return channel.voidPromise();
+ }
+ }
+}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java
index d857cf8d1e..7788e6dd64 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java
@@ -15,8 +15,7 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -31,11 +30,13 @@ import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.LastInboundHandler.Consumer;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
-import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
@@ -43,34 +44,43 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static io.netty.util.ReferenceCountUtil.release;
-import static org.hamcrest.Matchers.instanceOf;
+import static io.netty.handler.codec.http2.Http2TestUtil.anyChannelPromise;
+import static io.netty.handler.codec.http2.Http2TestUtil.anyHttp2Settings;
+import static io.netty.handler.codec.http2.Http2TestUtil.assertEqualsAndRelease;
+import static io.netty.handler.codec.http2.Http2TestUtil.bb;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Unit tests for {@link Http2MultiplexCodec}.
*/
public class Http2MultiplexCodecTest {
-
- private EmbeddedChannel parentChannel;
- private Writer writer;
-
- private TestChannelInitializer childChannelInitializer;
-
- private static final Http2Headers request = new DefaultHttp2Headers()
+ private final Http2Headers request = new DefaultHttp2Headers()
.method(HttpMethod.GET.asciiName()).scheme(HttpScheme.HTTPS.name())
.authority(new AsciiString("example.org")).path(new AsciiString("/foo"));
- private TestableHttp2MultiplexCodec codec;
- private TestableHttp2MultiplexCodec.Stream inboundStream;
- private TestableHttp2MultiplexCodec.Stream outboundStream;
+ private EmbeddedChannel parentChannel;
+ private Http2FrameWriter frameWriter;
+ private Http2FrameInboundWriter frameInboundWriter;
+ private TestChannelInitializer childChannelInitializer;
+ private Http2MultiplexCodec codec;
private static final int initialRemoteStreamWindow = 1024;
@@ -78,25 +88,38 @@ public class Http2MultiplexCodecTest {
public void setUp() {
childChannelInitializer = new TestChannelInitializer();
parentChannel = new EmbeddedChannel();
- writer = new Writer();
-
+ frameInboundWriter = new Http2FrameInboundWriter(parentChannel);
parentChannel.connect(new InetSocketAddress(0));
- codec = new TestableHttp2MultiplexCodecBuilder(true, childChannelInitializer).build();
+ frameWriter = Http2TestUtil.mockedFrameWriter();
+ codec = new Http2MultiplexCodecBuilder(true, childChannelInitializer).frameWriter(frameWriter).build();
parentChannel.pipeline().addLast(codec);
parentChannel.runPendingTasks();
+ parentChannel.pipeline().fireChannelActive();
+
+ parentChannel.writeInbound(Http2CodecUtil.connectionPrefaceBuf());
Http2Settings settings = new Http2Settings().initialWindowSize(initialRemoteStreamWindow);
- codec.onHttp2Frame(new DefaultHttp2SettingsFrame(settings));
+ frameInboundWriter.writeInboundSettings(settings);
- inboundStream = codec.newStream();
- inboundStream.id = 3;
- outboundStream = codec.newStream();
- outboundStream.id = 2;
+ verify(frameWriter).writeSettingsAck(eqMultiplexCodecCtx(), anyChannelPromise());
+
+ frameInboundWriter.writeInboundSettingsAck();
+
+ Http2SettingsFrame settingsFrame = parentChannel.readInbound();
+ assertNotNull(settingsFrame);
+
+ // Handshake
+ verify(frameWriter).writeSettings(eqMultiplexCodecCtx(),
+ anyHttp2Settings(), anyChannelPromise());
+ }
+
+ private ChannelHandlerContext eqMultiplexCodecCtx() {
+ return eq(codec.ctx);
}
@After
public void tearDown() throws Exception {
- if (childChannelInitializer.handler != null) {
+ if (childChannelInitializer.handler instanceof LastInboundHandler) {
((LastInboundHandler) childChannelInitializer.handler).finishAndReleaseAll();
}
parentChannel.finishAndReleaseAll();
@@ -110,154 +133,165 @@ public class Http2MultiplexCodecTest {
@Test
public void writeUnknownFrame() {
- childChannelInitializer.handler = new ChannelInboundHandlerAdapter() {
+ Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
ctx.writeAndFlush(new DefaultHttp2UnknownFrame((byte) 99, new Http2Flags()));
ctx.fireChannelActive();
}
- };
-
- Channel childChannel = newOutboundStream();
+ });
assertTrue(childChannel.isActive());
- Http2FrameStream stream = readOutboundHeadersAndAssignId();
parentChannel.runPendingTasks();
- Http2UnknownFrame frame = parentChannel.readOutbound();
- assertEquals(stream, frame.stream());
- assertEquals(99, frame.frameType());
- assertEquals(new Http2Flags(), frame.flags());
- frame.release();
+ verify(frameWriter).writeFrame(eq(codec.ctx), eq((byte) 99), eqStreamId(childChannel), any(Http2Flags.class),
+ any(ByteBuf.class), any(ChannelPromise.class));
+ }
+
+ private Http2StreamChannel newInboundStream(int streamId, boolean endStream, final ChannelHandler childHandler) {
+ return newInboundStream(streamId, endStream, null, childHandler);
+ }
+
+ private Http2StreamChannel newInboundStream(int streamId, boolean endStream,
+ AtomicInteger maxReads, final ChannelHandler childHandler) {
+ final AtomicReference streamChannelRef = new AtomicReference();
+ childChannelInitializer.maxReads = maxReads;
+ childChannelInitializer.handler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) {
+ assertNull(streamChannelRef.get());
+ streamChannelRef.set((Http2StreamChannel) ctx.channel());
+ ctx.pipeline().addLast(childHandler);
+ ctx.fireChannelRegistered();
+ }
+ };
+
+ frameInboundWriter.writeInboundHeaders(streamId, request, 0, endStream);
+ parentChannel.runPendingTasks();
+ Http2StreamChannel channel = streamChannelRef.get();
+ assertEquals(streamId, channel.stream().id());
+ return channel;
}
@Test
public void readUnkownFrame() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- codec.onHttp2Frame(new DefaultHttp2UnknownFrame((byte) 99, new Http2Flags()).stream(inboundStream));
- codec.onChannelReadComplete();
+ LastInboundHandler handler = new LastInboundHandler();
- // headers and unknown frame
- verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2);
+ Http2StreamChannel channel = newInboundStream(3, true, handler);
+ frameInboundWriter.writeInboundFrame((byte) 99, channel.stream().id(), new Http2Flags(), Unpooled.EMPTY_BUFFER);
- Channel childChannel = newOutboundStream();
+ // header frame and unknown frame
+ verifyFramesMultiplexedToCorrectChannel(channel, handler, 2);
+
+ Channel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter());
assertTrue(childChannel.isActive());
}
@Test
public void headerAndDataFramesShouldBeDelivered() {
LastInboundHandler inboundHandler = new LastInboundHandler();
- childChannelInitializer.handler = inboundHandler;
- Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).stream(inboundStream);
- Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("hello")).stream(inboundStream);
- Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("world")).stream(inboundStream);
+ Http2StreamChannel channel = newInboundStream(3, false, inboundHandler);
+ Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).stream(channel.stream());
+ Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("hello")).stream(channel.stream());
+ Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("world")).stream(channel.stream());
- assertFalse(inboundHandler.isChannelActive());
- inboundStream.state = Http2Stream.State.OPEN;
- codec.onHttp2StreamStateChanged(inboundStream);
- codec.onHttp2Frame(headersFrame);
assertTrue(inboundHandler.isChannelActive());
- codec.onHttp2Frame(dataFrame1);
- codec.onHttp2Frame(dataFrame2);
+ frameInboundWriter.writeInboundData(channel.stream().id(), bb("hello"), 0, false);
+ frameInboundWriter.writeInboundData(channel.stream().id(), bb("world"), 0, false);
assertEquals(headersFrame, inboundHandler.readInbound());
- assertEquals(dataFrame1, inboundHandler.readInbound());
- assertEquals(dataFrame2, inboundHandler.readInbound());
- assertNull(inboundHandler.readInbound());
- dataFrame1.release();
- dataFrame2.release();
+ assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound());
+ assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound());
+
+ assertNull(inboundHandler.readInbound());
}
@Test
public void framesShouldBeMultiplexed() {
+ LastInboundHandler handler1 = new LastInboundHandler();
+ Http2StreamChannel channel1 = newInboundStream(3, false, handler1);
+ LastInboundHandler handler2 = new LastInboundHandler();
+ Http2StreamChannel channel2 = newInboundStream(5, false, handler2);
+ LastInboundHandler handler3 = new LastInboundHandler();
+ Http2StreamChannel channel3 = newInboundStream(11, false, handler3);
- TestableHttp2MultiplexCodec.Stream stream3 = codec.newStream();
- stream3.id = 3;
- TestableHttp2MultiplexCodec.Stream stream5 = codec.newStream();
- stream5.id = 5;
+ verifyFramesMultiplexedToCorrectChannel(channel1, handler1, 1);
+ verifyFramesMultiplexedToCorrectChannel(channel2, handler2, 1);
+ verifyFramesMultiplexedToCorrectChannel(channel3, handler3, 1);
- TestableHttp2MultiplexCodec.Stream stream11 = codec.newStream();
- stream11.id = 11;
+ frameInboundWriter.writeInboundData(channel2.stream().id(), bb("hello"), 0, false);
+ frameInboundWriter.writeInboundData(channel1.stream().id(), bb("foo"), 0, true);
+ frameInboundWriter.writeInboundData(channel2.stream().id(), bb("world"), 0, true);
+ frameInboundWriter.writeInboundData(channel3.stream().id(), bb("bar"), 0, true);
- LastInboundHandler inboundHandler3 = streamActiveAndWriteHeaders(stream3);
- LastInboundHandler inboundHandler5 = streamActiveAndWriteHeaders(stream5);
- LastInboundHandler inboundHandler11 = streamActiveAndWriteHeaders(stream11);
-
- verifyFramesMultiplexedToCorrectChannel(stream3, inboundHandler3, 1);
- verifyFramesMultiplexedToCorrectChannel(stream5, inboundHandler5, 1);
- verifyFramesMultiplexedToCorrectChannel(stream11, inboundHandler11, 1);
-
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("hello"), false).stream(stream5));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), true).stream(stream3));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("world"), true).stream(stream5));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(stream11));
- verifyFramesMultiplexedToCorrectChannel(stream5, inboundHandler5, 2);
- verifyFramesMultiplexedToCorrectChannel(stream3, inboundHandler3, 1);
- verifyFramesMultiplexedToCorrectChannel(stream11, inboundHandler11, 1);
+ verifyFramesMultiplexedToCorrectChannel(channel1, handler1, 1);
+ verifyFramesMultiplexedToCorrectChannel(channel2, handler2, 2);
+ verifyFramesMultiplexedToCorrectChannel(channel3, handler3, 1);
}
@Test
- public void inboundDataFrameShouldEmitWindowUpdateFrame() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
+ public void inboundDataFrameShouldUpdateLocalFlowController() throws Http2Exception {
+ Http2LocalFlowController flowController = Mockito.mock(Http2LocalFlowController.class);
+ codec.connection().local().flowController(flowController);
+
+ LastInboundHandler handler = new LastInboundHandler();
+ final Http2StreamChannel channel = newInboundStream(3, false, handler);
+
ByteBuf tenBytes = bb("0123456789");
- codec.onHttp2Frame(new DefaultHttp2DataFrame(tenBytes, true).stream(inboundStream));
- codec.onChannelReadComplete();
- Http2WindowUpdateFrame windowUpdate = parentChannel.readOutbound();
- assertNotNull(windowUpdate);
+ frameInboundWriter.writeInboundData(channel.stream().id(), tenBytes, 0, true);
- assertEquals(inboundStream, windowUpdate.stream());
- assertEquals(10, windowUpdate.windowSizeIncrement());
+ // Verify we marked the bytes as consumed
+ verify(flowController).consumeBytes(argThat(new ArgumentMatcher() {
+ @Override
+ public boolean matches(Http2Stream http2Stream) {
+ return http2Stream.id() == channel.stream().id();
+ }
+ }), eq(10));
// headers and data frame
- verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2);
+ verifyFramesMultiplexedToCorrectChannel(channel, handler, 2);
}
@Test
public void unhandledHttp2FramesShouldBePropagated() {
- assertThat(parentChannel.readInbound(), instanceOf(Http2SettingsFrame.class));
-
Http2PingFrame pingFrame = new DefaultHttp2PingFrame(0);
- codec.onHttp2Frame(pingFrame);
- assertSame(parentChannel.readInbound(), pingFrame);
+ frameInboundWriter.writeInboundPing(false, 0);
+ assertEquals(parentChannel.readInbound(), pingFrame);
- DefaultHttp2GoAwayFrame goAwayFrame =
- new DefaultHttp2GoAwayFrame(1, parentChannel.alloc().buffer().writeLong(8));
- codec.onHttp2Frame(goAwayFrame);
+ DefaultHttp2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(1,
+ parentChannel.alloc().buffer().writeLong(8));
+ frameInboundWriter.writeInboundGoAway(0, goAwayFrame.errorCode(), goAwayFrame.content().retainedDuplicate());
Http2GoAwayFrame frame = parentChannel.readInbound();
- assertSame(frame, goAwayFrame);
- assertTrue(frame.release());
+ assertEqualsAndRelease(frame, goAwayFrame);
}
@Test
public void channelReadShouldRespectAutoRead() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Channel childChannel = inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
assertTrue(childChannel.config().isAutoRead());
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
childChannel.config().setAutoRead(false);
- codec.onHttp2Frame(
- new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream));
- codec.onChannelReadComplete();
+
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world"), 0, false);
Http2DataFrame dataFrame0 = inboundHandler.readInbound();
assertNotNull(dataFrame0);
release(dataFrame0);
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream));
- codec.onChannelReadComplete();
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar"), 0, false);
- dataFrame0 = inboundHandler.readInbound();
- assertNull(dataFrame0);
+ assertNull(inboundHandler.readInbound());
childChannel.config().setAutoRead(true);
- verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2);
+ verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 2);
}
@Test
@@ -271,8 +305,8 @@ public class Http2MultiplexCodecTest {
}
private void useReadWithoutAutoRead(final boolean readComplete) {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Channel childChannel = inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
assertTrue(childChannel.config().isAutoRead());
childChannel.config().setAutoRead(false);
assertFalse(childChannel.config().isAutoRead());
@@ -299,23 +333,18 @@ public class Http2MultiplexCodecTest {
}
});
- codec.onHttp2Frame(
- new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream));
- codec.onChannelReadComplete();
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar"), 0, true);
- codec.onHttp2Frame(
- new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream));
- codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream));
- codec.onChannelReadComplete();
-
- verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 6);
+ verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 6);
}
- private Http2StreamChannel newOutboundStream() {
- return new Http2StreamChannelBootstrap(parentChannel).handler(childChannelInitializer)
+ private Http2StreamChannel newOutboundStream(ChannelHandler handler) {
+ return new Http2StreamChannelBootstrap(parentChannel).handler(handler)
.open().syncUninterruptibly().getNow();
}
@@ -323,12 +352,11 @@ public class Http2MultiplexCodecTest {
* A child channel for a HTTP/2 stream in IDLE state (that is no headers sent or received),
* should not emit a RST_STREAM frame on close, as this is a connection error of type protocol error.
*/
-
@Test
public void idleOutboundStreamShouldNotWriteResetFrameOnClose() {
- childChannelInitializer.handler = new LastInboundHandler();
+ LastInboundHandler handler = new LastInboundHandler();
- Channel childChannel = newOutboundStream();
+ Channel childChannel = newOutboundStream(handler);
assertTrue(childChannel.isActive());
childChannel.close();
@@ -341,124 +369,105 @@ public class Http2MultiplexCodecTest {
@Test
public void outboundStreamShouldWriteResetFrameOnClose_headersSent() {
- childChannelInitializer.handler = new ChannelInboundHandlerAdapter() {
+ ChannelHandler handler = new ChannelInboundHandlerAdapter() {
@Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
ctx.fireChannelActive();
}
};
- Channel childChannel = newOutboundStream();
+ Http2StreamChannel childChannel = newOutboundStream(handler);
assertTrue(childChannel.isActive());
- Http2FrameStream stream2 = readOutboundHeadersAndAssignId();
-
childChannel.close();
- parentChannel.runPendingTasks();
-
- Http2ResetFrame reset = parentChannel.readOutbound();
- assertEquals(stream2, reset.stream());
- assertEquals(Http2Error.CANCEL.code(), reset.errorCode());
+ verify(frameWriter).writeRstStream(eqMultiplexCodecCtx(),
+ eqStreamId(childChannel), eq(Http2Error.CANCEL.code()), anyChannelPromise());
}
@Test
public void outboundStreamShouldNotWriteResetFrameOnClose_IfStreamDidntExist() {
- writer = new Writer() {
+ when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(),
+ any(Http2Headers.class), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(),
+ any(ChannelPromise.class))).thenAnswer(new Answer() {
+
private boolean headersWritten;
@Override
- void write(Object msg, ChannelPromise promise) {
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
// We want to fail to write the first headers frame. This is what happens if the connection
// refuses to allocate a new stream due to having received a GOAWAY.
- if (!headersWritten && msg instanceof Http2HeadersFrame) {
+ if (!headersWritten) {
headersWritten = true;
- Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg;
- final TestableHttp2MultiplexCodec.Stream stream =
- (TestableHttp2MultiplexCodec.Stream) headersFrame.stream();
- stream.id = 1;
- promise.setFailure(new Exception("boom"));
- } else {
- super.write(msg, promise);
+ return ((ChannelPromise) invocationOnMock.getArgument(8)).setFailure(new Exception("boom"));
}
+ return ((ChannelPromise) invocationOnMock.getArgument(8)).setSuccess();
}
- };
+ });
- childChannelInitializer.handler = new ChannelInboundHandlerAdapter() {
+ Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter() {
@Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
ctx.fireChannelActive();
}
- };
+ });
- Channel childChannel = newOutboundStream();
assertFalse(childChannel.isActive());
childChannel.close();
parentChannel.runPendingTasks();
+ // The channel was never active so we should not generate a RST frame.
+ verify(frameWriter, never()).writeRstStream(eqMultiplexCodecCtx(), eqStreamId(childChannel), anyLong(),
+ anyChannelPromise());
+
assertTrue(parentChannel.outboundMessages().isEmpty());
}
@Test
public void inboundRstStreamFireChannelInactive() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel channel = newInboundStream(3, false, inboundHandler);
assertTrue(inboundHandler.isChannelActive());
- codec.onHttp2Frame(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR)
- .stream(inboundStream));
- codec.onChannelReadComplete();
-
- // This will be called by the frame codec.
- inboundStream.state = Http2Stream.State.CLOSED;
- codec.onHttp2StreamStateChanged(inboundStream);
- parentChannel.runPendingTasks();
+ frameInboundWriter.writeInboundRstStream(channel.stream().id(), Http2Error.INTERNAL_ERROR.code());
assertFalse(inboundHandler.isChannelActive());
+
// A RST_STREAM frame should NOT be emitted, as we received a RST_STREAM.
- assertNull(parentChannel.readOutbound());
+ verify(frameWriter, Mockito.never()).writeRstStream(eqMultiplexCodecCtx(), eqStreamId(channel),
+ anyLong(), anyChannelPromise());
}
@Test(expected = StreamException.class)
public void streamExceptionTriggersChildChannelExceptionAndClose() throws Exception {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel channel = newInboundStream(3, false, inboundHandler);
+ assertTrue(channel.isActive());
+ StreamException cause = new StreamException(channel.stream().id(), Http2Error.PROTOCOL_ERROR, "baaam!");
+ parentChannel.pipeline().fireExceptionCaught(cause);
- StreamException cause = new StreamException(inboundStream.id(), Http2Error.PROTOCOL_ERROR, "baaam!");
- Http2FrameStreamException http2Ex = new Http2FrameStreamException(
- inboundStream, Http2Error.PROTOCOL_ERROR, cause);
- codec.onHttp2FrameStreamException(http2Ex);
-
- inboundHandler.checkException();
- }
-
- @Test(expected = StreamException.class)
- public void streamExceptionClosesChildChannel() throws Exception {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
-
- assertTrue(inboundHandler.isChannelActive());
- StreamException cause = new StreamException(inboundStream.id(), Http2Error.PROTOCOL_ERROR, "baaam!");
- Http2FrameStreamException http2Ex = new Http2FrameStreamException(
- inboundStream, Http2Error.PROTOCOL_ERROR, cause);
- codec.onHttp2FrameStreamException(http2Ex);
- parentChannel.runPendingTasks();
-
- assertFalse(inboundHandler.isChannelActive());
+ assertFalse(channel.isActive());
inboundHandler.checkException();
}
@Test(expected = ClosedChannelException.class)
public void streamClosedErrorTranslatedToClosedChannelExceptionOnWrites() throws Exception {
- writer = new Writer() {
- @Override
- void write(Object msg, ChannelPromise promise) {
- promise.tryFailure(new StreamException(inboundStream.id(), Http2Error.STREAM_CLOSED, "Stream Closed"));
- }
- };
LastInboundHandler inboundHandler = new LastInboundHandler();
- childChannelInitializer.handler = inboundHandler;
- Channel childChannel = newOutboundStream();
+ final Http2StreamChannel childChannel = newOutboundStream(inboundHandler);
assertTrue(childChannel.isActive());
+ Http2Headers headers = new DefaultHttp2Headers();
+ when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(),
+ eq(headers), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(),
+ any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(8)).setFailure(
+ new StreamException(childChannel.stream().id(), Http2Error.STREAM_CLOSED, "Stream Closed"));
+ }
+ });
ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
+
parentChannel.flush();
assertFalse(childChannel.isActive());
@@ -472,9 +481,7 @@ public class Http2MultiplexCodecTest {
@Test
public void creatingWritingReadingAndClosingOutboundStreamShouldWork() {
LastInboundHandler inboundHandler = new LastInboundHandler();
- childChannelInitializer.handler = inboundHandler;
-
- Http2StreamChannel childChannel = newOutboundStream();
+ Http2StreamChannel childChannel = newOutboundStream(inboundHandler);
assertTrue(childChannel.isActive());
assertTrue(inboundHandler.isChannelActive());
@@ -482,25 +489,21 @@ public class Http2MultiplexCodecTest {
Http2Headers headers = new DefaultHttp2Headers().scheme("https").method("GET").path("/foo.txt");
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
- readOutboundHeadersAndAssignId();
-
// Read from the child channel
- headers = new DefaultHttp2Headers().scheme("https").status("200");
- codec.onHttp2Frame(new DefaultHttp2HeadersFrame(headers).stream(childChannel.stream()));
- codec.onChannelReadComplete();
+ frameInboundWriter.writeInboundHeaders(childChannel.stream().id(), headers, 0, false);
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
- assertSame(headers, headersFrame.headers());
+ assertEquals(headers, headersFrame.headers());
// Close the child channel.
childChannel.close();
parentChannel.runPendingTasks();
// An active outbound stream should emit a RST_STREAM frame.
- Http2ResetFrame rstFrame = parentChannel.readOutbound();
- assertNotNull(rstFrame);
- assertEquals(childChannel.stream(), rstFrame.stream());
+ verify(frameWriter).writeRstStream(eqMultiplexCodecCtx(), eqStreamId(childChannel),
+ anyLong(), anyChannelPromise());
+
assertFalse(childChannel.isOpen());
assertFalse(childChannel.isActive());
assertFalse(inboundHandler.isChannelActive());
@@ -511,33 +514,36 @@ public class Http2MultiplexCodecTest {
//
@Test(expected = Http2NoMoreStreamIdsException.class)
public void failedOutboundStreamCreationThrowsAndClosesChannel() throws Exception {
- writer = new Writer() {
- @Override
- void write(Object msg, ChannelPromise promise) {
- promise.tryFailure(new Http2NoMoreStreamIdsException());
- }
- };
- LastInboundHandler inboundHandler = new LastInboundHandler();
- childChannelInitializer.handler = inboundHandler;
-
- Channel childChannel = newOutboundStream();
+ LastInboundHandler handler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newOutboundStream(handler);
assertTrue(childChannel.isActive());
- ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
+ Http2Headers headers = new DefaultHttp2Headers();
+ when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(),
+ eq(headers), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(),
+ any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(8)).setFailure(
+ new Http2NoMoreStreamIdsException());
+ }
+ });
+
+ ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
parentChannel.flush();
assertFalse(childChannel.isActive());
assertFalse(childChannel.isOpen());
- inboundHandler.checkException();
+ handler.checkException();
future.syncUninterruptibly();
}
@Test
public void channelClosedWhenCloseListenerCompletes() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
assertTrue(childChannel.isOpen());
assertTrue(childChannel.isActive());
@@ -564,42 +570,35 @@ public class Http2MultiplexCodecTest {
@Test
public void channelClosedWhenChannelClosePromiseCompletes() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
- assertTrue(childChannel.isOpen());
- assertTrue(childChannel.isActive());
+ assertTrue(childChannel.isOpen());
+ assertTrue(childChannel.isActive());
- final AtomicBoolean channelOpen = new AtomicBoolean(true);
- final AtomicBoolean channelActive = new AtomicBoolean(true);
+ final AtomicBoolean channelOpen = new AtomicBoolean(true);
+ final AtomicBoolean channelActive = new AtomicBoolean(true);
- childChannel.closeFuture().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- channelOpen.set(future.channel().isOpen());
- channelActive.set(future.channel().isActive());
- }
- });
- childChannel.close().syncUninterruptibly();
+ childChannel.closeFuture().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ channelOpen.set(future.channel().isOpen());
+ channelActive.set(future.channel().isActive());
+ }
+ });
+ childChannel.close().syncUninterruptibly();
- assertFalse(channelOpen.get());
- assertFalse(channelActive.get());
- assertFalse(childChannel.isActive());
+ assertFalse(channelOpen.get());
+ assertFalse(channelActive.get());
+ assertFalse(childChannel.isActive());
}
@Test
public void channelClosedWhenWriteFutureFails() {
final Queue writePromises = new ArrayDeque();
- writer = new Writer() {
- @Override
- void write(Object msg, ChannelPromise promise) {
- ReferenceCountUtil.release(msg);
- writePromises.offer(promise);
- }
- };
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
assertTrue(childChannel.isOpen());
assertTrue(childChannel.isActive());
@@ -607,7 +606,19 @@ public class Http2MultiplexCodecTest {
final AtomicBoolean channelOpen = new AtomicBoolean(true);
final AtomicBoolean channelActive = new AtomicBoolean(true);
- ChannelFuture f = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
+ Http2Headers headers = new DefaultHttp2Headers();
+ when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(),
+ eq(headers), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(),
+ any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ ChannelPromise promise = invocationOnMock.getArgument(8);
+ writePromises.offer(promise);
+ return promise;
+ }
+ });
+
+ ChannelFuture f = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
assertFalse(f.isDone());
f.addListener(new ChannelFutureListener() {
@Override
@@ -628,8 +639,8 @@ public class Http2MultiplexCodecTest {
@Test
public void channelClosedTwiceMarksPromiseAsSuccessful() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
assertTrue(childChannel.isOpen());
assertTrue(childChannel.isActive());
@@ -644,7 +655,7 @@ public class Http2MultiplexCodecTest {
public void settingChannelOptsAndAttrs() {
AttributeKey key = AttributeKey.newInstance("foo");
- Channel childChannel = newOutboundStream();
+ Channel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter());
childChannel.config().setAutoRead(false).setWriteSpinCount(1000);
childChannel.attr(key).set("bar");
assertFalse(childChannel.config().isAutoRead());
@@ -654,50 +665,52 @@ public class Http2MultiplexCodecTest {
@Test
public void outboundFlowControlWritability() {
- Http2StreamChannel childChannel = newOutboundStream();
+ Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter());
assertTrue(childChannel.isActive());
assertTrue(childChannel.isWritable());
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
parentChannel.flush();
- Http2FrameStream stream = readOutboundHeadersAndAssignId();
-
// Test for initial window size
assertEquals(initialRemoteStreamWindow, childChannel.config().getWriteBufferHighWaterMark());
- codec.onHttp2StreamWritabilityChanged(stream, true);
assertTrue(childChannel.isWritable());
- codec.onHttp2StreamWritabilityChanged(stream, false);
+ childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(16 * 1024 * 1024)));
assertFalse(childChannel.isWritable());
}
@Test
public void writabilityAndFlowControl() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
assertEquals("", inboundHandler.writabilityStates());
+ assertTrue(childChannel.isWritable());
// HEADERS frames are not flow controlled, so they should not affect the flow control window.
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
- codec.onHttp2StreamWritabilityChanged(childChannel.stream(), true);
+ codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true);
- assertEquals("true", inboundHandler.writabilityStates());
+ assertTrue(childChannel.isWritable());
+ assertEquals("", inboundHandler.writabilityStates());
- codec.onHttp2StreamWritabilityChanged(childChannel.stream(), true);
- assertEquals("true", inboundHandler.writabilityStates());
+ codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true);
+ assertTrue(childChannel.isWritable());
+ assertEquals("", inboundHandler.writabilityStates());
- codec.onHttp2StreamWritabilityChanged(childChannel.stream(), false);
- assertEquals("true,false", inboundHandler.writabilityStates());
+ codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false);
+ assertFalse(childChannel.isWritable());
+ assertEquals("false", inboundHandler.writabilityStates());
- codec.onHttp2StreamWritabilityChanged(childChannel.stream(), false);
- assertEquals("true,false", inboundHandler.writabilityStates());
+ codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false);
+ assertFalse(childChannel.isWritable());
+ assertEquals("false", inboundHandler.writabilityStates());
}
@Test
public void channelClosedWhenInactiveFired() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
final AtomicBoolean channelOpen = new AtomicBoolean(false);
final AtomicBoolean channelActive = new AtomicBoolean(false);
@@ -725,9 +738,7 @@ public class Http2MultiplexCodecTest {
final AtomicInteger exceptionCaught = new AtomicInteger(-1);
final AtomicInteger channelInactive = new AtomicInteger(-1);
final AtomicInteger channelUnregistered = new AtomicInteger(-1);
- Http2StreamChannel childChannel = newOutboundStream();
-
- childChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+ Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@@ -767,28 +778,10 @@ public class Http2MultiplexCodecTest {
assertEquals(2, channelUnregistered.get());
}
- @Ignore("not supported anymore atm")
- @Test
- public void cancellingWritesBeforeFlush() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Channel childChannel = inboundHandler.channel();
-
- Http2HeadersFrame headers1 = new DefaultHttp2HeadersFrame(new DefaultHttp2Headers());
- Http2HeadersFrame headers2 = new DefaultHttp2HeadersFrame(new DefaultHttp2Headers());
- ChannelPromise writePromise = childChannel.newPromise();
- childChannel.write(headers1, writePromise);
- childChannel.write(headers2);
- assertTrue(writePromise.cancel(false));
- childChannel.flush();
-
- Http2HeadersFrame headers = parentChannel.readOutbound();
- assertSame(headers, headers2);
- }
-
@Test
public void callUnsafeCloseMultipleTimes() {
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler();
+ Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
childChannel.unsafe().close(childChannel.voidPromise());
ChannelPromise promise = childChannel.newPromise();
@@ -809,49 +802,56 @@ public class Http2MultiplexCodecTest {
}
}
};
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler(ctxConsumer);
+ Http2StreamChannel childChannel = newInboundStream(3, false, numReads, inboundHandler);
childChannel.config().setAutoRead(false);
- Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream);
- Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream);
- Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream);
- Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream);
+ Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(childChannel.stream());
+ Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(childChannel.stream());
+ Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(childChannel.stream());
+ Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(childChannel.stream());
- assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound());
+ assertEquals(new DefaultHttp2HeadersFrame(request).stream(childChannel.stream()), inboundHandler.readInbound());
- // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
- parentChannel.writeOneInbound(new Object());
- codec.onHttp2Frame(dataFrame1);
- assertEquals(dataFrame1, inboundHandler.readInbound());
+ ChannelHandler readCompleteSupressHandler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
+ }
+ };
+
+ parentChannel.pipeline().addFirst(readCompleteSupressHandler);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("1"), 0, false);
+
+ assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound());
// Deliver frames, and then a stream closed while read is inactive.
- codec.onHttp2Frame(dataFrame2);
- codec.onHttp2Frame(dataFrame3);
- codec.onHttp2Frame(dataFrame4);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("2"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("3"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("4"), 0, false);
shouldDisableAutoRead.set(true);
childChannel.config().setAutoRead(true);
numReads.set(1);
- inboundStream.state = Http2Stream.State.CLOSED;
- codec.onHttp2StreamStateChanged(inboundStream);
+ frameInboundWriter.writeInboundRstStream(childChannel.stream().id(), Http2Error.NO_ERROR.code());
// Detecting EOS should flush all pending data regardless of read calls.
- assertEquals(dataFrame2, inboundHandler.readInbound());
- assertEquals(dataFrame3, inboundHandler.readInbound());
- assertEquals(dataFrame4, inboundHandler.readInbound());
+ assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound());
+ assertEqualsAndRelease(dataFrame3, inboundHandler.readInbound());
+ assertEqualsAndRelease(dataFrame4, inboundHandler.readInbound());
+
+ Http2ResetFrame resetFrame = inboundHandler.readInbound();
+ assertEquals(childChannel.stream(), resetFrame.stream());
+ assertEquals(Http2Error.NO_ERROR.code(), resetFrame.errorCode());
+
assertNull(inboundHandler.readInbound());
// Now we want to call channelReadComplete and simulate the end of the read loop.
+ parentChannel.pipeline().remove(readCompleteSupressHandler);
parentChannel.flushInbound();
childChannel.closeFuture().syncUninterruptibly();
-
- dataFrame1.release();
- dataFrame2.release();
- dataFrame3.release();
- dataFrame4.release();
}
@Test
@@ -868,54 +868,58 @@ public class Http2MultiplexCodecTest {
}
}
};
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ LastInboundHandler inboundHandler = new LastInboundHandler(ctxConsumer);
+ Http2StreamChannel childChannel = newInboundStream(3, false, numReads, inboundHandler);
childChannel.config().setAutoRead(false);
- Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream);
- Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream);
- Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream);
- Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream);
+ Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(childChannel.stream());
+ Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(childChannel.stream());
+ Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(childChannel.stream());
+ Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(childChannel.stream());
- assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound());
+ assertEquals(new DefaultHttp2HeadersFrame(request).stream(childChannel.stream()), inboundHandler.readInbound());
- // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
- parentChannel.writeOneInbound(new Object());
- codec.onHttp2Frame(dataFrame1);
- assertEquals(dataFrame1, inboundHandler.readInbound());
+ ChannelHandler readCompleteSupressHandler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
+ }
+ };
+ parentChannel.pipeline().addFirst(readCompleteSupressHandler);
+
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("1"), 0, false);
+
+ assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound());
// We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that
// when beginRead() is called the child channel is added to the readPending queue of the parent channel.
- codec.onHttp2Frame(dataFrame2);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("2"), 0, false);
numReads.set(10);
shouldDisableAutoRead.set(true);
childChannel.config().setAutoRead(true);
- codec.onHttp2Frame(dataFrame3);
- codec.onHttp2Frame(dataFrame4);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("3"), 0, false);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("4"), 0, false);
// Detecting EOS should flush all pending data regardless of read calls.
- assertEquals(dataFrame2, inboundHandler.readInbound());
- assertEquals(dataFrame3, inboundHandler.readInbound());
- assertEquals(dataFrame4, inboundHandler.readInbound());
+ assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound());
+ assertEqualsAndRelease(dataFrame3, inboundHandler.readInbound());
+ assertEqualsAndRelease(dataFrame4, inboundHandler.readInbound());
+
assertNull(inboundHandler.readInbound());
// Now we want to call channelReadComplete and simulate the end of the read loop.
+ parentChannel.pipeline().remove(readCompleteSupressHandler);
parentChannel.flushInbound();
// 3 = 1 for initialization + 1 for read when auto read was off + 1 for when auto read was back on
assertEquals(3, channelReadCompleteCount.get());
-
- dataFrame1.release();
- dataFrame2.release();
- dataFrame3.release();
- dataFrame4.release();
}
@Test
public void childQueueIsDrainedAndNewDataIsDispatchedInParentReadLoopNoAutoRead() {
- AtomicInteger numReads = new AtomicInteger(1);
+ final AtomicInteger numReads = new AtomicInteger(1);
final AtomicInteger channelReadCompleteCount = new AtomicInteger(0);
final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean();
Consumer ctxConsumer = new Consumer() {
@@ -927,214 +931,77 @@ public class Http2MultiplexCodecTest {
}
}
};
- LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer);
- Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
+ final LastInboundHandler inboundHandler = new LastInboundHandler(ctxConsumer);
+ Http2StreamChannel childChannel = newInboundStream(3, false, numReads, inboundHandler);
childChannel.config().setAutoRead(false);
- Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream);
- Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream);
- Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream);
- Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream);
+ Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(childChannel.stream());
+ Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(childChannel.stream());
+ Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(childChannel.stream());
+ Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(childChannel.stream());
- assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound());
+ assertEquals(new DefaultHttp2HeadersFrame(request).stream(childChannel.stream()), inboundHandler.readInbound());
- // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
- parentChannel.writeOneInbound(new Object());
- codec.onHttp2Frame(dataFrame1);
- assertEquals(dataFrame1, inboundHandler.readInbound());
+ ChannelHandler readCompleteSupressHandler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete.
+ }
+ };
+ parentChannel.pipeline().addFirst(readCompleteSupressHandler);
+
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("1"), 0, false);
+
+ assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound());
// We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that
// when beginRead() is called the child channel is added to the readPending queue of the parent channel.
- codec.onHttp2Frame(dataFrame2);
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("2"), 0, false);
numReads.set(2);
childChannel.read();
- assertEquals(dataFrame2, inboundHandler.readInbound());
+
+ assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound());
+
assertNull(inboundHandler.readInbound());
// This is the second item that was read, this should be the last until we call read() again. This should also
// notify of readComplete().
- codec.onHttp2Frame(dataFrame3);
- assertEquals(dataFrame3, inboundHandler.readInbound());
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("3"), 0, false);
- codec.onHttp2Frame(dataFrame4);
+ assertEqualsAndRelease(dataFrame3, inboundHandler.readInbound());
+
+ frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("4"), 0, false);
assertNull(inboundHandler.readInbound());
childChannel.read();
- assertEquals(dataFrame4, inboundHandler.readInbound());
+
+ assertEqualsAndRelease(dataFrame4, inboundHandler.readInbound());
+
assertNull(inboundHandler.readInbound());
// Now we want to call channelReadComplete and simulate the end of the read loop.
+ parentChannel.pipeline().remove(readCompleteSupressHandler);
parentChannel.flushInbound();
// 3 = 1 for initialization + 1 for first read of 2 items + 1 for second read of 2 items +
// 1 for parent channel readComplete
assertEquals(4, channelReadCompleteCount.get());
-
- dataFrame1.release();
- dataFrame2.release();
- dataFrame3.release();
- dataFrame4.release();
}
- private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream) {
- return streamActiveAndWriteHeaders(stream, null, LastInboundHandler.noopConsumer());
- }
-
- private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream,
- AtomicInteger maxReads,
- Consumer contextConsumer) {
-
- LastInboundHandler inboundHandler = new LastInboundHandler(contextConsumer);
- childChannelInitializer.handler = inboundHandler;
- childChannelInitializer.maxReads = maxReads;
- assertFalse(inboundHandler.isChannelActive());
- ((TestableHttp2MultiplexCodec.Stream) stream).state = Http2Stream.State.OPEN;
- codec.onHttp2StreamStateChanged(stream);
- codec.onHttp2Frame(new DefaultHttp2HeadersFrame(request).stream(stream));
- codec.onChannelReadComplete();
- assertTrue(inboundHandler.isChannelActive());
-
- return inboundHandler;
- }
-
- private static void verifyFramesMultiplexedToCorrectChannel(Http2FrameStream stream,
+ private static void verifyFramesMultiplexedToCorrectChannel(Http2StreamChannel streamChannel,
LastInboundHandler inboundHandler,
int numFrames) {
for (int i = 0; i < numFrames; i++) {
Http2StreamFrame frame = inboundHandler.readInbound();
assertNotNull(frame);
- assertEquals(stream, frame.stream());
+ assertEquals(streamChannel.stream(), frame.stream());
release(frame);
}
assertNull(inboundHandler.readInbound());
}
- private static ByteBuf bb(String s) {
- return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
- }
-
- /**
- * Simulates the frame codec, in first assigning an identifier and the completing the write promise.
- */
- private Http2FrameStream readOutboundHeadersAndAssignId() {
- // Only peek at the frame, so to not complete the promise of the write. We need to first
- // assign a stream identifier, as the frame codec would do.
- Http2HeadersFrame headersFrame = (Http2HeadersFrame) parentChannel.outboundMessages().peek();
- assertNotNull(headersFrame);
- assertNotNull(headersFrame.stream());
- assertFalse(Http2CodecUtil.isStreamIdValid(headersFrame.stream().id()));
- TestableHttp2MultiplexCodec.Stream frameStream = (TestableHttp2MultiplexCodec.Stream) headersFrame.stream();
- frameStream.id = outboundStream.id();
- // Create the stream in the Http2Connection.
- try {
- Http2Stream stream = codec.connection().local().createStream(
- headersFrame.stream().id(), headersFrame.isEndStream());
- frameStream.stream = stream;
- } catch (Exception ex) {
- throw new IllegalStateException("Failed to create a stream", ex);
- }
-
- // Now read it and complete the write promise.
- assertSame(headersFrame, parentChannel.readOutbound());
-
- return headersFrame.stream();
- }
-
- /**
- * This class removes the bits that would transform the frames to bytes and so make it easier to test the actual
- * special handling of the codec.
- */
- private final class TestableHttp2MultiplexCodec extends Http2MultiplexCodec {
-
- public TestableHttp2MultiplexCodec(Http2ConnectionEncoder encoder,
- Http2ConnectionDecoder decoder,
- Http2Settings initialSettings,
- ChannelHandler inboundStreamHandler) {
- super(encoder, decoder, initialSettings, inboundStreamHandler, null);
- }
-
- void onHttp2Frame(Http2Frame frame) {
- onHttp2Frame(ctx, frame);
- }
-
- void onChannelReadComplete() {
- onChannelReadComplete(ctx);
- }
-
- void onHttp2StreamStateChanged(Http2FrameStream stream) {
- onHttp2StreamStateChanged(ctx, stream);
- }
-
- void onHttp2FrameStreamException(Http2FrameStreamException cause) {
- onHttp2FrameStreamException(ctx, cause);
- }
-
- void onHttp2StreamWritabilityChanged(Http2FrameStream stream, boolean writable) {
- onHttp2StreamWritabilityChanged(ctx, stream, writable);
- }
-
- @Override
- boolean onBytesConsumed(ChannelHandlerContext ctx, Http2FrameStream stream, int bytes) {
- writer.write(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream), ctx.newPromise());
- return true;
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- writer.write(msg, promise);
- }
-
- @Override
- void flush0(ChannelHandlerContext ctx) {
- // Do nothing
- }
-
- @Override
- Stream newStream() {
- return new Stream();
- }
-
- final class Stream extends Http2MultiplexCodecStream {
- Http2Stream.State state = Http2Stream.State.IDLE;
- int id = -1;
-
- @Override
- public int id() {
- return id;
- }
-
- @Override
- public Http2Stream.State state() {
- return state;
- }
- }
- }
-
- private final class TestableHttp2MultiplexCodecBuilder extends Http2MultiplexCodecBuilder {
-
- TestableHttp2MultiplexCodecBuilder(boolean server, ChannelHandler childHandler) {
- super(server, childHandler);
- }
-
- @Override
- public TestableHttp2MultiplexCodec build() {
- return (TestableHttp2MultiplexCodec) super.build();
- }
-
- @Override
- protected Http2MultiplexCodec build(
- Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) {
- return new TestableHttp2MultiplexCodec(
- encoder, decoder, initialSettings, childHandler);
- }
- }
-
- class Writer {
-
- void write(Object msg, ChannelPromise promise) {
- parentChannel.outboundMessages().add(msg);
- promise.setSuccess();
- }
+ private static int eqStreamId(Http2StreamChannel channel) {
+ return eq(channel.stream().id());
}
}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java
index e7130ee5d9..f660381237 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java
@@ -15,7 +15,9 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -24,18 +26,34 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.AsciiString;
+import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import junit.framework.AssertionFailedError;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_HEADER_LIST_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_HEADER_TABLE_SIZE;
+import static io.netty.util.ReferenceCountUtil.release;
import static java.lang.Math.min;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyByte;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
/**
* Utilities for the integration tests.
@@ -506,4 +524,179 @@ public final class Http2TestUtil {
return isWriteAllowed ? (int) min(pendingBytes, Integer.MAX_VALUE) : -1;
}
}
+
+ static Http2FrameWriter mockedFrameWriter() {
+ Http2FrameWriter.Configuration configuration = new Http2FrameWriter.Configuration() {
+ private final Http2HeadersEncoder.Configuration headerConfiguration =
+ new Http2HeadersEncoder.Configuration() {
+ @Override
+ public void maxHeaderTableSize(long max) {
+ // NOOP
+ }
+
+ @Override
+ public long maxHeaderTableSize() {
+ return 0;
+ }
+
+ @Override
+ public void maxHeaderListSize(long max) {
+ // NOOP
+ }
+
+ @Override
+ public long maxHeaderListSize() {
+ return 0;
+ }
+ };
+
+ private final Http2FrameSizePolicy policy = new Http2FrameSizePolicy() {
+ @Override
+ public void maxFrameSize(int max) {
+ // NOOP
+ }
+
+ @Override
+ public int maxFrameSize() {
+ return 0;
+ }
+ };
+ @Override
+ public Http2HeadersEncoder.Configuration headersConfiguration() {
+ return headerConfiguration;
+ }
+
+ @Override
+ public Http2FrameSizePolicy frameSizePolicy() {
+ return policy;
+ }
+ };
+
+ final ConcurrentLinkedQueue buffers = new ConcurrentLinkedQueue();
+
+ Http2FrameWriter frameWriter = Mockito.mock(Http2FrameWriter.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) {
+ for (;;) {
+ ByteBuf buf = buffers.poll();
+ if (buf == null) {
+ break;
+ }
+ buf.release();
+ }
+ return null;
+ }
+ }).when(frameWriter).close();
+
+ when(frameWriter.configuration()).thenReturn(configuration);
+ when(frameWriter.writeSettings(any(ChannelHandlerContext.class), any(Http2Settings.class),
+ any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(2)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writeSettingsAck(any(ChannelHandlerContext.class), any(ChannelPromise.class)))
+ .thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(1)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writeGoAway(any(ChannelHandlerContext.class), anyInt(),
+ anyLong(), any(ByteBuf.class), any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ buffers.offer((ByteBuf) invocationOnMock.getArgument(3));
+ return ((ChannelPromise) invocationOnMock.getArgument(4)).setSuccess();
+ }
+ });
+ when(frameWriter.writeHeaders(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class), anyInt(),
+ anyBoolean(), any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writeHeaders(any(ChannelHandlerContext.class), anyInt(),
+ any(Http2Headers.class), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(),
+ any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(8)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writeData(any(ChannelHandlerContext.class), anyInt(), any(ByteBuf.class), anyInt(),
+ anyBoolean(), any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ buffers.offer((ByteBuf) invocationOnMock.getArgument(2));
+ return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writeRstStream(any(ChannelHandlerContext.class), anyInt(),
+ anyLong(), any(ChannelPromise.class))).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(3)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), anyInt(),
+ any(ChannelPromise.class))).then(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(3)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writePushPromise(any(ChannelHandlerContext.class), anyInt(), anyInt(), any(Http2Headers.class),
+ anyInt(), anyChannelPromise())).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess();
+ }
+ });
+
+ when(frameWriter.writeFrame(any(ChannelHandlerContext.class), anyByte(), anyInt(), any(Http2Flags.class),
+ any(ByteBuf.class), anyChannelPromise())).thenAnswer(new Answer() {
+ @Override
+ public ChannelFuture answer(InvocationOnMock invocationOnMock) {
+ buffers.offer((ByteBuf) invocationOnMock.getArgument(4));
+ return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess();
+ }
+ });
+ return frameWriter;
+ }
+
+ static ChannelPromise anyChannelPromise() {
+ return any(ChannelPromise.class);
+ }
+
+ static Http2Settings anyHttp2Settings() {
+ return any(Http2Settings.class);
+ }
+
+ static ByteBuf bb(String s) {
+ return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
+ }
+
+ static void assertEqualsAndRelease(Http2Frame expected, Http2Frame actual) {
+ try {
+ assertEquals(expected, actual);
+ } finally {
+ release(expected);
+ release(actual);
+ // Will return -1 when not implements ReferenceCounted.
+ assertTrue(ReferenceCountUtil.refCnt(expected) <= 0);
+ assertTrue(ReferenceCountUtil.refCnt(actual) <= 0);
+ }
+ }
+
}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java
index 015550f078..2cc79d14e5 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java
@@ -50,9 +50,9 @@ public class TestChannelInitializer extends ChannelInitializer {
/**
* Designed to read a single byte at a time to control the number of reads done at a fine granularity.
*/
- private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
+ static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
private final AtomicInteger numReads;
- TestNumReadsRecvByteBufAllocator(AtomicInteger numReads) {
+ private TestNumReadsRecvByteBufAllocator(AtomicInteger numReads) {
this.numReads = numReads;
}