Adding support for HTTP/2 initialization handshake.

Motivation:

The current HTTP/2 support does not properly comply with the HTTP/2 spec
wrt startup.

Modifications:

Changed the frame codec as well as the connection handler to support
exchange of the connection preface, followed immediately by an initial
settings frame.

Result:

The HTTP/2 initialization handshake will be in compliance with the spec.
Will need more work to support the upgrade protocols, however :)
This commit is contained in:
nmittler 2014-04-09 16:23:25 -07:00 committed by Norman Maurer
parent c71e9e485e
commit e769cb3917
11 changed files with 520 additions and 27 deletions

View File

@ -65,6 +65,11 @@ public class DefaultInboundFlowController implements InboundFlowController {
}
}
@Override
public int getInitialInboundWindowSize() {
return initialWindowSize;
}
@Override
public void applyInboundFlowControl(Http2DataFrame dataFrame, FrameWriter frameWriter)
throws Http2Exception {

View File

@ -78,6 +78,11 @@ public class DefaultOutboundFlowController implements OutboundFlowController {
}
}
@Override
public int getInitialOutboundWindowSize() {
return initialWindowSize;
}
@Override
public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception {
StreamState streamWindow;

View File

@ -18,13 +18,14 @@ package io.netty.handler.codec.http2.draft10.connection;
import static io.netty.handler.codec.http2.draft10.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.draft10.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.draft10.Http2Exception.format;
import static io.netty.handler.codec.http2.draft10.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.draft10.connection.Http2ConnectionUtil.toHttp2Exception;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
@ -48,11 +49,41 @@ import io.netty.handler.codec.http2.draft10.frame.Http2StreamFrame;
import io.netty.handler.codec.http2.draft10.frame.Http2WindowUpdateFrame;
import io.netty.util.ReferenceCountUtil;
/**
* Handler for HTTP/2 connection state. Manages inbound and outbound flow control for data frames.
* Handles error conditions as defined by the HTTP/2 spec and controls appropriate shutdown of the
* connection.
* <p>
* Propagates the following inbound frames to downstream handlers:<br>
* {@link Http2DataFrame}<br>
* {@link Http2HeadersFrame}<br>
* {@link Http2PushPromiseFrame}<br>
* {@link Http2PriorityFrame}<br>
* {@link Http2RstStreamFrame}<br>
* {@link Http2GoAwayFrame}<br>
* {@link Http2WindowUpdateFrame}<br>
* {@link Http2SettingsFrame}<br>
* <p>
* The following outbound frames are allowed from downstream handlers:<br>
* {@link Http2DataFrame}<br>
* {@link Http2HeadersFrame}<br>
* {@link Http2PushPromiseFrame}<br>
* {@link Http2PriorityFrame}<br>
* {@link Http2RstStreamFrame}<br>
* {@link Http2PingFrame} (non-ack)<br>
* {@link Http2SettingsFrame} (non-ack)<br>
* <p>
* All outbound frames are disallowed after a connection shutdown has begun by sending a goAway
* frame to the remote endpoint. In addition, no outbound frames are allowed until the first non-ack
* settings frame is received from the remote endpoint.
*/
public class Http2ConnectionHandler extends ChannelHandlerAdapter {
private final Http2Connection connection;
private final InboundFlowController inboundFlow;
private final OutboundFlowController outboundFlow;
private boolean initialSettingsSent;
private boolean initialSettingsReceived;
public Http2ConnectionHandler(boolean server) {
this(new DefaultHttp2Connection(server));
@ -114,7 +145,11 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object inMsg) throws Exception {
try {
if (inMsg instanceof Http2DataFrame) {
if (inMsg == CONNECTION_PREFACE) {
// The connection preface has been received from the remote endpoint, we're
// beginning an HTTP2 connection. Send the initial settings to the remote endpoint.
sendInitialSettings(ctx);
} else if (inMsg instanceof Http2DataFrame) {
handleInboundData(ctx, (Http2DataFrame) inMsg);
} else if (inMsg instanceof Http2HeadersFrame) {
handleInboundHeaders(ctx, (Http2HeadersFrame) inMsg);
@ -146,6 +181,12 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
try {
if (!initialSettingsReceived) {
throw protocolError(
"Attempting to send frame (%s) before initial settings received", msg
.getClass().getName());
}
if (msg instanceof Http2DataFrame) {
handleOutboundData(ctx, (Http2DataFrame) msg, promise);
} else if (msg instanceof Http2HeadersFrame) {
@ -207,6 +248,7 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
private void handleInboundData(final ChannelHandlerContext ctx, Http2DataFrame frame)
throws Http2Exception {
verifyInitialSettingsReceived();
// Check if we received a data frame for a stream which is half-closed
Http2Stream stream = connection.getStreamOrFail(frame.getStreamId());
@ -236,6 +278,8 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
private void handleInboundHeaders(ChannelHandlerContext ctx, Http2HeadersFrame frame)
throws Http2Exception {
verifyInitialSettingsReceived();
if (isInboundStreamAfterGoAway(frame)) {
return;
}
@ -269,6 +313,8 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
private void handleInboundPushPromise(ChannelHandlerContext ctx, Http2PushPromiseFrame frame)
throws Http2Exception {
verifyInitialSettingsReceived();
if (isInboundStreamAfterGoAway(frame)) {
// Ignore frames for any stream created after we sent a go-away.
return;
@ -283,6 +329,8 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
private void handleInboundPriority(ChannelHandlerContext ctx, Http2PriorityFrame frame)
throws Http2Exception {
verifyInitialSettingsReceived();
if (isInboundStreamAfterGoAway(frame)) {
// Ignore frames for any stream created after we sent a go-away.
return;
@ -304,6 +352,8 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
private void handleInboundWindowUpdate(ChannelHandlerContext ctx, Http2WindowUpdateFrame frame)
throws Http2Exception {
verifyInitialSettingsReceived();
if (isInboundStreamAfterGoAway(frame)) {
// Ignore frames for any stream created after we sent a go-away.
return;
@ -325,7 +375,10 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
ctx.fireChannelRead(frame);
}
private void handleInboundRstStream(ChannelHandlerContext ctx, Http2RstStreamFrame frame) {
private void handleInboundRstStream(ChannelHandlerContext ctx, Http2RstStreamFrame frame)
throws Http2Exception {
verifyInitialSettingsReceived();
if (isInboundStreamAfterGoAway(frame)) {
// Ignore frames for any stream created after we sent a go-away.
return;
@ -342,7 +395,10 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
ctx.fireChannelRead(frame);
}
private static void handleInboundPing(ChannelHandlerContext ctx, Http2PingFrame frame) {
private void handleInboundPing(ChannelHandlerContext ctx, Http2PingFrame frame)
throws Http2Exception {
verifyInitialSettingsReceived();
if (frame.isAck()) {
// The remote enpoint is responding to an Ack that we sent.
ctx.fireChannelRead(frame);
@ -358,6 +414,10 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
private void handleInboundSettings(ChannelHandlerContext ctx, Http2SettingsFrame frame)
throws Http2Exception {
if (frame.isAck()) {
// Should not get an ack before receiving the initial settings from the remote
// endpoint.
verifyInitialSettingsReceived();
// The remote endpoint is acknowledging the settings - fire this up to the next
// handler.
ctx.fireChannelRead(frame);
@ -386,6 +446,10 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
// Acknowledge receipt of the settings.
Http2Frame ack = new DefaultHttp2SettingsFrame.Builder().setAck(true).build();
ctx.writeAndFlush(ack);
// We've received at least one non-ack settings frame from the remote endpoint.
initialSettingsReceived = true;
ctx.fireChannelRead(frame);
}
private void handleInboundGoAway(ChannelHandlerContext ctx, Http2GoAwayFrame frame) {
@ -575,4 +639,35 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
}
ctx.writeAndFlush(frame, promise);
}
private void verifyInitialSettingsReceived() throws Http2Exception {
if (!initialSettingsReceived) {
throw protocolError("Received non-SETTINGS as first frame.");
}
}
/**
* Sends the initial settings frame upon establishment of the connection, if not already sent.
*/
private void sendInitialSettings(ChannelHandlerContext ctx) throws Http2Exception {
if (initialSettingsSent) {
throw protocolError("Already sent initial settings.");
}
// Create and send the frame to the remote endpoint.
DefaultHttp2SettingsFrame frame =
new DefaultHttp2SettingsFrame.Builder()
.setInitialWindowSize(inboundFlow.getInitialInboundWindowSize())
.setMaxConcurrentStreams(connection.remote().getMaxStreams())
.setPushEnabled(connection.local().isPushToAllowed()).build();
ctx.writeAndFlush(frame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
initialSettingsSent = true;
}
}
});
}
}

View File

@ -44,6 +44,11 @@ public interface InboundFlowController {
*/
void setInitialInboundWindowSize(int newWindowSize) throws Http2Exception;
/**
* Gets the initial inbound flow control window size.
*/
int getInitialInboundWindowSize();
/**
* Applies flow control for the received data frame.
*

View File

@ -50,6 +50,11 @@ public interface OutboundFlowController {
*/
void setInitialOutboundWindowSize(int newWindowSize) throws Http2Exception;
/**
* Gets the initial size of the connection's outbound flow control window.
*/
int getInitialOutboundWindowSize();
/**
* Updates the size of the stream's outbound flow control window. This is called upon receiving a
* WINDOW_UPDATE frame from the remote endpoint.

View File

@ -16,6 +16,8 @@
package io.netty.handler.codec.http2.draft10.frame;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
/**
* Constants and utility method used for encoding/decoding HTTP2 frames.
@ -23,6 +25,20 @@ import io.netty.buffer.ByteBuf;
public final class Http2FrameCodecUtil {
public static final int CONNECTION_STREAM_ID = 0;
public static final String CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
private static final ByteBuf CONNECTION_PREFACE_BUF = Unpooled.unmodifiableBuffer(Unpooled
.copiedBuffer(CONNECTION_PREFACE, CharsetUtil.UTF_8));
/**
* Returns a buffer containing the the {@link #CONNECTION_PREFACE}.
*/
public static ByteBuf connectionPrefaceBuf() {
// Return a duplicate so that modifications to the reader index will not affect the original
// buffer.
return CONNECTION_PREFACE_BUF.duplicate().retain();
}
public static final int DEFAULT_STREAM_PRIORITY = 0x40000000; // 2^30
public static final int MAX_FRAME_PAYLOAD_LENGTH = 16383;

View File

@ -15,10 +15,13 @@
package io.netty.handler.codec.http2.draft10.frame.decoder;
import static io.netty.handler.codec.http2.draft10.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.draft10.Http2Exception.format;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.FRAME_HEADER_LENGTH;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.FRAME_LENGTH_MASK;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.readUnsignedInt;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
@ -40,12 +43,14 @@ import java.util.List;
public class Http2FrameDecoder extends ByteToMessageDecoder {
private enum State {
PREFACE,
FRAME_HEADER,
FRAME_PAYLOAD,
ERROR
}
private final Http2FrameUnmarshaller frameUnmarshaller;
private final ByteBuf preface;
private State state;
private int payloadLength;
@ -58,13 +63,23 @@ public class Http2FrameDecoder extends ByteToMessageDecoder {
throw new NullPointerException("frameUnmarshaller");
}
this.frameUnmarshaller = frameUnmarshaller;
state = State.FRAME_HEADER;
preface = connectionPrefaceBuf();
state = State.PREFACE;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
switch (state) {
case PREFACE:
processHttp2Preface(ctx, in);
if (state == State.PREFACE) {
// Still processing the preface.
break;
}
// Successfully processed the HTTP2 preface.
case FRAME_HEADER:
processFrameHeader(in);
if (state == State.FRAME_HEADER) {
@ -90,6 +105,34 @@ public class Http2FrameDecoder extends ByteToMessageDecoder {
}
}
private void processHttp2Preface(ChannelHandlerContext ctx, ByteBuf in) throws Http2Exception {
int prefaceRemaining = preface.readableBytes();
int bytesRead = Math.min(in.readableBytes(), prefaceRemaining);
// Read the portion of the input up to the length of the preface, if reached.
ByteBuf sourceSlice = in.readSlice(bytesRead);
// Read the same number of bytes from the preface buffer.
ByteBuf prefaceSlice = preface.readSlice(bytesRead);
// If the input so far doesn't match the preface, break the connection.
if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) {
throw format(PROTOCOL_ERROR, "Invalid HTTP2 preface");
}
if ((prefaceRemaining - bytesRead) > 0) {
// Wait until the entire preface has arrived.
return;
}
// Fire the connection preface to notify the connection handler that it should send the
// initial settings frame.
ctx.fireChannelRead(CONNECTION_PREFACE);
// Start processing the first header.
state = State.FRAME_HEADER;
}
private void processFrameHeader(ByteBuf in) throws Http2Exception {
if (in.readableBytes() < FRAME_HEADER_LENGTH) {
// Wait until the entire frame header has been read.
@ -98,6 +141,7 @@ public class Http2FrameDecoder extends ByteToMessageDecoder {
// Read the header and prepare the unmarshaller to read the frame.
Http2FrameHeader frameHeader = readFrameHeader(in);
payloadLength = frameHeader.getPayloadLength();
frameUnmarshaller.unmarshall(frameHeader);

View File

@ -15,7 +15,11 @@
package io.netty.handler.codec.http2.draft10.frame.encoder;
import static io.netty.handler.codec.http2.draft10.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.connectionPrefaceBuf;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.http2.draft10.frame.Http2Frame;
@ -30,6 +34,8 @@ import io.netty.handler.codec.http2.draft10.frame.Http2Frame;
public class Http2FrameEncoder extends MessageToByteEncoder<Http2Frame> {
private final Http2FrameMarshaller frameMarshaller;
private ChannelFutureListener prefaceWriteListener;
private boolean prefaceWritten;
public Http2FrameEncoder() {
this(new Http2StandardFrameMarshaller());
@ -43,11 +49,55 @@ public class Http2FrameEncoder extends MessageToByteEncoder<Http2Frame> {
}
@Override
protected void encode(ChannelHandlerContext ctx, Http2Frame frame, ByteBuf out) throws Exception {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the HTTP2 connection preface to the remote
// endpoint.
sendPreface(ctx);
super.channelActive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// This handler was just added to the context. In case it was handled after
// the connection became active, send the HTTP2 connection preface now.
sendPreface(ctx);
}
@Override
protected void encode(ChannelHandlerContext ctx, Http2Frame frame, ByteBuf out)
throws Exception {
try {
if (!prefaceWritten) {
throw protocolError(
"Attempting to send frame before connection preface written: %s", frame
.getClass().getName());
}
frameMarshaller.marshall(frame, out, ctx.alloc());
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
/**
* Sends the HTTP2 connection preface to the remote endpoint, if not already sent.
*/
private void sendPreface(final ChannelHandlerContext ctx) {
if (!prefaceWritten && prefaceWriteListener == null && ctx.channel().isActive()) {
prefaceWriteListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
prefaceWritten = true;
prefaceWriteListener = null;
} else if (ctx.channel().isOpen()) {
// The write failed, close the connection.
ctx.close();
}
}
};
ctx.writeAndFlush(connectionPrefaceBuf()).addListener(prefaceWriteListener);
}
}
}

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.http2.draft10.connection;
import static io.netty.handler.codec.http2.draft10.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.PING_FRAME_PAYLOAD_LENGTH;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
@ -59,6 +60,7 @@ import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/**
@ -106,9 +108,6 @@ public class Http2ConnectionHandlerTest {
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(channel.isActive()).thenReturn(true);
when(stream.getId()).thenReturn(STREAM_ID);
when(pushStream.getId()).thenReturn(PUSH_STREAM_ID);
@ -121,8 +120,17 @@ public class Http2ConnectionHandlerTest {
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(remote.createStream(eq(STREAM_ID), anyInt(), anyBoolean())).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
mockContext();
handler = new Http2ConnectionHandler(connection, inboundFlow, outboundFlow);
// Send the connection preface followed by an empty settings frame. This is required
// before the handler will accept other frames.
handler.channelRead(ctx, CONNECTION_PREFACE);
handler.channelRead(ctx, new DefaultHttp2SettingsFrame.Builder().build());
// Re-mock the context so no calls are registered.
mockContext();
}
@Test
@ -406,7 +414,7 @@ public class Http2ConnectionHandlerTest {
verify(remote).setPushToAllowed(true);
verify(local).setMaxStreams(10);
verify(outboundFlow).setInitialOutboundWindowSize(20);
verify(ctx, never()).fireChannelRead(frame);
verify(ctx).fireChannelRead(frame);
verify(ctx).writeAndFlush(eq(new DefaultHttp2SettingsFrame.Builder().setAck(true).build()));
}
@ -666,4 +674,12 @@ public class Http2ConnectionHandlerTest {
return new DefaultHttp2RstStreamFrame.Builder().setStreamId(streamId)
.setErrorCode(error.getCode()).build();
}
private void mockContext() {
Mockito.reset(ctx);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.writeAndFlush(any())).thenReturn(future);
}
}

View File

@ -72,6 +72,7 @@ public class DefaultHttp2FrameRoundtripTest {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("codec", new Http2FrameCodec());
p.addLast("ignorePreface", new IgnorePrefaceHandler());
p.addLast("handler", captureHandler);
}
});
@ -83,6 +84,7 @@ public class DefaultHttp2FrameRoundtripTest {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("codec", new Http2FrameCodec());
p.addLast("ignorePreface", new IgnorePrefaceHandler());
}
});
@ -282,6 +284,17 @@ public class DefaultHttp2FrameRoundtripTest {
return captureHandler.frame;
}
private static class IgnorePrefaceHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == Http2FrameCodecUtil.CONNECTION_PREFACE) {
return;
} else {
super.channelRead(ctx, msg);
}
}
}
private static class CaptureHandler extends ChannelHandlerAdapter {
public volatile Http2Frame frame;
public volatile int count;

View File

@ -0,0 +1,239 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2.draft10.frame;
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertTrue;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.draft10.connection.Http2ConnectionHandler;
import io.netty.handler.codec.http2.draft10.frame.decoder.Http2FrameDecoder;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests the handling of HTTP connection preface and initial settings between client and server.
*/
public class Http2PrefaceTest {
private CaptureHandler serverHandler;
private CaptureHandler clientHandler;
private Channel serverChannel;
private int serverPort;
private List<EventExecutorGroup> groups;
@Before
public void setup() throws Exception {
groups = new ArrayList<EventExecutorGroup>();
groups.add(new NioEventLoopGroup());
serverHandler = new CaptureHandler(groups.get(0).next());
clientHandler = new CaptureHandler(groups.get(0).next());
ServerBootstrap sb = new ServerBootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
serverHandler = new CaptureHandler(sb.group().next());
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("codec", new Http2FrameCodec());
p.addLast("connection", new Http2ConnectionHandler(true));
p.addLast("handler", serverHandler);
}
});
groups.add(sb.group());
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
serverPort = ((InetSocketAddress) serverChannel.localAddress()).getPort();
}
@After
public void teardown() throws Exception {
serverChannel.close().sync();
for (EventExecutorGroup group : groups) {
group.shutdownGracefully();
}
}
@Test
public void badPrefaceShouldCloseConnection() throws Exception {
createClientChannel(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("badPrefaceGenerator", new ChannelHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("BAD_PREFACE".getBytes());
ctx.writeAndFlush(buffer);
super.channelActive(ctx);
}
});
p.addLast("decoder", new Http2FrameDecoder());
p.addLast("handler", clientHandler);
}
});
// Wait a bit and verify that the connection was closed.
assertTrue(serverHandler.awaitClose());
assertTrue(clientHandler.awaitClose());
}
@Test
public void prefaceNotFollowedBySettingsShouldCloseConnection() throws Exception {
createClientChannel(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("codec", new Http2FrameCodec());
p.addLast("wrongFrameGenerator", new ChannelHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == CONNECTION_PREFACE) {
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
Http2PingFrame frame =
new DefaultHttp2PingFrame.Builder().setData(buf).build();
ctx.writeAndFlush(frame);
} else {
super.channelRead(ctx, msg);
}
}
});
p.addLast("handler", clientHandler);
}
});
// Wait a bit and verify that the connection was closed.
assertTrue(serverHandler.awaitClose());
assertTrue(clientHandler.awaitClose());
}
@Test
public void settingsShouldBeExchangedAtStartup() throws Exception {
createClientChannel(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("codec", new Http2FrameCodec());
p.addLast("connection", new Http2ConnectionHandler(false));
p.addLast("handler", clientHandler);
}
});
// Wait a bit and verify that the settings were exchanged.
serverHandler.settings.get(1, TimeUnit.SECONDS);
serverHandler.settingsAck.get(1, TimeUnit.SECONDS);
clientHandler.settings.get(1, TimeUnit.SECONDS);
clientHandler.settingsAck.get(1, TimeUnit.SECONDS);
}
private Channel createClientChannel(ChannelHandler handler) {
Bootstrap cb = new Bootstrap();
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.handler(handler);
groups.add(cb.group());
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, serverPort));
assertTrue(ccf.awaitUninterruptibly().isSuccess());
return ccf.channel();
}
private static class CaptureHandler extends ChannelHandlerAdapter {
final DefaultPromise<Http2SettingsFrame> settings;
final DefaultPromise<Http2SettingsFrame> settingsAck;
final DefaultPromise<Void> initFuture;
Channel channel;
CaptureHandler(EventExecutor executor) {
settings = new DefaultPromise<Http2SettingsFrame>(executor);
settingsAck = new DefaultPromise<Http2SettingsFrame>(executor);
initFuture = new DefaultPromise<Void>(executor);
}
public boolean awaitClose() throws Exception {
initFuture.await();
for (int i = 0; channel.isOpen() && i < 5; ++i) {
Thread.sleep(10);
}
return !channel.isOpen();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
initFuture.setSuccess(null);
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof Http2SettingsFrame)) {
throw new Exception("Received wrong frame type: " + msg.getClass().getName());
}
Http2SettingsFrame frame = (Http2SettingsFrame) msg;
if (frame.isAck()) {
if (settingsAck.isDone()) {
throw new Exception("Already received settings ack");
}
settingsAck.setSuccess(frame);
} else {
if (settings.isDone()) {
throw new Exception("Already received settings");
}
settings.setSuccess(frame);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!settingsAck.isDone()) {
settingsAck.setFailure(cause);
}
if (!settings.isDone()) {
settings.setFailure(cause);
}
}
}
}