Adding padding to HTTP/2 flow control.

Motivation:

We're currently out-of-spec with HTTP/2 in that we don't include padding
in the flow control logic.

Modifications:

Modified both DefaultHttp2InboundFlowController and
DefaultHttp2OutboundFlowController to properly take padding into
account. Outbound is more complicated since padding has to be properly
accounted for when splitting frames.

Result:

HTTP/2 codec properly flow controls padding.
This commit is contained in:
nmittler 2014-08-21 13:48:43 -07:00
parent d3538dee2e
commit 3ffd205f57
5 changed files with 211 additions and 135 deletions

View File

@ -68,7 +68,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
public void applyInboundFlowControl(int streamId, ByteBuf data, int padding, public void applyInboundFlowControl(int streamId, ByteBuf data, int padding,
boolean endOfStream, FrameWriter frameWriter) boolean endOfStream, FrameWriter frameWriter)
throws Http2Exception { throws Http2Exception {
int dataLength = data.readableBytes(); int dataLength = data.readableBytes() + padding;
applyConnectionFlowControl(dataLength, frameWriter); applyConnectionFlowControl(dataLength, frameWriter);
applyStreamFlowControl(streamId, dataLength, endOfStream, frameWriter); applyStreamFlowControl(streamId, dataLength, endOfStream, frameWriter);
} }

View File

@ -28,10 +28,14 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import static io.netty.handler.codec.http2.Http2CodecUtil.*; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2Error.*; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Exception.*; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static java.lang.Math.*; import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static java.lang.Math.max;
import static java.lang.Math.min;
/** /**
* Basic implementation of {@link Http2OutboundFlowController}. * Basic implementation of {@link Http2OutboundFlowController}.
@ -178,7 +182,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
int window = state.writableWindow(); int window = state.writableWindow();
OutboundFlowState.Frame frame = state.newFrame(ctx, promise, data, padding, endStream); OutboundFlowState.Frame frame = state.newFrame(ctx, promise, data, padding, endStream);
if (window >= data.readableBytes()) { if (window >= frame.size()) {
// Window size is large enough to send entire data frame // Window size is large enough to send entire data frame
frame.write(); frame.write();
ctx.flush(); ctx.flush();
@ -561,8 +565,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
promiseAggregator.add(promise); promiseAggregator.add(promise);
} }
/**
* Gets the total size (in bytes) of this frame including the data and padding.
*/
int size() { int size() {
return data.readableBytes(); return data.readableBytes() + padding;
} }
void enqueue() { void enqueue() {
@ -571,7 +578,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
pendingWriteQueue.offer(this); pendingWriteQueue.offer(this);
// Increment the number of pending bytes for this stream. // Increment the number of pending bytes for this stream.
incrementPendingBytes(data.readableBytes()); incrementPendingBytes(size());
} }
} }
@ -599,13 +606,13 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Using a do/while loop because if the buffer is empty we still need to call // Using a do/while loop because if the buffer is empty we still need to call
// the writer once to send the empty frame. // the writer once to send the empty frame.
do { do {
int bytesToWrite = data.readableBytes(); int bytesToWrite = size();
int frameBytes = Math.min(bytesToWrite, frameWriter.maxFrameSize()); int frameBytes = Math.min(bytesToWrite, frameWriter.maxFrameSize());
if (frameBytes == bytesToWrite) { if (frameBytes == bytesToWrite) {
// All the bytes fit into a single HTTP/2 frame, just send it all. // All the bytes fit into a single HTTP/2 frame, just send it all.
connectionState().incrementStreamWindow(-bytesToWrite); connectionState().incrementStreamWindow(-bytesToWrite);
incrementStreamWindow(-bytesToWrite); incrementStreamWindow(-bytesToWrite);
ByteBuf slice = data.readSlice(bytesToWrite); ByteBuf slice = data.readSlice(data.readableBytes());
frameWriter.writeData(ctx, stream.id(), slice, padding, endStream, promise); frameWriter.writeData(ctx, stream.id(), slice, padding, endStream, promise);
decrementPendingBytes(bytesToWrite); decrementPendingBytes(bytesToWrite);
return; return;
@ -622,26 +629,34 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* removed from this branch of the priority tree. * removed from this branch of the priority tree.
*/ */
void writeError(Http2Exception cause) { void writeError(Http2Exception cause) {
decrementPendingBytes(data.readableBytes()); decrementPendingBytes(size());
data.release(); data.release();
promise.setFailure(cause); promise.setFailure(cause);
} }
/** /**
* Creates a new frame that is a view of this frame's data buffer starting at the current read index with * Creates a new frame that is a view of this frame's data. The {@code maxBytes} are
* the given number of bytes. The reader index on the input frame is then advanced by the number of bytes. * first split from the data buffer. If not all the requested bytes are available, the
* The returned frame will not have end-of-stream set and it will not be automatically placed in the pending * remaining bytes are then split from the padding (if available).
* queue.
* *
* @param maxBytes * @param maxBytes
* the maximum number of bytes that is allowed in the created frame. * the maximum number of bytes that is allowed in the created frame.
* @return the partial frame. * @return the partial frame.
*/ */
Frame split(int maxBytes) { Frame split(int maxBytes) {
// TODO: Should padding be included in the chunks or only the last frame? // TODO: Should padding be spread across chunks or only at the end?
maxBytes = min(maxBytes, data.readableBytes());
Frame frame = new Frame(ctx, promiseAggregator, data.readSlice(maxBytes).retain(), 0, false); // Get the portion of the data buffer to be split. Limit to the readable bytes.
decrementPendingBytes(maxBytes); int dataSplit = min(maxBytes, data.readableBytes());
// Split any remaining bytes from the padding.
int paddingSplit = min(maxBytes - dataSplit, padding);
ByteBuf splitSlice = data.readSlice(dataSplit).retain();
Frame frame = new Frame(ctx, promiseAggregator, splitSlice, paddingSplit, false);
int totalBytesSplit = dataSplit + paddingSplit;
decrementPendingBytes(totalBytesSplit);
return frame; return frame;
} }

View File

@ -24,6 +24,7 @@ public interface Http2InboundFlowController {
/** /**
* A writer of window update frames. * A writer of window update frames.
* TODO: Use Http2FrameWriter instead.
*/ */
interface FrameWriter { interface FrameWriter {

View File

@ -59,13 +59,14 @@ public class DefaultHttp2InboundFlowControllerTest {
@Test @Test
public void dataFrameShouldBeAccepted() throws Http2Exception { public void dataFrameShouldBeAccepted() throws Http2Exception {
applyFlowControl(10, false); applyFlowControl(10, 0, false);
verifyWindowUpdateNotSent(); verifyWindowUpdateNotSent();
} }
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
public void connectionFlowControlExceededShouldThrow() throws Http2Exception { public void connectionFlowControlExceededShouldThrow() throws Http2Exception {
applyFlowControl(DEFAULT_WINDOW_SIZE + 1, true); // Window exceeded because of the padding.
applyFlowControl(DEFAULT_WINDOW_SIZE, 1, true);
} }
@Test @Test
@ -75,7 +76,7 @@ public class DefaultHttp2InboundFlowControllerTest {
int windowDelta = DEFAULT_WINDOW_SIZE - newWindow; int windowDelta = DEFAULT_WINDOW_SIZE - newWindow;
// Set end-of-stream on the frame, so no window update will be sent for the stream. // Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(dataSize, true); applyFlowControl(dataSize, 0, true);
verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta)); verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta));
} }
@ -86,7 +87,7 @@ public class DefaultHttp2InboundFlowControllerTest {
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
// Don't set end-of-stream so we'll get a window update for the stream as well. // Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(dataSize, false); applyFlowControl(dataSize, 0, false);
verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta)); verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta));
verify(frameWriter).writeFrame(eq(STREAM_ID), eq(windowDelta)); verify(frameWriter).writeFrame(eq(STREAM_ID), eq(windowDelta));
} }
@ -95,7 +96,7 @@ public class DefaultHttp2InboundFlowControllerTest {
public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception { public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception {
// Send a frame that takes up the entire window. // Send a frame that takes up the entire window.
int initialWindowSize = DEFAULT_WINDOW_SIZE; int initialWindowSize = DEFAULT_WINDOW_SIZE;
applyFlowControl(initialWindowSize, false); applyFlowControl(initialWindowSize, 0, false);
// Update the initial window size to allow another frame. // Update the initial window size to allow another frame.
int newInitialWindowSize = 2 * initialWindowSize; int newInitialWindowSize = 2 * initialWindowSize;
@ -105,7 +106,7 @@ public class DefaultHttp2InboundFlowControllerTest {
reset(frameWriter); reset(frameWriter);
// Send the next frame and verify that the expected window updates were sent. // Send the next frame and verify that the expected window updates were sent.
applyFlowControl(initialWindowSize, false); applyFlowControl(initialWindowSize, 0, false);
int delta = newInitialWindowSize - initialWindowSize; int delta = newInitialWindowSize - initialWindowSize;
verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(delta)); verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(delta));
verify(frameWriter).writeFrame(eq(STREAM_ID), eq(delta)); verify(frameWriter).writeFrame(eq(STREAM_ID), eq(delta));
@ -116,9 +117,9 @@ public class DefaultHttp2InboundFlowControllerTest {
return initialSize - newWindowSize; return initialSize - newWindowSize;
} }
private void applyFlowControl(int dataSize, boolean endOfStream) throws Http2Exception { private void applyFlowControl(int dataSize, int padding, boolean endOfStream) throws Http2Exception {
ByteBuf buf = dummyData(dataSize); ByteBuf buf = dummyData(dataSize);
controller.applyInboundFlowControl(STREAM_ID, buf, 0, endOfStream, frameWriter); controller.applyInboundFlowControl(STREAM_ID, buf, padding, endOfStream, frameWriter);
buf.release(); buf.release();
} }

View File

@ -91,91 +91,119 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test @Test
public void frameShouldBeSentImmediately() throws Http2Exception { public void frameShouldBeSentImmediately() throws Http2Exception {
ByteBuf data = dummyData(10); ByteBuf data = dummyData(5, 5);
send(STREAM_A, data.slice()); send(STREAM_A, data.slice(0, 5), 5);
verifyWrite(STREAM_A, data); verifyWrite(STREAM_A, data.slice(0, 5), 5);
assertEquals(1, data.refCnt()); assertEquals(1, data.refCnt());
data.release();
} }
@Test @Test
public void frameShouldSplitForMaxFrameSize() throws Http2Exception { public void frameShouldSplitForMaxFrameSize() throws Http2Exception {
when(frameWriter.maxFrameSize()).thenReturn(5); when(frameWriter.maxFrameSize()).thenReturn(5);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 0);
ByteBuf slice1 = data.slice(data.readerIndex(), 5); ByteBuf slice1 = data.slice(0, 5);
ByteBuf slice2 = data.slice(5, 5); ByteBuf slice2 = data.slice(5, 5);
send(STREAM_A, data.slice()); send(STREAM_A, data.slice(), 0);
verifyWrite(STREAM_A, slice1); verifyWrite(STREAM_A, slice1, 0);
verifyWrite(STREAM_A, slice2); verifyWrite(STREAM_A, slice2, 0);
assertEquals(2, data.refCnt()); assertEquals(2, data.refCnt());
data.release(2);
} }
@Test @Test
public void stalledStreamShouldQueueFrame() throws Http2Exception { public void stalledStreamShouldQueueFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0); controller.initialOutboundWindowSize(0);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 5);
send(STREAM_A, data); send(STREAM_A, data.slice(0, 10), 5);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
assertEquals(1, data.refCnt()); assertEquals(1, data.refCnt());
data.release();
} }
@Test @Test
public void nonZeroWindowShouldSendPartialFrame() throws Http2Exception { public void frameShouldSplit() throws Http2Exception {
controller.initialOutboundWindowSize(5); controller.initialOutboundWindowSize(5);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(5, 5);
send(STREAM_A, data); send(STREAM_A, data.slice(0, 5), 5);
// Verify that a partial frame of 5 was sent. // Verify that a partial frame of 5 was sent.
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, false); // None of the padding should be sent in the frame.
captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue(); ByteBuf writtenBuf = argument.getValue();
assertEquals(5, writtenBuf.readableBytes()); assertEquals(5, writtenBuf.readableBytes());
assertEquals(data.slice(0, 5), writtenBuf); assertEquals(data.slice(0, 5), writtenBuf);
assertEquals(2, writtenBuf.refCnt()); assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt()); assertEquals(2, data.refCnt());
data.release(2); }
@Test
public void frameShouldSplitPadding() throws Http2Exception {
controller.initialOutboundWindowSize(5);
ByteBuf data = dummyData(3, 7);
send(STREAM_A, data.slice(0, 3), 7);
// Verify that a partial frame of 5 was sent.
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 2, false);
ByteBuf writtenBuf = argument.getValue();
assertEquals(3, writtenBuf.readableBytes());
assertEquals(data.slice(0, 3), writtenBuf);
assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt());
}
@Test
public void emptyFrameShouldSplitPadding() throws Http2Exception {
controller.initialOutboundWindowSize(5);
ByteBuf data = dummyData(0, 10);
send(STREAM_A, data.slice(0, 0), 10);
// Verify that a partial frame of 5 was sent.
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 5, false);
ByteBuf writtenBuf = argument.getValue();
assertEquals(0, writtenBuf.readableBytes());
assertEquals(1, writtenBuf.refCnt());
assertEquals(1, data.refCnt());
} }
@Test @Test
public void initialWindowUpdateShouldSendFrame() throws Http2Exception { public void initialWindowUpdateShouldSendFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0); controller.initialOutboundWindowSize(0);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 0);
send(STREAM_A, data.slice()); send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
// Verify that the entire frame was sent. // Verify that the entire frame was sent.
controller.initialOutboundWindowSize(10); controller.initialOutboundWindowSize(10);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, false); captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue(); ByteBuf writtenBuf = argument.getValue();
assertEquals(data, writtenBuf); assertEquals(data, writtenBuf);
assertEquals(1, data.refCnt()); assertEquals(1, data.refCnt());
data.release();
} }
@Test @Test
public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception { public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception {
controller.initialOutboundWindowSize(0); controller.initialOutboundWindowSize(0);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 0);
send(STREAM_A, data); send(STREAM_A, data, 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
// Verify that a partial frame of 5 was sent. // Verify that a partial frame of 5 was sent.
controller.initialOutboundWindowSize(5); controller.initialOutboundWindowSize(5);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, false); captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue(); ByteBuf writtenBuf = argument.getValue();
assertEquals(5, writtenBuf.readableBytes()); assertEquals(5, writtenBuf.readableBytes());
assertEquals(data.slice(0, 5), writtenBuf); assertEquals(data.slice(0, 5), writtenBuf);
assertEquals(2, writtenBuf.refCnt()); assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt()); assertEquals(2, data.refCnt());
data.release(2);
} }
@Test @Test
@ -183,18 +211,17 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the connection window size to zero. // Set the connection window size to zero.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 0);
send(STREAM_A, data.slice()); send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
// Verify that the entire frame was sent. // Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, false); captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue(); ByteBuf writtenBuf = argument.getValue();
assertEquals(data, writtenBuf); assertEquals(data, writtenBuf);
assertEquals(1, data.refCnt()); assertEquals(1, data.refCnt());
data.release();
} }
@Test @Test
@ -202,20 +229,19 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the connection window size to zero. // Set the connection window size to zero.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 0);
send(STREAM_A, data); send(STREAM_A, data, 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
// Verify that a partial frame of 5 was sent. // Verify that a partial frame of 5 was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 5); controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 5);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, false); captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue(); ByteBuf writtenBuf = argument.getValue();
assertEquals(5, writtenBuf.readableBytes()); assertEquals(5, writtenBuf.readableBytes());
assertEquals(data.slice(0, 5), writtenBuf); assertEquals(data.slice(0, 5), writtenBuf);
assertEquals(2, writtenBuf.refCnt()); assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt()); assertEquals(2, data.refCnt());
data.release(2);
} }
@Test @Test
@ -223,18 +249,17 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the stream window size to zero. // Set the stream window size to zero.
controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE); controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 0);
send(STREAM_A, data.slice()); send(STREAM_A, data.slice(), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
// Verify that the entire frame was sent. // Verify that the entire frame was sent.
controller.updateOutboundWindowSize(STREAM_A, 10); controller.updateOutboundWindowSize(STREAM_A, 10);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, false); captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue(); ByteBuf writtenBuf = argument.getValue();
assertEquals(data, writtenBuf); assertEquals(data, writtenBuf);
assertEquals(1, data.refCnt()); assertEquals(1, data.refCnt());
data.release();
} }
@Test @Test
@ -242,19 +267,51 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Set the stream window size to zero. // Set the stream window size to zero.
controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE); controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE);
ByteBuf data = dummyData(10); ByteBuf data = dummyData(10, 0);
send(STREAM_A, data); send(STREAM_A, data, 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
// Verify that a partial frame of 5 was sent. // Verify that a partial frame of 5 was sent.
controller.updateOutboundWindowSize(STREAM_A, 5); controller.updateOutboundWindowSize(STREAM_A, 5);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, false); captureWrite(STREAM_A, argument, 0, false);
ByteBuf writtenBuf = argument.getValue(); ByteBuf writtenBuf = argument.getValue();
assertEquals(5, writtenBuf.readableBytes()); assertEquals(5, writtenBuf.readableBytes());
assertEquals(2, writtenBuf.refCnt()); assertEquals(2, writtenBuf.refCnt());
assertEquals(2, data.refCnt()); assertEquals(2, data.refCnt());
data.release(2); }
/**
* In this test, we give stream A padding and verify that it's padding is properly split.
*
* <pre>
* 0
* / \
* A B
* </pre>
*/
@Test
public void multipleStreamsShouldSplitPadding() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Try sending 10 bytes on each stream. They will be pending until we free up the
// connection.
send(STREAM_A, dummyData(3, 0), 7);
send(STREAM_B, dummyData(10, 0), 0);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
// Open up the connection window.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that 5 bytes from A were written: 3 from data and 2 from padding.
captureWrite(STREAM_A, captor, 2, false);
assertEquals(3, captor.getValue().readableBytes());
captureWrite(STREAM_B, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes());
} }
/** /**
@ -279,10 +336,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Try sending 10 bytes on each stream. They will be pending until we free up the // Try sending 10 bytes on each stream. They will be pending until we free up the
// connection. // connection.
send(STREAM_A, dummyData(10)); send(STREAM_A, dummyData(10, 0), 0);
send(STREAM_B, dummyData(10)); send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10)); send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10)); send(STREAM_D, dummyData(10, 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -295,14 +352,14 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that no write was done for A, since it's blocked. // Verify that no write was done for A, since it's blocked.
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
captureWrite(STREAM_B, captor, false); captureWrite(STREAM_B, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes()); assertEquals(5, captor.getValue().readableBytes());
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot // Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2. // be split evenly, one will get 3 and one will get 2.
captureWrite(STREAM_C, captor, false); captureWrite(STREAM_C, captor, 0, false);
int c = captor.getValue().readableBytes(); int c = captor.getValue().readableBytes();
captureWrite(STREAM_D, captor, false); captureWrite(STREAM_D, captor, 0, false);
int d = captor.getValue().readableBytes(); int d = captor.getValue().readableBytes();
assertEquals(5, c + d); assertEquals(5, c + d);
assertEquals(1, Math.abs(c - d)); assertEquals(1, Math.abs(c - d));
@ -329,10 +386,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE); controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
// Send 10 bytes to each. // Send 10 bytes to each.
send(STREAM_A, dummyData(10)); send(STREAM_A, dummyData(10, 0), 0);
send(STREAM_B, dummyData(10)); send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10)); send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10)); send(STREAM_D, dummyData(10, 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -343,7 +400,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes. // Verify that A received all the bytes.
captureWrite(STREAM_A, captor, false); captureWrite(STREAM_A, captor, 0, false);
assertEquals(10, captor.getValue().readableBytes()); assertEquals(10, captor.getValue().readableBytes());
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -371,10 +428,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE); controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
// Only send 5 to A so that it will allow data from its children. // Only send 5 to A so that it will allow data from its children.
send(STREAM_A, dummyData(5)); send(STREAM_A, dummyData(5, 0), 0);
send(STREAM_B, dummyData(10)); send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10)); send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10)); send(STREAM_D, dummyData(10, 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -387,14 +444,14 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that no write was done for B, since it's blocked. // Verify that no write was done for B, since it's blocked.
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
captureWrite(STREAM_A, captor, false); captureWrite(STREAM_A, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes()); assertEquals(5, captor.getValue().readableBytes());
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot // Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2. // be split evenly, one will get 3 and one will get 2.
captureWrite(STREAM_C, captor, false); captureWrite(STREAM_C, captor, 0, false);
int c = captor.getValue().readableBytes(); int c = captor.getValue().readableBytes();
captureWrite(STREAM_D, captor, false); captureWrite(STREAM_D, captor, 0, false);
int d = captor.getValue().readableBytes(); int d = captor.getValue().readableBytes();
assertEquals(5, c + d); assertEquals(5, c + d);
assertEquals(1, Math.abs(c - d)); assertEquals(1, Math.abs(c - d));
@ -432,10 +489,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE); controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
// Send 10 bytes to each. // Send 10 bytes to each.
send(STREAM_A, dummyData(10)); send(STREAM_A, dummyData(10, 0), 0);
send(STREAM_B, dummyData(10)); send(STREAM_B, dummyData(10, 0), 0);
send(STREAM_C, dummyData(10)); send(STREAM_C, dummyData(10, 0), 0);
send(STREAM_D, dummyData(10)); send(STREAM_D, dummyData(10, 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -449,9 +506,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes. // Verify that A received all the bytes.
captureWrite(STREAM_A, captor, false); captureWrite(STREAM_A, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes()); assertEquals(5, captor.getValue().readableBytes());
captureWrite(STREAM_D, captor, false); captureWrite(STREAM_D, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes()); assertEquals(5, captor.getValue().readableBytes());
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -479,10 +536,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
setPriority(STREAM_D, 0, (short) 100, false); setPriority(STREAM_D, 0, (short) 100, false);
// Send a bunch of data on each stream. // Send a bunch of data on each stream.
send(STREAM_A, dummyData(1000)); send(STREAM_A, dummyData(1000, 0), 0);
send(STREAM_B, dummyData(1000)); send(STREAM_B, dummyData(1000, 0), 0);
send(STREAM_C, dummyData(1000)); send(STREAM_C, dummyData(1000, 0), 0);
send(STREAM_D, dummyData(1000)); send(STREAM_D, dummyData(1000, 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -492,22 +549,22 @@ public class DefaultHttp2OutboundFlowControllerTest {
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 1000); controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 1000);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, captor, false); captureWrite(STREAM_A, captor, 0, false);
int aWritten = captor.getValue().readableBytes(); int aWritten = captor.getValue().readableBytes();
int min = aWritten; int min = aWritten;
int max = aWritten; int max = aWritten;
captureWrite(STREAM_B, captor, false); captureWrite(STREAM_B, captor, 0, false);
int bWritten = captor.getValue().readableBytes(); int bWritten = captor.getValue().readableBytes();
min = Math.min(min, bWritten); min = Math.min(min, bWritten);
max = Math.max(max, bWritten); max = Math.max(max, bWritten);
captureWrite(STREAM_C, captor, false); captureWrite(STREAM_C, captor, 0, false);
int cWritten = captor.getValue().readableBytes(); int cWritten = captor.getValue().readableBytes();
min = Math.min(min, cWritten); min = Math.min(min, cWritten);
max = Math.max(max, cWritten); max = Math.max(max, cWritten);
captureWrite(STREAM_D, captor, false); captureWrite(STREAM_D, captor, 0, false);
int dWritten = captor.getValue().readableBytes(); int dWritten = captor.getValue().readableBytes();
min = Math.min(min, dWritten); min = Math.min(min, dWritten);
max = Math.max(max, dWritten); max = Math.max(max, dWritten);
@ -542,29 +599,29 @@ public class DefaultHttp2OutboundFlowControllerTest {
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false); setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Send a bunch of data on each stream. // Send a bunch of data on each stream.
send(STREAM_A, dummyData(400)); send(STREAM_A, dummyData(400, 0), 0);
send(STREAM_B, dummyData(500)); send(STREAM_B, dummyData(500, 0), 0);
send(STREAM_C, dummyData(0)); send(STREAM_C, dummyData(0, 0), 0);
send(STREAM_D, dummyData(700)); send(STREAM_D, dummyData(700, 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_D); verifyNoWrite(STREAM_D);
// The write will occur on C, because it's an empty frame. // The write will occur on C, because it's an empty frame.
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class); ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_C, captor, false); captureWrite(STREAM_C, captor, 0, false);
assertEquals(0, captor.getValue().readableBytes()); assertEquals(0, captor.getValue().readableBytes());
// Allow 1000 bytes to be sent. // Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999); controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999);
captureWrite(STREAM_A, captor, false); captureWrite(STREAM_A, captor, 0, false);
int aWritten = captor.getValue().readableBytes(); int aWritten = captor.getValue().readableBytes();
captureWrite(STREAM_B, captor, false); captureWrite(STREAM_B, captor, 0, false);
int bWritten = captor.getValue().readableBytes(); int bWritten = captor.getValue().readableBytes();
captureWrite(STREAM_D, captor, false); captureWrite(STREAM_D, captor, 0, false);
int dWritten = captor.getValue().readableBytes(); int dWritten = captor.getValue().readableBytes();
assertEquals(999, aWritten + bWritten + dWritten); assertEquals(999, aWritten + bWritten + dWritten);
@ -601,10 +658,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamSizes.put(STREAM_B, 500); streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600); streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700); streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A))); send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B))); send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C))); send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D))); send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -666,10 +723,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamSizes.put(STREAM_B, 500); streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600); streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700); streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A))); send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B))); send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C))); send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D))); send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -736,11 +793,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamSizes.put(STREAM_C, 600); streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700); streamSizes.put(STREAM_D, 700);
streamSizes.put(STREAM_E, 900); streamSizes.put(STREAM_E, 900);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A))); send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B))); send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C))); send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D))); send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
send(STREAM_E, dummyData(streamSizes.get(STREAM_E))); send(STREAM_E, dummyData(streamSizes.get(STREAM_E), 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -802,10 +859,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamSizes.put(STREAM_B, 500); streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600); streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700); streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A))); send(STREAM_A, dummyData(streamSizes.get(STREAM_A), 0), 0);
send(STREAM_B, dummyData(streamSizes.get(STREAM_B))); send(STREAM_B, dummyData(streamSizes.get(STREAM_B), 0), 0);
send(STREAM_C, dummyData(streamSizes.get(STREAM_C))); send(STREAM_C, dummyData(streamSizes.get(STREAM_C), 0), 0);
send(STREAM_D, dummyData(streamSizes.get(STREAM_D))); send(STREAM_D, dummyData(streamSizes.get(STREAM_D), 0), 0);
verifyNoWrite(STREAM_A); verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C); verifyNoWrite(STREAM_C);
@ -844,12 +901,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
return sum; return sum;
} }
private void send(int streamId, ByteBuf data) throws Http2Exception { private void send(int streamId, ByteBuf data, int padding) throws Http2Exception {
controller.writeData(ctx, streamId, data, 0, false, promise); controller.writeData(ctx, streamId, data, padding, false, promise);
} }
private void verifyWrite(int streamId, ByteBuf data) { private void verifyWrite(int streamId, ByteBuf data, int padding) {
verify(frameWriter).writeData(eq(ctx), eq(streamId), eq(data), eq(0), eq(false), eq(promise)); verify(frameWriter).writeData(eq(ctx), eq(streamId), eq(data), eq(padding), eq(false), eq(promise));
} }
private void verifyNoWrite(int streamId) { private void verifyNoWrite(int streamId) {
@ -857,20 +914,22 @@ public class DefaultHttp2OutboundFlowControllerTest {
eq(promise)); eq(promise));
} }
private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, boolean endStream) { private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, int padding,
verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(0), eq(endStream), eq(promise)); boolean endStream) {
verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(padding), eq(endStream), eq(promise));
} }
private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception { private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception {
connection.stream(stream).setPriority(parent, (short) weight, exclusive); connection.stream(stream).setPriority(parent, (short) weight, exclusive);
} }
private static ByteBuf dummyData(int size) { private static ByteBuf dummyData(int size, int padding) {
String repeatedData = "0123456789"; String repeatedData = "0123456789";
ByteBuf buffer = Unpooled.buffer(size); ByteBuf buffer = Unpooled.buffer(size + padding);
for (int index = 0; index < size; ++index) { for (int index = 0; index < size; ++index) {
buffer.writeByte(repeatedData.charAt(index % repeatedData.length())); buffer.writeByte(repeatedData.charAt(index % repeatedData.length()));
} }
buffer.writeZero(padding);
return buffer; return buffer;
} }
} }