From 2139ff9fe384792f7b4b82f6bf968f268b9b1d48 Mon Sep 17 00:00:00 2001 From: Aayush Atharva Date: Sun, 27 Dec 2020 19:00:24 +0530 Subject: [PATCH] Add PushPromise and Priority Frame support in Http2FrameCodec (#10765) Motivation: Right now, we don't have to handle Push Promise Read in `Http2FrameCodec`. Push Promise is one of the key features of HTTP/2 and we should support it in our `Http2FrameCodec`. Modification: Added `Http2PushPromiseFrame` and `Http2PushPromiseFrame` to handle Push Promise and Promise Frame. Result: Fixes #10748 --- .../http2/DefaultHttp2PriorityFrame.java | 77 ++++++ .../http2/DefaultHttp2PushPromiseFrame.java | 101 ++++++++ .../handler/codec/http2/Http2FrameCodec.java | 140 ++++++---- .../codec/http2/Http2PriorityFrame.java | 44 ++++ .../codec/http2/Http2PushPromiseFrame.java | 55 ++++ .../DefaultHttp2PushPromiseFrameTest.java | 242 ++++++++++++++++++ 6 files changed, 611 insertions(+), 48 deletions(-) create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PriorityFrame.java create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrame.java create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PriorityFrame.java create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PushPromiseFrame.java create mode 100644 codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrameTest.java diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PriorityFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PriorityFrame.java new file mode 100644 index 0000000000..e131936e24 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PriorityFrame.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.util.internal.UnstableApi; + +/** + * Default implementation of {@linkplain Http2PriorityFrame} + */ +@UnstableApi +public final class DefaultHttp2PriorityFrame implements Http2PriorityFrame { + + private final int streamDependency; + private final short weight; + private final boolean exclusive; + private Http2FrameStream http2FrameStream; + + public DefaultHttp2PriorityFrame(int streamDependency, short weight, boolean exclusive) { + this.streamDependency = streamDependency; + this.weight = weight; + this.exclusive = exclusive; + } + + @Override + public int streamDependency() { + return streamDependency; + } + + @Override + public short weight() { + return weight; + } + + @Override + public boolean exclusive() { + return exclusive; + } + + @Override + public Http2PriorityFrame stream(Http2FrameStream stream) { + http2FrameStream = stream; + return this; + } + + @Override + public Http2FrameStream stream() { + return http2FrameStream; + } + + @Override + public String name() { + return "PRIORITY_FRAME"; + } + + @Override + public String toString() { + return "DefaultHttp2PriorityFrame(" + + "stream=" + http2FrameStream + + ", streamDependency=" + streamDependency + + ", weight=" + weight + + ", exclusive=" + exclusive + + ')'; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrame.java new file mode 100644 index 0000000000..f9fd987109 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrame.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.util.internal.UnstableApi; + +/** + * Default implementation of {@link Http2PushPromiseFrame} + */ +@UnstableApi +public final class DefaultHttp2PushPromiseFrame implements Http2PushPromiseFrame { + + private Http2FrameStream pushStreamFrame; + private final Http2Headers http2Headers; + private Http2FrameStream streamFrame; + private final int padding; + private final int promisedStreamId; + + public DefaultHttp2PushPromiseFrame(Http2Headers http2Headers) { + this(http2Headers, 0); + } + + public DefaultHttp2PushPromiseFrame(Http2Headers http2Headers, int padding) { + this(http2Headers, padding, -1); + } + + DefaultHttp2PushPromiseFrame(Http2Headers http2Headers, int padding, int promisedStreamId) { + this.http2Headers = http2Headers; + this.padding = padding; + this.promisedStreamId = promisedStreamId; + } + + @Override + public Http2StreamFrame pushStream(Http2FrameStream stream) { + pushStreamFrame = stream; + return this; + } + + @Override + public Http2FrameStream pushStream() { + return pushStreamFrame; + } + + @Override + public Http2Headers http2Headers() { + return http2Headers; + } + + @Override + public int padding() { + return padding; + } + + @Override + public int promisedStreamId() { + if (pushStreamFrame != null) { + return pushStreamFrame.id(); + } else { + return promisedStreamId; + } + } + + @Override + public Http2PushPromiseFrame stream(Http2FrameStream stream) { + streamFrame = stream; + return this; + } + + @Override + public Http2FrameStream stream() { + return streamFrame; + } + + @Override + public String name() { + return "PUSH_PROMISE_FRAME"; + } + + @Override + public String toString() { + return "DefaultHttp2PushPromiseFrame{" + + "pushStreamFrame=" + pushStreamFrame + + ", http2Headers=" + http2Headers + + ", streamFrame=" + streamFrame + + ", padding=" + padding + + '}'; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java index 390fde5a9b..90dd87c136 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -54,7 +55,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; * creating outbound streams. * *

Stream Lifecycle

- * + *

* The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream. @@ -64,7 +65,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; * {@link Http2StreamFrame#stream(Http2FrameStream)}. * *

Flow control

- * + *

* The frame codec automatically increments stream and connection flow control windows. * *

Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed @@ -78,12 +79,12 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; * connection-level flow control window is the same as initial stream-level flow control window. * *

New inbound Streams

- * + *

* The first frame of an HTTP/2 stream must be an {@link Http2HeadersFrame}, which will have an {@link Http2FrameStream} * object attached. * *

New outbound Streams

- * + *

* A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream * attached. @@ -125,13 +126,13 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}. * *

Error Handling

- * + *

* Exceptions and errors are propagated via {@link ChannelHandler#exceptionCaught}. Exceptions that apply to * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding * {@link Http2FrameStream} object attached. * *

Reference Counting

- * + *

* Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before * propagating a reference counted object through the pipeline, and thus an application handler needs to release such @@ -139,7 +140,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; * https://netty.io/wiki/reference-counted-objects.html * *

HTTP Upgrade

- * + *

* Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary * HTTP-to-HTTP/2 conversion is performed automatically. */ @@ -155,7 +156,9 @@ public class Http2FrameCodec extends Http2ConnectionHandler { ChannelHandlerContext ctx; - /** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/ + /** + * Number of buffered streams if the {@link StreamBufferingEncoder} is used. + **/ private int numBufferedStreams; private final IntObjectMap frameStreamToInitializeMap = new IntObjectHashMap(8); @@ -201,7 +204,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler { /** * Retrieve the number of streams currently in the process of being initialized. - * + *

* This is package-private for testing only. */ int numInitializingStreams() { @@ -324,6 +327,13 @@ public class Http2FrameCodec extends Http2ConnectionHandler { encoder().writeSettingsAck(ctx, promise); } else if (msg instanceof Http2GoAwayFrame) { writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise); + } else if (msg instanceof Http2PushPromiseFrame) { + Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg; + writePushPromise(ctx, pushPromiseFrame, promise); + } else if (msg instanceof Http2PriorityFrame) { + Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg; + encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(), + priorityFrame.weight(), priorityFrame.exclusive(), promise); } else if (msg instanceof Http2UnknownFrame) { Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg; encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(), @@ -370,37 +380,14 @@ public class Http2FrameCodec extends Http2ConnectionHandler { goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise); } - private void writeHeadersFrame( - final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame, final ChannelPromise promise) { + private void writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame, + final ChannelPromise promise) { if (isStreamIdValid(headersFrame.stream().id())) { encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(), headersFrame.isEndStream(), promise); - } else { - final DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) headersFrame.stream(); - final Http2Connection connection = connection(); - final int streamId = connection.local().incrementAndGetNextStreamId(); - if (streamId < 0) { - promise.setFailure(new Http2NoMoreStreamIdsException()); - - // Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum - // valid stream ID for the current peer. - onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE : - Integer.MAX_VALUE - 1, NO_ERROR.code(), - writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation"))); - return; - } - stream.id = streamId; - - // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the - // stream in a field directly we may override the stored field before onStreamAdded(...) was called - // and so not correctly set the property for the buffered stream. - // - // See https://github.com/netty/netty/issues/8692 - Object old = frameStreamToInitializeMap.put(streamId, stream); - - // We should not re-use ids. - assert old == null; + } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream(), promise)) { + final int streamId = headersFrame.stream().id(); encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(), headersFrame.isEndStream(), promise); @@ -420,6 +407,59 @@ public class Http2FrameCodec extends Http2ConnectionHandler { } } + private void writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame, + final ChannelPromise promise) { + if (isStreamIdValid(pushPromiseFrame.pushStream().id())) { + encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(), + pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise); + } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream(), promise)) { + final int streamId = pushPromiseFrame.stream().id(); + encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(), + pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise); + + if (promise.isDone()) { + handleHeaderFuture(promise, streamId); + } else { + numBufferedStreams++; + // Clean up the stream being initialized if writing the headers fails and also + // decrement the number of buffered streams. + promise.addListener((ChannelFuture f) -> { + numBufferedStreams--; + handleHeaderFuture(f, streamId); + }); + } + } + } + + private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream, + ChannelPromise promise) { + final Http2Connection connection = connection(); + final int streamId = connection.local().incrementAndGetNextStreamId(); + if (streamId < 0) { + promise.setFailure(new Http2NoMoreStreamIdsException()); + + // Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum + // valid stream ID for the current peer. + onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE : + Integer.MAX_VALUE - 1, NO_ERROR.code(), + writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation"))); + + return false; + } + http2FrameStream.id = streamId; + + // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the + // stream in a field directly we may override the stored field before onStreamAdded(...) was called + // and so not correctly set the property for the buffered stream. + // + // See https://github.com/netty/netty/issues/8692 + Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream); + + // We should not re-use ids. + assert old == null; + return true; + } + private void handleHeaderFuture(Future channelFuture, int streamId) { if (!channelFuture.isSuccess()) { frameStreamToInitializeMap.remove(streamId); @@ -488,7 +528,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler { */ @Override protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, - Http2Exception.StreamException streamException) { + Http2Exception.StreamException streamException) { int streamId = streamException.streamId(); Http2Stream connectionStream = connection().stream(streamId); if (connectionStream == null) { @@ -513,12 +553,12 @@ public class Http2FrameCodec extends Http2ConnectionHandler { } private void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause, - Http2Exception.StreamException streamException) { + Http2Exception.StreamException streamException) { // It is normal to hit a race condition where we still receive frames for a stream that this // peer has deemed closed, such as if this peer sends a RST(CANCEL) to discard the request. // Since this is likely to be normal we log at DEBUG level. InternalLogLevel level = - streamException.error() == Http2Error.STREAM_CLOSED ? InternalLogLevel.DEBUG : InternalLogLevel.WARN; + streamException.error() == Http2Error.STREAM_CLOSED ? InternalLogLevel.DEBUG : InternalLogLevel.WARN; LOG.log(level, "Stream exception thrown for unknown stream {}.", streamException.streamId(), cause); } @@ -580,14 +620,14 @@ public class Http2FrameCodec extends Http2ConnectionHandler { public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) { onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding) - .stream(requireStream(streamId))); + .stream(requireStream(streamId))); } @Override public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) { onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding) - .stream(requireStream(streamId)).retain()); + .stream(requireStream(streamId)).retain()); // We return the bytes in consumeBytes() once the stream channel consumed the bytes. return 0; } @@ -598,9 +638,10 @@ public class Http2FrameCodec extends Http2ConnectionHandler { } @Override - public void onPriorityRead( - ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) { - // TODO: Maybe handle me + public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, + short weight, boolean exclusive) { + onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive) + .stream(requireStream(streamId))); } @Override @@ -609,9 +650,12 @@ public class Http2FrameCodec extends Http2ConnectionHandler { } @Override - public void onPushPromiseRead( - ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) { - // TODO: Maybe handle me + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, + Http2Headers headers, int padding) { + onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId) + .pushStream(new DefaultHttp2FrameStream() + .setStreamAndProperty(streamKey, connection().stream(promisedStreamId))) + .stream(requireStream(streamId))); } private Http2FrameStream requireStream(int streamId) { @@ -628,7 +672,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler { } private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream, - @SuppressWarnings("unused") boolean writable) { + @SuppressWarnings("unused") boolean writable) { ctx.fireUserEventTriggered(stream.writabilityChanged); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PriorityFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PriorityFrame.java new file mode 100644 index 0000000000..403028fa39 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PriorityFrame.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.util.internal.UnstableApi; + +/** + * HTTP/2 Priority Frame + */ +@UnstableApi +public interface Http2PriorityFrame extends Http2StreamFrame { + + /** + * Parent Stream Id of this Priority request + */ + int streamDependency(); + + /** + * Stream weight + */ + short weight(); + + /** + * Set to {@code true} if this stream is exclusive else set to {@code false} + */ + boolean exclusive(); + + @Override + Http2PriorityFrame stream(Http2FrameStream stream); + +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PushPromiseFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PushPromiseFrame.java new file mode 100644 index 0000000000..dc5d7cb42e --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2PushPromiseFrame.java @@ -0,0 +1,55 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.util.internal.UnstableApi; + +/** + * HTTP/2 Push Promise Frame + */ +@UnstableApi +public interface Http2PushPromiseFrame extends Http2StreamFrame { + + /** + * Set the Promise {@link Http2FrameStream} object for this frame. + */ + Http2StreamFrame pushStream(Http2FrameStream stream); + + /** + * Returns the Promise {@link Http2FrameStream} object for this frame, or {@code null} if the + * frame has yet to be associated with a stream. + */ + Http2FrameStream pushStream(); + + /** + * {@link Http2Headers} sent in Push Promise + */ + Http2Headers http2Headers(); + + /** + * Frame padding to use. Will be non-negative and less than 256. + */ + int padding(); + + /** + * Promised Stream ID + */ + int promisedStreamId(); + + @Override + Http2PushPromiseFrame stream(Http2FrameStream stream); + +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrameTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrameTest.java new file mode 100644 index 0000000000..7bc598d557 --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2PushPromiseFrameTest.java @@ -0,0 +1,242 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.Assert.assertEquals; + +public class DefaultHttp2PushPromiseFrameTest { + + private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2); + private final ClientHandler clientHandler = new ClientHandler(); + private final Map contentMap = new ConcurrentHashMap(); + + private ChannelFuture connectionFuture; + + @Before + public void setup() throws InterruptedException { + ServerBootstrap serverBootstrap = new ServerBootstrap() + .group(eventLoopGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + Http2FrameCodec frameCodec = Http2FrameCodecBuilder.forServer() + .autoAckSettingsFrame(true) + .autoAckPingFrame(true) + .build(); + + pipeline.addLast(frameCodec); + pipeline.addLast(new ServerHandler()); + } + }); + + ChannelFuture channelFuture = serverBootstrap.bind(0).sync(); + + final Bootstrap bootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + Http2FrameCodec frameCodec = Http2FrameCodecBuilder.forClient() + .autoAckSettingsFrame(true) + .autoAckPingFrame(true) + .initialSettings(Http2Settings.defaultSettings().pushEnabled(true)) + .build(); + + pipeline.addLast(frameCodec); + pipeline.addLast(clientHandler); + } + }); + + connectionFuture = bootstrap.connect(channelFuture.channel().localAddress()); + } + + @Test + public void send() { + connectionFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + clientHandler.write(); + } + }); + } + + @After + public void shutdown() { + eventLoopGroup.shutdownGracefully(); + } + + private final class ServerHandler extends Http2ChannelDuplexHandler { + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + + if (msg instanceof Http2HeadersFrame) { + final Http2HeadersFrame receivedFrame = (Http2HeadersFrame) msg; + + Http2Headers pushRequestHeaders = new DefaultHttp2Headers(); + pushRequestHeaders.path("/meow") + .method("GET") + .scheme("https") + .authority("localhost:5555"); + + // Write PUSH_PROMISE request headers + final Http2FrameStream newPushFrameStream = newStream(); + Http2PushPromiseFrame pushPromiseFrame = new DefaultHttp2PushPromiseFrame(pushRequestHeaders); + pushPromiseFrame.stream(receivedFrame.stream()); + pushPromiseFrame.pushStream(newPushFrameStream); + ctx.writeAndFlush(pushPromiseFrame).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + contentMap.put(newPushFrameStream.id(), "Meow, I am Pushed via HTTP/2"); + + // Write headers for actual request + Http2Headers http2Headers = new DefaultHttp2Headers(); + http2Headers.status("200"); + http2Headers.add("push", "false"); + Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, false); + headersFrame.stream(receivedFrame.stream()); + ChannelFuture channelFuture = ctx.writeAndFlush(headersFrame); + + // Write Data of actual request + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + Http2DataFrame dataFrame = new DefaultHttp2DataFrame( + Unpooled.wrappedBuffer("Meow".getBytes()), true); + dataFrame.stream(receivedFrame.stream()); + ctx.writeAndFlush(dataFrame); + } + }); + } + }); + } else if (msg instanceof Http2PriorityFrame) { + Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg; + String content = contentMap.get(priorityFrame.stream().id()); + if (content == null) { + ctx.writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.REFUSED_STREAM)); + return; + } + + // Write headers for Priority request + Http2Headers http2Headers = new DefaultHttp2Headers(); + http2Headers.status("200"); + http2Headers.add("push", "true"); + Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, false); + headersFrame.stream(priorityFrame.stream()); + ctx.writeAndFlush(headersFrame); + + // Write Data of Priority request + Http2DataFrame dataFrame = new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(content.getBytes()), true); + dataFrame.stream(priorityFrame.stream()); + ctx.writeAndFlush(dataFrame); + } + } + } + + private static final class ClientHandler extends Http2ChannelDuplexHandler { + + private ChannelHandlerContext ctx; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws InterruptedException { + this.ctx = ctx; + } + + void write() { + Http2Headers http2Headers = new DefaultHttp2Headers(); + http2Headers.path("/") + .authority("localhost") + .method("GET") + .scheme("https"); + + Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, true); + headersFrame.stream(newStream()); + ctx.writeAndFlush(headersFrame); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + + if (msg instanceof Http2PushPromiseFrame) { + Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg; + + assertEquals("/meow", pushPromiseFrame.http2Headers().path().toString()); + assertEquals("GET", pushPromiseFrame.http2Headers().method().toString()); + assertEquals("https", pushPromiseFrame.http2Headers().scheme().toString()); + assertEquals("localhost:5555", pushPromiseFrame.http2Headers().authority().toString()); + + Http2PriorityFrame priorityFrame = new DefaultHttp2PriorityFrame(pushPromiseFrame.stream().id(), + Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, true); + priorityFrame.stream(pushPromiseFrame.pushStream()); + ctx.writeAndFlush(priorityFrame); + } else if (msg instanceof Http2HeadersFrame) { + Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg; + + if (headersFrame.stream().id() == 3) { + assertEquals("200", headersFrame.headers().status().toString()); + assertEquals("false", headersFrame.headers().get("push").toString()); + } else if (headersFrame.stream().id() == 2) { + assertEquals("200", headersFrame.headers().status().toString()); + assertEquals("true", headersFrame.headers().get("push").toString()); + } else { + ctx.writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.REFUSED_STREAM)); + } + } else if (msg instanceof Http2DataFrame) { + Http2DataFrame dataFrame = (Http2DataFrame) msg; + + try { + if (dataFrame.stream().id() == 3) { + assertEquals("Meow", dataFrame.content().toString(CharsetUtil.UTF_8)); + } else if (dataFrame.stream().id() == 2) { + assertEquals("Meow, I am Pushed via HTTP/2", dataFrame.content().toString(CharsetUtil.UTF_8)); + } else { + ctx.writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.REFUSED_STREAM)); + } + } finally { + ReferenceCountUtil.release(dataFrame); + } + } + } + } +}