Fixing queuing bug in outbound flow control.

Motivation:

Outbound flow control does not properly remove the head of queue after
it's written. This will cause streams with multiple frames to get stuck
and not send all of the data.

Modifications:

Modified the DefaultHttp2OutboundFlowController to properly remove the
head of the pending write queue once a queued frame has been written.

Added an integration test that sends a large message to verify that all
DATA frames are properly collected at the other end.

Result:

Outbound flow control properly handles several queued messages.
This commit is contained in:
nmittler 2014-09-04 14:30:30 -07:00
parent c40b0d2e07
commit d35ddf0648
6 changed files with 387 additions and 67 deletions

View File

@ -15,6 +15,7 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.FRAME_HEADER_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.INT_FIELD_LENGTH;
@ -542,6 +543,16 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
private void readWindowUpdateFrame(ChannelHandlerContext ctx, ByteBuf payload,
Http2FrameListener listener) throws Http2Exception {
int windowSizeIncrement = readUnsignedInt(payload);
if (windowSizeIncrement == 0) {
if (streamId == CONNECTION_STREAM_ID) {
// It's a connection error.
throw protocolError("Received WINDOW_UPDATE with delta 0 for connection stream");
} else {
// It's a stream error.
throw new Http2StreamException(streamId, Http2Error.PROTOCOL_ERROR,
"Received WINDOW_UPDATE with delta 0 for stream: " + streamId);
}
}
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
}

View File

@ -126,9 +126,12 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
@Override
public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception {
if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
}
int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize;
connectionState().incrementStreamWindow(delta);
for (Http2Stream stream : connection.activeStreams()) {
// Verify that the maximum value is not exceeded by this change.
OutboundFlowState state = state(stream);
@ -148,6 +151,10 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
@Override
public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception {
if (delta <= 0) {
throw new IllegalArgumentException("delta must be > 0");
}
if (streamId == CONNECTION_STREAM_ID) {
// Update the connection window and write any pending frames for all streams.
connectionState().incrementStreamWindow(delta);
@ -173,16 +180,27 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
if (data == null) {
throw new NullPointerException("data");
}
if (this.ctx != null && this.ctx != ctx) {
throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
}
if (padding < 0) {
throw new IllegalArgumentException("padding must be >= 0");
}
if (streamId <= 0) {
throw new IllegalArgumentException("streamId must be >= 0");
}
// Save the context. We'll use this later when we write pending bytes.
this.ctx = ctx;
try {
OutboundFlowState state = stateOrFail(streamId);
int window = state.writableWindow();
OutboundFlowState.Frame frame = state.newFrame(ctx, promise, data, padding, endStream);
if (window >= frame.size()) {
int window = state.writableWindow();
boolean framesAlreadyQueued = state.hasFrame();
OutboundFlowState.Frame frame = state.newFrame(promise, data, padding, endStream);
if (!framesAlreadyQueued && window >= frame.size()) {
// Window size is large enough to send entire data frame
frame.write();
ctx.flush();
@ -192,8 +210,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Enqueue the frame to be written when the window size permits.
frame.enqueue();
if (window <= 0) {
// Stream is stalled, don't send anything now.
if (framesAlreadyQueued || window <= 0) {
// Stream already has frames pending or is stalled, don't send anything now.
return promise;
}
@ -463,9 +481,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* Creates a new frame with the given values but does not add it to the pending queue.
*/
private Frame newFrame(ChannelHandlerContext ctx, ChannelPromise promise, ByteBuf data,
int padding, boolean endStream) {
return new Frame(ctx, new ChannelPromiseAggregator(promise), data, padding, endStream);
private Frame newFrame(ChannelPromise promise, ByteBuf data, int padding, boolean endStream) {
return new Frame(new ChannelPromiseAggregator(promise), data, padding, endStream);
}
/**
@ -476,13 +493,10 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Returns the the head of the pending queue, or {@code null} if empty or the current window size is zero.
* Returns the the head of the pending queue, or {@code null} if empty.
*/
Frame peek() {
if (window > 0) {
return pendingWriteQueue.peek();
}
return null;
return pendingWriteQueue.peek();
}
/**
@ -510,12 +524,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
int maxBytes = min(bytes, writableWindow());
while (bytesWritten < maxBytes && hasFrame()) {
while (hasFrame()) {
Frame pendingWrite = peek();
if (maxBytes >= pendingWrite.size()) {
// Window size is large enough to send entire data frame
bytesWritten += pendingWrite.size();
pendingWrite.write();
} else if (maxBytes == 0) {
// No data from the current frame can be written - we're done.
// We purposely check this after first testing the size of the
// pending frame to properly handle zero-length frame.
break;
} else {
// We can send a partial frame
Frame partialFrame = pendingWrite.split(maxBytes);
@ -546,17 +565,14 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
*/
private final class Frame {
final ByteBuf data;
final int padding;
final boolean endStream;
final ChannelHandlerContext ctx;
final ChannelPromiseAggregator promiseAggregator;
final ChannelPromise promise;
int padding;
boolean enqueued;
Frame(ChannelHandlerContext ctx,
ChannelPromiseAggregator promiseAggregator, ByteBuf data, int padding,
Frame(ChannelPromiseAggregator promiseAggregator, ByteBuf data, int padding,
boolean endStream) {
this.ctx = ctx;
this.data = data;
this.padding = padding;
this.endStream = endStream;
@ -612,16 +628,19 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// All the bytes fit into a single HTTP/2 frame, just send it all.
connectionState().incrementStreamWindow(-bytesToWrite);
incrementStreamWindow(-bytesToWrite);
ByteBuf slice = data.readSlice(data.readableBytes());
frameWriter.writeData(ctx, stream.id(), slice, padding, endStream, promise);
frameWriter.writeData(ctx, stream.id(), data, padding, endStream, promise);
decrementPendingBytes(bytesToWrite);
if (enqueued) {
// It's enqueued - remove it from the head of the pending write queue.
pendingWriteQueue.remove();
}
return;
}
// Split a chunk that will fit into a single HTTP/2 frame and write it.
Frame frame = split(frameBytes);
frame.write();
} while (data.isReadable());
} while (size() > 0);
}
/**
@ -646,6 +665,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
Frame split(int maxBytes) {
// TODO: Should padding be spread across chunks or only at the end?
// The requested maxBytes should always be less than the size of this frame.
assert maxBytes < size() : "Attempting to split a frame for the full size.";
// Get the portion of the data buffer to be split. Limit to the readable bytes.
int dataSplit = min(maxBytes, data.readableBytes());
@ -653,7 +675,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
int paddingSplit = min(maxBytes - dataSplit, padding);
ByteBuf splitSlice = data.readSlice(dataSplit).retain();
Frame frame = new Frame(ctx, promiseAggregator, splitSlice, paddingSplit, false);
padding -= paddingSplit;
Frame frame = new Frame(promiseAggregator, splitSlice, paddingSplit, false);
int totalBytesSplit = dataSplit + paddingSplit;
decrementPendingBytes(totalBytesSplit);

View File

@ -218,4 +218,25 @@ public final class Http2Settings extends IntObjectHashMap<Long> {
break;
}
}
@Override
protected String keyToString(int key) {
switch (key) {
case SETTINGS_HEADER_TABLE_SIZE:
return "HEADER_TABLE_SIZE";
case SETTINGS_ENABLE_PUSH:
return "ENABLE_PUSH";
case SETTINGS_MAX_CONCURRENT_STREAMS:
return "MAX_CONCURRENT_STREAMS";
case SETTINGS_INITIAL_WINDOW_SIZE:
return "INITIAL_WINDOW_SIZE";
case SETTINGS_MAX_FRAME_SIZE:
return "MAX_FRAME_SIZE";
case SETTINGS_MAX_HEADER_LIST_SIZE:
return "MAX_HEADER_LIST_SIZE";
default:
// Unknown keys.
return super.keyToString(key);
}
}
}

View File

@ -42,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/**
@ -86,7 +87,37 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
when(frameWriter.maxFrameSize()).thenReturn(Integer.MAX_VALUE);
resetFrameWriter();
}
@Test
public void initialWindowSizeShouldOnlyChangeStreams() throws Http2Exception {
controller.initialOutboundWindowSize(0);
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(0, window(STREAM_B));
assertEquals(0, window(STREAM_C));
assertEquals(0, window(STREAM_D));
}
@Test
public void windowUpdateShouldChangeConnectionWindow() throws Http2Exception {
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 100);
assertEquals(DEFAULT_WINDOW_SIZE + 100, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
}
@Test
public void windowUpdateShouldChangeStreamWindow() throws Http2Exception {
controller.updateOutboundWindowSize(STREAM_A, 100);
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE + 100, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
}
@Test
@ -97,6 +128,25 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(1, data.refCnt());
}
@Test
public void frameLargerThanMaxFrameSizeShouldBeSplit() throws Http2Exception {
when(frameWriter.maxFrameSize()).thenReturn(3);
ByteBuf data = dummyData(5, 0);
send(STREAM_A, data.copy(), 5);
verifyWrite(STREAM_A, data.slice(0, 3), 0);
verifyWrite(STREAM_A, data.slice(3, 2), 1);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 3);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 1);
}
@Test
public void emptyFrameShouldBeSentImmediately() throws Http2Exception {
send(STREAM_A, Unpooled.EMPTY_BUFFER, 0);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 0);
}
@Test
public void frameShouldSplitForMaxFrameSize() throws Http2Exception {
when(frameWriter.maxFrameSize()).thenReturn(5);
@ -170,6 +220,24 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(1, data.refCnt());
}
@Test
public void windowUpdateShouldSendFrame() throws Http2Exception {
controller.initialOutboundWindowSize(10);
ByteBuf data = dummyData(10, 10);
send(STREAM_A, data.slice(0, 10), 10);
verifyWrite(STREAM_A, data.slice(0, 10), 0);
// Update the window and verify that the rest of the frame is written.
controller.updateOutboundWindowSize(STREAM_A, 10);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 10);
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(10, window(STREAM_B));
assertEquals(10, window(STREAM_C));
assertEquals(10, window(STREAM_D));
}
@Test
public void initialWindowUpdateShouldSendFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
@ -187,6 +255,26 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(1, data.refCnt());
}
@Test
public void initialWindowUpdateShouldSendEmptyFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
// First send a frame that will get buffered.
ByteBuf data = dummyData(10, 0);
send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A);
// Now send an empty frame on the same stream and verify that it's also buffered.
send(STREAM_A, Unpooled.EMPTY_BUFFER, 0);
verifyNoWrite(STREAM_A);
// Re-expand the window and verify that both frames were sent.
controller.initialOutboundWindowSize(10);
verifyWrite(STREAM_A, data.slice(), 0);
verifyWrite(STREAM_A, Unpooled.EMPTY_BUFFER, 0);
}
@Test
public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0);
@ -209,7 +297,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void connectionWindowUpdateShouldSendFrame() throws Http2Exception {
// Set the connection window size to zero.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
ByteBuf data = dummyData(10, 0);
send(STREAM_A, data.slice(), 0);
@ -217,6 +305,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
@ -227,7 +321,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void connectionWindowUpdateShouldSendPartialFrame() throws Http2Exception {
// Set the connection window size to zero.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
ByteBuf data = dummyData(10, 0);
send(STREAM_A, data, 0);
@ -235,6 +329,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that a partial frame of 5 was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 5);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
@ -247,7 +347,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void streamWindowUpdateShouldSendFrame() throws Http2Exception {
// Set the stream window size to zero.
controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(STREAM_A);
ByteBuf data = dummyData(10, 0);
send(STREAM_A, data.slice(), 0);
@ -255,6 +355,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(STREAM_A, 10);
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
@ -265,7 +371,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void streamWindowUpdateShouldSendPartialFrame() throws Http2Exception {
// Set the stream window size to zero.
controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(STREAM_A);
ByteBuf data = dummyData(10, 0);
send(STREAM_A, data, 0);
@ -273,6 +379,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that a partial frame of 5 was sent.
controller.updateOutboundWindowSize(STREAM_A, 5);
assertEquals(DEFAULT_WINDOW_SIZE - data.readableBytes(), window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue();
@ -293,7 +405,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void multipleStreamsShouldSplitPadding() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Try sending 10 bytes on each stream. They will be pending until we free up the
// connection.
@ -304,6 +416,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Open up the connection window.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that 5 bytes from A were written: 3 from data and 2 from padding.
@ -328,11 +446,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
*/
@Test
public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream A
controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(STREAM_A);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Try sending 10 bytes on each stream. They will be pending until we free up the
// connection.
@ -347,6 +465,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B));
assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that no write was done for A, since it's blocked.
@ -379,11 +502,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
*/
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(STREAM_B);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Send 10 bytes to each.
send(STREAM_A, dummyData(10, 0), 0);
@ -397,6 +520,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
assertEquals(0, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes.
@ -421,11 +550,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
*/
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(STREAM_B);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Only send 5 to A so that it will allow data from its children.
send(STREAM_A, dummyData(5, 0), 0);
@ -439,6 +568,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(0, window(STREAM_B));
assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that no write was done for B, since it's blocked.
@ -482,11 +616,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
*/
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(STREAM_B);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Send 10 bytes to each.
send(STREAM_A, dummyData(10, 0), 0);
@ -503,6 +637,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(0, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D));
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes.
@ -527,7 +667,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Root the streams at the connection and assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
@ -575,6 +715,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertTrue(aWritten < cWritten);
assertEquals(cWritten, dWritten);
assertTrue(cWritten < bWritten);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - aWritten, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - bWritten, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE - cWritten, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - dWritten, window(STREAM_D));
}
/**
@ -590,7 +736,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void samePriorityShouldWriteEqualData() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Root the streams at the connection with the same weights.
setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
@ -614,6 +760,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D));
captureWrite(STREAM_A, captor, 0, false);
int aWritten = captor.getValue().readableBytes();
@ -644,7 +795,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void subTreeBytesShouldBeCorrect() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
@ -709,7 +860,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void subTreeBytesShouldBeCorrectWithRestructure() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
@ -775,7 +926,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void subTreeBytesShouldBeCorrectWithAddition() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
@ -845,7 +996,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void subTreeBytesShouldBeCorrectWithRemoval() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
@ -923,6 +1074,45 @@ public class DefaultHttp2OutboundFlowControllerTest {
connection.stream(stream).setPriority(parent, (short) weight, exclusive);
}
private void exhaustStreamWindow(int streamId) throws Http2Exception {
int dataLength = window(streamId);
if (streamId == CONNECTION_STREAM_ID) {
// Find a stream that we can use to shrink the connection window.
int streamToWrite = 0;
for (Http2Stream stream : connection.activeStreams()) {
if (stream.outboundFlow().window() >= dataLength) {
streamToWrite = stream.id();
break;
}
}
// Write to STREAM_A to decrease the connection window and then restore STREAM_A's window.
int prevWindow = window(streamToWrite);
send(streamToWrite, dummyData(dataLength, 0), 0);
int delta = prevWindow - window(streamToWrite);
controller.updateOutboundWindowSize(streamToWrite, delta);
} else {
// Write to the stream and then restore the connection window.
int prevWindow = window(CONNECTION_STREAM_ID);
send(streamId, dummyData(dataLength, 0), 0);
int delta = prevWindow - window(CONNECTION_STREAM_ID);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, delta);
}
// Reset the frameWriter so that this write doesn't interfere with other tests.
resetFrameWriter();
}
private void resetFrameWriter() {
Mockito.reset(frameWriter);
when(frameWriter.maxFrameSize()).thenReturn(Integer.MAX_VALUE);
}
private int window(int streamId) {
return connection.stream(streamId).outboundFlow().window();
}
private static ByteBuf dummyData(int size, int padding) {
String repeatedData = "0123456789";
ByteBuf buffer = Unpooled.buffer(size + padding);

View File

@ -15,6 +15,18 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
import static io.netty.util.CharsetUtil.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -28,26 +40,30 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2TestUtil.*;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests the full HTTP/2 framing stack including the connection and preface handlers.
*/
public class Http2ConnectionRoundtripTest {
private static final int NUM_STREAMS = 1000;
private final byte[] DATA_TEXT = "hello world".getBytes(UTF_8);
@Mock
private Http2FrameListener clientListener;
@ -59,8 +75,8 @@ public class Http2ConnectionRoundtripTest {
private Bootstrap cb;
private Channel serverChannel;
private Channel clientChannel;
private static final int NUM_STREAMS = 1000;
private final CountDownLatch requestLatch = new CountDownLatch(NUM_STREAMS * 3);
private CountDownLatch dataLatch = new CountDownLatch(NUM_STREAMS * DATA_TEXT.length);
@Before
public void setup() throws Exception {
@ -107,6 +123,58 @@ public class Http2ConnectionRoundtripTest {
cb.group().shutdownGracefully();
}
@Test
public void flowControlProperlyChunksLargeMessage() throws Exception {
final Http2Headers headers =
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
// Create a large message to send.
int length = 10485760; // 10MB
// Create a buffer filled with random bytes.
byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
final ByteBuf data = Unpooled.wrappedBuffer(bytes);
// Prepare a receive buffer and populate it as DATA frames are received by the server.
final ByteBuf receivedData = Unpooled.buffer(length);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ByteBuf buf = (ByteBuf) invocation.getArguments()[2];
receivedData.writeBytes(buf);
return null;
}
}).when(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3),
any(ByteBuf.class), eq(0), anyBoolean());
// Initialize the data latch based on the number of bytes expected.
dataLatch = new CountDownLatch(length);
// Create the stream and send all of the data at once.
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
http2Client.writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
http2Client.writeData(ctx(), 3, data.copy(), 0, true, newPromise());
}
});
// Wait for all DATA frames to be received at the server.
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Verify that headers were received and only one DATA frame was received with endStream set.
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers),
eq(0), eq((short) 16), eq(false), eq(0), eq(false));
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3),
any(ByteBuf.class), eq(0), eq(true));
// Verify we received all the bytes.
assertEquals(data, receivedData);
}
@Test
public void stressTest() throws Exception {
final Http2Headers headers =
@ -128,7 +196,7 @@ public class Http2ConnectionRoundtripTest {
}
});
// Wait for all frames to be received.
awaitRequests();
assertTrue(requestLatch.await(5, SECONDS));
verify(serverListener, times(NUM_STREAMS)).onHeadersRead(any(ChannelHandlerContext.class),
anyInt(), eq(headers), eq(0), eq((short) 16), eq(false), eq(0), eq(false));
verify(serverListener, times(NUM_STREAMS)).onPingRead(any(ChannelHandlerContext.class),
@ -137,10 +205,6 @@ public class Http2ConnectionRoundtripTest {
anyInt(), eq(Unpooled.copiedBuffer(text.getBytes())), eq(0), eq(true));
}
private void awaitRequests() throws Exception {
requestLatch.await(5, SECONDS);
}
private ChannelHandlerContext ctx() {
return clientChannel.pipeline().firstContext();
}
@ -161,6 +225,9 @@ public class Http2ConnectionRoundtripTest {
throws Http2Exception {
serverListener.onDataRead(ctx, streamId, copy(data), padding, endOfStream);
requestLatch.countDown();
for (int i = 0; i < data.readableBytes(); ++i) {
dataLatch.countDown();
}
}
@Override

View File

@ -498,9 +498,16 @@ public class IntObjectHashMap<V> implements IntObjectMap<V>, Iterable<IntObjectM
V value = values[i];
if (value != null) {
sb.append(sb.length() == 0 ? "{" : ", ");
sb.append(keys[i]).append('=').append(value == this ? "(this Map)" : value);
sb.append(keyToString(keys[i])).append('=').append(value == this ? "(this Map)" : value);
}
}
return sb.append('}').toString();
}
/**
* Helper method called by {@link #toString()} in order to convert a single map key into a string.
*/
protected String keyToString(int key) {
return Integer.toString(key);
}
}