HTTP/2 Unit Test Leak Fixes

Motivation:
The HTTP/2 tests do not always clean up ByteBuf resources reliably. There are issues with the refCnt, over allocating buffers, and potentially not waiting long enough to reclaim resources for stress tests.

Modifications:
Scrub the HTTP/2 unit tests for ByteBuf leaks.

Result:
Less leaks (hopefully none) in the HTTP/2 unit tests. No OOME from HTTP/2 unit tests.
This commit is contained in:
Scott Mitchell 2014-09-14 11:33:51 -04:00
parent 96a044fabe
commit 2cf6ed9460
10 changed files with 1266 additions and 1013 deletions

View File

@ -182,7 +182,7 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
data.readerIndex(0);
data.resetReaderIndex();
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
@ -216,7 +216,7 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
data.readerIndex(0);
data.resetReaderIndex();
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
@ -254,8 +254,8 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
data1.readerIndex(0);
data2.readerIndex(0);
data1.resetReaderIndex();
data2.resetReaderIndex();
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
ArgumentCaptor<Boolean> endStreamCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(serverListener, times(2)).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(),
@ -297,7 +297,7 @@ public class DataCompressionHttp2Test {
}
});
awaitServer();
data.readerIndex(0);
data.resetReaderIndex();
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));

View File

@ -26,7 +26,6 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Before;
import org.junit.Test;
@ -66,66 +65,96 @@ public class DefaultHttp2FrameIOTest {
@Test
public void emptyDataShouldRoundtrip() throws Exception {
ByteBuf data = Unpooled.EMPTY_BUFFER;
final ByteBuf data = Unpooled.EMPTY_BUFFER;
writer.writeData(ctx, 1000, data, 0, false, promise);
ByteBuf frame = captureWrite();
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void dataShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
final ByteBuf data = dummyData();
writer.writeData(ctx, 1000, data.retain().duplicate(), 0, false, promise);
ByteBuf frame = captureWrite();
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
verify(listener).onDataRead(eq(ctx), eq(1000), eq(data), eq(0), eq(false));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void dataWithPaddingShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
final ByteBuf data = dummyData();
writer.writeData(ctx, 1, data.retain().duplicate(), 0xFF, true, promise);
ByteBuf frame = captureWrite();
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
verify(listener).onDataRead(eq(ctx), eq(1), eq(data), eq(0xFF), eq(true));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void priorityShouldRoundtrip() throws Exception {
writer.writePriority(ctx, 1, 2, (short) 255, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onPriorityRead(eq(ctx), eq(1), eq(2), eq((short) 255), eq(true));
} finally {
frame.release();
}
}
@Test
public void rstStreamShouldRoundtrip() throws Exception {
writer.writeRstStream(ctx, 1, MAX_UNSIGNED_INT, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onRstStreamRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT));
} finally {
frame.release();
}
}
@Test
public void emptySettingsShouldRoundtrip() throws Exception {
writer.writeSettings(ctx, new Http2Settings(), promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings()));
} finally {
frame.release();
}
}
@Test
public void settingsShouldStripShouldRoundtrip() throws Exception {
@ -138,175 +167,251 @@ public class DefaultHttp2FrameIOTest {
writer.writeSettings(ctx, settings, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onSettingsRead(eq(ctx), eq(settings));
} finally {
frame.release();
}
}
@Test
public void settingsAckShouldRoundtrip() throws Exception {
writer.writeSettingsAck(ctx, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onSettingsAckRead(eq(ctx));
} finally {
frame.release();
}
}
@Test
public void pingShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writePing(ctx, false, data.retain().duplicate(), promise);
ByteBuf frame = captureWrite();
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
verify(listener).onPingRead(eq(ctx), eq(data));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void pingAckShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writePing(ctx, true, data.retain().duplicate(), promise);
ByteBuf frame = captureWrite();
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
verify(listener).onPingAckRead(eq(ctx), eq(data));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void goAwayShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writeGoAway(ctx, 1, MAX_UNSIGNED_INT, data.retain().duplicate(), promise);
ByteBuf frame = captureWrite();
ByteBuf frame = null;
try {
frame = captureWrite();
reader.readFrame(ctx, frame, listener);
verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT), eq(data));
} finally {
if (frame != null) {
frame.release();
}
data.release();
}
}
@Test
public void windowUpdateShouldRoundtrip() throws Exception {
writer.writeWindowUpdate(ctx, 1, Integer.MAX_VALUE, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onWindowUpdateRead(eq(ctx), eq(1), eq(Integer.MAX_VALUE));
} finally {
frame.release();
}
}
@Test
public void emptyHeadersShouldRoundtrip() throws Exception {
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
writer.writeHeaders(ctx, 1, headers, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void emptyHeadersWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithoutPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithPaddingWithoutPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0),
eq(true));
verify(listener)
.onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void headersWithPaddingWithPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF),
eq(true));
} finally {
frame.release();
}
}
@Test
public void continuedHeadersShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0),
eq(true));
verify(listener)
.onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), eq(true));
} finally {
frame.release();
}
}
@Test
public void continuedHeadersWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF),
eq(true));
} finally {
frame.release();
}
}
@Test
public void emptypushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
} finally {
frame.release();
}
}
@Test
public void pushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
} finally {
frame.release();
}
}
@Test
public void pushPromiseWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF));
} finally {
frame.release();
}
}
@Test
public void continuedPushPromiseShouldRoundtrip() throws Exception {
@ -322,11 +427,15 @@ public class DefaultHttp2FrameIOTest {
public void continuedPushPromiseWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise);
ByteBuf frame = captureWrite();
try {
reader.readFrame(ctx, frame, listener);
verify(listener).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF));
} finally {
frame.release();
}
}
private ByteBuf captureWrite() {
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
@ -335,12 +444,12 @@ public class DefaultHttp2FrameIOTest {
}
private ByteBuf dummyData() {
return ReferenceCountUtil.releaseLater(alloc.buffer().writeBytes("abcdefgh".getBytes(CharsetUtil.UTF_8)));
return alloc.buffer().writeBytes("abcdefgh".getBytes(CharsetUtil.UTF_8));
}
private static Http2Headers dummyHeaders() {
return DefaultHttp2Headers.newBuilder().method("GET").scheme("https")
.authority("example.org").path("/some/path").add("accept", "*/*").build();
return DefaultHttp2Headers.newBuilder().method("GET").scheme("https").authority("example.org")
.path("/some/path").add("accept", "*/*").build();
}
private static Http2Headers largeHeaders() {

View File

@ -27,7 +27,6 @@ import org.junit.Test;
import com.twitter.hpack.Encoder;
/**
* Tests for {@link DefaultHttp2HeadersDecoder}.
*/
@ -42,22 +41,30 @@ public class DefaultHttp2HeadersDecoderTest {
@Test
public void decodeShouldSucceed() throws Exception {
ByteBuf buf = encode(":method", "GET", "akey", "avalue");
final ByteBuf buf = encode(":method", "GET", "akey", "avalue");
try {
Http2Headers headers = decoder.decodeHeaders(buf).build();
assertEquals(2, headers.size());
assertEquals("GET", headers.method());
assertEquals("avalue", headers.get("akey"));
} finally {
buf.release();
}
}
@Test(expected = Http2Exception.class)
public void decodeWithInvalidPseudoHeaderShouldFail() throws Exception {
ByteBuf buf = encode(":invalid", "GET", "akey", "avalue");
final ByteBuf buf = encode(":invalid", "GET", "akey", "avalue");
try {
decoder.decodeHeaders(buf);
} finally {
buf.release();
}
}
private ByteBuf encode(String... entries) throws Exception {
Encoder encoder = new Encoder();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
final Encoder encoder = new Encoder();
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
for (int ix = 0; ix < entries.length;) {
String key = entries[ix++];
String value = entries[ix++];

View File

@ -22,7 +22,6 @@ import io.netty.buffer.Unpooled;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for {@link DefaultHttp2HeadersEncoder}.
*/
@ -37,17 +36,21 @@ public class DefaultHttp2HeadersEncoderTest {
@Test
public void encodeShouldSucceed() throws Http2Exception {
DefaultHttp2Headers headers =
DefaultHttp2Headers.newBuilder().method("GET").add("a", "1").add("a", "2").build();
ByteBuf buf = Unpooled.buffer();
DefaultHttp2Headers headers = DefaultHttp2Headers.newBuilder().method("GET").add("a", "1").add("a", "2")
.build();
final ByteBuf buf = Unpooled.buffer();
try {
encoder.encodeHeaders(headers, buf);
assertTrue(buf.writerIndex() > 0);
} finally {
buf.release();
}
}
@Test(expected = Http2Exception.class)
public void headersExceedMaxSetSizeShouldFail() throws Http2Exception {
DefaultHttp2Headers headers =
DefaultHttp2Headers.newBuilder().method("GET").add("a", "1").add("a", "2").build();
DefaultHttp2Headers headers = DefaultHttp2Headers.newBuilder().method("GET").add("a", "1").add("a", "2")
.build();
encoder.maxHeaderListSize(2);
encoder.encodeHeaders(headers, Unpooled.buffer());

View File

@ -153,19 +153,21 @@ public class DefaultHttp2InboundFlowControllerTest {
}
private void applyFlowControl(int dataSize, int padding, boolean endOfStream) throws Http2Exception {
ByteBuf buf = dummyData(dataSize);
final ByteBuf buf = dummyData(dataSize);
try {
controller.onDataRead(ctx, STREAM_ID, buf, padding, endOfStream);
} finally {
buf.release();
}
}
private static ByteBuf dummyData(int size) {
ByteBuf buffer = Unpooled.buffer(size);
final ByteBuf buffer = Unpooled.buffer(size);
buffer.writerIndex(size);
return buffer;
}
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement)
throws Http2Exception {
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception {
verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise));
}
@ -174,7 +176,7 @@ public class DefaultHttp2InboundFlowControllerTest {
}
private void verifyWindowUpdateNotSent() throws Http2Exception {
verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(),
anyInt(), any(ChannelPromise.class));
verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), anyInt(),
any(ChannelPromise.class));
}
}

View File

@ -122,23 +122,31 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void frameShouldBeSentImmediately() throws Http2Exception {
ByteBuf data = dummyData(5, 5);
final ByteBuf data = dummyData(5, 5);
try {
send(STREAM_A, data.slice(0, 5), 5);
verifyWrite(STREAM_A, data.slice(0, 5), 5);
assertEquals(1, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
public void frameLargerThanMaxFrameSizeShouldBeSplit() throws Http2Exception {
when(frameWriter.maxFrameSize()).thenReturn(3);
ByteBuf data = dummyData(5, 0);
final ByteBuf data = dummyData(5, 0);
try {
send(STREAM_A, data.copy(), 5);
verifyWrite(STREAM_A, data.slice(0, 3), 0);
verifyWrite(STREAM_A, data.slice(3, 2), 1);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 3);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 1);
} finally {
manualSafeRelease(data);
}
}
@Test
@ -150,81 +158,102 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void frameShouldSplitForMaxFrameSize() throws Http2Exception {
when(frameWriter.maxFrameSize()).thenReturn(5);
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
ByteBuf slice1 = data.slice(0, 5);
ByteBuf slice2 = data.slice(5, 5);
send(STREAM_A, data.slice(), 0);
verifyWrite(STREAM_A, slice1, 0);
verifyWrite(STREAM_A, slice2, 0);
assertEquals(2, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
public void stalledStreamShouldQueueFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
ByteBuf data = dummyData(10, 5);
final ByteBuf data = dummyData(10, 5);
try {
send(STREAM_A, data.slice(0, 10), 5);
verifyNoWrite(STREAM_A);
assertEquals(1, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
public void frameShouldSplit() throws Http2Exception {
controller.initialOutboundWindowSize(5);
ByteBuf data = dummyData(5, 5);
final ByteBuf data = dummyData(5, 5);
try {
send(STREAM_A, data.slice(0, 5), 5);
// Verify that a partial frame of 5 was sent.
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
// None of the padding should be sent in the frame.
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
final ByteBuf writtenBuf = argument.getValue();
assertEquals(5, writtenBuf.readableBytes());
assertEquals(data.slice(0, 5), writtenBuf);
assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
public void frameShouldSplitPadding() throws Http2Exception {
controller.initialOutboundWindowSize(5);
ByteBuf data = dummyData(3, 7);
final ByteBuf data = dummyData(3, 7);
try {
send(STREAM_A, data.slice(0, 3), 7);
// Verify that a partial frame of 5 was sent.
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 2, false);
ByteBuf writtenBuf = argument.getValue();
final ByteBuf writtenBuf = argument.getValue();
assertEquals(3, writtenBuf.readableBytes());
assertEquals(data.slice(0, 3), writtenBuf);
assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
public void emptyFrameShouldSplitPadding() throws Http2Exception {
controller.initialOutboundWindowSize(5);
ByteBuf data = dummyData(0, 10);
final ByteBuf data = dummyData(0, 10);
try {
send(STREAM_A, data.slice(0, 0), 10);
// Verify that a partial frame of 5 was sent.
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 5, false);
ByteBuf writtenBuf = argument.getValue();
final ByteBuf writtenBuf = argument.getValue();
assertEquals(0, writtenBuf.readableBytes());
assertEquals(1, writtenBuf.refCnt());
assertEquals(1, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
public void windowUpdateShouldSendFrame() throws Http2Exception {
controller.initialOutboundWindowSize(10);
ByteBuf data = dummyData(10, 10);
final ByteBuf data = dummyData(10, 10);
try {
send(STREAM_A, data.slice(0, 10), 10);
verifyWrite(STREAM_A, data.slice(0, 10), 0);
@ -236,13 +265,17 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(10, window(STREAM_B));
assertEquals(10, window(STREAM_C));
assertEquals(10, window(STREAM_D));
} finally {
manualSafeRelease(data);
}
}
@Test
public void initialWindowUpdateShouldSendFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A);
@ -250,9 +283,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
controller.initialOutboundWindowSize(10);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
final ByteBuf writtenBuf = argument.getValue();
assertEquals(data, writtenBuf);
assertEquals(1, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
@ -260,7 +296,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
controller.initialOutboundWindowSize(0);
// First send a frame that will get buffered.
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A);
@ -273,13 +310,17 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyWrite(STREAM_A, data.slice(), 0);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 0);
} finally {
manualSafeRelease(data);
}
}
@Test
public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
send(STREAM_A, data, 0);
verifyNoWrite(STREAM_A);
@ -292,6 +333,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(data.slice(0, 5), writtenBuf);
assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
@ -299,7 +343,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the connection window size to zero.
exhaustStreamWindow(CONNECTION_STREAM_ID);
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A);
@ -313,9 +358,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
final ByteBuf writtenBuf = argument.getValue();
assertEquals(data, writtenBuf);
assertEquals(1, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
@ -323,7 +371,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the connection window size to zero.
exhaustStreamWindow(CONNECTION_STREAM_ID);
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
send(STREAM_A, data, 0);
verifyNoWrite(STREAM_A);
@ -337,11 +386,14 @@ public class DefaultHttp2OutboundFlowControllerTest {
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
final ByteBuf writtenBuf = argument.getValue();
assertEquals(5, writtenBuf.readableBytes());
assertEquals(data.slice(0, 5), writtenBuf);
assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
@ -349,7 +401,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the stream window size to zero.
exhaustStreamWindow(STREAM_A);
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A);
@ -363,9 +416,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
final ByteBuf writtenBuf = argument.getValue();
assertEquals(data, writtenBuf);
assertEquals(1, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
@Test
@ -373,7 +429,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the stream window size to zero.
exhaustStreamWindow(STREAM_A);
ByteBuf data = dummyData(10, 0);
final ByteBuf data = dummyData(10, 0);
try {
send(STREAM_A, data, 0);
verifyNoWrite(STREAM_A);
@ -391,6 +448,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(5, writtenBuf.readableBytes());
assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt());
} finally {
manualSafeRelease(data);
}
}
/**
@ -409,8 +469,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Try sending 10 bytes on each stream. They will be pending until we free up the
// connection.
send(STREAM_A, dummyData(3, 0), 7);
send(STREAM_B, dummyData(10, 0), 0);
final ByteBuf[] bufs = { dummyData(3, 0), dummyData(10, 0) };
try {
send(STREAM_A, bufs[0], 7);
send(STREAM_B, bufs[1], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
@ -422,7 +484,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that 5 bytes from A were written: 3 from data and 2 from padding.
captureWrite(STREAM_A, captor, 2, false);
@ -430,6 +492,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
captureWrite(STREAM_B, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes());
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -454,10 +519,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Try sending 10 bytes on each stream. They will be pending until we free up the
// connection.
send(STREAM_A, dummyData(10, 0), 0);
send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10, 0), 0);
final ByteBuf[] bufs = { dummyData(10, 0), dummyData(10, 0), dummyData(10, 0), dummyData(10, 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -470,7 +537,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B));
assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that no write was done for A, since it's blocked.
verifyNoWrite(STREAM_A);
@ -486,6 +553,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
int d = captor.getValue().readableBytes();
assertEquals(5, c + d);
assertEquals(1, Math.abs(c - d));
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -509,10 +579,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Send 10 bytes to each.
send(STREAM_A, dummyData(10, 0), 0);
send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10, 0), 0);
final ByteBuf[] bufs = { dummyData(10, 0), dummyData(10, 0), dummyData(10, 0), dummyData(10, 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -526,7 +598,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes.
captureWrite(STREAM_A, captor, 0, false);
@ -534,6 +606,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -557,10 +632,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Only send 5 to A so that it will allow data from its children.
send(STREAM_A, dummyData(5, 0), 0);
send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10, 0), 0);
final ByteBuf[] bufs = { dummyData(5, 0), dummyData(10, 0), dummyData(10, 0), dummyData(10, 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -573,7 +650,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(0, window(STREAM_B));
assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that no write was done for B, since it's blocked.
verifyNoWrite(STREAM_B);
@ -589,6 +666,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
int d = captor.getValue().readableBytes();
assertEquals(5, c + d);
assertEquals(1, Math.abs(c - d));
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -623,10 +703,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Send 10 bytes to each.
send(STREAM_A, dummyData(10, 0), 0);
send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10, 0), 0);
final ByteBuf[] bufs = { dummyData(10, 0), dummyData(10, 0), dummyData(10, 0), dummyData(10, 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -643,7 +725,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes.
captureWrite(STREAM_A, captor, 0, false);
@ -652,6 +734,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(5, captor.getValue().readableBytes());
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -676,10 +761,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
setPriority(STREAM_D, 0, (short) 100, false);
// Send a bunch of data on each stream.
send(STREAM_A, dummyData(1000, 0), 0);
send(STREAM_B, dummyData(1000, 0), 0);
send(STREAM_C, dummyData(1000, 0), 0);
send(STREAM_D, dummyData(1000, 0), 0);
final ByteBuf[] bufs = { dummyData(1000, 0), dummyData(1000, 0), dummyData(1000, 0), dummyData(1000, 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -687,7 +774,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 1000);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, captor, 0, false);
int aWritten = captor.getValue().readableBytes();
@ -721,6 +808,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE - bWritten, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE - cWritten, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - dWritten, window(STREAM_D));
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -745,16 +835,18 @@ public class DefaultHttp2OutboundFlowControllerTest {
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Send a bunch of data on each stream.
send(STREAM_A, dummyData(400, 0), 0);
send(STREAM_B, dummyData(500, 0), 0);
send(STREAM_C, dummyData(0, 0), 0);
send(STREAM_D, dummyData(700, 0), 0);
final ByteBuf[] bufs = { dummyData(400, 0), dummyData(500, 0), dummyData(0, 0), dummyData(700, 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_D);
// The write will occur on C, because it's an empty frame.
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_C, captor, 0, false);
assertEquals(0, captor.getValue().readableBytes());
@ -779,6 +871,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(333, aWritten);
assertEquals(333, bWritten);
assertEquals(333, dWritten);
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -804,35 +899,38 @@ public class DefaultHttp2OutboundFlowControllerTest {
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
final ByteBuf[] bufs = { dummyData(streamSizes.get(STREAM_A), 0), dummyData(streamSizes.get(STREAM_B), 0),
dummyData(streamSizes.get(STREAM_C), 0), dummyData(streamSizes.get(STREAM_D), 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)),
state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -847,6 +945,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
* </pre>
*
* After the tree shift:
*
* <pre>
* [0]
* |
@ -869,15 +968,18 @@ public class DefaultHttp2OutboundFlowControllerTest {
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
final ByteBuf[] bufs = { dummyData(streamSizes.get(STREAM_A), 0), dummyData(streamSizes.get(STREAM_B), 0),
dummyData(streamSizes.get(STREAM_C), 0), dummyData(streamSizes.get(STREAM_D), 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -885,20 +987,21 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -913,6 +1016,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
* </pre>
*
* After the tree shift:
*
* <pre>
* [0]
* / \
@ -938,17 +1042,21 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
streamSizes.put(STREAM_E, 900);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
send(STREAM_E, dummyData(streamSizes.get(STREAM_E), 0), 0);
final ByteBuf[] bufs = { dummyData(streamSizes.get(STREAM_A), 0), dummyData(streamSizes.get(STREAM_B), 0),
dummyData(streamSizes.get(STREAM_C), 0), dummyData(streamSizes.get(STREAM_D), 0),
dummyData(streamSizes.get(STREAM_E), 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
send(STREAM_E, bufs[4], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -956,23 +1064,25 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_E);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)), state.priorityBytes());
assertEquals(
calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)),
state.priorityBytes());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)),
state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
state = state(streamE);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_E, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)),
state.priorityBytes());
} finally {
manualSafeRelease(bufs);
}
}
/**
@ -987,6 +1097,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
* </pre>
*
* After the tree shift:
*
* <pre>
* [0]
* / | \
@ -1005,15 +1116,18 @@ public class DefaultHttp2OutboundFlowControllerTest {
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
final ByteBuf[] bufs = { dummyData(streamSizes.get(STREAM_A), 0), dummyData(streamSizes.get(STREAM_B), 0),
dummyData(streamSizes.get(STREAM_C), 0), dummyData(streamSizes.get(STREAM_D), 0) };
try {
send(STREAM_A, bufs[0], 0);
send(STREAM_B, bufs[1], 0);
send(STREAM_C, bufs[2], 0);
send(STREAM_D, bufs[3], 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
@ -1022,19 +1136,19 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamA.close();
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state = state(streamA);
assertEquals(0, state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
} finally {
manualSafeRelease(bufs);
}
}
private static OutboundFlowState state(Http2Stream stream) {
@ -1065,8 +1179,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
eq(promise));
}
private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, int padding,
boolean endStream) {
private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, int padding, boolean endStream) {
verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(padding), eq(endStream), eq(promise));
}
@ -1075,8 +1188,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
private void exhaustStreamWindow(int streamId) throws Http2Exception {
int dataLength = window(streamId);
final int dataLength = window(streamId);
final ByteBuf data = dummyData(dataLength, 0);
try {
if (streamId == CONNECTION_STREAM_ID) {
// Find a stream that we can use to shrink the connection window.
int streamToWrite = 0;
@ -1089,19 +1203,22 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Write to STREAM_A to decrease the connection window and then restore STREAM_A's window.
int prevWindow = window(streamToWrite);
send(streamToWrite, dummyData(dataLength, 0), 0);
send(streamToWrite, data, 0);
int delta = prevWindow - window(streamToWrite);
controller.updateOutboundWindowSize(streamToWrite, delta);
} else {
// Write to the stream and then restore the connection window.
int prevWindow = window(CONNECTION_STREAM_ID);
send(streamId, dummyData(dataLength, 0), 0);
send(streamId, data, 0);
int delta = prevWindow - window(CONNECTION_STREAM_ID);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, delta);
}
// Reset the frameWriter so that this write doesn't interfere with other tests.
resetFrameWriter();
} finally {
manualSafeRelease(data);
}
}
private void resetFrameWriter() {
@ -1114,12 +1231,24 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
private static ByteBuf dummyData(int size, int padding) {
String repeatedData = "0123456789";
ByteBuf buffer = Unpooled.buffer(size + padding);
final String repeatedData = "0123456789";
final ByteBuf buffer = Unpooled.buffer(size + padding);
for (int index = 0; index < size; ++index) {
buffer.writeByte(repeatedData.charAt(index % repeatedData.length()));
}
buffer.writeZero(padding);
return buffer;
}
private static void manualSafeRelease(ByteBuf data) {
while (data.refCnt() > 0) { // Manually release just to be safe if the test fails
data.release();
}
}
private static void manualSafeRelease(ByteBuf[] bufs) {
for (int i = 0; i < bufs.length; ++i) {
manualSafeRelease(bufs[i]);
}
}
}

View File

@ -47,6 +47,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -56,6 +57,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import java.util.Collections;
import java.util.List;
import org.junit.After;
import org.junit.Before;
@ -65,8 +67,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link DelegatingHttp2ConnectionHandlerTest} and its base class
* {@link AbstractHttp2ConnectionHandler}.
* Tests for {@link DelegatingHttp2ConnectionHandlerTest} and its base class {@link AbstractHttp2ConnectionHandler}.
*/
public class DelegatingHttp2ConnectionHandlerTest {
private static final int STREAM_ID = 1;
@ -134,17 +135,13 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(
future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise)))
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future);
when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise)))
.thenReturn(future);
when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(),
anyBoolean(), eq(promise))) .thenReturn(future);
mockContext();
handler =
new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow,
outboundFlow, listener);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
// Simulate activation of the handler to force writing the initial settings.
Http2Settings settings = new Http2Settings();
@ -186,8 +183,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void clientShouldSendClientPrefaceStringWhenActive() throws Exception {
when(connection.isServer()).thenReturn(false);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow,
outboundFlow, listener);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
handler.channelActive(ctx);
verify(ctx).write(eq(connectionPrefaceBuf()));
}
@ -195,8 +191,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void serverShouldNotSendClientPrefaceStringWhenActive() throws Exception {
when(connection.isServer()).thenReturn(true);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow,
outboundFlow, listener);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
handler.channelActive(ctx);
verify(ctx, never()).write(eq(connectionPrefaceBuf()));
}
@ -204,8 +199,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void serverReceivingInvalidClientPrefaceStringShouldCloseConnection() throws Exception {
when(connection.isServer()).thenReturn(true);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow,
outboundFlow, listener);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
handler.channelRead(ctx, copiedBuffer("BAD_PREFACE", UTF_8));
verify(ctx).close();
}
@ -214,8 +208,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void serverReceivingValidClientPrefaceStringShouldContinueReadingFrames() throws Exception {
reset(listener);
when(connection.isServer()).thenReturn(true);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow,
outboundFlow, listener);
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
handler.channelRead(ctx, connectionPrefaceBuf());
verify(ctx, never()).close();
decode().onSettingsRead(ctx, new Http2Settings());
@ -225,8 +218,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void closeShouldSendGoAway() throws Exception {
handler.close(ctx, promise);
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) NO_ERROR.code()),
eq(EMPTY_BUFFER), eq(promise));
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) NO_ERROR.code()), eq(EMPTY_BUFFER), eq(promise));
verify(remote).goAwayReceived(0);
}
@ -241,8 +233,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
Http2Exception e = new Http2StreamException(STREAM_ID, PROTOCOL_ERROR);
handler.exceptionCaught(ctx, e);
verify(stream).close();
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID),
eq((long) PROTOCOL_ERROR.code()), eq(promise));
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise));
}
@Test
@ -251,27 +242,36 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(remote.lastStreamCreated()).thenReturn(STREAM_ID);
handler.exceptionCaught(ctx, e);
verify(remote).goAwayReceived(STREAM_ID);
verify(writer).writeGoAway(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()),
eq(EMPTY_BUFFER), eq(promise));
verify(writer).writeGoAway(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(EMPTY_BUFFER),
eq(promise));
}
@Test
public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true));
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(),
anyBoolean());
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
@Test
public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception {
decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true));
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(stream).closeRemoteSide();
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
} finally {
data.release();
}
}
@Test
@ -281,8 +281,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
verify(remote, never()).createStream(eq(STREAM_ID), eq(false));
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class),
anyInt(), anyBoolean());
verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean());
verify(remote, never()).createStream(anyInt(), anyBoolean());
}
@ -291,8 +290,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(remote.createStream(eq(5), eq(false))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, false);
verify(remote).createStream(eq(5), eq(false));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(0), eq(false));
}
@Test
@ -300,8 +299,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(remote.createStream(eq(5), eq(true))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, true);
verify(remote).createStream(eq(5), eq(true));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(0), eq(true));
}
@Test
@ -309,8 +308,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(stream.state()).thenReturn(RESERVED_REMOTE);
decode().onHeadersRead(ctx, STREAM_ID, EMPTY_HEADERS, 0, false);
verify(stream).openForPush();
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(0), eq(false));
}
@Test
@ -319,8 +318,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
decode().onHeadersRead(ctx, STREAM_ID, EMPTY_HEADERS, 0, true);
verify(stream).openForPush();
verify(stream).close();
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(0), eq(true));
}
@Test
@ -328,16 +327,14 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0);
verify(remote, never()).reservePushStream(anyInt(), any(Http2Stream.class));
verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(),
any(Http2Headers.class), anyInt());
verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt());
}
@Test
public void pushPromiseReadShouldSucceed() throws Exception {
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0);
verify(remote).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
verify(listener).onPushPromiseRead(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EMPTY_HEADERS), eq(0));
verify(listener).onPushPromiseRead(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), eq(EMPTY_HEADERS), eq(0));
}
@Test
@ -453,21 +450,35 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void dataWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeData(ctx, STREAM_ID, dummyData(), 0, false, promise);
final ByteBuf data = dummyData();
try {
ChannelFuture future = handler.writeData(ctx, STREAM_ID, data, 0, false, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
} finally {
while (data.refCnt() > 0) {
data.release();
}
}
}
@Test
public void dataWriteShouldSucceed() throws Exception {
handler.writeData(ctx, STREAM_ID, dummyData(), 0, false, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(false), eq(promise));
final ByteBuf data = dummyData();
try {
handler.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(false), eq(promise));
} finally {
data.release();
}
}
@Test
public void dataWriteShouldHalfCloseStream() throws Exception {
reset(future);
handler.writeData(ctx, STREAM_ID, dummyData(), 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(true), eq(promise));
final ByteBuf data = dummyData();
try {
handler.writeData(ctx, STREAM_ID, data, 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write completed successfully.
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
@ -475,38 +486,52 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(future.isSuccess()).thenReturn(true);
captor.getValue().operationComplete(future);
verify(stream).closeLocalSide();
} finally {
data.release();
}
}
@Test
public void dataWriteWithFailureShouldHandleException() throws Exception {
reset(future);
handler.writeData(ctx, STREAM_ID, dummyData(), 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(true), eq(promise));
final String msg = "fake exception";
final ByteBuf exceptionData = Unpooled.copiedBuffer(msg.getBytes(UTF_8));
final ByteBuf data = dummyData();
List<ByteBuf> goAwayDataCapture = null;
try {
handler.writeData(ctx, STREAM_ID, data, 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write failed.
String msg = "fake exception";
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
verify(future).addListener(captor.capture());
when(future.isSuccess()).thenReturn(false);
when(future.cause()).thenReturn(new RuntimeException(msg));
captor.getValue().operationComplete(future);
ArgumentCaptor<ByteBuf> bufferCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) INTERNAL_ERROR.code()),
bufferCaptor.capture(), eq(promise));
ByteBuf writtenBuffer = bufferCaptor.getValue();
assertEquals(wrappedBuffer(msg.getBytes(UTF_8)), writtenBuffer);
writtenBuffer.release();
final ArgumentCaptor<ByteBuf> bufferCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) INTERNAL_ERROR.code()), bufferCaptor.capture(),
eq(promise));
goAwayDataCapture = bufferCaptor.getAllValues();
assertEquals(exceptionData, goAwayDataCapture.get(0));
verify(remote).goAwayReceived(0);
} finally {
data.release();
exceptionData.release();
if (goAwayDataCapture != null) {
for (int i = 0; i < goAwayDataCapture.size(); ++i) {
goAwayDataCapture.get(i).release();
}
}
}
}
@Test
public void headersWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeHeaders(
ctx, 5, EMPTY_HEADERS, 0, (short) 255, false, 0, false, promise);
ChannelFuture future = handler.writeHeaders(ctx, 5, EMPTY_HEADERS, 0, (short) 255, false, 0, false, promise);
verify(local, never()).createStream(anyInt(), anyBoolean());
verify(writer, never()).writeHeaders(eq(ctx), anyInt(),
any(Http2Headers.class), anyInt(), anyBoolean(), eq(promise));
verify(writer, never()).writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean(),
eq(promise));
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@ -515,8 +540,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(local.createStream(eq(5), eq(false))).thenReturn(stream);
handler.writeHeaders(ctx, 5, EMPTY_HEADERS, 0, false, promise);
verify(local).createStream(eq(5), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false),
eq(0), eq(false), eq(promise));
}
@Test
@ -524,8 +549,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(local.createStream(eq(5), eq(true))).thenReturn(stream);
handler.writeHeaders(ctx, 5, EMPTY_HEADERS, 0, true, promise);
verify(local).createStream(eq(5), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false),
eq(0), eq(true), eq(promise));
}
@Test
@ -534,8 +559,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
handler.writeHeaders(ctx, STREAM_ID, EMPTY_HEADERS, 0, false, promise);
verify(stream).openForPush();
verify(stream, never()).closeLocalSide();
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(0), eq(false), eq(promise));
}
@Test
@ -544,8 +569,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
handler.writeHeaders(ctx, STREAM_ID, EMPTY_HEADERS, 0, true, promise);
verify(stream).openForPush();
verify(stream).closeLocalSide();
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), eq(DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(0), eq(true), eq(promise));
}
@Test
@ -559,8 +584,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void pushPromiseWriteShouldReserveStream() throws Exception {
handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0, promise);
verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EMPTY_HEADERS), eq(0), eq(promise));
verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(promise));
}
@Test
@ -574,8 +599,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void priorityWriteShouldSetPriorityForStream() throws Exception {
handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255),
eq(true), eq(promise));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
}
@Test
@ -588,8 +612,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void rstStreamWriteShouldCloseStream() throws Exception {
handler.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise);
verify(stream).close();
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID),
eq((long) PROTOCOL_ERROR.code()), eq(promise));
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise));
}
@Test
@ -652,8 +675,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
* Calls the decode method on the handler and gets back the captured internal listener
*/
private Http2FrameListener decode() throws Exception {
ArgumentCaptor<Http2FrameListener> internallistener =
ArgumentCaptor.forClass(Http2FrameListener.class);
ArgumentCaptor<Http2FrameListener> internallistener = ArgumentCaptor.forClass(Http2FrameListener.class);
doNothing().when(reader).readFrame(eq(ctx), any(ByteBuf.class), internallistener.capture());
handler.decode(ctx, EMPTY_BUFFER, Collections.emptyList());
return internallistener.getValue();

View File

@ -20,6 +20,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.http2.Http2CodecUtil.ignoreSettingsHandler;
import static io.netty.util.CharsetUtil.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
@ -48,11 +49,13 @@ import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.NetUtil;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -60,6 +63,10 @@ import org.mockito.MockitoAnnotations;
* Testing the {@link DelegatingHttp2HttpConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames
*/
public class DelegatingHttp2HttpConnectionHandlerTest {
private static final int CONNECTION_SETUP_READ_COUNT = 2;
private List<ByteBuf> capturedData;
@Mock
private Http2FrameListener clientListener;
@ -71,13 +78,13 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
private Channel serverChannel;
private Channel clientChannel;
private CountDownLatch requestLatch;
private static final int CONNECTION_SETUP_READ_COUNT = 2;
private Http2TestUtil.FrameCountDown serverFrameCountDown;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
requestLatch = new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 1);
requestLatch(new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 1));
sb = new ServerBootstrap();
cb = new Bootstrap();
@ -88,7 +95,8 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelegatingHttp2ConnectionHandler(true, new FrameCountDown()));
serverFrameCountDown = new Http2TestUtil.FrameCountDown(serverListener, requestLatch);
p.addLast(new DelegatingHttp2ConnectionHandler(true, serverFrameCountDown));
p.addLast(ignoreSettingsHandler());
}
});
@ -114,6 +122,12 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
@After
public void teardown() throws Exception {
if (capturedData != null) {
for (int i = 0; i < capturedData.size(); ++i) {
capturedData.get(i).release();
}
capturedData = null;
}
serverChannel.close().sync();
sb.group().shutdownGracefully();
sb.childGroup().shutdownGracefully();
@ -122,7 +136,8 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
@Test
public void testJustHeadersRequest() throws Exception {
final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/example");
final FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/example");
try {
final HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 5);
httpHeaders.set(HttpHeaders.Names.HOST, "http://my-user_name@www.example.org:5555/example");
@ -131,9 +146,9 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
httpHeaders.add("foo", "goo");
httpHeaders.add("foo", "goo2");
httpHeaders.add("foo2", "goo2");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/example").authority("www.example.org:5555").scheme("http")
.add("foo", "goo").add("foo", "goo2").add("foo2", "goo2").build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/example")
.authority("www.example.org:5555").scheme("http").add("foo", "goo").add("foo", "goo2")
.add("foo2", "goo2").build();
ChannelPromise writePromise = newPromise();
ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise);
@ -142,26 +157,31 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
writeFuture.awaitUninterruptibly(2, SECONDS);
assertTrue(writeFuture.isSuccess());
awaitRequests();
final ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(5), eq(http2Headers), eq(0),
anyShort(), anyBoolean(), eq(0), eq(true));
verify(serverListener, never()).onDataRead(any(ChannelHandlerContext.class),
anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
verify(serverListener, never()).onDataRead(any(ChannelHandlerContext.class), anyInt(),
dataCaptor.capture(), anyInt(), anyBoolean());
} finally {
request.release();
}
}
@Test
public void testRequestWithBody() throws Exception {
requestLatch = new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 2);
requestLatch(new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 2));
final String text = "foooooogoooo";
final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, "/example",
Unpooled.copiedBuffer(text, UTF_8));
final ByteBuf data = Unpooled.copiedBuffer(text, UTF_8);
try {
final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, "/example", data.retain());
final HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpHeaders.Names.HOST, "http://your_user-name123@www.example.org:5555/example");
httpHeaders.add("foo", "goo");
httpHeaders.add("foo", "goo2");
httpHeaders.add("foo2", "goo2");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("POST").path("/example").authority("www.example.org:5555").scheme("http")
.add("foo", "goo").add("foo", "goo2").add("foo2", "goo2").build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("POST").path("/example")
.authority("www.example.org:5555").scheme("http").add("foo", "goo").add("foo", "goo2")
.add("foo2", "goo2").build();
ChannelPromise writePromise = newPromise();
ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise);
@ -170,10 +190,23 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
writeFuture.awaitUninterruptibly(2, SECONDS);
assertTrue(writeFuture.isSuccess());
awaitRequests();
final ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(http2Headers), eq(0),
anyShort(), anyBoolean(), eq(0), eq(false));
verify(serverListener).onDataRead(any(ChannelHandlerContext.class),
eq(3), eq(Unpooled.copiedBuffer(text.getBytes())), eq(0), eq(true));
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
capturedData = dataCaptor.getAllValues();
assertEquals(data, capturedData.get(0));
} finally {
data.release();
}
}
private void requestLatch(CountDownLatch latch) {
requestLatch = latch;
if (serverFrameCountDown != null) {
serverFrameCountDown.messageLatch(latch);
}
}
private void awaitRequests() throws Exception {
@ -187,105 +220,4 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
private ChannelPromise newPromise() {
return ctx().newPromise();
}
/**
* A decorator around the serverObserver that counts down the latch so that we can await the
* completion of the request.
*/
private final class FrameCountDown implements Http2FrameListener {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream)
throws Http2Exception {
serverListener.onDataRead(ctx, streamId, copy(data), padding, endOfStream);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream) throws Http2Exception {
serverListener.onHeadersRead(ctx, streamId, headers, padding, endStream);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream) throws Http2Exception {
serverListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream);
requestLatch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
serverListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
requestLatch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
serverListener.onRstStreamRead(ctx, streamId, errorCode);
requestLatch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
serverListener.onSettingsAckRead(ctx);
requestLatch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
serverListener.onSettingsRead(ctx, settings);
requestLatch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
serverListener.onPingRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
serverListener.onPingAckRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
serverListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
requestLatch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
serverListener.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData));
requestLatch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement) throws Http2Exception {
serverListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
requestLatch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) {
serverListener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
requestLatch.countDown();
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
}
}
}

View File

@ -21,10 +21,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
@ -41,9 +39,11 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -51,17 +51,16 @@ import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests the full HTTP/2 framing stack including the connection and preface handlers.
*/
public class Http2ConnectionRoundtripTest {
private static final int NUM_STREAMS = 1000;
private static final int STRESS_TIMEOUT_SECONDS = 30;
private static final int NUM_STREAMS = 5000;
private final byte[] DATA_TEXT = "hello world".getBytes(UTF_8);
@Mock
@ -75,13 +74,16 @@ public class Http2ConnectionRoundtripTest {
private Bootstrap cb;
private Channel serverChannel;
private Channel clientChannel;
private final CountDownLatch requestLatch = new CountDownLatch(NUM_STREAMS * 3);
private CountDownLatch dataLatch = new CountDownLatch(NUM_STREAMS * DATA_TEXT.length);
private Http2TestUtil.FrameCountDown serverFrameCountDown;
private CountDownLatch requestLatch;
private CountDownLatch dataLatch;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
requestLatch(new CountDownLatch(NUM_STREAMS * 3));
dataLatch(new CountDownLatch(NUM_STREAMS * DATA_TEXT.length));
sb = new ServerBootstrap();
cb = new Bootstrap();
@ -91,7 +93,8 @@ public class Http2ConnectionRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelegatingHttp2ConnectionHandler(true, new FrameCountDown()));
serverFrameCountDown = new Http2TestUtil.FrameCountDown(serverListener, requestLatch, dataLatch);
p.addLast(new DelegatingHttp2ConnectionHandler(true, serverFrameCountDown));
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
}
});
@ -126,40 +129,28 @@ public class Http2ConnectionRoundtripTest {
@Test
public void flowControlProperlyChunksLargeMessage() throws Exception {
final Http2Headers headers =
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
final Http2Headers headers = new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
// Create a large message to send.
int length = 10485760; // 10MB
final int length = 10485760; // 10MB
// Create a buffer filled with random bytes.
byte[] bytes = new byte[length];
final byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
final ByteBuf data = Unpooled.wrappedBuffer(bytes);
// Prepare a receive buffer and populate it as DATA frames are received by the server.
final ByteBuf receivedData = Unpooled.buffer(length);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ByteBuf buf = (ByteBuf) invocation.getArguments()[2];
receivedData.writeBytes(buf);
return null;
}
}).when(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3),
any(ByteBuf.class), eq(0), anyBoolean());
List<ByteBuf> capturedData = null;
try {
// Initialize the data latch based on the number of bytes expected.
dataLatch = new CountDownLatch(length);
requestLatch(new CountDownLatch(2));
dataLatch(new CountDownLatch(length));
// Create the stream and send all of the data at once.
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
http2Client.writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
http2Client.writeData(ctx(), 3, data.copy(), 0, true, newPromise());
http2Client.writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, newPromise());
http2Client.writeData(ctx(), 3, data.retain(), 0, true, newPromise());
}
});
@ -167,43 +158,84 @@ public class Http2ConnectionRoundtripTest {
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Verify that headers were received and only one DATA frame was received with endStream set.
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers),
eq(0), eq((short) 16), eq(false), eq(0), eq(false));
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3),
any(ByteBuf.class), eq(0), eq(true));
final ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers), eq(0),
eq((short) 16), eq(false), eq(0), eq(false));
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
// Verify we received all the bytes.
assertEquals(data, receivedData);
capturedData = dataCaptor.getAllValues();
assertEquals(data, capturedData.get(0));
} finally {
data.release();
release(capturedData);
}
}
@Test
public void stressTest() throws Exception {
final Http2Headers headers =
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
final Http2Headers headers = new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
final String text = "hello world";
final String pingMsg = "12345678";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
final ByteBuf pingData = Unpooled.copiedBuffer(pingMsg.getBytes());
List<ByteBuf> capturedData = null;
List<ByteBuf> capturedPingData = null;
try {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
for (int i = 0, nextStream = 3; i < NUM_STREAMS; ++i, nextStream += 2) {
http2Client.writeHeaders(ctx(), nextStream, headers, 0, (short) 16, false, 0,
false, newPromise());
http2Client.writePing(ctx(), Unpooled.copiedBuffer(pingMsg.getBytes()),
http2Client.writeHeaders(ctx(), nextStream, headers, 0, (short) 16, false, 0, false,
newPromise());
http2Client.writeData(ctx(), nextStream,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
http2Client.writePing(ctx(), pingData.retain(), newPromise());
http2Client.writeData(ctx(), nextStream, data.retain(), 0, true, newPromise());
}
}
});
// Wait for all frames to be received.
assertTrue(requestLatch.await(5, SECONDS));
verify(serverListener, times(NUM_STREAMS)).onHeadersRead(any(ChannelHandlerContext.class),
anyInt(), eq(headers), eq(0), eq((short) 16), eq(false), eq(0), eq(false));
assertTrue(requestLatch.await(STRESS_TIMEOUT_SECONDS, SECONDS));
verify(serverListener, times(NUM_STREAMS)).onHeadersRead(any(ChannelHandlerContext.class), anyInt(),
eq(headers), eq(0), eq((short) 16), eq(false), eq(0), eq(false));
final ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
final ArgumentCaptor<ByteBuf> pingDataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener, times(NUM_STREAMS)).onPingRead(any(ChannelHandlerContext.class),
eq(Unpooled.copiedBuffer(pingMsg.getBytes())));
verify(serverListener, times(NUM_STREAMS)).onDataRead(any(ChannelHandlerContext.class),
anyInt(), eq(Unpooled.copiedBuffer(text.getBytes())), eq(0), eq(true));
pingDataCaptor.capture());
capturedPingData = pingDataCaptor.getAllValues();
verify(serverListener, times(NUM_STREAMS)).onDataRead(any(ChannelHandlerContext.class), anyInt(),
dataCaptor.capture(), eq(0), eq(true));
capturedData = dataCaptor.getAllValues();
data.resetReaderIndex();
pingData.resetReaderIndex();
int i;
for (i = 0; i < capturedPingData.size(); ++i) {
assertEquals(pingData, capturedPingData.get(i));
}
for (i = 0; i < capturedData.size(); ++i) {
assertEquals(capturedData.get(i).toString(CharsetUtil.UTF_8), data, capturedData.get(i));
}
} finally {
data.release();
pingData.release();
release(capturedData);
release(capturedPingData);
}
}
private void dataLatch(CountDownLatch latch) {
dataLatch = latch;
if (serverFrameCountDown != null) {
serverFrameCountDown.dataLatch(latch);
}
}
private void requestLatch(CountDownLatch latch) {
requestLatch = latch;
if (serverFrameCountDown != null) {
serverFrameCountDown.messageLatch(latch);
}
}
private ChannelHandlerContext ctx() {
@ -214,107 +246,11 @@ public class Http2ConnectionRoundtripTest {
return ctx().newPromise();
}
/**
* A decorator around the serverObserver that counts down the latch so that we can await the
* completion of the request.
*/
private final class FrameCountDown implements Http2FrameListener {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream)
throws Http2Exception {
serverListener.onDataRead(ctx, streamId, copy(data), padding, endOfStream);
requestLatch.countDown();
for (int i = 0; i < data.readableBytes(); ++i) {
dataLatch.countDown();
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream) throws Http2Exception {
serverListener.onHeadersRead(ctx, streamId, headers, padding, endStream);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream) throws Http2Exception {
serverListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream);
requestLatch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
serverListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
requestLatch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
serverListener.onRstStreamRead(ctx, streamId, errorCode);
requestLatch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
serverListener.onSettingsAckRead(ctx);
requestLatch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
serverListener.onSettingsRead(ctx, settings);
requestLatch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
serverListener.onPingRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
serverListener.onPingAckRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
serverListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
requestLatch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
serverListener.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData));
requestLatch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement) throws Http2Exception {
serverListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
requestLatch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) {
serverListener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
requestLatch.countDown();
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
private static void release(List<ByteBuf> capturedData) {
if (capturedData != null) {
for (int i = 0; i < capturedData.size(); ++i) {
capturedData.get(i).release();
}
}
}
}

View File

@ -15,7 +15,6 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
@ -50,8 +49,7 @@ final class Http2TestUtil {
});
}
private Http2TestUtil() {
}
private Http2TestUtil() { }
static class FrameAdapter extends ByteToMessageDecoder {
private final boolean copyBufs;
@ -118,7 +116,7 @@ final class Http2TestUtil {
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, endOfStream);
listener.onDataRead(ctx, streamId, copyBufs ? copy(data) : data, padding, endOfStream);
listener.onDataRead(ctx, streamId, copyBufs ? data.copy() : data, padding, endOfStream);
if (endOfStream) {
closeStream(stream, true);
}
@ -186,13 +184,13 @@ final class Http2TestUtil {
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingRead(ctx, copyBufs ? copy(data) : data);
listener.onPingRead(ctx, copyBufs ? data.copy() : data);
latch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingAckRead(ctx, copyBufs ? copy(data) : data);
listener.onPingAckRead(ctx, copyBufs ? data.copy() : data);
latch.countDown();
}
@ -207,7 +205,7 @@ final class Http2TestUtil {
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
listener.onGoAwayRead(ctx, lastStreamId, errorCode, copyBufs ? copy(debugData) : debugData);
listener.onGoAwayRead(ctx, lastStreamId, errorCode, copyBufs ? debugData.copy() : debugData);
latch.countDown();
}
@ -227,9 +225,124 @@ final class Http2TestUtil {
}
});
}
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
/**
* A decorator around a {@link Http2FrameListener} that counts down the latch so that we can await the completion of
* the request.
*/
static class FrameCountDown implements Http2FrameListener {
private final Http2FrameListener listener;
private CountDownLatch messageLatch;
private CountDownLatch dataLatch;
public FrameCountDown(Http2FrameListener listener, CountDownLatch messageLatch) {
this(listener, messageLatch, null);
}
public FrameCountDown(Http2FrameListener listener, CountDownLatch messageLatch, CountDownLatch dataLatch) {
this.listener = listener;
this.messageLatch = messageLatch;
this.dataLatch = dataLatch;
}
public void messageLatch(CountDownLatch latch) {
messageLatch = latch;
}
public void dataLatch(CountDownLatch latch) {
dataLatch = latch;
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
listener.onDataRead(ctx, streamId, data.copy(), padding, endOfStream);
messageLatch.countDown();
if (dataLatch != null) {
for (int i = 0; i < data.readableBytes(); ++i) {
dataLatch.countDown();
}
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception {
listener.onHeadersRead(ctx, streamId, headers, padding, endStream);
messageLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
messageLatch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
messageLatch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
listener.onRstStreamRead(ctx, streamId, errorCode);
messageLatch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
listener.onSettingsAckRead(ctx);
messageLatch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
listener.onSettingsRead(ctx, settings);
messageLatch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingRead(ctx, data.copy());
messageLatch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingAckRead(ctx, data.copy());
messageLatch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
messageLatch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData.copy());
messageLatch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
messageLatch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload) {
listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
messageLatch.countDown();
}
}
}