Clean up HTTP/2 integration tests

Related pull request: #2481 written by @nmittler

Motivation:

Some tests were still sending requests from the test thread rather than
from the event loop.

Modifications:

- Modified the roundtrip tests to use a new utility Http2TestUtil to
  perform the write operations in the event loop thread.
- Modified the Http2Client under examples to write all requests in the
  event loop thread.
- Added HttpPrefaceHandler and its test which were missing.
- Fixed various inspector warnings

Result:

Tests and examples now send requests in the correct thread context.
This commit is contained in:
Trustin Lee 2014-07-02 16:56:09 +09:00
parent 66d8d8219a
commit cea3b6b2ab
5 changed files with 380 additions and 87 deletions

View File

@ -0,0 +1,121 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import static io.netty.handler.codec.http2.Http2CodecUtil.*;
/**
* Reads and writes the HTTP/2 connection preface, which must be the first bytes sent by both
* endpoints upon successful establishment of an HTTP/2 connection. After receiving the preface from
* the remote endpoint, this handler removes itself from the pipeline.
*
* https://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-3.5
*/
public class Http2PrefaceHandler extends ChannelHandlerAdapter {
private final boolean server;
private final ByteBuf preface = connectionPrefaceBuf();
private boolean prefaceWritten;
public Http2PrefaceHandler(boolean server) {
this.server = server;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the HTTP2 connection preface to the remote
// endpoint.
sendPreface(ctx);
super.channelActive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// This handler was just added to the context. In case it was handled after
// the connection became active, send the HTTP2 connection preface now.
sendPreface(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (server) {
// Only servers receive the preface string.
if (preface.isReadable() && msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
processHttp2Preface(ctx, buf);
if (preface.isReadable()) {
// More preface left to process.
buf.release();
return;
}
}
}
super.channelRead(ctx, msg);
}
/**
* Sends the HTTP2 connection preface to the remote endpoint, if not already sent.
*/
private void sendPreface(final ChannelHandlerContext ctx) {
if (server) {
// The preface string is only written by clients.
return;
}
if (!prefaceWritten && ctx.channel().isActive()) {
prefaceWritten = true;
ctx.writeAndFlush(connectionPrefaceBuf()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess() && ctx.channel().isOpen()) {
// The write failed, close the connection.
ctx.close();
} else {
ctx.pipeline().remove(Http2PrefaceHandler.this);
}
}
});
}
}
private void processHttp2Preface(ChannelHandlerContext ctx, ByteBuf in) {
int prefaceRemaining = preface.readableBytes();
int bytesRead = Math.min(in.readableBytes(), prefaceRemaining);
// Read the portion of the input up to the length of the preface, if reached.
ByteBuf sourceSlice = in.readSlice(bytesRead);
// Read the same number of bytes from the preface buffer.
ByteBuf prefaceSlice = preface.readSlice(bytesRead);
// If the input so far doesn't match the preface, break the connection.
if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) {
ctx.close();
return;
}
if (!preface.isReadable()) {
// Entire preface has been read, remove ourselves from the pipeline.
ctx.pipeline().remove(this);
}
}
}

View File

@ -15,6 +15,7 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@ -78,6 +79,7 @@ public class Http2ConnectionRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Http2PrefaceHandler(true));
p.addLast(new DelegatingHttp2ConnectionHandler(true, new FrameCountDown()));
}
});
@ -88,6 +90,7 @@ public class Http2ConnectionRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Http2PrefaceHandler(false));
p.addLast(new DelegatingHttp2ConnectionHandler(false, serverObserver));
}
});
@ -114,20 +117,18 @@ public class Http2ConnectionRoundtripTest {
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
final String text = "hello world";
runInChannel(clientChannel, new Http2TestUtil.Http2Runnable() {
@Override
public void run() throws Http2Exception {
for (int i = 0, nextStream = 3; i < NUM_STREAMS; ++i, nextStream += 2) {
final int streamId = nextStream;
clientChannel.eventLoop().execute(new Runnable() {
@Override
public void run() {
http2Client.writeHeaders(ctx(), newPromise(), streamId, headers, 0,
(short) 16, false, 0, false, false);
http2Client.writeHeaders(ctx(), newPromise(), streamId, headers, 0, (short) 16,
false, 0, false, false);
http2Client.writeData(ctx(), newPromise(), streamId,
Unpooled.copiedBuffer(text.getBytes()), 0, true, true, false);
}
});
}
});
// Wait for all frames to be received.
awaitRequests();
verify(serverObserver, times(NUM_STREAMS)).onHeadersRead(any(ChannelHandlerContext.class),

View File

@ -15,12 +15,6 @@
package io.netty.handler.codec.http2;
import static io.netty.util.CharsetUtil.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -36,11 +30,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.NetUtil;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -48,6 +37,16 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2TestUtil.*;
import static io.netty.util.CharsetUtil.*;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Tests encoding/decoding each HTTP2 frame type.
*/
@ -112,11 +111,14 @@ public class Http2FrameRoundtripTest {
@Test
public void dataFrameShouldMatch() throws Exception {
String text = "hello world";
final String text = "hello world";
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeData(ctx(), newPromise(), 0x7FFFFFFF,
Unpooled.copiedBuffer(text.getBytes()), 100, true, false, false);
}
});
awaitRequests();
verify(serverObserver).onDataRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
dataCaptor.capture(), eq(100), eq(true), eq(false), eq(false));
@ -124,11 +126,15 @@ public class Http2FrameRoundtripTest {
@Test
public void headersFrameWithoutPriorityShouldMatch() throws Exception {
Http2Headers headers =
final Http2Headers headers =
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 0, true, false);
}
});
awaitRequests();
verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(headers), eq(0), eq(true), eq(false));
@ -136,12 +142,16 @@ public class Http2FrameRoundtripTest {
@Test
public void headersFrameWithPriorityShouldMatch() throws Exception {
Http2Headers headers =
final Http2Headers headers =
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 4, (short) 255, true, 0,
true, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 4, (short) 255,
true, 0, true, false);
}
});
awaitRequests();
verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(headers), eq(4), eq((short) 255), eq(true), eq(0), eq(true), eq(false));
@ -149,10 +159,14 @@ public class Http2FrameRoundtripTest {
@Test
public void goAwayFrameShouldMatch() throws Exception {
String text = "test";
final String text = "test";
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeGoAway(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL,
Unpooled.copiedBuffer(text.getBytes()));
}
});
awaitRequests();
verify(serverObserver).onGoAwayRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0xFFFFFFFFL), dataCaptor.capture());
@ -160,17 +174,26 @@ public class Http2FrameRoundtripTest {
@Test
public void pingFrameShouldMatch() throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
final ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePing(ctx(), ctx().newPromise(), true, buf);
}
});
awaitRequests();
verify(serverObserver).onPingAckRead(any(ChannelHandlerContext.class), dataCaptor.capture());
verify(serverObserver)
.onPingAckRead(any(ChannelHandlerContext.class), dataCaptor.capture());
}
@Test
public void priorityFrameShouldMatch() throws Exception {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePriority(ctx(), newPromise(), 0x7FFFFFFF, 1, (short) 1, true);
}
});
awaitRequests();
verify(serverObserver).onPriorityRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(1), eq((short) 1), eq(true));
@ -178,11 +201,15 @@ public class Http2FrameRoundtripTest {
@Test
public void pushPromiseFrameShouldMatch() throws Exception {
Http2Headers headers =
final Http2Headers headers =
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePushPromise(ctx(), newPromise(), 0x7FFFFFFF, 1, headers, 5);
}
});
awaitRequests();
verify(serverObserver).onPushPromiseRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(1), eq(headers), eq(5));
@ -190,8 +217,12 @@ public class Http2FrameRoundtripTest {
@Test
public void rstStreamFrameShouldMatch() throws Exception {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeRstStream(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL);
}
});
awaitRequests();
verify(serverObserver).onRstStreamRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0xFFFFFFFFL));
@ -199,21 +230,29 @@ public class Http2FrameRoundtripTest {
@Test
public void settingsFrameShouldMatch() throws Exception {
Http2Settings settings = new Http2Settings();
final Http2Settings settings = new Http2Settings();
settings.allowCompressedData(true);
settings.initialWindowSize(10);
settings.maxConcurrentStreams(1000);
settings.maxHeaderTableSize(4096);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeSettings(ctx(), newPromise(), settings);
}
});
awaitRequests();
verify(serverObserver).onSettingsRead(any(ChannelHandlerContext.class), eq(settings));
}
@Test
public void windowUpdateFrameShouldMatch() throws Exception {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeWindowUpdate(ctx(), newPromise(), 0x7FFFFFFF, 0x7FFFFFFF);
}
});
awaitRequests();
verify(serverObserver).onWindowUpdateRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0x7FFFFFFF));
@ -221,22 +260,24 @@ public class Http2FrameRoundtripTest {
@Test
public void stressTest() throws Exception {
Http2Headers headers =
final Http2Headers headers =
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
String text = "hello world";
int numStreams = 1000;
final String text = "hello world";
final int numStreams = 10000;
int expectedFrames = numStreams * 2;
requestLatch = new CountDownLatch(expectedFrames);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
for (int i = 1; i < numStreams + 1; ++i) {
frameWriter.writeHeaders(ctx(), newPromise(), i, headers, 0, (short) 16, false, 0,
false, false);
frameWriter.writeData(ctx(), newPromise(), i, Unpooled.copiedBuffer(text.getBytes()),
0, true, true, false);
frameWriter.writeHeaders(ctx(), newPromise(), i, headers, 0, (short) 16, false,
0, false, false);
frameWriter.writeData(ctx(), newPromise(), i,
Unpooled.copiedBuffer(text.getBytes()), 0, true, true, false);
}
// Wait for all frames to be received.
}
});
awaitRequests();
}
@ -323,20 +364,23 @@ public class Http2FrameRoundtripTest {
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
observer.onPingRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
observer.onPingAckRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding)
throws Http2Exception {
observer.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
requestLatch.countDown();
}
@ -365,7 +409,8 @@ public class Http2FrameRoundtripTest {
}
@Override
public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
public void onBlockedRead(ChannelHandlerContext ctx, int streamId)
throws Http2Exception {
observer.onBlockedRead(ctx, streamId);
requestLatch.countDown();
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import java.util.Queue;
import static io.netty.handler.codec.http2.Http2CodecUtil.*;
import static io.netty.util.CharsetUtil.*;
import static org.junit.Assert.*;
/**
* Tests for {@link Http2PrefaceHandler}.
*/
public class Http2PrefaceHandlerTest {
@Test
public void clientShouldWritePrefaceAtStartup() {
EmbeddedChannel channel = createChannel(false);
// Ensure that the preface was automatically written at startup.
Queue<Object> outboundMessages = channel.outboundMessages();
assertTrue(channel.isOpen());
assertNull(channel.pipeline().get(Http2PrefaceHandler.class));
assertTrue(channel.finish());
assertEquals(1, outboundMessages.size());
assertEquals(connectionPrefaceBuf(), outboundMessages.peek());
}
@Test
public void serverShouldNotWritePrefaceAtStartup() {
EmbeddedChannel channel = createChannel(true);
// Ensure that the preface was automatically written at startup.
Queue<Object> outboundMessages = channel.outboundMessages();
assertTrue(channel.isOpen());
assertNotNull(channel.pipeline().get(Http2PrefaceHandler.class));
assertFalse(channel.finish());
assertTrue(outboundMessages.isEmpty());
}
@Test
public void serverShouldBeRemovedAfterReceivingPreface() {
EmbeddedChannel channel = createChannel(true);
// Simulate receiving the preface.
assertTrue(channel.writeInbound(connectionPrefaceBuf()));
assertNull(channel.pipeline().get(Http2PrefaceHandler.class));
assertTrue(channel.isOpen());
assertTrue(channel.finish());
}
@Test
public void serverReceivingBadPrefaceShouldCloseTheConnection() {
EmbeddedChannel channel = createChannel(true);
// Simulate receiving the bad preface.
assertFalse(channel.writeInbound(Unpooled.copiedBuffer("BAD_PREFACE", UTF_8)));
assertFalse(channel.isOpen());
assertFalse(channel.finish());
}
private static EmbeddedChannel createChannel(boolean server) {
return new EmbeddedChannel(new Http2PrefaceHandler(server));
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.Channel;
/**
* Utilities for the integration tests.
*/
final class Http2TestUtil {
/**
* Interface that allows for running a operation that throws a {@link Http2Exception}.
*/
interface Http2Runnable {
void run() throws Http2Exception;
}
/**
* Runs the given operation within the event loop thread of the given {@link Channel}.
*/
static void runInChannel(Channel channel, final Http2Runnable runnable) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
}
});
}
private Http2TestUtil() {
}
}