Make the flow-controllers write fewer, fatter frames to improve throughput.
Motivation: Coalescing many small writes into a larger DATA frame reduces framing overheads on the wire and reduces the number of calls to Http2FrameListeners on the remote side. Delaying the write of WINDOW_UPDATE until flush allows for more consumed bytes to be returned as the aggregate of consumed bytes is returned and not the amount consumed when the threshold was crossed. Modifications: - Remote flow controller no longer immediately writes bytes when a flow-controlled payload is enqueued. Sequential data payloads are now merged into a single CompositeByteBuf which are written when 'writePendingBytes' is called. - Listener added to remote flow-controller which observes written bytes per stream. - Local flow-controller no longer immediately writes WINDOW_UPDATE when the ratio threshold is crossed. Now an explicit call to 'writeWindowUpdates' triggers the WINDOW_UPDATE for all streams who's ratio is exceeded at that time. This results in fewer window updates being sent and more bytes being returned. - Http2ConnectionHandler.flush triggers 'writeWindowUpdates' on the local flow-controller followed by 'writePendingBytes' on the remote flow-controller so WINDOW_UPDATES preceed DATA frames on the wire. Result: - Better throughput for writing many small DATA chunks followed by a flush, saving 9-bytes per coalesced frame. - Fewer WINDOW_UPDATES being written and more flow-control bytes returned to remote side more quickly, thereby improving throughput.
This commit is contained in:
parent
045602bd76
commit
bbf1829f2f
@ -19,6 +19,8 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.SlicedByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
@ -130,7 +132,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
}
|
||||
|
||||
// Hand control of the frame to the flow controller.
|
||||
flowController().sendFlowControlled(ctx, stream,
|
||||
flowController().addFlowControlled(ctx, stream,
|
||||
new FlowControlledData(ctx, stream, data, padding, endOfStream, promise));
|
||||
return promise;
|
||||
}
|
||||
@ -166,7 +168,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
}
|
||||
|
||||
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
|
||||
flowController().sendFlowControlled(ctx, stream,
|
||||
flowController().addFlowControlled(ctx, stream,
|
||||
new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
|
||||
exclusive, padding, endOfStream, promise));
|
||||
return promise;
|
||||
@ -318,10 +320,10 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
private ByteBuf data;
|
||||
private int size;
|
||||
|
||||
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
|
||||
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf buf, int padding,
|
||||
boolean endOfStream, ChannelPromise promise) {
|
||||
super(ctx, stream, padding, endOfStream, promise);
|
||||
this.data = data;
|
||||
this.data = buf;
|
||||
size = data.readableBytes() + padding;
|
||||
}
|
||||
|
||||
@ -367,7 +369,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
padding -= writeablePadding;
|
||||
bytesWritten += writeableData + writeablePadding;
|
||||
ChannelPromise writePromise;
|
||||
if (size == bytesWritten) {
|
||||
if (size == bytesWritten && !promise.isVoid()) {
|
||||
// Can use the original promise if it's the last write
|
||||
writePromise = promise;
|
||||
} else {
|
||||
@ -375,13 +377,67 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
writePromise = ctx.newPromise();
|
||||
writePromise.addListener(this);
|
||||
}
|
||||
if (toWrite instanceof SlicedByteBuf && data instanceof CompositeByteBuf) {
|
||||
// If we're writing a subset of a composite buffer then we want to release
|
||||
// any underlying buffers that have been consumed. CompositeByteBuf only releases
|
||||
// underlying buffers on write if all of its data has been consumed and its refCnt becomes
|
||||
// 0.
|
||||
final CompositeByteBuf toFree = (CompositeByteBuf) data;
|
||||
writePromise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
toFree.discardReadComponents();
|
||||
}
|
||||
});
|
||||
}
|
||||
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
|
||||
size == bytesWritten && endOfStream, writePromise);
|
||||
size == bytesWritten && endOfStream, writePromise);
|
||||
} while (size != bytesWritten && allowedBytes > bytesWritten);
|
||||
} finally {
|
||||
size -= bytesWritten;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
|
||||
if (FlowControlledData.class != next.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final FlowControlledData nextData = (FlowControlledData) next;
|
||||
// Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
|
||||
padding = Math.max(nextData.padding, padding);
|
||||
endOfStream = nextData.endOfStream;
|
||||
final CompositeByteBuf compositeByteBuf;
|
||||
if (data instanceof CompositeByteBuf) {
|
||||
compositeByteBuf = (CompositeByteBuf) data;
|
||||
} else {
|
||||
compositeByteBuf = ctx.alloc().compositeBuffer(Integer.MAX_VALUE);
|
||||
compositeByteBuf.addComponent(data);
|
||||
compositeByteBuf.writerIndex(data.readableBytes());
|
||||
data = compositeByteBuf;
|
||||
}
|
||||
compositeByteBuf.addComponent(nextData.data);
|
||||
compositeByteBuf.writerIndex(compositeByteBuf.writerIndex() + nextData.data.readableBytes());
|
||||
size = data.readableBytes() + padding;
|
||||
if (!nextData.promise.isVoid()) {
|
||||
// Replace current promise if void otherwise chain them.
|
||||
if (promise.isVoid()) {
|
||||
promise = nextData.promise;
|
||||
} else {
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
nextData.promise.trySuccess();
|
||||
} else {
|
||||
nextData.promise.tryFailure(future.cause());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -419,9 +475,18 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
|
||||
@Override
|
||||
public void write(int allowedBytes) {
|
||||
if (promise.isVoid()) {
|
||||
promise = ctx.newPromise();
|
||||
promise.addListener(this);
|
||||
}
|
||||
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
|
||||
padding, endOfStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -431,8 +496,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
ChannelFutureListener {
|
||||
protected final ChannelHandlerContext ctx;
|
||||
protected final Http2Stream stream;
|
||||
protected final ChannelPromise promise;
|
||||
protected final boolean endOfStream;
|
||||
protected ChannelPromise promise;
|
||||
protected boolean endOfStream;
|
||||
protected int padding;
|
||||
|
||||
public FlowControlledBase(final ChannelHandlerContext ctx, final Http2Stream stream, int padding,
|
||||
@ -445,8 +510,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
this.endOfStream = endOfStream;
|
||||
this.stream = stream;
|
||||
this.promise = promise;
|
||||
// Ensure error() gets called in case something goes wrong after the frame is passed to Netty.
|
||||
promise.addListener(this);
|
||||
if (!promise.isVoid()) {
|
||||
promise.addListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -38,7 +38,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2Stream stream) {
|
||||
state(stream).writeAllocatedBytes();
|
||||
int written = state(stream).writeAllocatedBytes();
|
||||
if (written != -1 && listener != null) {
|
||||
listener.streamWritten(stream, written);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
@ -46,6 +49,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
private final Http2Connection.PropertyKey stateKey;
|
||||
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
|
||||
private ChannelHandlerContext ctx;
|
||||
private Listener listener;
|
||||
|
||||
public DefaultHttp2RemoteFlowController(Http2Connection connection) {
|
||||
this.connection = checkNotNull(connection, "connection");
|
||||
@ -175,20 +179,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
|
||||
@Override
|
||||
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
|
||||
// This call does not trigger any writes, all writes will occur when writePendingBytes is called.
|
||||
if (stream.id() == CONNECTION_STREAM_ID) {
|
||||
// Update the connection window and write any pending frames for all streams.
|
||||
// Update the connection window
|
||||
connectionState().incrementStreamWindow(delta);
|
||||
writePendingBytes();
|
||||
} else {
|
||||
// Update the stream window and write any pending frames for the stream.
|
||||
// Update the stream window
|
||||
AbstractState state = state(stream);
|
||||
state.incrementStreamWindow(delta);
|
||||
state.writeBytes(state.writableWindow());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) {
|
||||
public void listener(Listener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Listener listener() {
|
||||
return this.listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) {
|
||||
checkNotNull(ctx, "ctx");
|
||||
checkNotNull(frame, "frame");
|
||||
if (this.ctx != null && this.ctx != ctx) {
|
||||
@ -202,9 +215,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
state.enqueueFrame(frame);
|
||||
} catch (Throwable t) {
|
||||
frame.error(t);
|
||||
return;
|
||||
}
|
||||
state.writeBytes(state.writableWindow());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -233,17 +244,19 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
/**
|
||||
* Writes as many pending bytes as possible, according to stream priority.
|
||||
*/
|
||||
private void writePendingBytes() throws Http2Exception {
|
||||
@Override
|
||||
public void writePendingBytes() throws Http2Exception {
|
||||
Http2Stream connectionStream = connection.connectionStream();
|
||||
int connectionWindowSize = state(connectionStream).windowSize();
|
||||
|
||||
if (connectionWindowSize > 0) {
|
||||
// Allocate the bytes for the connection window to the streams, but do not write.
|
||||
allocateBytesForTree(connectionStream, connectionWindowSize);
|
||||
|
||||
// Now write all of the allocated bytes.
|
||||
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
|
||||
}
|
||||
|
||||
// Now write all of the allocated bytes, must write as there may be empty frames with
|
||||
// EOS = true
|
||||
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -469,7 +482,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
void writeAllocatedBytes() {
|
||||
int writeAllocatedBytes() {
|
||||
int numBytes = allocated;
|
||||
|
||||
// Restore the number of streamable bytes to this branch.
|
||||
@ -477,7 +490,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
resetAllocated();
|
||||
|
||||
// Perform the write.
|
||||
writeBytes(numBytes);
|
||||
return writeBytes(numBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -522,7 +535,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
@Override
|
||||
void enqueueFrame(FlowControlled frame) {
|
||||
incrementPendingBytes(frame.size());
|
||||
pendingWriteQueue.offer(frame);
|
||||
FlowControlled last = pendingWriteQueue.peekLast();
|
||||
if (last == null || !last.merge(frame)) {
|
||||
pendingWriteQueue.offer(frame);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -564,23 +580,19 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
|
||||
@Override
|
||||
int writeBytes(int bytes) {
|
||||
boolean wrote = false;
|
||||
int bytesAttempted = 0;
|
||||
while (hasFrame()) {
|
||||
int maxBytes = min(bytes - bytesAttempted, writableWindow());
|
||||
bytesAttempted += write(peek(), maxBytes);
|
||||
if (bytes - bytesAttempted <= 0 && !isNextFrameEmpty()) {
|
||||
// The frame had data and all of it was written.
|
||||
break;
|
||||
}
|
||||
int writableBytes = min(bytes, writableWindow());
|
||||
while (hasFrame() && (writableBytes > 0 || peek().size() == 0)) {
|
||||
wrote = true;
|
||||
bytesAttempted += write(peek(), writableBytes);
|
||||
writableBytes = min(bytes - bytesAttempted, writableWindow());
|
||||
}
|
||||
if (wrote) {
|
||||
return bytesAttempted;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
return bytesAttempted;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true} if there is a next frame and its size is zero.
|
||||
*/
|
||||
private boolean isNextFrameEmpty() {
|
||||
return hasFrame() && peek().size() == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -709,7 +721,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
void writeAllocatedBytes() {
|
||||
int writeAllocatedBytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@ -789,9 +801,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
abstract int initialWindowSize();
|
||||
|
||||
/**
|
||||
* Write bytes allocated bytes for this stream.
|
||||
* Write the allocated bytes for this stream.
|
||||
*
|
||||
* @return the number of bytes written for a stream or {@code -1} if no write occurred.
|
||||
*/
|
||||
abstract void writeAllocatedBytes();
|
||||
abstract int writeAllocatedBytes();
|
||||
|
||||
/**
|
||||
* Returns the number of pending bytes for this node that will fit within the
|
||||
@ -830,7 +844,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
/**
|
||||
* Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by
|
||||
* the number of pending writes available, or because a frame does not support splitting on arbitrary
|
||||
* boundaries.
|
||||
* boundaries. Will return {@code -1} if there are no frames to write.
|
||||
*/
|
||||
abstract int writeBytes(int bytes);
|
||||
|
||||
|
@ -164,6 +164,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
||||
connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) throws Http2Exception {
|
||||
// Trigger pending writes in the remote flow controller.
|
||||
connection().remote().flowController().writePendingBytes();
|
||||
try {
|
||||
super.flush(ctx);
|
||||
} catch (Throwable t) {
|
||||
throw new Http2Exception(INTERNAL_ERROR, "Error flushing" , t);
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class BaseDecoder {
|
||||
public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
|
||||
@ -410,7 +421,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
// Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
|
||||
// for flow-control the read may release window that causes data to be written that can now be flushed.
|
||||
ctx.flush();
|
||||
flush(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -23,18 +23,38 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
public interface Http2RemoteFlowController extends Http2FlowController {
|
||||
|
||||
/**
|
||||
* Writes or queues a payload for transmission to the remote endpoint. There is no
|
||||
* guarantee when the data will be written or whether it will be split into multiple frames
|
||||
* Queues a payload for transmission to the remote endpoint. There is no guarantee as to when the data
|
||||
* will be written or how it will be allocated to frames.
|
||||
* before sending.
|
||||
* <p>
|
||||
* Manually flushing the {@link ChannelHandlerContext} is required for writes as the flow controller will
|
||||
* <strong>not</strong> flush by itself.
|
||||
* Writes do not actually occur until {@link #writePendingBytes()} is called.
|
||||
*
|
||||
* @param ctx the context from the handler.
|
||||
* @param stream the subject stream. Must not be the connection stream object.
|
||||
* @param payload payload to write subject to flow-control accounting and ordering rules.
|
||||
*/
|
||||
void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload);
|
||||
void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload);
|
||||
|
||||
/**
|
||||
* Write all data pending in the flow controller up to the flow-control limits.
|
||||
*
|
||||
* @throws Http2Exception throws if a protocol-related error occurred.
|
||||
*/
|
||||
void writePendingBytes() throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Set the active listener on the flow-controller.
|
||||
*
|
||||
* @param listener to notify when the a write occurs, can be {@code null}.
|
||||
*/
|
||||
void listener(Listener listener);
|
||||
|
||||
/**
|
||||
* Get the current listener to flow-control events.
|
||||
*
|
||||
* @return the current listener or {@code null} if one is not set.
|
||||
*/
|
||||
Listener listener();
|
||||
|
||||
/**
|
||||
* Implementations of this interface are used to progressively write chunks of the underlying
|
||||
@ -84,5 +104,30 @@ public interface Http2RemoteFlowController extends Http2FlowController {
|
||||
* @param allowedBytes an upper bound on the number of bytes the payload can write at this time.
|
||||
*/
|
||||
void write(int allowedBytes);
|
||||
|
||||
/**
|
||||
* Merge the contents of the {@code next} message into this message so they can be written out as one unit.
|
||||
* This allows many small messages to be written as a single DATA frame.
|
||||
*
|
||||
* @return {@code true} if {@code next} was successfully merged and does not need to be enqueued,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
boolean merge(FlowControlled next);
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener to the number of flow-controlled bytes written per stream.
|
||||
*/
|
||||
interface Listener {
|
||||
|
||||
/**
|
||||
* Report the number of {@code writtenBytes} for a {@code stream}. Called after the
|
||||
* flow-controller has flushed bytes for the given stream.
|
||||
*
|
||||
* @param stream that had bytes written.
|
||||
* @param writtenBytes the number of bytes written for a stream, can be 0 in the case of an
|
||||
* empty DATA frame.
|
||||
*/
|
||||
void streamWritten(Http2Stream stream, int writtenBytes);
|
||||
}
|
||||
}
|
||||
|
@ -88,6 +88,7 @@ public class DataCompressionHttp2Test {
|
||||
private CountDownLatch clientSettingsAckLatch;
|
||||
private Http2Connection serverConnection;
|
||||
private Http2Connection clientConnection;
|
||||
private Http2ConnectionHandler clientHandler;
|
||||
private ByteArrayOutputStream serverOut;
|
||||
|
||||
@Before
|
||||
@ -123,9 +124,9 @@ public class DataCompressionHttp2Test {
|
||||
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, true, newPromiseClient());
|
||||
ctxClient().flush();
|
||||
clientHandler.flush(ctxClient());
|
||||
}
|
||||
});
|
||||
awaitServer();
|
||||
@ -148,10 +149,10 @@ public class DataCompressionHttp2Test {
|
||||
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
|
||||
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
|
||||
ctxClient().flush();
|
||||
clientHandler.flush(ctxClient());
|
||||
}
|
||||
});
|
||||
awaitServer();
|
||||
@ -177,10 +178,10 @@ public class DataCompressionHttp2Test {
|
||||
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
|
||||
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
|
||||
ctxClient().flush();
|
||||
clientHandler.flush(ctxClient());
|
||||
}
|
||||
});
|
||||
awaitServer();
|
||||
@ -206,13 +207,13 @@ public class DataCompressionHttp2Test {
|
||||
// our {@link Http2TestUtil$FrameAdapter} does.
|
||||
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
|
||||
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
|
||||
clientEncoder.writeData(ctxClient(), 3, data1.retain(), 0, false, newPromiseClient());
|
||||
clientEncoder.writeData(ctxClient(), 3, data2.retain(), 0, true, newPromiseClient());
|
||||
ctxClient().flush();
|
||||
clientHandler.flush(ctxClient());
|
||||
}
|
||||
});
|
||||
awaitServer();
|
||||
@ -242,10 +243,10 @@ public class DataCompressionHttp2Test {
|
||||
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
|
||||
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
|
||||
ctxClient().flush();
|
||||
clientHandler.flush(ctxClient());
|
||||
}
|
||||
});
|
||||
awaitServer();
|
||||
@ -330,8 +331,8 @@ public class DataCompressionHttp2Test {
|
||||
new DefaultHttp2FrameReader(),
|
||||
new DelegatingDecompressorFrameListener(clientConnection,
|
||||
clientFrameCountDown));
|
||||
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(decoder, clientEncoder);
|
||||
p.addLast(connectionHandler);
|
||||
clientHandler = new Http2ConnectionHandler(decoder, clientEncoder);
|
||||
p.addLast(clientHandler);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -55,6 +55,8 @@ import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelPromise;
|
||||
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
|
||||
import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -64,6 +66,7 @@ import junit.framework.AssertionFailedError;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
@ -98,6 +101,9 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
|
||||
private ChannelPromise promise;
|
||||
|
||||
@Mock
|
||||
private ChannelPromise voidPromise;
|
||||
|
||||
@Mock
|
||||
private ChannelFuture future;
|
||||
|
||||
@ -132,6 +138,10 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
||||
when(voidPromise.addListener(Matchers.<GenericFutureListener<Future<? super Void>>>any())).thenThrow(
|
||||
new AssertionFailedError());
|
||||
when(voidPromise.addListeners(Matchers.<GenericFutureListener<Future<? super Void>>>any())).thenThrow(
|
||||
new AssertionFailedError());
|
||||
|
||||
when(channel.isActive()).thenReturn(true);
|
||||
when(stream.id()).thenReturn(STREAM_ID);
|
||||
@ -202,7 +212,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
});
|
||||
payloadCaptor = ArgumentCaptor.forClass(Http2RemoteFlowController.FlowControlled.class);
|
||||
doNothing().when(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
|
||||
doNothing().when(remoteFlow).addFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
|
||||
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
|
||||
when(ctx.channel()).thenReturn(channel);
|
||||
when(ctx.newSucceededFuture()).thenReturn(future);
|
||||
@ -225,6 +235,52 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
assertEquals(0, data.refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataFramesShouldMerge() throws Exception {
|
||||
final ByteBuf data = dummyData().retain();
|
||||
DefaultChannelPromise secondPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, secondPromise);
|
||||
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
|
||||
FlowControlled mergedPayload = capturedWrites.get(0);
|
||||
mergedPayload.merge(capturedWrites.get(1));
|
||||
|
||||
assertEquals(16, mergedPayload.size());
|
||||
assertFalse(secondPromise.isSuccess());
|
||||
mergedPayload.write(16);
|
||||
assertEquals(0, mergedPayload.size());
|
||||
assertEquals("abcdefghabcdefgh", writtenData.get(0));
|
||||
assertEquals(0, data.refCnt());
|
||||
// Second promise is notified after write of the merged payload completes
|
||||
assertTrue(secondPromise.isSuccess());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataFramesShouldMergeUseVoidPromise() throws Exception {
|
||||
final ByteBuf data = dummyData().retain();
|
||||
when(voidPromise.isVoid()).thenReturn(true);
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise);
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise);
|
||||
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
|
||||
FlowControlled mergedPayload = capturedWrites.get(0);
|
||||
assertTrue(mergedPayload.merge(capturedWrites.get(1)));
|
||||
|
||||
assertEquals(16, mergedPayload.size());
|
||||
mergedPayload.write(16);
|
||||
assertEquals(0, mergedPayload.size());
|
||||
assertEquals("abcdefghabcdefgh", writtenData.get(0));
|
||||
assertEquals(0, data.refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataFramesDontMergeWithHeaders() throws Exception {
|
||||
final ByteBuf data = dummyData().retain();
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
|
||||
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
|
||||
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
|
||||
assertFalse(capturedWrites.get(0).merge(capturedWrites.get(1)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataLargerThanMaxFrameSizeShouldBeSplit() throws Exception {
|
||||
when(frameSizePolicy.maxFrameSize()).thenReturn(3);
|
||||
@ -443,7 +499,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
mockSendFlowControlledWriteEverything();
|
||||
ByteBuf data = dummyData();
|
||||
encoder.writeData(ctx, STREAM_ID, data.retain(), 0, true, promise);
|
||||
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
verify(lifecycleManager).closeStreamLocal(stream, promise);
|
||||
assertEquals(data.toString(UTF_8), writtenData.get(0));
|
||||
data.release();
|
||||
@ -520,7 +576,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
when(remote.lastStreamKnownByPeer()).thenReturn(0);
|
||||
ByteBuf data = mock(ByteBuf.class);
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
|
||||
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -528,7 +584,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
when(connection.goAwaySent()).thenReturn(true);
|
||||
when(remote.lastStreamKnownByPeer()).thenReturn(0);
|
||||
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
|
||||
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -537,7 +593,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
|
||||
ByteBuf data = mock(ByteBuf.class);
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
|
||||
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -545,7 +601,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
when(connection.goAwayReceived()).thenReturn(true);
|
||||
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
|
||||
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
|
||||
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
|
||||
}
|
||||
|
||||
private void mockSendFlowControlledWriteEverything() {
|
||||
@ -557,7 +613,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
flowControlled.writeComplete();
|
||||
return null;
|
||||
}
|
||||
}).when(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
|
||||
}).when(remoteFlow).addFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
|
||||
}
|
||||
|
||||
private void mockFutureAddListener(boolean success) {
|
||||
|
@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
@ -79,6 +82,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
@Mock
|
||||
private ChannelPromise promise;
|
||||
|
||||
@Mock
|
||||
private Http2RemoteFlowController.Listener listener;
|
||||
|
||||
private DefaultHttp2Connection connection;
|
||||
|
||||
@Before
|
||||
@ -90,6 +96,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
controller = new DefaultHttp2RemoteFlowController(connection);
|
||||
controller.listener(listener);
|
||||
connection.remote().flowController(controller);
|
||||
|
||||
connection.local().createStream(STREAM_A, false);
|
||||
@ -134,14 +141,49 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
public void payloadSmallerThanWindowShouldBeWrittenImmediately() throws Http2Exception {
|
||||
FakeFlowControlled data = new FakeFlowControlled(5);
|
||||
sendData(STREAM_A, data);
|
||||
data.assertNotWritten();
|
||||
verifyZeroInteractions(listener);
|
||||
controller.writePendingBytes();
|
||||
data.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyPayloadShouldBeWrittenImmediately() throws Http2Exception {
|
||||
FakeFlowControlled data = new FakeFlowControlled(0);
|
||||
sendData(STREAM_A, data);
|
||||
data.assertNotWritten();
|
||||
controller.writePendingBytes();
|
||||
data.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unflushedPayloadsShouldBeDroppedOnCancel() throws Http2Exception {
|
||||
FakeFlowControlled data = new FakeFlowControlled(5);
|
||||
sendData(STREAM_A, data);
|
||||
connection.stream(STREAM_A).close();
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
verifyZeroInteractions(listener);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void payloadsShouldMerge() throws Http2Exception {
|
||||
controller.initialWindowSize(15);
|
||||
FakeFlowControlled data1 = new FakeFlowControlled(5, true);
|
||||
FakeFlowControlled data2 = new FakeFlowControlled(10, true);
|
||||
sendData(STREAM_A, data1);
|
||||
sendData(STREAM_A, data2);
|
||||
data1.assertNotWritten();
|
||||
data1.assertNotWritten();
|
||||
data2.assertMerged();
|
||||
controller.writePendingBytes();
|
||||
data1.assertFullyWritten();
|
||||
data2.assertNotWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 15);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -151,9 +193,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
FakeFlowControlled data = new FakeFlowControlled(15);
|
||||
FakeFlowControlled moreData = new FakeFlowControlled(0);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
sendData(STREAM_A, moreData);
|
||||
controller.writePendingBytes();
|
||||
moreData.assertNotWritten();
|
||||
verifyZeroInteractions(listener);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -163,13 +208,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
FakeFlowControlled data = new FakeFlowControlled(15);
|
||||
FakeFlowControlled moreData = new FakeFlowControlled(0);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
sendData(STREAM_A, moreData);
|
||||
controller.writePendingBytes();
|
||||
moreData.assertNotWritten();
|
||||
|
||||
connection.stream(STREAM_A).close();
|
||||
data.assertError();
|
||||
moreData.assertError();
|
||||
verifyZeroInteractions(listener);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -178,26 +226,35 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
final FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
// Verify that a partial frame of 5 remains to be sent
|
||||
data.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
verifyNoMoreInteractions(listener);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void windowUpdateShouldTriggerWrite() throws Http2Exception {
|
||||
public void windowUpdateAndFlushShouldTriggerWrite() throws Http2Exception {
|
||||
controller.initialWindowSize(10);
|
||||
|
||||
FakeFlowControlled data = new FakeFlowControlled(20);
|
||||
FakeFlowControlled moreData = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
sendData(STREAM_A, moreData);
|
||||
controller.writePendingBytes();
|
||||
data.assertPartiallyWritten(10);
|
||||
moreData.assertNotWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
|
||||
reset(ctx);
|
||||
|
||||
// Update the window and verify that the rest of data and some of moreData are written
|
||||
incrementWindowSize(STREAM_A, 15);
|
||||
controller.writePendingBytes();
|
||||
|
||||
data.assertFullyWritten();
|
||||
moreData.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 15);
|
||||
verifyNoMoreInteractions(listener);
|
||||
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 25, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(0, window(STREAM_A));
|
||||
@ -212,6 +269,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
|
||||
// Verify that the entire frame was sent.
|
||||
@ -228,25 +286,33 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
FakeFlowControlled dataA = new FakeFlowControlled(10);
|
||||
// Queue data for stream A and allow most of it to be written.
|
||||
sendData(STREAM_A, dataA);
|
||||
controller.writePendingBytes();
|
||||
dataA.assertNotWritten();
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 8);
|
||||
controller.writePendingBytes();
|
||||
dataA.assertPartiallyWritten(8);
|
||||
assertEquals(65527, window(STREAM_A));
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 8);
|
||||
|
||||
// Queue data for stream B and allow the rest of A and all of B to be written.
|
||||
FakeFlowControlled dataB = new FakeFlowControlled(10);
|
||||
sendData(STREAM_B, dataB);
|
||||
controller.writePendingBytes();
|
||||
dataB.assertNotWritten();
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 12);
|
||||
controller.writePendingBytes();
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
|
||||
// Verify the rest of A is written.
|
||||
dataA.assertFullyWritten();
|
||||
assertEquals(65525, window(STREAM_A));
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 2);
|
||||
|
||||
dataB.assertFullyWritten();
|
||||
assertEquals(65525, window(STREAM_B));
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_B), 10);
|
||||
verifyNoMoreInteractions(listener);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -260,7 +326,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Deplete the stream A window to 0
|
||||
sendData(STREAM_A, data1);
|
||||
controller.writePendingBytes();
|
||||
data1.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 20);
|
||||
|
||||
// Make the window size for stream A negative
|
||||
controller.initialWindowSize(initWindow - secondWindowSize);
|
||||
@ -268,21 +336,26 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Queue up a write. It should not be written now because the window is negative
|
||||
sendData(STREAM_A, data2);
|
||||
controller.writePendingBytes();
|
||||
data2.assertNotWritten();
|
||||
|
||||
// Open the window size back up a bit (no send should happen)
|
||||
incrementWindowSize(STREAM_A, 5);
|
||||
controller.writePendingBytes();
|
||||
assertEquals(-5, window(STREAM_A));
|
||||
data2.assertNotWritten();
|
||||
|
||||
// Open the window size back up a bit (no send should happen)
|
||||
incrementWindowSize(STREAM_A, 5);
|
||||
controller.writePendingBytes();
|
||||
assertEquals(0, window(STREAM_A));
|
||||
data2.assertNotWritten();
|
||||
|
||||
// Open the window size back up and allow the write to happen
|
||||
incrementWindowSize(STREAM_A, 5);
|
||||
controller.writePendingBytes();
|
||||
data2.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -290,13 +363,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
controller.initialWindowSize(0);
|
||||
|
||||
// First send a frame that will get buffered.
|
||||
FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
FakeFlowControlled data = new FakeFlowControlled(10, false);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
|
||||
// Now send an empty frame on the same stream and verify that it's also buffered.
|
||||
FakeFlowControlled data2 = new FakeFlowControlled(0);
|
||||
FakeFlowControlled data2 = new FakeFlowControlled(0, false);
|
||||
sendData(STREAM_A, data2);
|
||||
controller.writePendingBytes();
|
||||
data2.assertNotWritten();
|
||||
|
||||
// Re-expand the window and verify that both frames were sent.
|
||||
@ -304,6 +379,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
data.assertFullyWritten();
|
||||
data2.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -312,11 +388,13 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
|
||||
// Verify that a partial frame of 5 was sent.
|
||||
controller.initialWindowSize(5);
|
||||
data.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -326,11 +404,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
|
||||
// Verify that the entire frame was sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 10);
|
||||
data.assertNotWritten();
|
||||
controller.writePendingBytes();
|
||||
data.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
|
||||
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
|
||||
@ -345,11 +428,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
|
||||
// Verify that a partial frame of 5 was sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 5);
|
||||
data.assertNotWritten();
|
||||
controller.writePendingBytes();
|
||||
data.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
|
||||
@ -364,11 +451,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
|
||||
// Verify that the entire frame was sent.
|
||||
incrementWindowSize(STREAM_A, 10);
|
||||
data.assertNotWritten();
|
||||
controller.writePendingBytes();
|
||||
data.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(0, window(STREAM_A));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
|
||||
@ -383,11 +474,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
FakeFlowControlled data = new FakeFlowControlled(10);
|
||||
sendData(STREAM_A, data);
|
||||
controller.writePendingBytes();
|
||||
data.assertNotWritten();
|
||||
|
||||
// Verify that a partial frame of 5 was sent.
|
||||
incrementWindowSize(STREAM_A, 5);
|
||||
data.assertNotWritten();
|
||||
controller.writePendingBytes();
|
||||
data.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(0, window(STREAM_A));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
|
||||
@ -427,6 +522,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
dataC.assertNotWritten();
|
||||
@ -434,6 +530,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Verify that the entire frame was sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 10);
|
||||
controller.writePendingBytes();
|
||||
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
|
||||
// A is not written
|
||||
@ -443,12 +541,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
// B is partially written
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2);
|
||||
dataB.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_B), 5);
|
||||
|
||||
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
|
||||
// be split evenly, one will get 3 and one will get 2.
|
||||
assertEquals(2 * DEFAULT_WINDOW_SIZE - 5, window(STREAM_C) + window(STREAM_D), 5);
|
||||
dataC.assertPartiallyWritten(3);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_C), 3);
|
||||
dataD.assertPartiallyWritten(2);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_D), 2);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -480,6 +581,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -488,6 +590,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Verify that the entire frame was sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 10);
|
||||
controller.writePendingBytes();
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
|
||||
assertEquals(0, window(STREAM_B));
|
||||
@ -495,6 +598,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
|
||||
|
||||
dataA.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
|
||||
|
||||
dataB.assertNotWritten();
|
||||
dataC.assertNotWritten();
|
||||
dataD.assertNotWritten();
|
||||
@ -530,6 +635,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -538,6 +644,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Verify that the entire frame was sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 10);
|
||||
controller.writePendingBytes();
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
|
||||
assertEquals(0, window(STREAM_B));
|
||||
@ -546,9 +653,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
|
||||
// be split evenly, one will get 3 and one will get 2.
|
||||
dataA.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
dataB.assertNotWritten();
|
||||
dataC.assertPartiallyWritten(3);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_C), 3);
|
||||
dataD.assertPartiallyWritten(2);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_D), 2);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -592,6 +702,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -603,6 +714,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Verify that the entire frame was sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 10);
|
||||
controller.writePendingBytes();
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2);
|
||||
assertEquals(0, window(STREAM_B));
|
||||
@ -611,9 +723,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Verify that A and D split the bytes.
|
||||
dataA.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
|
||||
dataB.assertNotWritten();
|
||||
dataC.assertNotWritten();
|
||||
dataD.assertPartiallyWritten(5);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_D), 5);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -646,6 +760,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -654,6 +769,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
// Allow 1000 bytes to be sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 1000);
|
||||
controller.writePendingBytes();
|
||||
|
||||
// All writes sum == 1000
|
||||
assertEquals(1000, dataA.written() + dataB.written() + dataC.written() + dataD.written());
|
||||
@ -662,6 +778,10 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
dataB.assertPartiallyWritten(445, allowedError);
|
||||
dataC.assertPartiallyWritten(223, allowedError);
|
||||
dataD.assertPartiallyWritten(223, allowedError);
|
||||
verify(listener, times(1)).streamWritten(eq(stream(STREAM_A)), anyInt());
|
||||
verify(listener, times(1)).streamWritten(eq(stream(STREAM_B)), anyInt());
|
||||
verify(listener, times(1)).streamWritten(eq(stream(STREAM_C)), anyInt());
|
||||
verify(listener, times(1)).streamWritten(eq(stream(STREAM_D)), anyInt());
|
||||
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - dataA.written(), window(STREAM_A));
|
||||
@ -701,15 +821,19 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
// The write will occur on C, because it's an empty frame.
|
||||
dataC.assertFullyWritten();
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_C), 0);
|
||||
dataD.assertNotWritten();
|
||||
|
||||
// Allow 1000 bytes to be sent.
|
||||
incrementWindowSize(CONNECTION_STREAM_ID, 999);
|
||||
controller.writePendingBytes();
|
||||
|
||||
assertEquals(0, window(CONNECTION_STREAM_ID));
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50);
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50);
|
||||
@ -717,8 +841,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D), 50);
|
||||
|
||||
dataA.assertPartiallyWritten(333);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_A), 333);
|
||||
dataB.assertPartiallyWritten(333);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_B), 333);
|
||||
dataD.assertPartiallyWritten(333);
|
||||
verify(listener, times(1)).streamWritten(stream(STREAM_D), 333);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -759,6 +886,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -828,6 +956,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -905,6 +1034,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
sendData(STREAM_E, dataE);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -966,6 +1096,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -1033,6 +1164,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
sendData(STREAM_B, dataB);
|
||||
sendData(STREAM_C, dataC);
|
||||
sendData(STREAM_D, dataD);
|
||||
controller.writePendingBytes();
|
||||
|
||||
dataA.assertNotWritten();
|
||||
dataB.assertNotWritten();
|
||||
@ -1065,7 +1197,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
int windowBefore = window(STREAM_A);
|
||||
|
||||
controller.sendFlowControlled(ctx, stream, flowControlled);
|
||||
controller.addFlowControlled(ctx, stream, flowControlled);
|
||||
controller.writePendingBytes();
|
||||
|
||||
verify(flowControlled, times(3)).write(anyInt());
|
||||
verify(flowControlled).error(any(Throwable.class));
|
||||
@ -1088,7 +1221,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
boolean exceptionThrown = false;
|
||||
try {
|
||||
controller.sendFlowControlled(ctx, stream, flowControlled);
|
||||
controller.addFlowControlled(ctx, stream, flowControlled);
|
||||
controller.writePendingBytes();
|
||||
} catch (RuntimeException e) {
|
||||
exceptionThrown = true;
|
||||
} finally {
|
||||
@ -1131,7 +1265,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
int windowBefore = window(STREAM_A);
|
||||
|
||||
try {
|
||||
controller.sendFlowControlled(ctx, stream, flowControlled);
|
||||
controller.addFlowControlled(ctx, stream, flowControlled);
|
||||
controller.writePendingBytes();
|
||||
} catch (Exception e) {
|
||||
fail();
|
||||
}
|
||||
@ -1157,7 +1292,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
}
|
||||
}).when(flowControlled).error(any(Throwable.class));
|
||||
|
||||
controller.sendFlowControlled(ctx, stream, flowControlled);
|
||||
controller.addFlowControlled(ctx, stream, flowControlled);
|
||||
controller.writePendingBytes();
|
||||
|
||||
verify(flowControlled).write(anyInt());
|
||||
verify(flowControlled).error(any(Throwable.class));
|
||||
@ -1203,7 +1339,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
|
||||
private void sendData(int streamId, FakeFlowControlled data) throws Http2Exception {
|
||||
Http2Stream stream = stream(streamId);
|
||||
controller.sendFlowControlled(ctx, stream, data);
|
||||
controller.addFlowControlled(ctx, stream, data);
|
||||
}
|
||||
|
||||
private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception {
|
||||
@ -1235,11 +1371,21 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
private int currentSize;
|
||||
private int originalSize;
|
||||
private boolean writeCalled;
|
||||
private final boolean mergeable;
|
||||
private boolean merged;
|
||||
|
||||
private Throwable t;
|
||||
|
||||
private FakeFlowControlled(int size) {
|
||||
this.currentSize = size;
|
||||
this.originalSize = size;
|
||||
this.mergeable = false;
|
||||
}
|
||||
|
||||
private FakeFlowControlled(int size, boolean mergeable) {
|
||||
this.currentSize = size;
|
||||
this.originalSize = size;
|
||||
this.mergeable = mergeable;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1267,6 +1413,17 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
currentSize -= written;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
|
||||
if (mergeable && next instanceof FakeFlowControlled) {
|
||||
this.originalSize += ((FakeFlowControlled) next).originalSize;
|
||||
this.currentSize += ((FakeFlowControlled) next).originalSize;
|
||||
((FakeFlowControlled) next).merged = true;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public int written() {
|
||||
return originalSize - currentSize;
|
||||
}
|
||||
@ -1289,6 +1446,10 @@ public class DefaultHttp2RemoteFlowControllerTest {
|
||||
assertEquals(0, currentSize);
|
||||
}
|
||||
|
||||
public boolean assertMerged() {
|
||||
return merged;
|
||||
}
|
||||
|
||||
public void assertError() {
|
||||
assertNotNull(t);
|
||||
}
|
||||
|
@ -77,9 +77,15 @@ public class Http2ConnectionHandlerTest {
|
||||
@Mock
|
||||
private Http2Connection.Endpoint<Http2RemoteFlowController> remote;
|
||||
|
||||
@Mock
|
||||
private Http2RemoteFlowController remoteFlowController;
|
||||
|
||||
@Mock
|
||||
private Http2Connection.Endpoint<Http2LocalFlowController> local;
|
||||
|
||||
@Mock
|
||||
private Http2LocalFlowController localFlowController;
|
||||
|
||||
@Mock
|
||||
private ChannelHandlerContext ctx;
|
||||
|
||||
@ -135,7 +141,9 @@ public class Http2ConnectionHandlerTest {
|
||||
when(future.channel()).thenReturn(channel);
|
||||
when(channel.isActive()).thenReturn(true);
|
||||
when(connection.remote()).thenReturn(remote);
|
||||
when(remote.flowController()).thenReturn(remoteFlowController);
|
||||
when(connection.local()).thenReturn(local);
|
||||
when(local.flowController()).thenReturn(localFlowController);
|
||||
doAnswer(new Answer<Http2Stream>() {
|
||||
@Override
|
||||
public Http2Stream answer(InvocationOnMock in) throws Throwable {
|
||||
|
@ -120,10 +120,10 @@ public class Http2ConnectionRoundtripTest {
|
||||
final Http2Headers headers = dummyHeaders();
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, weight, false, 0, true,
|
||||
newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
});
|
||||
|
||||
@ -157,10 +157,10 @@ public class Http2ConnectionRoundtripTest {
|
||||
final Http2Headers headers = dummyHeaders();
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
|
||||
newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
});
|
||||
|
||||
@ -202,10 +202,10 @@ public class Http2ConnectionRoundtripTest {
|
||||
// Create a single stream by sending a HEADERS frame to the server.
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
|
||||
newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
});
|
||||
|
||||
@ -235,10 +235,10 @@ public class Http2ConnectionRoundtripTest {
|
||||
final Http2Headers headers = dummyHeaders();
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
|
||||
newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
});
|
||||
|
||||
@ -267,10 +267,10 @@ public class Http2ConnectionRoundtripTest {
|
||||
final Http2Headers headers = dummyHeaders();
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
|
||||
true, newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
});
|
||||
|
||||
@ -278,10 +278,10 @@ public class Http2ConnectionRoundtripTest {
|
||||
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), Integer.MAX_VALUE + 1, headers, 0, (short) 16, false, 0,
|
||||
true, newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
});
|
||||
|
||||
@ -319,7 +319,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
// Create the stream and send all of the data at once.
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
|
||||
false, newPromise());
|
||||
http2Client.encoder().writeData(ctx(), 3, data.retain(), 0, false, newPromise());
|
||||
@ -327,7 +327,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
// Write trailers.
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
|
||||
true, newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
});
|
||||
|
||||
@ -399,7 +399,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
bootstrapEnv(numStreams * length, 1, numStreams * 4, numStreams);
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() throws Http2Exception {
|
||||
int upperLimit = 3 + 2 * numStreams;
|
||||
for (int streamId = 3; streamId < upperLimit; streamId += 2) {
|
||||
// Send a bunch of data on each stream.
|
||||
@ -412,7 +412,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
// Write trailers.
|
||||
http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16,
|
||||
false, 0, true, newPromise());
|
||||
ctx().flush();
|
||||
http2Client.flush(ctx());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -27,6 +27,7 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder.GoAwayException;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
@ -116,8 +117,11 @@ public class StreamBufferingEncoderTest {
|
||||
encoderWriteHeaders(3, promise);
|
||||
|
||||
writeVerifyWriteHeaders(times(2), 3, promise);
|
||||
verify(writer, times(3))
|
||||
.writeData(eq(ctx), eq(3), any(ByteBuf.class), eq(0), eq(false), eq(promise));
|
||||
// Contiguous data writes are coalesced
|
||||
ArgumentCaptor<ByteBuf> bufCaptor = ArgumentCaptor.forClass(ByteBuf.class);
|
||||
verify(writer, times(1))
|
||||
.writeData(eq(ctx), eq(3), bufCaptor.capture(), eq(0), eq(false), eq(promise));
|
||||
assertEquals(data().readableBytes() * 3, bufCaptor.getValue().readableBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -396,6 +400,8 @@ public class StreamBufferingEncoderTest {
|
||||
private void setMaxConcurrentStreams(int newValue) {
|
||||
try {
|
||||
encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));
|
||||
// Flush the remote flow controller to write data
|
||||
encoder.flowController().writePendingBytes();
|
||||
} catch (Http2Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -404,6 +410,11 @@ public class StreamBufferingEncoderTest {
|
||||
private void encoderWriteHeaders(int streamId, ChannelPromise promise) {
|
||||
encoder.writeHeaders(ctx, streamId, new DefaultHttp2Headers(), 0, DEFAULT_PRIORITY_WEIGHT,
|
||||
false, 0, false, promise);
|
||||
try {
|
||||
encoder.flowController().writePendingBytes();
|
||||
} catch (Http2Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeVerifyWriteHeaders(VerificationMode mode, int streamId,
|
||||
|
@ -51,7 +51,20 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload) {
|
||||
public void writePendingBytes() throws Http2Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listener(Listener listener) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Listener listener() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload) {
|
||||
// Don't check size beforehand because Headers payload returns 0 all the time.
|
||||
do {
|
||||
payload.write(MAX_INITIAL_WINDOW_SIZE);
|
||||
|
Loading…
Reference in New Issue
Block a user