From fa84e2b3af45ec7fd47909eff0aa7d2be5a54972 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 14 Dec 2018 10:10:04 +0000 Subject: [PATCH] Cleanup HTTP/2 tests for Http2FrameCodec and Http2MultiplexCodec (#8646) Motiviation: Http2FrameCodecTest and Http2MultiplexCodecTest were quite fragile and often not went through the whole pipeline which made testing sometimes hard and error-prone. Modification: - Refactor tests to have data flow through the whole pipeline and so made the test more robust (by testing the while implementation). Result: Easier to write tests for the codecs in the future and more robust testing in general. Beside this it also fixes https://github.com/netty/netty/issues/6036. --- .../handler/codec/http2/Http2FrameCodec.java | 2 +- .../codec/http2/Http2MultiplexCodec.java | 23 +- .../http2/Http2MultiplexCodecBuilder.java | 29 + .../codec/http2/Http2FrameCodecTest.java | 203 ++-- .../codec/http2/Http2FrameInboundWriter.java | 340 +++++++ .../codec/http2/Http2MultiplexCodecTest.java | 875 ++++++++---------- .../handler/codec/http2/Http2TestUtil.java | 193 ++++ .../codec/http2/TestChannelInitializer.java | 4 +- 8 files changed, 1028 insertions(+), 641 deletions(-) create mode 100644 codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java index 47f2533f3f..b9075144f3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java @@ -148,7 +148,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler { private final Integer initialFlowControlWindowSize; - private ChannelHandlerContext ctx; + ChannelHandlerContext ctx; /** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/ private int numBufferedStreams; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index 46aa97a536..d19ce2b8f8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -414,28 +414,11 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } } - // Allow to override for testing - void flush0(ChannelHandlerContext ctx) { + final void flush0(ChannelHandlerContext ctx) { flush(ctx); } - /** - * Return bytes to flow control. - *

- * Package private to allow to override for testing - * @param ctx The {@link ChannelHandlerContext} associated with the parent channel. - * @param stream The object representing the HTTP/2 stream. - * @param bytes The number of bytes to return to flow control. - * @return {@code true} if a frame has been written as a result of this method call. - * @throws Http2Exception If this operation violates the flow control limits. - */ - boolean onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx, - Http2FrameStream stream, int bytes) throws Http2Exception { - return consumeBytes(stream.id(), bytes); - } - - // Allow to extend for testing - static class Http2MultiplexCodecStream extends DefaultHttp2FrameStream { + static final class Http2MultiplexCodecStream extends DefaultHttp2FrameStream { DefaultHttp2StreamChannel channel; } @@ -1084,7 +1067,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { allocHandle.lastBytesRead(numBytesToBeConsumed); if (numBytesToBeConsumed != 0) { try { - writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed); + writeDoneAndNoFlush |= consumeBytes(stream.id(), numBytesToBeConsumed); } catch (Http2Exception e) { pipeline().fireExceptionCaught(e); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java index 94f0d49720..c5732ec687 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilder.java @@ -27,6 +27,7 @@ import static io.netty.util.internal.ObjectUtil.checkNotNull; @UnstableApi public class Http2MultiplexCodecBuilder extends AbstractHttp2ConnectionHandlerBuilder { + private Http2FrameWriter frameWriter; final ChannelHandler childHandler; private ChannelHandler upgradeStreamHandler; @@ -44,6 +45,12 @@ public class Http2MultiplexCodecBuilder return handler; } + // For testing only. + Http2MultiplexCodecBuilder frameWriter(Http2FrameWriter frameWriter) { + this.frameWriter = checkNotNull(frameWriter, "frameWriter"); + return this; + } + /** * Creates a builder for a HTTP/2 client. * @@ -160,6 +167,28 @@ public class Http2MultiplexCodecBuilder @Override public Http2MultiplexCodec build() { + Http2FrameWriter frameWriter = this.frameWriter; + if (frameWriter != null) { + // This is to support our tests and will never be executed by the user as frameWriter(...) + // is package-private. + DefaultHttp2Connection connection = new DefaultHttp2Connection(isServer(), maxReservedStreams()); + Long maxHeaderListSize = initialSettings().maxHeaderListSize(); + Http2FrameReader frameReader = new DefaultHttp2FrameReader(maxHeaderListSize == null ? + new DefaultHttp2HeadersDecoder(true) : + new DefaultHttp2HeadersDecoder(true, maxHeaderListSize)); + + if (frameLogger() != null) { + frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger()); + frameReader = new Http2InboundFrameLogger(frameReader, frameLogger()); + } + Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); + if (encoderEnforceMaxConcurrentStreams()) { + encoder = new StreamBufferingEncoder(encoder); + } + Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); + + return build(decoder, encoder, initialSettings()); + } return super.build(); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java index 2123986047..a3ccf038d0 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java @@ -15,9 +15,7 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -57,6 +55,11 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid; +import static io.netty.handler.codec.http2.Http2TestUtil.anyChannelPromise; +import static io.netty.handler.codec.http2.Http2TestUtil.anyHttp2Settings; +import static io.netty.handler.codec.http2.Http2TestUtil.assertEqualsAndRelease; +import static io.netty.handler.codec.http2.Http2TestUtil.bb; + import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -73,7 +76,6 @@ import static org.mockito.Mockito.anyShort; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.same; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -86,9 +88,10 @@ public class Http2FrameCodecTest { private Http2FrameWriter frameWriter; private Http2FrameCodec frameCodec; private EmbeddedChannel channel; + // For injecting inbound frames - private Http2FrameListener frameListener; - private ChannelHandlerContext http2HandlerCtx; + private Http2FrameInboundWriter frameInboundWriter; + private LastInboundHandler inboundHandler; private final Http2Headers request = new DefaultHttp2Headers() @@ -123,29 +126,29 @@ public class Http2FrameCodecTest { */ tearDown(); - frameWriter = spy(new VerifiableHttp2FrameWriter()); + frameWriter = Http2TestUtil.mockedFrameWriter(); + frameCodec = frameCodecBuilder.frameWriter(frameWriter).frameLogger(new Http2FrameLogger(LogLevel.TRACE)) .initialSettings(initialRemoteSettings).build(); - frameListener = ((DefaultHttp2ConnectionDecoder) frameCodec.decoder()) - .internalFrameListener(); inboundHandler = new LastInboundHandler(); channel = new EmbeddedChannel(); + frameInboundWriter = new Http2FrameInboundWriter(channel); channel.connect(new InetSocketAddress(0)); channel.pipeline().addLast(frameCodec); channel.pipeline().addLast(inboundHandler); channel.pipeline().fireChannelActive(); - http2HandlerCtx = channel.pipeline().context(frameCodec); - // Handshake - verify(frameWriter).writeSettings(eq(http2HandlerCtx), - anyHttp2Settings(), anyChannelPromise()); + verify(frameWriter).writeSettings(eqFrameCodecCtx(), anyHttp2Settings(), anyChannelPromise()); verifyNoMoreInteractions(frameWriter); channel.writeInbound(Http2CodecUtil.connectionPrefaceBuf()); - frameListener.onSettingsRead(http2HandlerCtx, initialRemoteSettings); - verify(frameWriter).writeSettingsAck(eq(http2HandlerCtx), anyChannelPromise()); - frameListener.onSettingsAckRead(http2HandlerCtx); + + frameInboundWriter.writeInboundSettings(initialRemoteSettings); + + verify(frameWriter).writeSettingsAck(eqFrameCodecCtx(), anyChannelPromise()); + + frameInboundWriter.writeInboundSettingsAck(); Http2SettingsFrame settingsFrame = inboundHandler.readInbound(); assertNotNull(settingsFrame); @@ -153,7 +156,7 @@ public class Http2FrameCodecTest { @Test public void stateChanges() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true); + frameInboundWriter.writeInboundHeaders(1, request, 31, true); Http2Stream stream = frameCodec.connection().stream(1); assertNotNull(stream); @@ -169,12 +172,12 @@ public class Http2FrameCodecTest { assertEquals(inboundFrame, new DefaultHttp2HeadersFrame(request, true, 31).stream(stream2)); assertNull(inboundHandler.readInbound()); - inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2)); + channel.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2)); verify(frameWriter).writeHeaders( - eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(), + eqFrameCodecCtx(), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(), eq(27), eq(true), anyChannelPromise()); verify(frameWriter, never()).writeRstStream( - any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise()); + eqFrameCodecCtx(), anyInt(), anyLong(), anyChannelPromise()); assertEquals(State.CLOSED, stream.state()); event = inboundHandler.readInboundMessageOrUserEvent(); @@ -185,7 +188,7 @@ public class Http2FrameCodecTest { @Test public void headerRequestHeaderResponse() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true); + frameInboundWriter.writeInboundHeaders(1, request, 31, true); Http2Stream stream = frameCodec.connection().stream(1); assertNotNull(stream); @@ -198,34 +201,34 @@ public class Http2FrameCodecTest { assertEquals(inboundFrame, new DefaultHttp2HeadersFrame(request, true, 31).stream(stream2)); assertNull(inboundHandler.readInbound()); - inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2)); + channel.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2)); verify(frameWriter).writeHeaders( - eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(), + eqFrameCodecCtx(), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(), eq(27), eq(true), anyChannelPromise()); verify(frameWriter, never()).writeRstStream( - any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise()); + eqFrameCodecCtx(), anyInt(), anyLong(), anyChannelPromise()); assertEquals(State.CLOSED, stream.state()); assertTrue(channel.isActive()); } - @Test - public void flowControlShouldBeResilientToMissingStreams() throws Http2Exception { - Http2Connection conn = new DefaultHttp2Connection(true); - Http2ConnectionEncoder enc = new DefaultHttp2ConnectionEncoder(conn, new DefaultHttp2FrameWriter()); - Http2ConnectionDecoder dec = new DefaultHttp2ConnectionDecoder(conn, enc, new DefaultHttp2FrameReader()); - Http2FrameCodec codec = new Http2FrameCodec(enc, dec, new Http2Settings()); - EmbeddedChannel em = new EmbeddedChannel(codec); + @Test + public void flowControlShouldBeResilientToMissingStreams() throws Http2Exception { + Http2Connection conn = new DefaultHttp2Connection(true); + Http2ConnectionEncoder enc = new DefaultHttp2ConnectionEncoder(conn, new DefaultHttp2FrameWriter()); + Http2ConnectionDecoder dec = new DefaultHttp2ConnectionDecoder(conn, enc, new DefaultHttp2FrameReader()); + Http2FrameCodec codec = new Http2FrameCodec(enc, dec, new Http2Settings()); + EmbeddedChannel em = new EmbeddedChannel(codec); - // We call #consumeBytes on a stream id which has not been seen yet to emulate the case - // where a stream is deregistered which in reality can happen in response to a RST. - assertFalse(codec.consumeBytes(1, 1)); - assertTrue(em.finishAndReleaseAll()); - } + // We call #consumeBytes on a stream id which has not been seen yet to emulate the case + // where a stream is deregistered which in reality can happen in response to a RST. + assertFalse(codec.consumeBytes(1, 1)); + assertTrue(em.finishAndReleaseAll()); + } @Test public void entityRequestEntityResponse() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 1, request, 0, false); + frameInboundWriter.writeInboundHeaders(1, request, 0, false); Http2Stream stream = frameCodec.connection().stream(1); assertNotNull(stream); @@ -239,39 +242,35 @@ public class Http2FrameCodecTest { assertNull(inboundHandler.readInbound()); ByteBuf hello = bb("hello"); - frameListener.onDataRead(http2HandlerCtx, 1, hello, 31, true); - // Release hello to emulate ByteToMessageDecoder - hello.release(); + frameInboundWriter.writeInboundData(1, hello, 31, true); Http2DataFrame inboundData = inboundHandler.readInbound(); Http2DataFrame expected = new DefaultHttp2DataFrame(bb("hello"), true, 31).stream(stream2); - assertEquals(expected, inboundData); + assertEqualsAndRelease(expected, inboundData); - assertEquals(1, inboundData.refCnt()); - expected.release(); - inboundData.release(); assertNull(inboundHandler.readInbound()); - inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).stream(stream2)); - verify(frameWriter).writeHeaders(eq(http2HandlerCtx), eq(1), eq(response), anyInt(), + channel.writeOutbound(new DefaultHttp2HeadersFrame(response, false).stream(stream2)); + verify(frameWriter).writeHeaders(eqFrameCodecCtx(), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(), eq(0), eq(false), anyChannelPromise()); - inboundHandler.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27).stream(stream2)); + channel.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27).stream(stream2)); ArgumentCaptor outboundData = ArgumentCaptor.forClass(ByteBuf.class); - verify(frameWriter).writeData(eq(http2HandlerCtx), eq(1), outboundData.capture(), eq(27), + verify(frameWriter).writeData(eqFrameCodecCtx(), eq(1), outboundData.capture(), eq(27), eq(true), anyChannelPromise()); ByteBuf bb = bb("world"); assertEquals(bb, outboundData.getValue()); assertEquals(1, outboundData.getValue().refCnt()); bb.release(); - verify(frameWriter, never()).writeRstStream( - any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise()); + outboundData.getValue().release(); + + verify(frameWriter, never()).writeRstStream(eqFrameCodecCtx(), anyInt(), anyLong(), anyChannelPromise()); assertTrue(channel.isActive()); } @Test public void sendRstStream() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, true); + frameInboundWriter.writeInboundHeaders(3, request, 31, true); Http2Stream stream = frameCodec.connection().stream(3); assertNotNull(stream); @@ -285,16 +284,15 @@ public class Http2FrameCodecTest { assertNotNull(stream2); assertEquals(3, stream2.id()); - inboundHandler.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).stream(stream2)); - verify(frameWriter).writeRstStream( - eq(http2HandlerCtx), eq(3), eq(314L), anyChannelPromise()); + channel.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).stream(stream2)); + verify(frameWriter).writeRstStream(eqFrameCodecCtx(), eq(3), eq(314L), anyChannelPromise()); assertEquals(State.CLOSED, stream.state()); assertTrue(channel.isActive()); } @Test public void receiveRstStream() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); + frameInboundWriter.writeInboundHeaders(3, request, 31, false); Http2Stream stream = frameCodec.connection().stream(3); assertNotNull(stream); @@ -304,7 +302,7 @@ public class Http2FrameCodecTest { Http2HeadersFrame actualHeaders = inboundHandler.readInbound(); assertEquals(expectedHeaders.stream(actualHeaders.stream()), actualHeaders); - frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.NO_ERROR.code()); + frameInboundWriter.writeInboundRstStream(3, Http2Error.NO_ERROR.code()); Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).stream(actualHeaders.stream()); Http2ResetFrame actualRst = inboundHandler.readInbound(); @@ -315,8 +313,7 @@ public class Http2FrameCodecTest { @Test public void sendGoAway() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); - + frameInboundWriter.writeInboundHeaders(3, request, 31, false); Http2Stream stream = frameCodec.connection().stream(3); assertNotNull(stream); assertEquals(State.OPEN, stream.state()); @@ -324,32 +321,29 @@ public class Http2FrameCodecTest { ByteBuf debugData = bb("debug"); ByteBuf expected = debugData.copy(); - Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData.slice()); + Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData); goAwayFrame.setExtraStreamIds(2); - inboundHandler.writeOutbound(goAwayFrame); - verify(frameWriter).writeGoAway( - eq(http2HandlerCtx), eq(7), eq(Http2Error.NO_ERROR.code()), eq(expected), anyChannelPromise()); + channel.writeOutbound(goAwayFrame); + verify(frameWriter).writeGoAway(eqFrameCodecCtx(), eq(7), + eq(Http2Error.NO_ERROR.code()), eq(expected), anyChannelPromise()); assertEquals(1, debugData.refCnt()); assertEquals(State.OPEN, stream.state()); assertTrue(channel.isActive()); expected.release(); + debugData.release(); } @Test public void receiveGoaway() throws Exception { ByteBuf debugData = bb("foo"); - frameListener.onGoAwayRead(http2HandlerCtx, 2, Http2Error.NO_ERROR.code(), debugData); - // Release debugData to emulate ByteToMessageDecoder - debugData.release(); + frameInboundWriter.writeInboundGoAway(2, Http2Error.NO_ERROR.code(), debugData); Http2GoAwayFrame expectedFrame = new DefaultHttp2GoAwayFrame(2, Http2Error.NO_ERROR.code(), bb("foo")); Http2GoAwayFrame actualFrame = inboundHandler.readInbound(); - assertEquals(expectedFrame, actualFrame); - assertNull(inboundHandler.readInbound()); + assertEqualsAndRelease(expectedFrame, actualFrame); - expectedFrame.release(); - actualFrame.release(); + assertNull(inboundHandler.readInbound()); } @Test @@ -383,7 +377,7 @@ public class Http2FrameCodecTest { @Test public void goAwayLastStreamIdOverflowed() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 5, request, 31, false); + frameInboundWriter.writeInboundHeaders(5, request, 31, false); Http2Stream stream = frameCodec.connection().stream(5); assertNotNull(stream); @@ -393,10 +387,10 @@ public class Http2FrameCodecTest { Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData.slice()); goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE); - inboundHandler.writeOutbound(goAwayFrame); + channel.writeOutbound(goAwayFrame); // When the last stream id computation overflows, the last stream id should just be set to 2^31 - 1. - verify(frameWriter).writeGoAway(eq(http2HandlerCtx), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), - eq(debugData), anyChannelPromise()); + verify(frameWriter).writeGoAway(eqFrameCodecCtx(), eq(Integer.MAX_VALUE), + eq(Http2Error.NO_ERROR.code()), eq(debugData), anyChannelPromise()); assertEquals(1, debugData.refCnt()); assertEquals(State.OPEN, stream.state()); assertTrue(channel.isActive()); @@ -404,13 +398,13 @@ public class Http2FrameCodecTest { @Test public void streamErrorShouldFireExceptionForInbound() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); + frameInboundWriter.writeInboundHeaders(3, request, 31, false); Http2Stream stream = frameCodec.connection().stream(3); assertNotNull(stream); StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo"); - frameCodec.onError(http2HandlerCtx, false, streamEx); + channel.pipeline().fireExceptionCaught(streamEx); Http2FrameStreamEvent event = inboundHandler.readInboundMessageOrUserEvent(); assertEquals(Http2FrameStreamEvent.Type.State, event.type()); @@ -430,13 +424,13 @@ public class Http2FrameCodecTest { @Test public void streamErrorShouldNotFireExceptionForOutbound() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); + frameInboundWriter.writeInboundHeaders(3, request, 31, false); Http2Stream stream = frameCodec.connection().stream(3); assertNotNull(stream); StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo"); - frameCodec.onError(http2HandlerCtx, true, streamEx); + frameCodec.onError(frameCodec.ctx, true, streamEx); Http2FrameStreamEvent event = inboundHandler.readInboundMessageOrUserEvent(); assertEquals(Http2FrameStreamEvent.Type.State, event.type()); @@ -452,14 +446,14 @@ public class Http2FrameCodecTest { @Test public void windowUpdateFrameDecrementsConsumedBytes() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); + frameInboundWriter.writeInboundHeaders(3, request, 31, false); Http2Connection connection = frameCodec.connection(); Http2Stream stream = connection.stream(3); assertNotNull(stream); ByteBuf data = Unpooled.buffer(100).writeZero(100); - frameListener.onDataRead(http2HandlerCtx, 3, data, 0, true); + frameInboundWriter.writeInboundData(3, data, 0, false); Http2HeadersFrame inboundHeaders = inboundHandler.readInbound(); assertNotNull(inboundHeaders); @@ -472,12 +466,11 @@ public class Http2FrameCodecTest { int after = connection.local().flowController().unconsumedBytes(stream); assertEquals(100, before - after); assertTrue(f.isSuccess()); - data.release(); } @Test public void windowUpdateMayFail() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); + frameInboundWriter.writeInboundHeaders(3, request, 31, false); Http2Connection connection = frameCodec.connection(); Http2Stream stream = connection.stream(3); assertNotNull(stream); @@ -496,10 +489,10 @@ public class Http2FrameCodecTest { @Test public void inboundWindowUpdateShouldBeForwarded() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); - frameListener.onWindowUpdateRead(http2HandlerCtx, 3, 100); + frameInboundWriter.writeInboundHeaders(3, request, 31, false); + frameInboundWriter.writeInboundWindowUpdate(3, 100); // Connection-level window update - frameListener.onWindowUpdateRead(http2HandlerCtx, 0, 100); + frameInboundWriter.writeInboundWindowUpdate(0, 100); Http2HeadersFrame headersFrame = inboundHandler.readInbound(); assertNotNull(headersFrame); @@ -558,7 +551,7 @@ public class Http2FrameCodecTest { unknownFrame.stream(stream); channel.write(unknownFrame); - verify(frameWriter).writeFrame(eq(http2HandlerCtx), eq(unknownFrame.frameType()), + verify(frameWriter).writeFrame(eqFrameCodecCtx(), eq(unknownFrame.frameType()), eq(unknownFrame.stream().id()), eq(unknownFrame.flags()), eq(buffer), any(ChannelPromise.class)); } @@ -567,7 +560,7 @@ public class Http2FrameCodecTest { Http2Settings settings = new Http2Settings(); channel.write(new DefaultHttp2SettingsFrame(settings)); - verify(frameWriter).writeSettings(eq(http2HandlerCtx), same(settings), any(ChannelPromise.class)); + verify(frameWriter).writeSettings(eqFrameCodecCtx(), same(settings), any(ChannelPromise.class)); } @Test(timeout = 5000) @@ -619,7 +612,7 @@ public class Http2FrameCodecTest { assertFalse(promise2.isDone()); // Increase concurrent streams limit to 2 - frameListener.onSettingsRead(http2HandlerCtx, new Http2Settings().maxConcurrentStreams(2)); + frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(2)); channel.flush(); @@ -643,7 +636,7 @@ public class Http2FrameCodecTest { @Test public void receivePing() throws Http2Exception { - frameListener.onPingRead(http2HandlerCtx, 12345L); + frameInboundWriter.writeInboundPing(false, 12345L); Http2PingFrame pingFrame = inboundHandler.readInbound(); assertNotNull(pingFrame); @@ -656,13 +649,13 @@ public class Http2FrameCodecTest { public void sendPing() { channel.writeAndFlush(new DefaultHttp2PingFrame(12345)); - verify(frameWriter).writePing(eq(http2HandlerCtx), eq(false), eq(12345L), anyChannelPromise()); + verify(frameWriter).writePing(eqFrameCodecCtx(), eq(false), eq(12345L), anyChannelPromise()); } @Test public void receiveSettings() throws Http2Exception { Http2Settings settings = new Http2Settings().maxConcurrentStreams(1); - frameListener.onSettingsRead(http2HandlerCtx, settings); + frameInboundWriter.writeInboundSettings(settings); Http2SettingsFrame settingsFrame = inboundHandler.readInbound(); assertNotNull(settingsFrame); @@ -674,7 +667,7 @@ public class Http2FrameCodecTest { Http2Settings settings = new Http2Settings().maxConcurrentStreams(1); channel.writeAndFlush(new DefaultHttp2SettingsFrame(settings)); - verify(frameWriter).writeSettings(eq(http2HandlerCtx), eq(settings), anyChannelPromise()); + verify(frameWriter).writeSettings(eqFrameCodecCtx(), eq(settings), anyChannelPromise()); } @Test @@ -682,7 +675,7 @@ public class Http2FrameCodecTest { setUp(Http2FrameCodecBuilder.forServer().encoderEnforceMaxConcurrentStreams(true), new Http2Settings().maxConcurrentStreams(1)); - frameListener.onHeadersRead(http2HandlerCtx, 3, request, 0, false); + frameInboundWriter.writeInboundHeaders(3, request, 0, false); Http2HeadersFrame headersFrame = inboundHandler.readInbound(); assertNotNull(headersFrame); @@ -736,8 +729,7 @@ public class Http2FrameCodecTest { @Test public void upgradeEventNoRefCntError() throws Exception { - frameListener.onHeadersRead(http2HandlerCtx, Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false); - + frameInboundWriter.writeInboundHeaders(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false); // Using reflect as the constructor is package-private and the class is final. Constructor constructor = UpgradeEvent.class.getDeclaredConstructor(CharSequence.class, FullHttpRequest.class); @@ -753,7 +745,7 @@ public class Http2FrameCodecTest { @Test public void upgradeWithoutFlowControlling() throws Exception { - channel.pipeline().addAfter(http2HandlerCtx.name(), null, new ChannelInboundHandlerAdapter() { + channel.pipeline().addAfter(frameCodec.ctx.name(), null, new ChannelInboundHandlerAdapter() { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Http2DataFrame) { @@ -774,7 +766,7 @@ public class Http2FrameCodecTest { } }); - frameListener.onHeadersRead(http2HandlerCtx, Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false); + frameInboundWriter.writeInboundHeaders(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, request, 31, false); // Using reflect as the constructor is package-private and the class is final. Constructor constructor = @@ -792,24 +784,7 @@ public class Http2FrameCodecTest { channel.pipeline().fireUserEventTriggered(upgradeEvent); } - private static ChannelPromise anyChannelPromise() { - return any(ChannelPromise.class); - } - - private static Http2Settings anyHttp2Settings() { - return any(Http2Settings.class); - } - - private static ByteBuf bb(String s) { - return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s); - } - - private static class VerifiableHttp2FrameWriter extends DefaultHttp2FrameWriter { - @Override - public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, - int padding, boolean endStream, ChannelPromise promise) { - // duplicate 'data' to prevent readerIndex from being changed, to ease verification - return super.writeData(ctx, streamId, data.duplicate(), padding, endStream, promise); - } + private ChannelHandlerContext eqFrameCodecCtx() { + return eq(frameCodec.ctx); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java new file mode 100644 index 0000000000..ace50aadbe --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java @@ -0,0 +1,340 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutor; + +import java.net.SocketAddress; + +/** + * Utility class which allows easy writing of HTTP2 frames via {@link EmbeddedChannel#writeInbound(Object...)}. + */ +final class Http2FrameInboundWriter { + + private final ChannelHandlerContext ctx; + private final Http2FrameWriter writer; + + Http2FrameInboundWriter(EmbeddedChannel channel) { + this(channel, new DefaultHttp2FrameWriter()); + } + + Http2FrameInboundWriter(EmbeddedChannel channel, Http2FrameWriter writer) { + this.ctx = new WriteInboundChannelHandlerContext(channel); + this.writer = writer; + } + + void writeInboundData(int streamId, ByteBuf data, int padding, boolean endStream) { + writer.writeData(ctx, streamId, data, padding, endStream, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundHeaders(int streamId, Http2Headers headers, + int padding, boolean endStream) { + writer.writeHeaders(ctx, streamId, headers, padding, endStream, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundHeaders(int streamId, Http2Headers headers, + int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) { + writer.writeHeaders(ctx, streamId, headers, streamDependency, + weight, exclusive, padding, endStream, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundPriority(int streamId, int streamDependency, + short weight, boolean exclusive) { + writer.writePriority(ctx, streamId, streamDependency, weight, + exclusive, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundRstStream(int streamId, long errorCode) { + writer.writeRstStream(ctx, streamId, errorCode, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundSettings(Http2Settings settings) { + writer.writeSettings(ctx, settings, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundSettingsAck() { + writer.writeSettingsAck(ctx, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundPing(boolean ack, long data) { + writer.writePing(ctx, ack, data, ctx.newPromise()).syncUninterruptibly(); + } + + void writePushPromise(int streamId, int promisedStreamId, + Http2Headers headers, int padding) { + writer.writePushPromise(ctx, streamId, promisedStreamId, + headers, padding, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundGoAway(int lastStreamId, long errorCode, ByteBuf debugData) { + writer.writeGoAway(ctx, lastStreamId, errorCode, debugData, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundWindowUpdate(int streamId, int windowSizeIncrement) { + writer.writeWindowUpdate(ctx, streamId, windowSizeIncrement, ctx.newPromise()).syncUninterruptibly(); + } + + void writeInboundFrame(byte frameType, int streamId, + Http2Flags flags, ByteBuf payload) { + writer.writeFrame(ctx, frameType, streamId, flags, payload, ctx.newPromise()).syncUninterruptibly(); + } + + private static final class WriteInboundChannelHandlerContext extends ChannelOutboundHandlerAdapter + implements ChannelHandlerContext { + private final EmbeddedChannel channel; + + WriteInboundChannelHandlerContext(EmbeddedChannel channel) { + this.channel = channel; + } + + @Override + public Channel channel() { + return channel; + } + + @Override + public EventExecutor executor() { + return channel.eventLoop(); + } + + @Override + public String name() { + return "WriteInbound"; + } + + @Override + public ChannelHandler handler() { + return this; + } + + @Override + public boolean isRemoved() { + return false; + } + + @Override + public ChannelHandlerContext fireChannelRegistered() { + channel.pipeline().fireChannelRegistered(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelUnregistered() { + channel.pipeline().fireChannelUnregistered(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelActive() { + channel.pipeline().fireChannelActive(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelInactive() { + channel.pipeline().fireChannelInactive(); + return this; + } + + @Override + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + channel.pipeline().fireExceptionCaught(cause); + return this; + } + + @Override + public ChannelHandlerContext fireUserEventTriggered(Object evt) { + channel.pipeline().fireUserEventTriggered(evt); + return this; + } + + @Override + public ChannelHandlerContext fireChannelRead(Object msg) { + channel.pipeline().fireChannelRead(msg); + return this; + } + + @Override + public ChannelHandlerContext fireChannelReadComplete() { + channel.pipeline().fireChannelReadComplete(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelWritabilityChanged() { + channel.pipeline().fireChannelWritabilityChanged(); + return this; + } + + @Override + public ChannelHandlerContext read() { + channel.read(); + return this; + } + + @Override + public ChannelHandlerContext flush() { + channel.pipeline().fireChannelReadComplete(); + return this; + } + + @Override + public ChannelPipeline pipeline() { + return channel.pipeline(); + } + + @Override + public ByteBufAllocator alloc() { + return channel.alloc(); + } + + @Override + public Attribute attr(AttributeKey key) { + return channel.attr(key); + } + + @Override + public boolean hasAttr(AttributeKey key) { + return channel.hasAttr(key); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return channel.bind(localAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return channel.connect(remoteAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return channel.connect(remoteAddress, localAddress); + } + + @Override + public ChannelFuture disconnect() { + return channel.disconnect(); + } + + @Override + public ChannelFuture close() { + return channel.close(); + } + + @Override + public ChannelFuture deregister() { + return channel.deregister(); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return channel.bind(localAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return channel.connect(remoteAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + return channel.connect(remoteAddress, localAddress, promise); + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + return channel.disconnect(promise); + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + return channel.close(promise); + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return channel.deregister(promise); + } + + @Override + public ChannelFuture write(Object msg) { + return write(msg, newPromise()); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + return writeAndFlush(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + try { + channel.writeInbound(msg); + channel.runPendingTasks(); + promise.setSuccess(); + } catch (Throwable cause) { + promise.setFailure(cause); + } + return promise; + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return writeAndFlush(msg, newPromise()); + } + + @Override + public ChannelPromise newPromise() { + return channel.newPromise(); + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return channel.newProgressivePromise(); + } + + @Override + public ChannelFuture newSucceededFuture() { + return channel.newSucceededFuture(); + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return channel.newFailedFuture(cause); + } + + @Override + public ChannelPromise voidPromise() { + return channel.voidPromise(); + } + } +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java index d857cf8d1e..7788e6dd64 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java @@ -15,8 +15,7 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -31,11 +30,13 @@ import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.LastInboundHandler.Consumer; import io.netty.util.AsciiString; import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCountUtil; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; @@ -43,34 +44,43 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static io.netty.util.ReferenceCountUtil.release; -import static org.hamcrest.Matchers.instanceOf; +import static io.netty.handler.codec.http2.Http2TestUtil.anyChannelPromise; +import static io.netty.handler.codec.http2.Http2TestUtil.anyHttp2Settings; +import static io.netty.handler.codec.http2.Http2TestUtil.assertEqualsAndRelease; +import static io.netty.handler.codec.http2.Http2TestUtil.bb; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Unit tests for {@link Http2MultiplexCodec}. */ public class Http2MultiplexCodecTest { - - private EmbeddedChannel parentChannel; - private Writer writer; - - private TestChannelInitializer childChannelInitializer; - - private static final Http2Headers request = new DefaultHttp2Headers() + private final Http2Headers request = new DefaultHttp2Headers() .method(HttpMethod.GET.asciiName()).scheme(HttpScheme.HTTPS.name()) .authority(new AsciiString("example.org")).path(new AsciiString("/foo")); - private TestableHttp2MultiplexCodec codec; - private TestableHttp2MultiplexCodec.Stream inboundStream; - private TestableHttp2MultiplexCodec.Stream outboundStream; + private EmbeddedChannel parentChannel; + private Http2FrameWriter frameWriter; + private Http2FrameInboundWriter frameInboundWriter; + private TestChannelInitializer childChannelInitializer; + private Http2MultiplexCodec codec; private static final int initialRemoteStreamWindow = 1024; @@ -78,25 +88,38 @@ public class Http2MultiplexCodecTest { public void setUp() { childChannelInitializer = new TestChannelInitializer(); parentChannel = new EmbeddedChannel(); - writer = new Writer(); - + frameInboundWriter = new Http2FrameInboundWriter(parentChannel); parentChannel.connect(new InetSocketAddress(0)); - codec = new TestableHttp2MultiplexCodecBuilder(true, childChannelInitializer).build(); + frameWriter = Http2TestUtil.mockedFrameWriter(); + codec = new Http2MultiplexCodecBuilder(true, childChannelInitializer).frameWriter(frameWriter).build(); parentChannel.pipeline().addLast(codec); parentChannel.runPendingTasks(); + parentChannel.pipeline().fireChannelActive(); + + parentChannel.writeInbound(Http2CodecUtil.connectionPrefaceBuf()); Http2Settings settings = new Http2Settings().initialWindowSize(initialRemoteStreamWindow); - codec.onHttp2Frame(new DefaultHttp2SettingsFrame(settings)); + frameInboundWriter.writeInboundSettings(settings); - inboundStream = codec.newStream(); - inboundStream.id = 3; - outboundStream = codec.newStream(); - outboundStream.id = 2; + verify(frameWriter).writeSettingsAck(eqMultiplexCodecCtx(), anyChannelPromise()); + + frameInboundWriter.writeInboundSettingsAck(); + + Http2SettingsFrame settingsFrame = parentChannel.readInbound(); + assertNotNull(settingsFrame); + + // Handshake + verify(frameWriter).writeSettings(eqMultiplexCodecCtx(), + anyHttp2Settings(), anyChannelPromise()); + } + + private ChannelHandlerContext eqMultiplexCodecCtx() { + return eq(codec.ctx); } @After public void tearDown() throws Exception { - if (childChannelInitializer.handler != null) { + if (childChannelInitializer.handler instanceof LastInboundHandler) { ((LastInboundHandler) childChannelInitializer.handler).finishAndReleaseAll(); } parentChannel.finishAndReleaseAll(); @@ -110,154 +133,165 @@ public class Http2MultiplexCodecTest { @Test public void writeUnknownFrame() { - childChannelInitializer.handler = new ChannelInboundHandlerAdapter() { + Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); ctx.writeAndFlush(new DefaultHttp2UnknownFrame((byte) 99, new Http2Flags())); ctx.fireChannelActive(); } - }; - - Channel childChannel = newOutboundStream(); + }); assertTrue(childChannel.isActive()); - Http2FrameStream stream = readOutboundHeadersAndAssignId(); parentChannel.runPendingTasks(); - Http2UnknownFrame frame = parentChannel.readOutbound(); - assertEquals(stream, frame.stream()); - assertEquals(99, frame.frameType()); - assertEquals(new Http2Flags(), frame.flags()); - frame.release(); + verify(frameWriter).writeFrame(eq(codec.ctx), eq((byte) 99), eqStreamId(childChannel), any(Http2Flags.class), + any(ByteBuf.class), any(ChannelPromise.class)); + } + + private Http2StreamChannel newInboundStream(int streamId, boolean endStream, final ChannelHandler childHandler) { + return newInboundStream(streamId, endStream, null, childHandler); + } + + private Http2StreamChannel newInboundStream(int streamId, boolean endStream, + AtomicInteger maxReads, final ChannelHandler childHandler) { + final AtomicReference streamChannelRef = new AtomicReference(); + childChannelInitializer.maxReads = maxReads; + childChannelInitializer.handler = new ChannelInboundHandlerAdapter() { + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + assertNull(streamChannelRef.get()); + streamChannelRef.set((Http2StreamChannel) ctx.channel()); + ctx.pipeline().addLast(childHandler); + ctx.fireChannelRegistered(); + } + }; + + frameInboundWriter.writeInboundHeaders(streamId, request, 0, endStream); + parentChannel.runPendingTasks(); + Http2StreamChannel channel = streamChannelRef.get(); + assertEquals(streamId, channel.stream().id()); + return channel; } @Test public void readUnkownFrame() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - codec.onHttp2Frame(new DefaultHttp2UnknownFrame((byte) 99, new Http2Flags()).stream(inboundStream)); - codec.onChannelReadComplete(); + LastInboundHandler handler = new LastInboundHandler(); - // headers and unknown frame - verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2); + Http2StreamChannel channel = newInboundStream(3, true, handler); + frameInboundWriter.writeInboundFrame((byte) 99, channel.stream().id(), new Http2Flags(), Unpooled.EMPTY_BUFFER); - Channel childChannel = newOutboundStream(); + // header frame and unknown frame + verifyFramesMultiplexedToCorrectChannel(channel, handler, 2); + + Channel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter()); assertTrue(childChannel.isActive()); } @Test public void headerAndDataFramesShouldBeDelivered() { LastInboundHandler inboundHandler = new LastInboundHandler(); - childChannelInitializer.handler = inboundHandler; - Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).stream(inboundStream); - Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("hello")).stream(inboundStream); - Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("world")).stream(inboundStream); + Http2StreamChannel channel = newInboundStream(3, false, inboundHandler); + Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).stream(channel.stream()); + Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("hello")).stream(channel.stream()); + Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("world")).stream(channel.stream()); - assertFalse(inboundHandler.isChannelActive()); - inboundStream.state = Http2Stream.State.OPEN; - codec.onHttp2StreamStateChanged(inboundStream); - codec.onHttp2Frame(headersFrame); assertTrue(inboundHandler.isChannelActive()); - codec.onHttp2Frame(dataFrame1); - codec.onHttp2Frame(dataFrame2); + frameInboundWriter.writeInboundData(channel.stream().id(), bb("hello"), 0, false); + frameInboundWriter.writeInboundData(channel.stream().id(), bb("world"), 0, false); assertEquals(headersFrame, inboundHandler.readInbound()); - assertEquals(dataFrame1, inboundHandler.readInbound()); - assertEquals(dataFrame2, inboundHandler.readInbound()); - assertNull(inboundHandler.readInbound()); - dataFrame1.release(); - dataFrame2.release(); + assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound()); + assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound()); + + assertNull(inboundHandler.readInbound()); } @Test public void framesShouldBeMultiplexed() { + LastInboundHandler handler1 = new LastInboundHandler(); + Http2StreamChannel channel1 = newInboundStream(3, false, handler1); + LastInboundHandler handler2 = new LastInboundHandler(); + Http2StreamChannel channel2 = newInboundStream(5, false, handler2); + LastInboundHandler handler3 = new LastInboundHandler(); + Http2StreamChannel channel3 = newInboundStream(11, false, handler3); - TestableHttp2MultiplexCodec.Stream stream3 = codec.newStream(); - stream3.id = 3; - TestableHttp2MultiplexCodec.Stream stream5 = codec.newStream(); - stream5.id = 5; + verifyFramesMultiplexedToCorrectChannel(channel1, handler1, 1); + verifyFramesMultiplexedToCorrectChannel(channel2, handler2, 1); + verifyFramesMultiplexedToCorrectChannel(channel3, handler3, 1); - TestableHttp2MultiplexCodec.Stream stream11 = codec.newStream(); - stream11.id = 11; + frameInboundWriter.writeInboundData(channel2.stream().id(), bb("hello"), 0, false); + frameInboundWriter.writeInboundData(channel1.stream().id(), bb("foo"), 0, true); + frameInboundWriter.writeInboundData(channel2.stream().id(), bb("world"), 0, true); + frameInboundWriter.writeInboundData(channel3.stream().id(), bb("bar"), 0, true); - LastInboundHandler inboundHandler3 = streamActiveAndWriteHeaders(stream3); - LastInboundHandler inboundHandler5 = streamActiveAndWriteHeaders(stream5); - LastInboundHandler inboundHandler11 = streamActiveAndWriteHeaders(stream11); - - verifyFramesMultiplexedToCorrectChannel(stream3, inboundHandler3, 1); - verifyFramesMultiplexedToCorrectChannel(stream5, inboundHandler5, 1); - verifyFramesMultiplexedToCorrectChannel(stream11, inboundHandler11, 1); - - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("hello"), false).stream(stream5)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), true).stream(stream3)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("world"), true).stream(stream5)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(stream11)); - verifyFramesMultiplexedToCorrectChannel(stream5, inboundHandler5, 2); - verifyFramesMultiplexedToCorrectChannel(stream3, inboundHandler3, 1); - verifyFramesMultiplexedToCorrectChannel(stream11, inboundHandler11, 1); + verifyFramesMultiplexedToCorrectChannel(channel1, handler1, 1); + verifyFramesMultiplexedToCorrectChannel(channel2, handler2, 2); + verifyFramesMultiplexedToCorrectChannel(channel3, handler3, 1); } @Test - public void inboundDataFrameShouldEmitWindowUpdateFrame() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); + public void inboundDataFrameShouldUpdateLocalFlowController() throws Http2Exception { + Http2LocalFlowController flowController = Mockito.mock(Http2LocalFlowController.class); + codec.connection().local().flowController(flowController); + + LastInboundHandler handler = new LastInboundHandler(); + final Http2StreamChannel channel = newInboundStream(3, false, handler); + ByteBuf tenBytes = bb("0123456789"); - codec.onHttp2Frame(new DefaultHttp2DataFrame(tenBytes, true).stream(inboundStream)); - codec.onChannelReadComplete(); - Http2WindowUpdateFrame windowUpdate = parentChannel.readOutbound(); - assertNotNull(windowUpdate); + frameInboundWriter.writeInboundData(channel.stream().id(), tenBytes, 0, true); - assertEquals(inboundStream, windowUpdate.stream()); - assertEquals(10, windowUpdate.windowSizeIncrement()); + // Verify we marked the bytes as consumed + verify(flowController).consumeBytes(argThat(new ArgumentMatcher() { + @Override + public boolean matches(Http2Stream http2Stream) { + return http2Stream.id() == channel.stream().id(); + } + }), eq(10)); // headers and data frame - verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2); + verifyFramesMultiplexedToCorrectChannel(channel, handler, 2); } @Test public void unhandledHttp2FramesShouldBePropagated() { - assertThat(parentChannel.readInbound(), instanceOf(Http2SettingsFrame.class)); - Http2PingFrame pingFrame = new DefaultHttp2PingFrame(0); - codec.onHttp2Frame(pingFrame); - assertSame(parentChannel.readInbound(), pingFrame); + frameInboundWriter.writeInboundPing(false, 0); + assertEquals(parentChannel.readInbound(), pingFrame); - DefaultHttp2GoAwayFrame goAwayFrame = - new DefaultHttp2GoAwayFrame(1, parentChannel.alloc().buffer().writeLong(8)); - codec.onHttp2Frame(goAwayFrame); + DefaultHttp2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(1, + parentChannel.alloc().buffer().writeLong(8)); + frameInboundWriter.writeInboundGoAway(0, goAwayFrame.errorCode(), goAwayFrame.content().retainedDuplicate()); Http2GoAwayFrame frame = parentChannel.readInbound(); - assertSame(frame, goAwayFrame); - assertTrue(frame.release()); + assertEqualsAndRelease(frame, goAwayFrame); } @Test public void channelReadShouldRespectAutoRead() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Channel childChannel = inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); assertTrue(childChannel.config().isAutoRead()); Http2HeadersFrame headersFrame = inboundHandler.readInbound(); assertNotNull(headersFrame); childChannel.config().setAutoRead(false); - codec.onHttp2Frame( - new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream)); - codec.onChannelReadComplete(); + + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world"), 0, false); Http2DataFrame dataFrame0 = inboundHandler.readInbound(); assertNotNull(dataFrame0); release(dataFrame0); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream)); - codec.onChannelReadComplete(); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar"), 0, false); - dataFrame0 = inboundHandler.readInbound(); - assertNull(dataFrame0); + assertNull(inboundHandler.readInbound()); childChannel.config().setAutoRead(true); - verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2); + verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 2); } @Test @@ -271,8 +305,8 @@ public class Http2MultiplexCodecTest { } private void useReadWithoutAutoRead(final boolean readComplete) { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Channel childChannel = inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); assertTrue(childChannel.config().isAutoRead()); childChannel.config().setAutoRead(false); assertFalse(childChannel.config().isAutoRead()); @@ -299,23 +333,18 @@ public class Http2MultiplexCodecTest { } }); - codec.onHttp2Frame( - new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream)); - codec.onChannelReadComplete(); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar"), 0, true); - codec.onHttp2Frame( - new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream)); - codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream)); - codec.onChannelReadComplete(); - - verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 6); + verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 6); } - private Http2StreamChannel newOutboundStream() { - return new Http2StreamChannelBootstrap(parentChannel).handler(childChannelInitializer) + private Http2StreamChannel newOutboundStream(ChannelHandler handler) { + return new Http2StreamChannelBootstrap(parentChannel).handler(handler) .open().syncUninterruptibly().getNow(); } @@ -323,12 +352,11 @@ public class Http2MultiplexCodecTest { * A child channel for a HTTP/2 stream in IDLE state (that is no headers sent or received), * should not emit a RST_STREAM frame on close, as this is a connection error of type protocol error. */ - @Test public void idleOutboundStreamShouldNotWriteResetFrameOnClose() { - childChannelInitializer.handler = new LastInboundHandler(); + LastInboundHandler handler = new LastInboundHandler(); - Channel childChannel = newOutboundStream(); + Channel childChannel = newOutboundStream(handler); assertTrue(childChannel.isActive()); childChannel.close(); @@ -341,124 +369,105 @@ public class Http2MultiplexCodecTest { @Test public void outboundStreamShouldWriteResetFrameOnClose_headersSent() { - childChannelInitializer.handler = new ChannelInboundHandlerAdapter() { + ChannelHandler handler = new ChannelInboundHandlerAdapter() { @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); ctx.fireChannelActive(); } }; - Channel childChannel = newOutboundStream(); + Http2StreamChannel childChannel = newOutboundStream(handler); assertTrue(childChannel.isActive()); - Http2FrameStream stream2 = readOutboundHeadersAndAssignId(); - childChannel.close(); - parentChannel.runPendingTasks(); - - Http2ResetFrame reset = parentChannel.readOutbound(); - assertEquals(stream2, reset.stream()); - assertEquals(Http2Error.CANCEL.code(), reset.errorCode()); + verify(frameWriter).writeRstStream(eqMultiplexCodecCtx(), + eqStreamId(childChannel), eq(Http2Error.CANCEL.code()), anyChannelPromise()); } @Test public void outboundStreamShouldNotWriteResetFrameOnClose_IfStreamDidntExist() { - writer = new Writer() { + when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(), + any(Http2Headers.class), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(), + any(ChannelPromise.class))).thenAnswer(new Answer() { + private boolean headersWritten; @Override - void write(Object msg, ChannelPromise promise) { + public ChannelFuture answer(InvocationOnMock invocationOnMock) { // We want to fail to write the first headers frame. This is what happens if the connection // refuses to allocate a new stream due to having received a GOAWAY. - if (!headersWritten && msg instanceof Http2HeadersFrame) { + if (!headersWritten) { headersWritten = true; - Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg; - final TestableHttp2MultiplexCodec.Stream stream = - (TestableHttp2MultiplexCodec.Stream) headersFrame.stream(); - stream.id = 1; - promise.setFailure(new Exception("boom")); - } else { - super.write(msg, promise); + return ((ChannelPromise) invocationOnMock.getArgument(8)).setFailure(new Exception("boom")); } + return ((ChannelPromise) invocationOnMock.getArgument(8)).setSuccess(); } - }; + }); - childChannelInitializer.handler = new ChannelInboundHandlerAdapter() { + Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter() { @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); ctx.fireChannelActive(); } - }; + }); - Channel childChannel = newOutboundStream(); assertFalse(childChannel.isActive()); childChannel.close(); parentChannel.runPendingTasks(); + // The channel was never active so we should not generate a RST frame. + verify(frameWriter, never()).writeRstStream(eqMultiplexCodecCtx(), eqStreamId(childChannel), anyLong(), + anyChannelPromise()); + assertTrue(parentChannel.outboundMessages().isEmpty()); } @Test public void inboundRstStreamFireChannelInactive() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel channel = newInboundStream(3, false, inboundHandler); assertTrue(inboundHandler.isChannelActive()); - codec.onHttp2Frame(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR) - .stream(inboundStream)); - codec.onChannelReadComplete(); - - // This will be called by the frame codec. - inboundStream.state = Http2Stream.State.CLOSED; - codec.onHttp2StreamStateChanged(inboundStream); - parentChannel.runPendingTasks(); + frameInboundWriter.writeInboundRstStream(channel.stream().id(), Http2Error.INTERNAL_ERROR.code()); assertFalse(inboundHandler.isChannelActive()); + // A RST_STREAM frame should NOT be emitted, as we received a RST_STREAM. - assertNull(parentChannel.readOutbound()); + verify(frameWriter, Mockito.never()).writeRstStream(eqMultiplexCodecCtx(), eqStreamId(channel), + anyLong(), anyChannelPromise()); } @Test(expected = StreamException.class) public void streamExceptionTriggersChildChannelExceptionAndClose() throws Exception { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel channel = newInboundStream(3, false, inboundHandler); + assertTrue(channel.isActive()); + StreamException cause = new StreamException(channel.stream().id(), Http2Error.PROTOCOL_ERROR, "baaam!"); + parentChannel.pipeline().fireExceptionCaught(cause); - StreamException cause = new StreamException(inboundStream.id(), Http2Error.PROTOCOL_ERROR, "baaam!"); - Http2FrameStreamException http2Ex = new Http2FrameStreamException( - inboundStream, Http2Error.PROTOCOL_ERROR, cause); - codec.onHttp2FrameStreamException(http2Ex); - - inboundHandler.checkException(); - } - - @Test(expected = StreamException.class) - public void streamExceptionClosesChildChannel() throws Exception { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - - assertTrue(inboundHandler.isChannelActive()); - StreamException cause = new StreamException(inboundStream.id(), Http2Error.PROTOCOL_ERROR, "baaam!"); - Http2FrameStreamException http2Ex = new Http2FrameStreamException( - inboundStream, Http2Error.PROTOCOL_ERROR, cause); - codec.onHttp2FrameStreamException(http2Ex); - parentChannel.runPendingTasks(); - - assertFalse(inboundHandler.isChannelActive()); + assertFalse(channel.isActive()); inboundHandler.checkException(); } @Test(expected = ClosedChannelException.class) public void streamClosedErrorTranslatedToClosedChannelExceptionOnWrites() throws Exception { - writer = new Writer() { - @Override - void write(Object msg, ChannelPromise promise) { - promise.tryFailure(new StreamException(inboundStream.id(), Http2Error.STREAM_CLOSED, "Stream Closed")); - } - }; LastInboundHandler inboundHandler = new LastInboundHandler(); - childChannelInitializer.handler = inboundHandler; - Channel childChannel = newOutboundStream(); + final Http2StreamChannel childChannel = newOutboundStream(inboundHandler); assertTrue(childChannel.isActive()); + Http2Headers headers = new DefaultHttp2Headers(); + when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(), + eq(headers), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(), + any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(8)).setFailure( + new StreamException(childChannel.stream().id(), Http2Error.STREAM_CLOSED, "Stream Closed")); + } + }); ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); + parentChannel.flush(); assertFalse(childChannel.isActive()); @@ -472,9 +481,7 @@ public class Http2MultiplexCodecTest { @Test public void creatingWritingReadingAndClosingOutboundStreamShouldWork() { LastInboundHandler inboundHandler = new LastInboundHandler(); - childChannelInitializer.handler = inboundHandler; - - Http2StreamChannel childChannel = newOutboundStream(); + Http2StreamChannel childChannel = newOutboundStream(inboundHandler); assertTrue(childChannel.isActive()); assertTrue(inboundHandler.isChannelActive()); @@ -482,25 +489,21 @@ public class Http2MultiplexCodecTest { Http2Headers headers = new DefaultHttp2Headers().scheme("https").method("GET").path("/foo.txt"); childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); - readOutboundHeadersAndAssignId(); - // Read from the child channel - headers = new DefaultHttp2Headers().scheme("https").status("200"); - codec.onHttp2Frame(new DefaultHttp2HeadersFrame(headers).stream(childChannel.stream())); - codec.onChannelReadComplete(); + frameInboundWriter.writeInboundHeaders(childChannel.stream().id(), headers, 0, false); Http2HeadersFrame headersFrame = inboundHandler.readInbound(); assertNotNull(headersFrame); - assertSame(headers, headersFrame.headers()); + assertEquals(headers, headersFrame.headers()); // Close the child channel. childChannel.close(); parentChannel.runPendingTasks(); // An active outbound stream should emit a RST_STREAM frame. - Http2ResetFrame rstFrame = parentChannel.readOutbound(); - assertNotNull(rstFrame); - assertEquals(childChannel.stream(), rstFrame.stream()); + verify(frameWriter).writeRstStream(eqMultiplexCodecCtx(), eqStreamId(childChannel), + anyLong(), anyChannelPromise()); + assertFalse(childChannel.isOpen()); assertFalse(childChannel.isActive()); assertFalse(inboundHandler.isChannelActive()); @@ -511,33 +514,36 @@ public class Http2MultiplexCodecTest { // @Test(expected = Http2NoMoreStreamIdsException.class) public void failedOutboundStreamCreationThrowsAndClosesChannel() throws Exception { - writer = new Writer() { - @Override - void write(Object msg, ChannelPromise promise) { - promise.tryFailure(new Http2NoMoreStreamIdsException()); - } - }; - LastInboundHandler inboundHandler = new LastInboundHandler(); - childChannelInitializer.handler = inboundHandler; - - Channel childChannel = newOutboundStream(); + LastInboundHandler handler = new LastInboundHandler(); + Http2StreamChannel childChannel = newOutboundStream(handler); assertTrue(childChannel.isActive()); - ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); + Http2Headers headers = new DefaultHttp2Headers(); + when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(), + eq(headers), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(), + any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(8)).setFailure( + new Http2NoMoreStreamIdsException()); + } + }); + + ChannelFuture future = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); parentChannel.flush(); assertFalse(childChannel.isActive()); assertFalse(childChannel.isOpen()); - inboundHandler.checkException(); + handler.checkException(); future.syncUninterruptibly(); } @Test public void channelClosedWhenCloseListenerCompletes() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); assertTrue(childChannel.isOpen()); assertTrue(childChannel.isActive()); @@ -564,42 +570,35 @@ public class Http2MultiplexCodecTest { @Test public void channelClosedWhenChannelClosePromiseCompletes() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); - assertTrue(childChannel.isOpen()); - assertTrue(childChannel.isActive()); + assertTrue(childChannel.isOpen()); + assertTrue(childChannel.isActive()); - final AtomicBoolean channelOpen = new AtomicBoolean(true); - final AtomicBoolean channelActive = new AtomicBoolean(true); + final AtomicBoolean channelOpen = new AtomicBoolean(true); + final AtomicBoolean channelActive = new AtomicBoolean(true); - childChannel.closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - channelOpen.set(future.channel().isOpen()); - channelActive.set(future.channel().isActive()); - } - }); - childChannel.close().syncUninterruptibly(); + childChannel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + channelOpen.set(future.channel().isOpen()); + channelActive.set(future.channel().isActive()); + } + }); + childChannel.close().syncUninterruptibly(); - assertFalse(channelOpen.get()); - assertFalse(channelActive.get()); - assertFalse(childChannel.isActive()); + assertFalse(channelOpen.get()); + assertFalse(channelActive.get()); + assertFalse(childChannel.isActive()); } @Test public void channelClosedWhenWriteFutureFails() { final Queue writePromises = new ArrayDeque(); - writer = new Writer() { - @Override - void write(Object msg, ChannelPromise promise) { - ReferenceCountUtil.release(msg); - writePromises.offer(promise); - } - }; - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); assertTrue(childChannel.isOpen()); assertTrue(childChannel.isActive()); @@ -607,7 +606,19 @@ public class Http2MultiplexCodecTest { final AtomicBoolean channelOpen = new AtomicBoolean(true); final AtomicBoolean channelActive = new AtomicBoolean(true); - ChannelFuture f = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); + Http2Headers headers = new DefaultHttp2Headers(); + when(frameWriter.writeHeaders(eqMultiplexCodecCtx(), anyInt(), + eq(headers), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(), + any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + ChannelPromise promise = invocationOnMock.getArgument(8); + writePromises.offer(promise); + return promise; + } + }); + + ChannelFuture f = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); assertFalse(f.isDone()); f.addListener(new ChannelFutureListener() { @Override @@ -628,8 +639,8 @@ public class Http2MultiplexCodecTest { @Test public void channelClosedTwiceMarksPromiseAsSuccessful() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); assertTrue(childChannel.isOpen()); assertTrue(childChannel.isActive()); @@ -644,7 +655,7 @@ public class Http2MultiplexCodecTest { public void settingChannelOptsAndAttrs() { AttributeKey key = AttributeKey.newInstance("foo"); - Channel childChannel = newOutboundStream(); + Channel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter()); childChannel.config().setAutoRead(false).setWriteSpinCount(1000); childChannel.attr(key).set("bar"); assertFalse(childChannel.config().isAutoRead()); @@ -654,50 +665,52 @@ public class Http2MultiplexCodecTest { @Test public void outboundFlowControlWritability() { - Http2StreamChannel childChannel = newOutboundStream(); + Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter()); assertTrue(childChannel.isActive()); assertTrue(childChannel.isWritable()); childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); parentChannel.flush(); - Http2FrameStream stream = readOutboundHeadersAndAssignId(); - // Test for initial window size assertEquals(initialRemoteStreamWindow, childChannel.config().getWriteBufferHighWaterMark()); - codec.onHttp2StreamWritabilityChanged(stream, true); assertTrue(childChannel.isWritable()); - codec.onHttp2StreamWritabilityChanged(stream, false); + childChannel.write(new DefaultHttp2DataFrame(Unpooled.buffer().writeZero(16 * 1024 * 1024))); assertFalse(childChannel.isWritable()); } @Test public void writabilityAndFlowControl() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); assertEquals("", inboundHandler.writabilityStates()); + assertTrue(childChannel.isWritable()); // HEADERS frames are not flow controlled, so they should not affect the flow control window. childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers())); - codec.onHttp2StreamWritabilityChanged(childChannel.stream(), true); + codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true); - assertEquals("true", inboundHandler.writabilityStates()); + assertTrue(childChannel.isWritable()); + assertEquals("", inboundHandler.writabilityStates()); - codec.onHttp2StreamWritabilityChanged(childChannel.stream(), true); - assertEquals("true", inboundHandler.writabilityStates()); + codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), true); + assertTrue(childChannel.isWritable()); + assertEquals("", inboundHandler.writabilityStates()); - codec.onHttp2StreamWritabilityChanged(childChannel.stream(), false); - assertEquals("true,false", inboundHandler.writabilityStates()); + codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false); + assertFalse(childChannel.isWritable()); + assertEquals("false", inboundHandler.writabilityStates()); - codec.onHttp2StreamWritabilityChanged(childChannel.stream(), false); - assertEquals("true,false", inboundHandler.writabilityStates()); + codec.onHttp2StreamWritabilityChanged(codec.ctx, childChannel.stream(), false); + assertFalse(childChannel.isWritable()); + assertEquals("false", inboundHandler.writabilityStates()); } @Test public void channelClosedWhenInactiveFired() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); final AtomicBoolean channelOpen = new AtomicBoolean(false); final AtomicBoolean channelActive = new AtomicBoolean(false); @@ -725,9 +738,7 @@ public class Http2MultiplexCodecTest { final AtomicInteger exceptionCaught = new AtomicInteger(-1); final AtomicInteger channelInactive = new AtomicInteger(-1); final AtomicInteger channelUnregistered = new AtomicInteger(-1); - Http2StreamChannel childChannel = newOutboundStream(); - - childChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { + Http2StreamChannel childChannel = newOutboundStream(new ChannelInboundHandlerAdapter() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { @@ -767,28 +778,10 @@ public class Http2MultiplexCodecTest { assertEquals(2, channelUnregistered.get()); } - @Ignore("not supported anymore atm") - @Test - public void cancellingWritesBeforeFlush() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Channel childChannel = inboundHandler.channel(); - - Http2HeadersFrame headers1 = new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()); - Http2HeadersFrame headers2 = new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()); - ChannelPromise writePromise = childChannel.newPromise(); - childChannel.write(headers1, writePromise); - childChannel.write(headers2); - assertTrue(writePromise.cancel(false)); - childChannel.flush(); - - Http2HeadersFrame headers = parentChannel.readOutbound(); - assertSame(headers, headers2); - } - @Test public void callUnsafeCloseMultipleTimes() { - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(); + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); childChannel.unsafe().close(childChannel.voidPromise()); ChannelPromise promise = childChannel.newPromise(); @@ -809,49 +802,56 @@ public class Http2MultiplexCodecTest { } } }; - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(ctxConsumer); + Http2StreamChannel childChannel = newInboundStream(3, false, numReads, inboundHandler); childChannel.config().setAutoRead(false); - Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream); - Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream); - Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream); - Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream); + Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(childChannel.stream()); + Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(childChannel.stream()); + Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(childChannel.stream()); + Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(childChannel.stream()); - assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound()); + assertEquals(new DefaultHttp2HeadersFrame(request).stream(childChannel.stream()), inboundHandler.readInbound()); - // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. - parentChannel.writeOneInbound(new Object()); - codec.onHttp2Frame(dataFrame1); - assertEquals(dataFrame1, inboundHandler.readInbound()); + ChannelHandler readCompleteSupressHandler = new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. + } + }; + + parentChannel.pipeline().addFirst(readCompleteSupressHandler); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("1"), 0, false); + + assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound()); // Deliver frames, and then a stream closed while read is inactive. - codec.onHttp2Frame(dataFrame2); - codec.onHttp2Frame(dataFrame3); - codec.onHttp2Frame(dataFrame4); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("2"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("3"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("4"), 0, false); shouldDisableAutoRead.set(true); childChannel.config().setAutoRead(true); numReads.set(1); - inboundStream.state = Http2Stream.State.CLOSED; - codec.onHttp2StreamStateChanged(inboundStream); + frameInboundWriter.writeInboundRstStream(childChannel.stream().id(), Http2Error.NO_ERROR.code()); // Detecting EOS should flush all pending data regardless of read calls. - assertEquals(dataFrame2, inboundHandler.readInbound()); - assertEquals(dataFrame3, inboundHandler.readInbound()); - assertEquals(dataFrame4, inboundHandler.readInbound()); + assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound()); + assertEqualsAndRelease(dataFrame3, inboundHandler.readInbound()); + assertEqualsAndRelease(dataFrame4, inboundHandler.readInbound()); + + Http2ResetFrame resetFrame = inboundHandler.readInbound(); + assertEquals(childChannel.stream(), resetFrame.stream()); + assertEquals(Http2Error.NO_ERROR.code(), resetFrame.errorCode()); + assertNull(inboundHandler.readInbound()); // Now we want to call channelReadComplete and simulate the end of the read loop. + parentChannel.pipeline().remove(readCompleteSupressHandler); parentChannel.flushInbound(); childChannel.closeFuture().syncUninterruptibly(); - - dataFrame1.release(); - dataFrame2.release(); - dataFrame3.release(); - dataFrame4.release(); } @Test @@ -868,54 +868,58 @@ public class Http2MultiplexCodecTest { } } }; - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + LastInboundHandler inboundHandler = new LastInboundHandler(ctxConsumer); + Http2StreamChannel childChannel = newInboundStream(3, false, numReads, inboundHandler); childChannel.config().setAutoRead(false); - Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream); - Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream); - Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream); - Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream); + Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(childChannel.stream()); + Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(childChannel.stream()); + Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(childChannel.stream()); + Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(childChannel.stream()); - assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound()); + assertEquals(new DefaultHttp2HeadersFrame(request).stream(childChannel.stream()), inboundHandler.readInbound()); - // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. - parentChannel.writeOneInbound(new Object()); - codec.onHttp2Frame(dataFrame1); - assertEquals(dataFrame1, inboundHandler.readInbound()); + ChannelHandler readCompleteSupressHandler = new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. + } + }; + parentChannel.pipeline().addFirst(readCompleteSupressHandler); + + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("1"), 0, false); + + assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound()); // We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that // when beginRead() is called the child channel is added to the readPending queue of the parent channel. - codec.onHttp2Frame(dataFrame2); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("2"), 0, false); numReads.set(10); shouldDisableAutoRead.set(true); childChannel.config().setAutoRead(true); - codec.onHttp2Frame(dataFrame3); - codec.onHttp2Frame(dataFrame4); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("3"), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("4"), 0, false); // Detecting EOS should flush all pending data regardless of read calls. - assertEquals(dataFrame2, inboundHandler.readInbound()); - assertEquals(dataFrame3, inboundHandler.readInbound()); - assertEquals(dataFrame4, inboundHandler.readInbound()); + assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound()); + assertEqualsAndRelease(dataFrame3, inboundHandler.readInbound()); + assertEqualsAndRelease(dataFrame4, inboundHandler.readInbound()); + assertNull(inboundHandler.readInbound()); // Now we want to call channelReadComplete and simulate the end of the read loop. + parentChannel.pipeline().remove(readCompleteSupressHandler); parentChannel.flushInbound(); // 3 = 1 for initialization + 1 for read when auto read was off + 1 for when auto read was back on assertEquals(3, channelReadCompleteCount.get()); - - dataFrame1.release(); - dataFrame2.release(); - dataFrame3.release(); - dataFrame4.release(); } @Test public void childQueueIsDrainedAndNewDataIsDispatchedInParentReadLoopNoAutoRead() { - AtomicInteger numReads = new AtomicInteger(1); + final AtomicInteger numReads = new AtomicInteger(1); final AtomicInteger channelReadCompleteCount = new AtomicInteger(0); final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean(); Consumer ctxConsumer = new Consumer() { @@ -927,214 +931,77 @@ public class Http2MultiplexCodecTest { } } }; - LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer); - Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + final LastInboundHandler inboundHandler = new LastInboundHandler(ctxConsumer); + Http2StreamChannel childChannel = newInboundStream(3, false, numReads, inboundHandler); childChannel.config().setAutoRead(false); - Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream); - Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream); - Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream); - Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream); + Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(childChannel.stream()); + Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(childChannel.stream()); + Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(childChannel.stream()); + Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(childChannel.stream()); - assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound()); + assertEquals(new DefaultHttp2HeadersFrame(request).stream(childChannel.stream()), inboundHandler.readInbound()); - // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. - parentChannel.writeOneInbound(new Object()); - codec.onHttp2Frame(dataFrame1); - assertEquals(dataFrame1, inboundHandler.readInbound()); + ChannelHandler readCompleteSupressHandler = new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. + } + }; + parentChannel.pipeline().addFirst(readCompleteSupressHandler); + + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("1"), 0, false); + + assertEqualsAndRelease(dataFrame1, inboundHandler.readInbound()); // We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that // when beginRead() is called the child channel is added to the readPending queue of the parent channel. - codec.onHttp2Frame(dataFrame2); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("2"), 0, false); numReads.set(2); childChannel.read(); - assertEquals(dataFrame2, inboundHandler.readInbound()); + + assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound()); + assertNull(inboundHandler.readInbound()); // This is the second item that was read, this should be the last until we call read() again. This should also // notify of readComplete(). - codec.onHttp2Frame(dataFrame3); - assertEquals(dataFrame3, inboundHandler.readInbound()); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("3"), 0, false); - codec.onHttp2Frame(dataFrame4); + assertEqualsAndRelease(dataFrame3, inboundHandler.readInbound()); + + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("4"), 0, false); assertNull(inboundHandler.readInbound()); childChannel.read(); - assertEquals(dataFrame4, inboundHandler.readInbound()); + + assertEqualsAndRelease(dataFrame4, inboundHandler.readInbound()); + assertNull(inboundHandler.readInbound()); // Now we want to call channelReadComplete and simulate the end of the read loop. + parentChannel.pipeline().remove(readCompleteSupressHandler); parentChannel.flushInbound(); // 3 = 1 for initialization + 1 for first read of 2 items + 1 for second read of 2 items + // 1 for parent channel readComplete assertEquals(4, channelReadCompleteCount.get()); - - dataFrame1.release(); - dataFrame2.release(); - dataFrame3.release(); - dataFrame4.release(); } - private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream) { - return streamActiveAndWriteHeaders(stream, null, LastInboundHandler.noopConsumer()); - } - - private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream, - AtomicInteger maxReads, - Consumer contextConsumer) { - - LastInboundHandler inboundHandler = new LastInboundHandler(contextConsumer); - childChannelInitializer.handler = inboundHandler; - childChannelInitializer.maxReads = maxReads; - assertFalse(inboundHandler.isChannelActive()); - ((TestableHttp2MultiplexCodec.Stream) stream).state = Http2Stream.State.OPEN; - codec.onHttp2StreamStateChanged(stream); - codec.onHttp2Frame(new DefaultHttp2HeadersFrame(request).stream(stream)); - codec.onChannelReadComplete(); - assertTrue(inboundHandler.isChannelActive()); - - return inboundHandler; - } - - private static void verifyFramesMultiplexedToCorrectChannel(Http2FrameStream stream, + private static void verifyFramesMultiplexedToCorrectChannel(Http2StreamChannel streamChannel, LastInboundHandler inboundHandler, int numFrames) { for (int i = 0; i < numFrames; i++) { Http2StreamFrame frame = inboundHandler.readInbound(); assertNotNull(frame); - assertEquals(stream, frame.stream()); + assertEquals(streamChannel.stream(), frame.stream()); release(frame); } assertNull(inboundHandler.readInbound()); } - private static ByteBuf bb(String s) { - return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s); - } - - /** - * Simulates the frame codec, in first assigning an identifier and the completing the write promise. - */ - private Http2FrameStream readOutboundHeadersAndAssignId() { - // Only peek at the frame, so to not complete the promise of the write. We need to first - // assign a stream identifier, as the frame codec would do. - Http2HeadersFrame headersFrame = (Http2HeadersFrame) parentChannel.outboundMessages().peek(); - assertNotNull(headersFrame); - assertNotNull(headersFrame.stream()); - assertFalse(Http2CodecUtil.isStreamIdValid(headersFrame.stream().id())); - TestableHttp2MultiplexCodec.Stream frameStream = (TestableHttp2MultiplexCodec.Stream) headersFrame.stream(); - frameStream.id = outboundStream.id(); - // Create the stream in the Http2Connection. - try { - Http2Stream stream = codec.connection().local().createStream( - headersFrame.stream().id(), headersFrame.isEndStream()); - frameStream.stream = stream; - } catch (Exception ex) { - throw new IllegalStateException("Failed to create a stream", ex); - } - - // Now read it and complete the write promise. - assertSame(headersFrame, parentChannel.readOutbound()); - - return headersFrame.stream(); - } - - /** - * This class removes the bits that would transform the frames to bytes and so make it easier to test the actual - * special handling of the codec. - */ - private final class TestableHttp2MultiplexCodec extends Http2MultiplexCodec { - - public TestableHttp2MultiplexCodec(Http2ConnectionEncoder encoder, - Http2ConnectionDecoder decoder, - Http2Settings initialSettings, - ChannelHandler inboundStreamHandler) { - super(encoder, decoder, initialSettings, inboundStreamHandler, null); - } - - void onHttp2Frame(Http2Frame frame) { - onHttp2Frame(ctx, frame); - } - - void onChannelReadComplete() { - onChannelReadComplete(ctx); - } - - void onHttp2StreamStateChanged(Http2FrameStream stream) { - onHttp2StreamStateChanged(ctx, stream); - } - - void onHttp2FrameStreamException(Http2FrameStreamException cause) { - onHttp2FrameStreamException(ctx, cause); - } - - void onHttp2StreamWritabilityChanged(Http2FrameStream stream, boolean writable) { - onHttp2StreamWritabilityChanged(ctx, stream, writable); - } - - @Override - boolean onBytesConsumed(ChannelHandlerContext ctx, Http2FrameStream stream, int bytes) { - writer.write(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream), ctx.newPromise()); - return true; - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - writer.write(msg, promise); - } - - @Override - void flush0(ChannelHandlerContext ctx) { - // Do nothing - } - - @Override - Stream newStream() { - return new Stream(); - } - - final class Stream extends Http2MultiplexCodecStream { - Http2Stream.State state = Http2Stream.State.IDLE; - int id = -1; - - @Override - public int id() { - return id; - } - - @Override - public Http2Stream.State state() { - return state; - } - } - } - - private final class TestableHttp2MultiplexCodecBuilder extends Http2MultiplexCodecBuilder { - - TestableHttp2MultiplexCodecBuilder(boolean server, ChannelHandler childHandler) { - super(server, childHandler); - } - - @Override - public TestableHttp2MultiplexCodec build() { - return (TestableHttp2MultiplexCodec) super.build(); - } - - @Override - protected Http2MultiplexCodec build( - Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) { - return new TestableHttp2MultiplexCodec( - encoder, decoder, initialSettings, childHandler); - } - } - - class Writer { - - void write(Object msg, ChannelPromise promise) { - parentChannel.outboundMessages().add(msg); - promise.setSuccess(); - } + private static int eqStreamId(Http2StreamChannel channel) { + return eq(channel.stream().id()); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java index e7130ee5d9..f660381237 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java @@ -15,7 +15,9 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -24,18 +26,34 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.AsciiString; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import junit.framework.AssertionFailedError; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.List; import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_HEADER_LIST_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_HEADER_TABLE_SIZE; +import static io.netty.util.ReferenceCountUtil.release; import static java.lang.Math.min; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyByte; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; /** * Utilities for the integration tests. @@ -506,4 +524,179 @@ public final class Http2TestUtil { return isWriteAllowed ? (int) min(pendingBytes, Integer.MAX_VALUE) : -1; } } + + static Http2FrameWriter mockedFrameWriter() { + Http2FrameWriter.Configuration configuration = new Http2FrameWriter.Configuration() { + private final Http2HeadersEncoder.Configuration headerConfiguration = + new Http2HeadersEncoder.Configuration() { + @Override + public void maxHeaderTableSize(long max) { + // NOOP + } + + @Override + public long maxHeaderTableSize() { + return 0; + } + + @Override + public void maxHeaderListSize(long max) { + // NOOP + } + + @Override + public long maxHeaderListSize() { + return 0; + } + }; + + private final Http2FrameSizePolicy policy = new Http2FrameSizePolicy() { + @Override + public void maxFrameSize(int max) { + // NOOP + } + + @Override + public int maxFrameSize() { + return 0; + } + }; + @Override + public Http2HeadersEncoder.Configuration headersConfiguration() { + return headerConfiguration; + } + + @Override + public Http2FrameSizePolicy frameSizePolicy() { + return policy; + } + }; + + final ConcurrentLinkedQueue buffers = new ConcurrentLinkedQueue(); + + Http2FrameWriter frameWriter = Mockito.mock(Http2FrameWriter.class); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) { + for (;;) { + ByteBuf buf = buffers.poll(); + if (buf == null) { + break; + } + buf.release(); + } + return null; + } + }).when(frameWriter).close(); + + when(frameWriter.configuration()).thenReturn(configuration); + when(frameWriter.writeSettings(any(ChannelHandlerContext.class), any(Http2Settings.class), + any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(2)).setSuccess(); + } + }); + + when(frameWriter.writeSettingsAck(any(ChannelHandlerContext.class), any(ChannelPromise.class))) + .thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(1)).setSuccess(); + } + }); + + when(frameWriter.writeGoAway(any(ChannelHandlerContext.class), anyInt(), + anyLong(), any(ByteBuf.class), any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + buffers.offer((ByteBuf) invocationOnMock.getArgument(3)); + return ((ChannelPromise) invocationOnMock.getArgument(4)).setSuccess(); + } + }); + when(frameWriter.writeHeaders(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class), anyInt(), + anyBoolean(), any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess(); + } + }); + + when(frameWriter.writeHeaders(any(ChannelHandlerContext.class), anyInt(), + any(Http2Headers.class), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(), + any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(8)).setSuccess(); + } + }); + + when(frameWriter.writeData(any(ChannelHandlerContext.class), anyInt(), any(ByteBuf.class), anyInt(), + anyBoolean(), any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + buffers.offer((ByteBuf) invocationOnMock.getArgument(2)); + return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess(); + } + }); + + when(frameWriter.writeRstStream(any(ChannelHandlerContext.class), anyInt(), + anyLong(), any(ChannelPromise.class))).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(3)).setSuccess(); + } + }); + + when(frameWriter.writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), anyInt(), + any(ChannelPromise.class))).then(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(3)).setSuccess(); + } + }); + + when(frameWriter.writePushPromise(any(ChannelHandlerContext.class), anyInt(), anyInt(), any(Http2Headers.class), + anyInt(), anyChannelPromise())).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess(); + } + }); + + when(frameWriter.writeFrame(any(ChannelHandlerContext.class), anyByte(), anyInt(), any(Http2Flags.class), + any(ByteBuf.class), anyChannelPromise())).thenAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock invocationOnMock) { + buffers.offer((ByteBuf) invocationOnMock.getArgument(4)); + return ((ChannelPromise) invocationOnMock.getArgument(5)).setSuccess(); + } + }); + return frameWriter; + } + + static ChannelPromise anyChannelPromise() { + return any(ChannelPromise.class); + } + + static Http2Settings anyHttp2Settings() { + return any(Http2Settings.class); + } + + static ByteBuf bb(String s) { + return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s); + } + + static void assertEqualsAndRelease(Http2Frame expected, Http2Frame actual) { + try { + assertEquals(expected, actual); + } finally { + release(expected); + release(actual); + // Will return -1 when not implements ReferenceCounted. + assertTrue(ReferenceCountUtil.refCnt(expected) <= 0); + assertTrue(ReferenceCountUtil.refCnt(actual) <= 0); + } + } + } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java index 015550f078..2cc79d14e5 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java @@ -50,9 +50,9 @@ public class TestChannelInitializer extends ChannelInitializer { /** * Designed to read a single byte at a time to control the number of reads done at a fine granularity. */ - private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { + static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { private final AtomicInteger numReads; - TestNumReadsRecvByteBufAllocator(AtomicInteger numReads) { + private TestNumReadsRecvByteBufAllocator(AtomicInteger numReads) { this.numReads = numReads; }