Make the flow-controllers write fewer, fatter frames to improve throughput.

Motivation:

Coalescing many small writes into a larger DATA frame reduces framing overheads on the wire and reduces the number of calls to Http2FrameListeners on the remote side.
Delaying the write of WINDOW_UPDATE until flush allows for more consumed bytes to be returned as the aggregate of consumed bytes is returned and not the amount consumed when the threshold was crossed.

Modifications:
- Remote flow controller no longer immediately writes bytes when a flow-controlled payload is enqueued. Sequential data payloads are now merged into a single CompositeByteBuf which are written when 'writePendingBytes' is called.
- Listener added to remote flow-controller which observes written bytes per stream.
- Local flow-controller no longer immediately writes WINDOW_UPDATE when the ratio threshold is crossed. Now an explicit call to 'writeWindowUpdates' triggers the WINDOW_UPDATE for all streams who's ratio is exceeded at that time. This results in
  fewer window updates being sent and more bytes being returned.
- Http2ConnectionHandler.flush triggers 'writeWindowUpdates' on the local flow-controller followed by 'writePendingBytes' on the remote flow-controller so WINDOW_UPDATES preceed DATA frames on the wire.

Result:
- Better throughput for writing many small DATA chunks followed by a flush, saving 9-bytes per coalesced frame.
- Fewer WINDOW_UPDATES being written and more flow-control bytes returned to remote side more quickly, thereby improving throughput.
This commit is contained in:
Louis Ryan 2015-06-04 11:55:18 -07:00 committed by nmittler
parent 1ecc37fbb2
commit 05ce33f5ca
11 changed files with 483 additions and 97 deletions

View File

@ -19,6 +19,8 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.SlicedByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -130,7 +132,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
// Hand control of the frame to the flow controller.
flowController().sendFlowControlled(ctx, stream,
flowController().addFlowControlled(ctx, stream,
new FlowControlledData(ctx, stream, data, padding, endOfStream, promise));
return promise;
}
@ -166,7 +168,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
flowController().sendFlowControlled(ctx, stream,
flowController().addFlowControlled(ctx, stream,
new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise));
return promise;
@ -318,10 +320,10 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
private ByteBuf data;
private int size;
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf buf, int padding,
boolean endOfStream, ChannelPromise promise) {
super(ctx, stream, padding, endOfStream, promise);
this.data = data;
this.data = buf;
size = data.readableBytes() + padding;
}
@ -367,7 +369,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
padding -= writeablePadding;
bytesWritten += writeableData + writeablePadding;
ChannelPromise writePromise;
if (size == bytesWritten) {
if (size == bytesWritten && !promise.isVoid()) {
// Can use the original promise if it's the last write
writePromise = promise;
} else {
@ -375,13 +377,67 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
writePromise = ctx.newPromise();
writePromise.addListener(this);
}
if (toWrite instanceof SlicedByteBuf && data instanceof CompositeByteBuf) {
// If we're writing a subset of a composite buffer then we want to release
// any underlying buffers that have been consumed. CompositeByteBuf only releases
// underlying buffers on write if all of its data has been consumed and its refCnt becomes
// 0.
final CompositeByteBuf toFree = (CompositeByteBuf) data;
writePromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
toFree.discardReadComponents();
}
});
}
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
size == bytesWritten && endOfStream, writePromise);
size == bytesWritten && endOfStream, writePromise);
} while (size != bytesWritten && allowedBytes > bytesWritten);
} finally {
size -= bytesWritten;
}
}
@Override
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
if (FlowControlledData.class != next.getClass()) {
return false;
}
final FlowControlledData nextData = (FlowControlledData) next;
// Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
padding = Math.max(nextData.padding, padding);
endOfStream = nextData.endOfStream;
final CompositeByteBuf compositeByteBuf;
if (data instanceof CompositeByteBuf) {
compositeByteBuf = (CompositeByteBuf) data;
} else {
compositeByteBuf = ctx.alloc().compositeBuffer(Integer.MAX_VALUE);
compositeByteBuf.addComponent(data);
compositeByteBuf.writerIndex(data.readableBytes());
data = compositeByteBuf;
}
compositeByteBuf.addComponent(nextData.data);
compositeByteBuf.writerIndex(compositeByteBuf.writerIndex() + nextData.data.readableBytes());
size = data.readableBytes() + padding;
if (!nextData.promise.isVoid()) {
// Replace current promise if void otherwise chain them.
if (promise.isVoid()) {
promise = nextData.promise;
} else {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
nextData.promise.trySuccess();
} else {
nextData.promise.tryFailure(future.cause());
}
}
});
}
}
return true;
}
}
/**
@ -419,9 +475,18 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override
public void write(int allowedBytes) {
if (promise.isVoid()) {
promise = ctx.newPromise();
promise.addListener(this);
}
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
}
@Override
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
return false;
}
}
/**
@ -431,8 +496,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
ChannelFutureListener {
protected final ChannelHandlerContext ctx;
protected final Http2Stream stream;
protected final ChannelPromise promise;
protected final boolean endOfStream;
protected ChannelPromise promise;
protected boolean endOfStream;
protected int padding;
public FlowControlledBase(final ChannelHandlerContext ctx, final Http2Stream stream, int padding,
@ -445,8 +510,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
this.endOfStream = endOfStream;
this.stream = stream;
this.promise = promise;
// Ensure error() gets called in case something goes wrong after the frame is passed to Netty.
promise.addListener(this);
if (!promise.isVoid()) {
promise.addListener(this);
}
}
@Override

View File

@ -38,7 +38,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) {
state(stream).writeAllocatedBytes();
int written = state(stream).writeAllocatedBytes();
if (written != -1 && listener != null) {
listener.streamWritten(stream, written);
}
return true;
}
};
@ -46,6 +49,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private final Http2Connection.PropertyKey stateKey;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx;
private Listener listener;
public DefaultHttp2RemoteFlowController(Http2Connection connection) {
this.connection = checkNotNull(connection, "connection");
@ -175,20 +179,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
// This call does not trigger any writes, all writes will occur when writePendingBytes is called.
if (stream.id() == CONNECTION_STREAM_ID) {
// Update the connection window and write any pending frames for all streams.
// Update the connection window
connectionState().incrementStreamWindow(delta);
writePendingBytes();
} else {
// Update the stream window and write any pending frames for the stream.
// Update the stream window
AbstractState state = state(stream);
state.incrementStreamWindow(delta);
state.writeBytes(state.writableWindow());
}
}
@Override
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) {
public void listener(Listener listener) {
this.listener = listener;
}
@Override
public Listener listener() {
return this.listener;
}
@Override
public void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) {
checkNotNull(ctx, "ctx");
checkNotNull(frame, "frame");
if (this.ctx != null && this.ctx != ctx) {
@ -202,9 +215,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
state.enqueueFrame(frame);
} catch (Throwable t) {
frame.error(t);
return;
}
state.writeBytes(state.writableWindow());
}
/**
@ -233,17 +244,19 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/**
* Writes as many pending bytes as possible, according to stream priority.
*/
private void writePendingBytes() throws Http2Exception {
@Override
public void writePendingBytes() throws Http2Exception {
Http2Stream connectionStream = connection.connectionStream();
int connectionWindowSize = state(connectionStream).windowSize();
if (connectionWindowSize > 0) {
// Allocate the bytes for the connection window to the streams, but do not write.
allocateBytesForTree(connectionStream, connectionWindowSize);
// Now write all of the allocated bytes.
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
}
// Now write all of the allocated bytes, must write as there may be empty frames with
// EOS = true
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
}
/**
@ -469,7 +482,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
void writeAllocatedBytes() {
int writeAllocatedBytes() {
int numBytes = allocated;
// Restore the number of streamable bytes to this branch.
@ -477,7 +490,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
resetAllocated();
// Perform the write.
writeBytes(numBytes);
return writeBytes(numBytes);
}
/**
@ -522,7 +535,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
void enqueueFrame(FlowControlled frame) {
incrementPendingBytes(frame.size());
pendingWriteQueue.offer(frame);
FlowControlled last = pendingWriteQueue.peekLast();
if (last == null || !last.merge(frame)) {
pendingWriteQueue.offer(frame);
}
}
@Override
@ -564,23 +580,19 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
int writeBytes(int bytes) {
boolean wrote = false;
int bytesAttempted = 0;
while (hasFrame()) {
int maxBytes = min(bytes - bytesAttempted, writableWindow());
bytesAttempted += write(peek(), maxBytes);
if (bytes - bytesAttempted <= 0 && !isNextFrameEmpty()) {
// The frame had data and all of it was written.
break;
}
int writableBytes = min(bytes, writableWindow());
while (hasFrame() && (writableBytes > 0 || peek().size() == 0)) {
wrote = true;
bytesAttempted += write(peek(), writableBytes);
writableBytes = min(bytes - bytesAttempted, writableWindow());
}
if (wrote) {
return bytesAttempted;
} else {
return -1;
}
return bytesAttempted;
}
/**
* @return {@code true} if there is a next frame and its size is zero.
*/
private boolean isNextFrameEmpty() {
return hasFrame() && peek().size() == 0;
}
/**
@ -709,7 +721,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
void writeAllocatedBytes() {
int writeAllocatedBytes() {
throw new UnsupportedOperationException();
}
@ -789,9 +801,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
abstract int initialWindowSize();
/**
* Write bytes allocated bytes for this stream.
* Write the allocated bytes for this stream.
*
* @return the number of bytes written for a stream or {@code -1} if no write occurred.
*/
abstract void writeAllocatedBytes();
abstract int writeAllocatedBytes();
/**
* Returns the number of pending bytes for this node that will fit within the
@ -830,7 +844,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/**
* Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by
* the number of pending writes available, or because a frame does not support splitting on arbitrary
* boundaries.
* boundaries. Will return {@code -1} if there are no frames to write.
*/
abstract int writeBytes(int bytes);

View File

@ -167,6 +167,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Http2Exception {
// Trigger pending writes in the remote flow controller.
connection().remote().flowController().writePendingBytes();
try {
super.flush(ctx);
} catch (Throwable t) {
throw new Http2Exception(INTERNAL_ERROR, "Error flushing" , t);
}
}
private abstract class BaseDecoder {
public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@ -449,7 +460,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
// for flow-control the read may release window that causes data to be written that can now be flushed.
ctx.flush();
flush(ctx);
}
/**

View File

@ -23,18 +23,38 @@ import io.netty.channel.ChannelHandlerContext;
public interface Http2RemoteFlowController extends Http2FlowController {
/**
* Writes or queues a payload for transmission to the remote endpoint. There is no
* guarantee when the data will be written or whether it will be split into multiple frames
* Queues a payload for transmission to the remote endpoint. There is no guarantee as to when the data
* will be written or how it will be allocated to frames.
* before sending.
* <p>
* Manually flushing the {@link ChannelHandlerContext} is required for writes as the flow controller will
* <strong>not</strong> flush by itself.
* Writes do not actually occur until {@link #writePendingBytes()} is called.
*
* @param ctx the context from the handler.
* @param stream the subject stream. Must not be the connection stream object.
* @param payload payload to write subject to flow-control accounting and ordering rules.
*/
void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload);
void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload);
/**
* Write all data pending in the flow controller up to the flow-control limits.
*
* @throws Http2Exception throws if a protocol-related error occurred.
*/
void writePendingBytes() throws Http2Exception;
/**
* Set the active listener on the flow-controller.
*
* @param listener to notify when the a write occurs, can be {@code null}.
*/
void listener(Listener listener);
/**
* Get the current listener to flow-control events.
*
* @return the current listener or {@code null} if one is not set.
*/
Listener listener();
/**
* Implementations of this interface are used to progressively write chunks of the underlying
@ -84,5 +104,30 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* @param allowedBytes an upper bound on the number of bytes the payload can write at this time.
*/
void write(int allowedBytes);
/**
* Merge the contents of the {@code next} message into this message so they can be written out as one unit.
* This allows many small messages to be written as a single DATA frame.
*
* @return {@code true} if {@code next} was successfully merged and does not need to be enqueued,
* {@code false} otherwise.
*/
boolean merge(FlowControlled next);
}
/**
* Listener to the number of flow-controlled bytes written per stream.
*/
interface Listener {
/**
* Report the number of {@code writtenBytes} for a {@code stream}. Called after the
* flow-controller has flushed bytes for the given stream.
*
* @param stream that had bytes written.
* @param writtenBytes the number of bytes written for a stream, can be 0 in the case of an
* empty DATA frame.
*/
void streamWritten(Http2Stream stream, int writtenBytes);
}
}

View File

@ -88,6 +88,7 @@ public class DataCompressionHttp2Test {
private CountDownLatch clientSettingsAckLatch;
private Http2Connection serverConnection;
private Http2Connection clientConnection;
private Http2ConnectionHandler clientHandler;
private ByteArrayOutputStream serverOut;
@Before
@ -123,9 +124,9 @@ public class DataCompressionHttp2Test {
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.flush(ctxClient());
}
});
awaitServer();
@ -148,10 +149,10 @@ public class DataCompressionHttp2Test {
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.flush(ctxClient());
}
});
awaitServer();
@ -177,10 +178,10 @@ public class DataCompressionHttp2Test {
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.flush(ctxClient());
}
});
awaitServer();
@ -206,13 +207,13 @@ public class DataCompressionHttp2Test {
// our {@link Http2TestUtil$FrameAdapter} does.
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data1.retain(), 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data2.retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.flush(ctxClient());
}
});
awaitServer();
@ -242,10 +243,10 @@ public class DataCompressionHttp2Test {
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.flush(ctxClient());
}
});
awaitServer();
@ -330,8 +331,8 @@ public class DataCompressionHttp2Test {
new DefaultHttp2FrameReader(),
new DelegatingDecompressorFrameListener(clientConnection,
clientFrameCountDown));
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(decoder, clientEncoder);
p.addLast(connectionHandler);
clientHandler = new Http2ConnectionHandler(decoder, clientEncoder);
p.addLast(clientHandler);
}
});

View File

@ -55,6 +55,8 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayList;
@ -64,6 +66,7 @@ import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
@ -98,6 +101,9 @@ public class DefaultHttp2ConnectionEncoderTest {
private ChannelPromise promise;
@Mock
private ChannelPromise voidPromise;
@Mock
private ChannelFuture future;
@ -132,6 +138,10 @@ public class DefaultHttp2ConnectionEncoderTest {
MockitoAnnotations.initMocks(this);
promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
when(voidPromise.addListener(Matchers.<GenericFutureListener<Future<? super Void>>>any())).thenThrow(
new AssertionFailedError());
when(voidPromise.addListeners(Matchers.<GenericFutureListener<Future<? super Void>>>any())).thenThrow(
new AssertionFailedError());
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
@ -202,7 +212,7 @@ public class DefaultHttp2ConnectionEncoderTest {
}
});
payloadCaptor = ArgumentCaptor.forClass(Http2RemoteFlowController.FlowControlled.class);
doNothing().when(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
doNothing().when(remoteFlow).addFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
@ -225,6 +235,52 @@ public class DefaultHttp2ConnectionEncoderTest {
assertEquals(0, data.refCnt());
}
@Test
public void dataFramesShouldMerge() throws Exception {
final ByteBuf data = dummyData().retain();
DefaultChannelPromise secondPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
encoder.writeData(ctx, STREAM_ID, data, 0, true, secondPromise);
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
FlowControlled mergedPayload = capturedWrites.get(0);
mergedPayload.merge(capturedWrites.get(1));
assertEquals(16, mergedPayload.size());
assertFalse(secondPromise.isSuccess());
mergedPayload.write(16);
assertEquals(0, mergedPayload.size());
assertEquals("abcdefghabcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
// Second promise is notified after write of the merged payload completes
assertTrue(secondPromise.isSuccess());
}
@Test
public void dataFramesShouldMergeUseVoidPromise() throws Exception {
final ByteBuf data = dummyData().retain();
when(voidPromise.isVoid()).thenReturn(true);
encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise);
encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise);
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
FlowControlled mergedPayload = capturedWrites.get(0);
assertTrue(mergedPayload.merge(capturedWrites.get(1)));
assertEquals(16, mergedPayload.size());
mergedPayload.write(16);
assertEquals(0, mergedPayload.size());
assertEquals("abcdefghabcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
}
@Test
public void dataFramesDontMergeWithHeaders() throws Exception {
final ByteBuf data = dummyData().retain();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
assertFalse(capturedWrites.get(0).merge(capturedWrites.get(1)));
}
@Test
public void dataLargerThanMaxFrameSizeShouldBeSplit() throws Exception {
when(frameSizePolicy.maxFrameSize()).thenReturn(3);
@ -443,7 +499,7 @@ public class DefaultHttp2ConnectionEncoderTest {
mockSendFlowControlledWriteEverything();
ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data.retain(), 0, true, promise);
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(lifecycleManager).closeStreamLocal(stream, promise);
assertEquals(data.toString(UTF_8), writtenData.get(0));
data.release();
@ -520,7 +576,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(remote.lastStreamKnownByPeer()).thenReturn(0);
ByteBuf data = mock(ByteBuf.class);
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
}
@Test
@ -528,7 +584,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.goAwaySent()).thenReturn(true);
when(remote.lastStreamKnownByPeer()).thenReturn(0);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
}
@Test
@ -537,7 +593,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
ByteBuf data = mock(ByteBuf.class);
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
}
@Test
@ -545,7 +601,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.goAwayReceived()).thenReturn(true);
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
}
private void mockSendFlowControlledWriteEverything() {
@ -557,7 +613,7 @@ public class DefaultHttp2ConnectionEncoderTest {
flowControlled.writeComplete();
return null;
}
}).when(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
}).when(remoteFlow).addFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
}
private void mockFutureAddListener(boolean success) {

View File

@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -79,6 +82,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
@Mock
private ChannelPromise promise;
@Mock
private Http2RemoteFlowController.Listener listener;
private DefaultHttp2Connection connection;
@Before
@ -90,6 +96,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2RemoteFlowController(connection);
controller.listener(listener);
connection.remote().flowController(controller);
connection.local().createStream(STREAM_A, false);
@ -134,14 +141,49 @@ public class DefaultHttp2RemoteFlowControllerTest {
public void payloadSmallerThanWindowShouldBeWrittenImmediately() throws Http2Exception {
FakeFlowControlled data = new FakeFlowControlled(5);
sendData(STREAM_A, data);
data.assertNotWritten();
verifyZeroInteractions(listener);
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
}
@Test
public void emptyPayloadShouldBeWrittenImmediately() throws Http2Exception {
FakeFlowControlled data = new FakeFlowControlled(0);
sendData(STREAM_A, data);
data.assertNotWritten();
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 0);
}
@Test
public void unflushedPayloadsShouldBeDroppedOnCancel() throws Http2Exception {
FakeFlowControlled data = new FakeFlowControlled(5);
sendData(STREAM_A, data);
connection.stream(STREAM_A).close();
controller.writePendingBytes();
data.assertNotWritten();
controller.writePendingBytes();
data.assertNotWritten();
verifyZeroInteractions(listener);
}
@Test
public void payloadsShouldMerge() throws Http2Exception {
controller.initialWindowSize(15);
FakeFlowControlled data1 = new FakeFlowControlled(5, true);
FakeFlowControlled data2 = new FakeFlowControlled(10, true);
sendData(STREAM_A, data1);
sendData(STREAM_A, data2);
data1.assertNotWritten();
data1.assertNotWritten();
data2.assertMerged();
controller.writePendingBytes();
data1.assertFullyWritten();
data2.assertNotWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 15);
}
@Test
@ -151,9 +193,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(15);
FakeFlowControlled moreData = new FakeFlowControlled(0);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
sendData(STREAM_A, moreData);
controller.writePendingBytes();
moreData.assertNotWritten();
verifyZeroInteractions(listener);
}
@Test
@ -163,13 +208,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(15);
FakeFlowControlled moreData = new FakeFlowControlled(0);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
sendData(STREAM_A, moreData);
controller.writePendingBytes();
moreData.assertNotWritten();
connection.stream(STREAM_A).close();
data.assertError();
moreData.assertError();
verifyZeroInteractions(listener);
}
@Test
@ -178,26 +226,35 @@ public class DefaultHttp2RemoteFlowControllerTest {
final FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
// Verify that a partial frame of 5 remains to be sent
data.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
verifyNoMoreInteractions(listener);
}
@Test
public void windowUpdateShouldTriggerWrite() throws Http2Exception {
public void windowUpdateAndFlushShouldTriggerWrite() throws Http2Exception {
controller.initialWindowSize(10);
FakeFlowControlled data = new FakeFlowControlled(20);
FakeFlowControlled moreData = new FakeFlowControlled(10);
sendData(STREAM_A, data);
sendData(STREAM_A, moreData);
controller.writePendingBytes();
data.assertPartiallyWritten(10);
moreData.assertNotWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
reset(ctx);
// Update the window and verify that the rest of data and some of moreData are written
incrementWindowSize(STREAM_A, 15);
controller.writePendingBytes();
data.assertFullyWritten();
moreData.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 15);
verifyNoMoreInteractions(listener);
assertEquals(DEFAULT_WINDOW_SIZE - 25, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
@ -212,6 +269,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
// Verify that the entire frame was sent.
@ -228,25 +286,33 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled dataA = new FakeFlowControlled(10);
// Queue data for stream A and allow most of it to be written.
sendData(STREAM_A, dataA);
controller.writePendingBytes();
dataA.assertNotWritten();
incrementWindowSize(CONNECTION_STREAM_ID, 8);
controller.writePendingBytes();
dataA.assertPartiallyWritten(8);
assertEquals(65527, window(STREAM_A));
assertEquals(0, window(CONNECTION_STREAM_ID));
verify(listener, times(1)).streamWritten(stream(STREAM_A), 8);
// Queue data for stream B and allow the rest of A and all of B to be written.
FakeFlowControlled dataB = new FakeFlowControlled(10);
sendData(STREAM_B, dataB);
controller.writePendingBytes();
dataB.assertNotWritten();
incrementWindowSize(CONNECTION_STREAM_ID, 12);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
// Verify the rest of A is written.
dataA.assertFullyWritten();
assertEquals(65525, window(STREAM_A));
verify(listener, times(1)).streamWritten(stream(STREAM_A), 2);
dataB.assertFullyWritten();
assertEquals(65525, window(STREAM_B));
verify(listener, times(1)).streamWritten(stream(STREAM_B), 10);
verifyNoMoreInteractions(listener);
}
@Test
@ -260,7 +326,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Deplete the stream A window to 0
sendData(STREAM_A, data1);
controller.writePendingBytes();
data1.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 20);
// Make the window size for stream A negative
controller.initialWindowSize(initWindow - secondWindowSize);
@ -268,21 +336,26 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Queue up a write. It should not be written now because the window is negative
sendData(STREAM_A, data2);
controller.writePendingBytes();
data2.assertNotWritten();
// Open the window size back up a bit (no send should happen)
incrementWindowSize(STREAM_A, 5);
controller.writePendingBytes();
assertEquals(-5, window(STREAM_A));
data2.assertNotWritten();
// Open the window size back up a bit (no send should happen)
incrementWindowSize(STREAM_A, 5);
controller.writePendingBytes();
assertEquals(0, window(STREAM_A));
data2.assertNotWritten();
// Open the window size back up and allow the write to happen
incrementWindowSize(STREAM_A, 5);
controller.writePendingBytes();
data2.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
}
@Test
@ -290,13 +363,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.initialWindowSize(0);
// First send a frame that will get buffered.
FakeFlowControlled data = new FakeFlowControlled(10);
FakeFlowControlled data = new FakeFlowControlled(10, false);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
// Now send an empty frame on the same stream and verify that it's also buffered.
FakeFlowControlled data2 = new FakeFlowControlled(0);
FakeFlowControlled data2 = new FakeFlowControlled(0, false);
sendData(STREAM_A, data2);
controller.writePendingBytes();
data2.assertNotWritten();
// Re-expand the window and verify that both frames were sent.
@ -304,6 +379,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
data.assertFullyWritten();
data2.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
}
@Test
@ -312,11 +388,13 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
// Verify that a partial frame of 5 was sent.
controller.initialWindowSize(5);
data.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
}
@Test
@ -326,11 +404,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
data.assertNotWritten();
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -345,11 +428,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
// Verify that a partial frame of 5 was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 5);
data.assertNotWritten();
controller.writePendingBytes();
data.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -364,11 +451,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
// Verify that the entire frame was sent.
incrementWindowSize(STREAM_A, 10);
data.assertNotWritten();
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -383,11 +474,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
// Verify that a partial frame of 5 was sent.
incrementWindowSize(STREAM_A, 5);
data.assertNotWritten();
controller.writePendingBytes();
data.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -427,6 +522,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
@ -434,6 +530,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
// A is not written
@ -443,12 +541,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
// B is partially written
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2);
dataB.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_B), 5);
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2.
assertEquals(2 * DEFAULT_WINDOW_SIZE - 5, window(STREAM_C) + window(STREAM_D), 5);
dataC.assertPartiallyWritten(3);
verify(listener, times(1)).streamWritten(stream(STREAM_C), 3);
dataD.assertPartiallyWritten(2);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 2);
}
/**
@ -480,6 +581,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -488,6 +590,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
assertEquals(0, window(STREAM_B));
@ -495,6 +598,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
dataA.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
@ -530,6 +635,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -538,6 +644,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(0, window(STREAM_B));
@ -546,9 +653,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2.
dataA.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
dataB.assertNotWritten();
dataC.assertPartiallyWritten(3);
verify(listener, times(1)).streamWritten(stream(STREAM_C), 3);
dataD.assertPartiallyWritten(2);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 2);
}
/**
@ -592,6 +702,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -603,6 +714,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2);
assertEquals(0, window(STREAM_B));
@ -611,9 +723,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that A and D split the bytes.
dataA.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 5);
}
/**
@ -646,6 +760,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -654,6 +769,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Allow 1000 bytes to be sent.
incrementWindowSize(CONNECTION_STREAM_ID, 1000);
controller.writePendingBytes();
// All writes sum == 1000
assertEquals(1000, dataA.written() + dataB.written() + dataC.written() + dataD.written());
@ -662,6 +778,10 @@ public class DefaultHttp2RemoteFlowControllerTest {
dataB.assertPartiallyWritten(445, allowedError);
dataC.assertPartiallyWritten(223, allowedError);
dataD.assertPartiallyWritten(223, allowedError);
verify(listener, times(1)).streamWritten(eq(stream(STREAM_A)), anyInt());
verify(listener, times(1)).streamWritten(eq(stream(STREAM_B)), anyInt());
verify(listener, times(1)).streamWritten(eq(stream(STREAM_C)), anyInt());
verify(listener, times(1)).streamWritten(eq(stream(STREAM_D)), anyInt());
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - dataA.written(), window(STREAM_A));
@ -701,15 +821,19 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
// The write will occur on C, because it's an empty frame.
dataC.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_C), 0);
dataD.assertNotWritten();
// Allow 1000 bytes to be sent.
incrementWindowSize(CONNECTION_STREAM_ID, 999);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50);
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50);
@ -717,8 +841,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D), 50);
dataA.assertPartiallyWritten(333);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 333);
dataB.assertPartiallyWritten(333);
verify(listener, times(1)).streamWritten(stream(STREAM_B), 333);
dataD.assertPartiallyWritten(333);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 333);
}
/**
@ -759,6 +886,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -828,6 +956,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -905,6 +1034,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
sendData(STREAM_E, dataE);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -966,6 +1096,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -1033,6 +1164,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
@ -1065,7 +1197,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
int windowBefore = window(STREAM_A);
controller.sendFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(ctx, stream, flowControlled);
controller.writePendingBytes();
verify(flowControlled, times(3)).write(anyInt());
verify(flowControlled).error(any(Throwable.class));
@ -1088,7 +1221,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
boolean exceptionThrown = false;
try {
controller.sendFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(ctx, stream, flowControlled);
controller.writePendingBytes();
} catch (RuntimeException e) {
exceptionThrown = true;
} finally {
@ -1131,7 +1265,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
int windowBefore = window(STREAM_A);
try {
controller.sendFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(ctx, stream, flowControlled);
controller.writePendingBytes();
} catch (Exception e) {
fail();
}
@ -1157,7 +1292,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
}).when(flowControlled).error(any(Throwable.class));
controller.sendFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(ctx, stream, flowControlled);
controller.writePendingBytes();
verify(flowControlled).write(anyInt());
verify(flowControlled).error(any(Throwable.class));
@ -1203,7 +1339,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
private void sendData(int streamId, FakeFlowControlled data) throws Http2Exception {
Http2Stream stream = stream(streamId);
controller.sendFlowControlled(ctx, stream, data);
controller.addFlowControlled(ctx, stream, data);
}
private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception {
@ -1235,11 +1371,21 @@ public class DefaultHttp2RemoteFlowControllerTest {
private int currentSize;
private int originalSize;
private boolean writeCalled;
private final boolean mergeable;
private boolean merged;
private Throwable t;
private FakeFlowControlled(int size) {
this.currentSize = size;
this.originalSize = size;
this.mergeable = false;
}
private FakeFlowControlled(int size, boolean mergeable) {
this.currentSize = size;
this.originalSize = size;
this.mergeable = mergeable;
}
@Override
@ -1267,6 +1413,17 @@ public class DefaultHttp2RemoteFlowControllerTest {
currentSize -= written;
}
@Override
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
if (mergeable && next instanceof FakeFlowControlled) {
this.originalSize += ((FakeFlowControlled) next).originalSize;
this.currentSize += ((FakeFlowControlled) next).originalSize;
((FakeFlowControlled) next).merged = true;
return true;
}
return false;
}
public int written() {
return originalSize - currentSize;
}
@ -1289,6 +1446,10 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertEquals(0, currentSize);
}
public boolean assertMerged() {
return merged;
}
public void assertError() {
assertNotNull(t);
}

View File

@ -77,9 +77,15 @@ public class Http2ConnectionHandlerTest {
@Mock
private Http2Connection.Endpoint<Http2RemoteFlowController> remote;
@Mock
private Http2RemoteFlowController remoteFlowController;
@Mock
private Http2Connection.Endpoint<Http2LocalFlowController> local;
@Mock
private Http2LocalFlowController localFlowController;
@Mock
private ChannelHandlerContext ctx;
@ -135,7 +141,9 @@ public class Http2ConnectionHandlerTest {
when(future.channel()).thenReturn(channel);
when(channel.isActive()).thenReturn(true);
when(connection.remote()).thenReturn(remote);
when(remote.flowController()).thenReturn(remoteFlowController);
when(connection.local()).thenReturn(local);
when(local.flowController()).thenReturn(localFlowController);
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {

View File

@ -120,10 +120,10 @@ public class Http2ConnectionRoundtripTest {
final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, weight, false, 0, true,
newPromise());
ctx().flush();
http2Client.flush(ctx());
}
});
@ -157,10 +157,10 @@ public class Http2ConnectionRoundtripTest {
final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
ctx().flush();
http2Client.flush(ctx());
}
});
@ -202,10 +202,10 @@ public class Http2ConnectionRoundtripTest {
// Create a single stream by sending a HEADERS frame to the server.
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
ctx().flush();
http2Client.flush(ctx());
}
});
@ -235,10 +235,10 @@ public class Http2ConnectionRoundtripTest {
final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
ctx().flush();
http2Client.flush(ctx());
}
});
@ -267,10 +267,10 @@ public class Http2ConnectionRoundtripTest {
final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
true, newPromise());
ctx().flush();
http2Client.flush(ctx());
}
});
@ -278,10 +278,10 @@ public class Http2ConnectionRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), Integer.MAX_VALUE + 1, headers, 0, (short) 16, false, 0,
true, newPromise());
ctx().flush();
http2Client.flush(ctx());
}
});
@ -319,7 +319,7 @@ public class Http2ConnectionRoundtripTest {
// Create the stream and send all of the data at once.
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
false, newPromise());
http2Client.encoder().writeData(ctx(), 3, data.retain(), 0, false, newPromise());
@ -327,7 +327,7 @@ public class Http2ConnectionRoundtripTest {
// Write trailers.
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
true, newPromise());
ctx().flush();
http2Client.flush(ctx());
}
});
@ -399,7 +399,7 @@ public class Http2ConnectionRoundtripTest {
bootstrapEnv(numStreams * length, 1, numStreams * 4, numStreams);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
public void run() throws Http2Exception {
int upperLimit = 3 + 2 * numStreams;
for (int streamId = 3; streamId < upperLimit; streamId += 2) {
// Send a bunch of data on each stream.
@ -412,7 +412,7 @@ public class Http2ConnectionRoundtripTest {
// Write trailers.
http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16,
false, 0, true, newPromise());
ctx().flush();
http2Client.flush(ctx());
}
}
});

View File

@ -27,6 +27,7 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder.GoAwayException;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
@ -116,8 +117,11 @@ public class StreamBufferingEncoderTest {
encoderWriteHeaders(3, promise);
writeVerifyWriteHeaders(times(2), 3, promise);
verify(writer, times(3))
.writeData(eq(ctx), eq(3), any(ByteBuf.class), eq(0), eq(false), eq(promise));
// Contiguous data writes are coalesced
ArgumentCaptor<ByteBuf> bufCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(writer, times(1))
.writeData(eq(ctx), eq(3), bufCaptor.capture(), eq(0), eq(false), eq(promise));
assertEquals(data().readableBytes() * 3, bufCaptor.getValue().readableBytes());
}
@Test
@ -396,6 +400,8 @@ public class StreamBufferingEncoderTest {
private void setMaxConcurrentStreams(int newValue) {
try {
encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));
// Flush the remote flow controller to write data
encoder.flowController().writePendingBytes();
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
@ -404,6 +410,11 @@ public class StreamBufferingEncoderTest {
private void encoderWriteHeaders(int streamId, ChannelPromise promise) {
encoder.writeHeaders(ctx, streamId, new DefaultHttp2Headers(), 0, DEFAULT_PRIORITY_WEIGHT,
false, 0, false, promise);
try {
encoder.flowController().writePendingBytes();
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
}
private void writeVerifyWriteHeaders(VerificationMode mode, int streamId,

View File

@ -51,7 +51,20 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
}
@Override
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload) {
public void writePendingBytes() throws Http2Exception {
}
@Override
public void listener(Listener listener) {
}
@Override
public Listener listener() {
return null;
}
@Override
public void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload) {
// Don't check size beforehand because Headers payload returns 0 all the time.
do {
payload.write(MAX_INITIAL_WINDOW_SIZE);