Add PushPromise and Priority Frame support in Http2FrameCodec (#10765)

Motivation:
Right now, we don't have to handle Push Promise Read in `Http2FrameCodec`. Push Promise is one of the key features of HTTP/2 and we should support it in our `Http2FrameCodec`.

Modification:
Added `Http2PushPromiseFrame` and `Http2PushPromiseFrame` to handle Push Promise and Promise Frame.

Result:
Fixes #10748
This commit is contained in:
Aayush Atharva 2020-12-27 19:00:24 +05:30 committed by Norman Maurer
parent b4dcd18427
commit 2139ff9fe3
6 changed files with 611 additions and 48 deletions

View File

@ -0,0 +1,77 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* Default implementation of {@linkplain Http2PriorityFrame}
*/
@UnstableApi
public final class DefaultHttp2PriorityFrame implements Http2PriorityFrame {
private final int streamDependency;
private final short weight;
private final boolean exclusive;
private Http2FrameStream http2FrameStream;
public DefaultHttp2PriorityFrame(int streamDependency, short weight, boolean exclusive) {
this.streamDependency = streamDependency;
this.weight = weight;
this.exclusive = exclusive;
}
@Override
public int streamDependency() {
return streamDependency;
}
@Override
public short weight() {
return weight;
}
@Override
public boolean exclusive() {
return exclusive;
}
@Override
public Http2PriorityFrame stream(Http2FrameStream stream) {
http2FrameStream = stream;
return this;
}
@Override
public Http2FrameStream stream() {
return http2FrameStream;
}
@Override
public String name() {
return "PRIORITY_FRAME";
}
@Override
public String toString() {
return "DefaultHttp2PriorityFrame(" +
"stream=" + http2FrameStream +
", streamDependency=" + streamDependency +
", weight=" + weight +
", exclusive=" + exclusive +
')';
}
}

View File

@ -0,0 +1,101 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* Default implementation of {@link Http2PushPromiseFrame}
*/
@UnstableApi
public final class DefaultHttp2PushPromiseFrame implements Http2PushPromiseFrame {
private Http2FrameStream pushStreamFrame;
private final Http2Headers http2Headers;
private Http2FrameStream streamFrame;
private final int padding;
private final int promisedStreamId;
public DefaultHttp2PushPromiseFrame(Http2Headers http2Headers) {
this(http2Headers, 0);
}
public DefaultHttp2PushPromiseFrame(Http2Headers http2Headers, int padding) {
this(http2Headers, padding, -1);
}
DefaultHttp2PushPromiseFrame(Http2Headers http2Headers, int padding, int promisedStreamId) {
this.http2Headers = http2Headers;
this.padding = padding;
this.promisedStreamId = promisedStreamId;
}
@Override
public Http2StreamFrame pushStream(Http2FrameStream stream) {
pushStreamFrame = stream;
return this;
}
@Override
public Http2FrameStream pushStream() {
return pushStreamFrame;
}
@Override
public Http2Headers http2Headers() {
return http2Headers;
}
@Override
public int padding() {
return padding;
}
@Override
public int promisedStreamId() {
if (pushStreamFrame != null) {
return pushStreamFrame.id();
} else {
return promisedStreamId;
}
}
@Override
public Http2PushPromiseFrame stream(Http2FrameStream stream) {
streamFrame = stream;
return this;
}
@Override
public Http2FrameStream stream() {
return streamFrame;
}
@Override
public String name() {
return "PUSH_PROMISE_FRAME";
}
@Override
public String toString() {
return "DefaultHttp2PushPromiseFrame{" +
"pushStreamFrame=" + pushStreamFrame +
", http2Headers=" + http2Headers +
", streamFrame=" + streamFrame +
", padding=" + padding +
'}';
}
}

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@ -54,7 +55,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
* creating outbound streams.
*
* <h3>Stream Lifecycle</h3>
*
* <p>
* The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
* {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
* {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
@ -64,7 +65,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
* {@link Http2StreamFrame#stream(Http2FrameStream)}.
*
* <h3>Flow control</h3>
*
* <p>
* The frame codec automatically increments stream and connection flow control windows.
*
* <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
@ -78,12 +79,12 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
* connection-level flow control window is the same as initial stream-level flow control window.
*
* <h3>New inbound Streams</h3>
*
* <p>
* The first frame of an HTTP/2 stream must be an {@link Http2HeadersFrame}, which will have an {@link Http2FrameStream}
* object attached.
*
* <h3>New outbound Streams</h3>
*
* <p>
* A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
* {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
* attached.
@ -125,13 +126,13 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
* the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
*
* <h3>Error Handling</h3>
*
* <p>
* Exceptions and errors are propagated via {@link ChannelHandler#exceptionCaught}. Exceptions that apply to
* a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
* {@link Http2FrameStream} object attached.
*
* <h3>Reference Counting</h3>
*
* <p>
* Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
* reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
* propagating a reference counted object through the pipeline, and thus an application handler needs to release such
@ -139,7 +140,7 @@ import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
* https://netty.io/wiki/reference-counted-objects.html
*
* <h3>HTTP Upgrade</h3>
*
* <p>
* Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
* HTTP-to-HTTP/2 conversion is performed automatically.
*/
@ -155,7 +156,9 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
ChannelHandlerContext ctx;
/** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
/**
* Number of buffered streams if the {@link StreamBufferingEncoder} is used.
**/
private int numBufferedStreams;
private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
new IntObjectHashMap<DefaultHttp2FrameStream>(8);
@ -201,7 +204,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
/**
* Retrieve the number of streams currently in the process of being initialized.
*
* <p>
* This is package-private for testing only.
*/
int numInitializingStreams() {
@ -324,6 +327,13 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
encoder().writeSettingsAck(ctx, promise);
} else if (msg instanceof Http2GoAwayFrame) {
writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
} else if (msg instanceof Http2PushPromiseFrame) {
Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
writePushPromise(ctx, pushPromiseFrame, promise);
} else if (msg instanceof Http2PriorityFrame) {
Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
priorityFrame.weight(), priorityFrame.exclusive(), promise);
} else if (msg instanceof Http2UnknownFrame) {
Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
@ -370,37 +380,14 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
}
private void writeHeadersFrame(
final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame, final ChannelPromise promise) {
private void writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame,
final ChannelPromise promise) {
if (isStreamIdValid(headersFrame.stream().id())) {
encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
headersFrame.isEndStream(), promise);
} else {
final DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) headersFrame.stream();
final Http2Connection connection = connection();
final int streamId = connection.local().incrementAndGetNextStreamId();
if (streamId < 0) {
promise.setFailure(new Http2NoMoreStreamIdsException());
// Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum
// valid stream ID for the current peer.
onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
Integer.MAX_VALUE - 1, NO_ERROR.code(),
writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation")));
return;
}
stream.id = streamId;
// Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
// stream in a field directly we may override the stored field before onStreamAdded(...) was called
// and so not correctly set the property for the buffered stream.
//
// See https://github.com/netty/netty/issues/8692
Object old = frameStreamToInitializeMap.put(streamId, stream);
// We should not re-use ids.
assert old == null;
} else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream(), promise)) {
final int streamId = headersFrame.stream().id();
encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
headersFrame.isEndStream(), promise);
@ -420,6 +407,59 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
}
}
private void writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame,
final ChannelPromise promise) {
if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
} else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream(), promise)) {
final int streamId = pushPromiseFrame.stream().id();
encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
if (promise.isDone()) {
handleHeaderFuture(promise, streamId);
} else {
numBufferedStreams++;
// Clean up the stream being initialized if writing the headers fails and also
// decrement the number of buffered streams.
promise.addListener((ChannelFuture f) -> {
numBufferedStreams--;
handleHeaderFuture(f, streamId);
});
}
}
}
private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream,
ChannelPromise promise) {
final Http2Connection connection = connection();
final int streamId = connection.local().incrementAndGetNextStreamId();
if (streamId < 0) {
promise.setFailure(new Http2NoMoreStreamIdsException());
// Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum
// valid stream ID for the current peer.
onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
Integer.MAX_VALUE - 1, NO_ERROR.code(),
writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation")));
return false;
}
http2FrameStream.id = streamId;
// Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
// stream in a field directly we may override the stored field before onStreamAdded(...) was called
// and so not correctly set the property for the buffered stream.
//
// See https://github.com/netty/netty/issues/8692
Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
// We should not re-use ids.
assert old == null;
return true;
}
private void handleHeaderFuture(Future<?> channelFuture, int streamId) {
if (!channelFuture.isSuccess()) {
frameStreamToInitializeMap.remove(streamId);
@ -488,7 +528,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
*/
@Override
protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
Http2Exception.StreamException streamException) {
Http2Exception.StreamException streamException) {
int streamId = streamException.streamId();
Http2Stream connectionStream = connection().stream(streamId);
if (connectionStream == null) {
@ -513,12 +553,12 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
}
private void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause,
Http2Exception.StreamException streamException) {
Http2Exception.StreamException streamException) {
// It is normal to hit a race condition where we still receive frames for a stream that this
// peer has deemed closed, such as if this peer sends a RST(CANCEL) to discard the request.
// Since this is likely to be normal we log at DEBUG level.
InternalLogLevel level =
streamException.error() == Http2Error.STREAM_CLOSED ? InternalLogLevel.DEBUG : InternalLogLevel.WARN;
streamException.error() == Http2Error.STREAM_CLOSED ? InternalLogLevel.DEBUG : InternalLogLevel.WARN;
LOG.log(level, "Stream exception thrown for unknown stream {}.", streamException.streamId(), cause);
}
@ -580,14 +620,14 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) {
onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
.stream(requireStream(streamId)));
.stream(requireStream(streamId)));
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) {
onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
.stream(requireStream(streamId)).retain());
.stream(requireStream(streamId)).retain());
// We return the bytes in consumeBytes() once the stream channel consumed the bytes.
return 0;
}
@ -598,9 +638,10 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
}
@Override
public void onPriorityRead(
ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
// TODO: Maybe handle me
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) {
onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
.stream(requireStream(streamId)));
}
@Override
@ -609,9 +650,12 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
}
@Override
public void onPushPromiseRead(
ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) {
// TODO: Maybe handle me
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) {
onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
.pushStream(new DefaultHttp2FrameStream()
.setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
.stream(requireStream(streamId)));
}
private Http2FrameStream requireStream(int streamId) {
@ -628,7 +672,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
}
private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
@SuppressWarnings("unused") boolean writable) {
@SuppressWarnings("unused") boolean writable) {
ctx.fireUserEventTriggered(stream.writabilityChanged);
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* HTTP/2 Priority Frame
*/
@UnstableApi
public interface Http2PriorityFrame extends Http2StreamFrame {
/**
* Parent Stream Id of this Priority request
*/
int streamDependency();
/**
* Stream weight
*/
short weight();
/**
* Set to {@code true} if this stream is exclusive else set to {@code false}
*/
boolean exclusive();
@Override
Http2PriorityFrame stream(Http2FrameStream stream);
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* HTTP/2 Push Promise Frame
*/
@UnstableApi
public interface Http2PushPromiseFrame extends Http2StreamFrame {
/**
* Set the Promise {@link Http2FrameStream} object for this frame.
*/
Http2StreamFrame pushStream(Http2FrameStream stream);
/**
* Returns the Promise {@link Http2FrameStream} object for this frame, or {@code null} if the
* frame has yet to be associated with a stream.
*/
Http2FrameStream pushStream();
/**
* {@link Http2Headers} sent in Push Promise
*/
Http2Headers http2Headers();
/**
* Frame padding to use. Will be non-negative and less than 256.
*/
int padding();
/**
* Promised Stream ID
*/
int promisedStreamId();
@Override
Http2PushPromiseFrame stream(Http2FrameStream stream);
}

View File

@ -0,0 +1,242 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.junit.Assert.assertEquals;
public class DefaultHttp2PushPromiseFrameTest {
private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
private final ClientHandler clientHandler = new ClientHandler();
private final Map<Integer, String> contentMap = new ConcurrentHashMap<Integer, String>();
private ChannelFuture connectionFuture;
@Before
public void setup() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
Http2FrameCodec frameCodec = Http2FrameCodecBuilder.forServer()
.autoAckSettingsFrame(true)
.autoAckPingFrame(true)
.build();
pipeline.addLast(frameCodec);
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(0).sync();
final Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
Http2FrameCodec frameCodec = Http2FrameCodecBuilder.forClient()
.autoAckSettingsFrame(true)
.autoAckPingFrame(true)
.initialSettings(Http2Settings.defaultSettings().pushEnabled(true))
.build();
pipeline.addLast(frameCodec);
pipeline.addLast(clientHandler);
}
});
connectionFuture = bootstrap.connect(channelFuture.channel().localAddress());
}
@Test
public void send() {
connectionFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
clientHandler.write();
}
});
}
@After
public void shutdown() {
eventLoopGroup.shutdownGracefully();
}
private final class ServerHandler extends Http2ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
final Http2HeadersFrame receivedFrame = (Http2HeadersFrame) msg;
Http2Headers pushRequestHeaders = new DefaultHttp2Headers();
pushRequestHeaders.path("/meow")
.method("GET")
.scheme("https")
.authority("localhost:5555");
// Write PUSH_PROMISE request headers
final Http2FrameStream newPushFrameStream = newStream();
Http2PushPromiseFrame pushPromiseFrame = new DefaultHttp2PushPromiseFrame(pushRequestHeaders);
pushPromiseFrame.stream(receivedFrame.stream());
pushPromiseFrame.pushStream(newPushFrameStream);
ctx.writeAndFlush(pushPromiseFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
contentMap.put(newPushFrameStream.id(), "Meow, I am Pushed via HTTP/2");
// Write headers for actual request
Http2Headers http2Headers = new DefaultHttp2Headers();
http2Headers.status("200");
http2Headers.add("push", "false");
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, false);
headersFrame.stream(receivedFrame.stream());
ChannelFuture channelFuture = ctx.writeAndFlush(headersFrame);
// Write Data of actual request
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Http2DataFrame dataFrame = new DefaultHttp2DataFrame(
Unpooled.wrappedBuffer("Meow".getBytes()), true);
dataFrame.stream(receivedFrame.stream());
ctx.writeAndFlush(dataFrame);
}
});
}
});
} else if (msg instanceof Http2PriorityFrame) {
Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
String content = contentMap.get(priorityFrame.stream().id());
if (content == null) {
ctx.writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.REFUSED_STREAM));
return;
}
// Write headers for Priority request
Http2Headers http2Headers = new DefaultHttp2Headers();
http2Headers.status("200");
http2Headers.add("push", "true");
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, false);
headersFrame.stream(priorityFrame.stream());
ctx.writeAndFlush(headersFrame);
// Write Data of Priority request
Http2DataFrame dataFrame = new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(content.getBytes()), true);
dataFrame.stream(priorityFrame.stream());
ctx.writeAndFlush(dataFrame);
}
}
}
private static final class ClientHandler extends Http2ChannelDuplexHandler {
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) throws InterruptedException {
this.ctx = ctx;
}
void write() {
Http2Headers http2Headers = new DefaultHttp2Headers();
http2Headers.path("/")
.authority("localhost")
.method("GET")
.scheme("https");
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, true);
headersFrame.stream(newStream());
ctx.writeAndFlush(headersFrame);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2PushPromiseFrame) {
Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
assertEquals("/meow", pushPromiseFrame.http2Headers().path().toString());
assertEquals("GET", pushPromiseFrame.http2Headers().method().toString());
assertEquals("https", pushPromiseFrame.http2Headers().scheme().toString());
assertEquals("localhost:5555", pushPromiseFrame.http2Headers().authority().toString());
Http2PriorityFrame priorityFrame = new DefaultHttp2PriorityFrame(pushPromiseFrame.stream().id(),
Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, true);
priorityFrame.stream(pushPromiseFrame.pushStream());
ctx.writeAndFlush(priorityFrame);
} else if (msg instanceof Http2HeadersFrame) {
Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg;
if (headersFrame.stream().id() == 3) {
assertEquals("200", headersFrame.headers().status().toString());
assertEquals("false", headersFrame.headers().get("push").toString());
} else if (headersFrame.stream().id() == 2) {
assertEquals("200", headersFrame.headers().status().toString());
assertEquals("true", headersFrame.headers().get("push").toString());
} else {
ctx.writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.REFUSED_STREAM));
}
} else if (msg instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) msg;
try {
if (dataFrame.stream().id() == 3) {
assertEquals("Meow", dataFrame.content().toString(CharsetUtil.UTF_8));
} else if (dataFrame.stream().id() == 2) {
assertEquals("Meow, I am Pushed via HTTP/2", dataFrame.content().toString(CharsetUtil.UTF_8));
} else {
ctx.writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.REFUSED_STREAM));
}
} finally {
ReferenceCountUtil.release(dataFrame);
}
}
}
}
}