Http2RemoteFlowController stream writibility listener

Motivation:
For implementations that want to manage flow control down to the stream level it is useful to be notified when stream writability changes.

Modifications:
- Add writabilityChanged to Http2RemoteFlowController.Listener
- Add isWritable to Http2RemoteFlowController

Result:
The Http2RemoteFlowController provides notification when writability of a stream changes.
This commit is contained in:
Scott Mitchell 2015-09-25 15:49:05 -07:00
parent 7ab132f28a
commit 0e9545e94d
7 changed files with 693 additions and 145 deletions

View File

@ -14,22 +14,24 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.StreamByteDistributor.Writer;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayDeque;
import java.util.Deque;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Stream.State;
import java.util.ArrayDeque;
import java.util.Deque;
/**
* Basic implementation of {@link Http2RemoteFlowController}.
* <p>
@ -37,39 +39,45 @@ import java.util.Deque;
* Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
*/
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
private final StreamByteDistributor.Writer writer = new StreamByteDistributor.Writer() {
@Override
public void write(Http2Stream stream, int numBytes) {
int written = state(stream).writeAllocatedBytes(numBytes);
if (written != -1 && listener != null) {
listener.streamWritten(stream, written);
}
}
};
private final Http2Connection connection;
private final Http2Connection.PropertyKey stateKey;
private final StreamByteDistributor streamByteDistributor;
private final AbstractState connectionState;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private WritabilityMonitor monitor;
private ChannelHandlerContext ctx;
private Listener listener;
public DefaultHttp2RemoteFlowController(Http2Connection connection) {
this(connection, new PriorityStreamByteDistributor(connection));
this(connection, (Listener) null);
}
public DefaultHttp2RemoteFlowController(Http2Connection connection,
StreamByteDistributor streamByteDistributor) {
this(connection, streamByteDistributor, null);
}
public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
this(connection, new PriorityStreamByteDistributor(connection), listener);
}
public DefaultHttp2RemoteFlowController(Http2Connection connection,
StreamByteDistributor streamByteDistributor,
final Listener listener) {
this.connection = checkNotNull(connection, "connection");
this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
// Add a flow state for the connection.
stateKey = connection.newKey();
connectionState = new DefaultState(connection.connectionStream(), initialWindowSize);
connectionState = new DefaultState(connection.connectionStream(), initialWindowSize,
initialWindowSize > 0 && isChannelWritable());
connection.connectionStream().setProperty(stateKey, connectionState);
// Monitor may depend upon connectionState, and so initialize after connectionState
listener(listener);
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
@ -78,7 +86,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// only requires the ReducedFlowState. Otherwise the full amount of memory is required.
stream.setProperty(stateKey, stream.state() == IDLE ?
new ReducedState(stream) :
new DefaultState(stream, 0));
new DefaultState(stream, 0,
isWritable(DefaultHttp2RemoteFlowController.this.connection.connectionStream())));
}
@Override
@ -104,13 +113,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// decrease the amount of memory required for this stream because no flow controlled frames can
// be exchanged on this stream
if (stream.prioritizableForTree() != 0) {
stream.setProperty(stateKey, new ReducedState(state));
state = new ReducedState(state);
stream.setProperty(stateKey, state);
}
// Tell the monitor after cancel has been called and after the new state is used.
monitor.stateCancelled(state);
}
@Override
public void onStreamHalfClosed(Http2Stream stream) {
if (State.HALF_CLOSED_LOCAL.equals(stream.state())) {
if (HALF_CLOSED_LOCAL.equals(stream.state())) {
/**
* When this method is called there should not be any
* pending frames left if the API is used correctly. However,
@ -122,7 +134,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*
* This is to cancel any such illegal writes.
*/
state(stream).cancel();
AbstractState state = state(stream);
state.cancel();
monitor.stateCancelled(state);
}
}
});
@ -137,11 +151,15 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
this.ctx = ctx;
// Writing the pending bytes will not check writability change and instead a writability change notification
// to be provided by an explicit call.
channelWritabilityChanged();
// Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
// closed and the queue cleanup will occur when the stream state transitions occur.
// If any frames have been queued up, we should send them now that we have a channel context.
if (ctx != null && ctx.channel().isWritable()) {
if (isChannelWritable()) {
writePendingBytes();
}
}
@ -154,25 +172,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
}
final int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize;
connection.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
// Verify that the maximum value is not exceeded by this change.
state(stream).incrementStreamWindow(delta);
return true;
}
});
if (delta > 0) {
// The window size increased, send any pending frames for all streams.
writePendingBytes();
}
monitor.initialWindowSize(newWindowSize);
}
@Override
@ -185,6 +185,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return state(stream).windowSize();
}
@Override
public boolean isWritable(Http2Stream stream) {
return monitor.isWritable(state(stream));
}
@Override
public void channelWritabilityChanged() throws Http2Exception {
monitor.channelWritabilityChange();
}
private boolean isChannelWritable() {
return ctx != null && isChannelWritable0();
}
private boolean isChannelWritable0() {
return ctx.channel().isWritable();
}
@Override
public void listener(Listener listener) {
monitor = listener == null ? new DefaultWritabilityMonitor() : new ListenerWritabilityMonitor(listener);
}
@Override
public int initialWindowSize(Http2Stream stream) {
return state(stream).initialWindowSize();
@ -193,24 +216,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
if (stream.id() == CONNECTION_STREAM_ID) {
// Update the connection window
connectionState.incrementStreamWindow(delta);
} else {
// Update the stream window
AbstractState state = state(stream);
state.incrementStreamWindow(delta);
}
}
@Override
public void listener(Listener listener) {
this.listener = listener;
}
@Override
public Listener listener() {
return this.listener;
monitor.incrementWindowSize(state(stream), delta);
}
@Override
@ -218,10 +224,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// The context can be null assuming the frame will be queued and send later when the context is set.
assert ctx == null || ctx.executor().inEventLoop();
checkNotNull(frame, "frame");
final AbstractState state;
try {
state = state(stream);
state.enqueueFrame(frame);
monitor.enqueueFrame(state(stream), frame);
} catch (Throwable t) {
frame.error(ctx, t);
}
@ -245,16 +249,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
// number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
// circumstances in which this can happen without rewriting the allocation algorithm.
return Math.max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
}
private int maxUsableChannelBytes() {
if (ctx == null) {
return 0;
}
// If the channel isWritable, allow at least minUseableChannelBytes.
int channelWritableBytes = (int) Math.min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
int useableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
// Clip the usable bytes by the connection window.
@ -262,29 +262,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
/**
* Package private for testing purposes only!
*
* @return The amount of bytes that can be supported by underlying {@link
* io.netty.channel.Channel} without queuing "too-much".
* The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel} without
* queuing "too-much".
*/
private int writableBytes() {
return Math.min(connectionWindowSize(), maxUsableChannelBytes());
return min(connectionWindowSize(), maxUsableChannelBytes());
}
/**
* Writes as many pending bytes as possible, according to stream priority.
*/
@Override
public void writePendingBytes() throws Http2Exception {
int bytesToWrite = writableBytes();
boolean haveUnwrittenBytes;
// Using a do-while loop so that we always write at least once, regardless if we have
// bytesToWrite or not. This ensures that zero-length frames will always be written.
do {
// Distribute the connection window across the streams and write the data.
haveUnwrittenBytes = streamByteDistributor.distribute(bytesToWrite, writer);
} while (haveUnwrittenBytes && (bytesToWrite = writableBytes()) > 0 && ctx.channel().isWritable());
monitor.writePendingBytes();
}
/**
@ -299,8 +286,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Set to true if cancel() was called.
private boolean cancelled;
DefaultState(Http2Stream stream, int initialWindowSize) {
super(stream);
DefaultState(Http2Stream stream, int initialWindowSize, boolean markedWritable) {
super(stream, markedWritable);
window(initialWindowSize);
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
}
@ -348,13 +335,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return window;
}
int writableWindow() {
@Override
public int streamableBytes() {
return max(0, min(pendingBytes, window));
}
/**
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
private int writableWindow() {
return min(window, connectionWindowSize());
}
@Override
public int streamableBytes() {
return max(0, min(pendingBytes, window));
int pendingBytes() {
return pendingBytes;
}
@Override
@ -401,6 +396,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
writeError(frame, streamError(stream.id(), INTERNAL_ERROR, cause,
"Stream closed before write could take place"));
}
streamByteDistributor.updateStreamableBytes(this);
}
@ -412,7 +408,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
FlowControlled frame = peek();
int maxBytes = min(bytes, writableWindow());
if (maxBytes <= 0 && frame.size() != 0) {
// The frame had data and all of it was written.
// The frame still has data, but the amount of allocated bytes has been exhausted.
return -1;
}
int originalBytes = bytes;
@ -423,7 +419,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
frame = peek();
maxBytes = min(bytes, writableWindow());
if (maxBytes <= 0 && frame.size() != 0) {
// The frame had data and all of it was written.
// The frame still has data, but the amount of allocated bytes has been exhausted.
break;
}
bytes -= write(frame, maxBytes);
@ -477,7 +473,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/
private void incrementPendingBytes(int numBytes) {
pendingBytes += numBytes;
streamByteDistributor.updateStreamableBytes(this);
monitor.incrementPendingBytes(numBytes);
}
/**
@ -518,7 +516,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/
private final class ReducedState extends AbstractState {
ReducedState(Http2Stream stream) {
super(stream);
super(stream, false);
}
ReducedState(AbstractState existingState) {
@ -540,6 +538,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return 0;
}
@Override
int pendingBytes() {
return 0;
}
@Override
int writeAllocatedBytes(int allocated) {
throw new UnsupportedOperationException();
@ -577,13 +580,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/
private abstract class AbstractState implements StreamByteDistributor.StreamState {
protected final Http2Stream stream;
private boolean markedWritable;
AbstractState(Http2Stream stream) {
AbstractState(Http2Stream stream, boolean markedWritable) {
this.stream = stream;
this.markedWritable = markedWritable;
}
AbstractState(AbstractState existingState) {
this.stream = existingState.stream();
stream = existingState.stream();
markedWritable = existingState.markWritability();
}
/**
@ -594,6 +600,20 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return stream;
}
/**
* Returns the parameter from the last call to {@link #markWritability(boolean)}.
*/
final boolean markWritability() {
return markedWritable;
}
/**
* Save the state of writability.
*/
final void markWritability(boolean isWritable) {
this.markedWritable = isWritable;
}
abstract int windowSize();
abstract int initialWindowSize();
@ -605,6 +625,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/
abstract int writeAllocatedBytes(int allocated);
/**
* Get the number of bytes pending to be written.
*/
abstract int pendingBytes();
/**
* Any operations that may be pending are cleared and the status of these operations is failed.
*/
@ -625,4 +650,265 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/
abstract void enqueueFrame(FlowControlled frame);
}
/**
* Abstract class which provides common functionality for {@link WritabilityMonitorfoo} implementations.
*/
private abstract class WritabilityMonitor {
private long totalPendingBytes;
/**
* Increment all windows by {@code newWindowSize} amount, and write data if streams change from not writable
* to writable.
* @param newWindowSize The new window size.
* @throws Http2Exception If an overflow occurs or an exception on write occurs.
*/
public abstract void initialWindowSize(int newWindowSize) throws Http2Exception;
/**
* Attempt to allocate bytes to streams which have frames queued.
* @throws Http2Exception If a write occurs and an exception happens in the write operation.
*/
public abstract void writePendingBytes() throws Http2Exception;
/**
* Called when the writability of the underlying channel changes.
* @throws Http2Exception If a write occurs and an exception happens in the write operation.
*/
public void channelWritabilityChange() throws Http2Exception { }
/**
* Called when the state is cancelled outside of a write operation.
* @param state the state that was cancelled.
*/
public void stateCancelled(AbstractState state) { }
/**
* Increment the window size for a particular stream.
* @param state the state associated with the stream whose window is being incremented.
* @param delta The amount to increment by.
* @throws Http2Exception If this operation overflows the window for {@code state}.
*/
public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception {
state.incrementStreamWindow(delta);
}
/**
* Add a frame to be sent via flow control.
* @param state The state associated with the stream which the {@code frame} is associated with.
* @param frame the frame to enqueue.
* @throws Http2Exception If a writability error occurs.
*/
public void enqueueFrame(AbstractState state, FlowControlled frame) throws Http2Exception {
state.enqueueFrame(frame);
}
/**
* Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes
* method should be called.
* @param delta The amount to increment by.
*/
public final void incrementPendingBytes(int delta) {
totalPendingBytes += delta;
// Notification of writibilty change should be delayed until the end of the top level event.
// This is to ensure the flow controller is more consistent state before calling external listener methods.
}
/**
* Determine if the stream associated with {@code state} is writable.
* @param state The state which is associated with the stream to test writability for.
* @return {@code true} if {@link AbstractState#stream()} is writable. {@code false} otherwise.
*/
public final boolean isWritable(AbstractState state) {
return isWritableConnection() && state.windowSize() - state.pendingBytes() > 0;
}
protected final void writePendingBytes(Writer writer) throws Http2Exception {
int bytesToWrite = writableBytes();
// Make sure we always write at least once, regardless if we have bytesToWrite or not.
// This ensures that zero-length frames will always be written.
for (;;) {
if (!streamByteDistributor.distribute(bytesToWrite, writer) ||
(bytesToWrite = writableBytes()) <= 0 ||
!isChannelWritable0()) {
break;
}
}
}
protected final boolean initialWindowSize(int newWindowSize, Writer writer)
throws Http2Exception {
if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
}
final int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize;
connection.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
state(stream).incrementStreamWindow(delta);
return true;
}
});
if (delta > 0) {
// The window size increased, send any pending frames for all streams.
writePendingBytes(writer);
return false;
}
return true;
}
protected final boolean isWritableConnection() {
return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
}
}
/**
* Provides no notification or tracking of writablity changes.
*/
private final class DefaultWritabilityMonitor extends WritabilityMonitor {
private final Writer writer = new StreamByteDistributor.Writer() {
@Override
public void write(Http2Stream stream, int numBytes) {
state(stream).writeAllocatedBytes(numBytes);
}
};
@Override
public void writePendingBytes() throws Http2Exception {
writePendingBytes(writer);
}
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
initialWindowSize(newWindowSize, writer);
}
}
/**
* Writability of a {@code stream} is calculated using the following:
* <pre>
* Connection Window - Total Queued Bytes > 0 &&
* Stream Window - Bytes Queued for Stream > 0 &&
* isChannelWritable()
* </pre>
*/
private final class ListenerWritabilityMonitor extends WritabilityMonitor {
private final Listener listener;
private final Http2StreamVisitor checkStreamWritabilityVisitor = new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
AbstractState state = state(stream);
if (isWritable(state) != state.markWritability()) {
notifyWritabilityChanged(state);
}
return true;
}
};
private final Writer initialWindowSizeWriter = new StreamByteDistributor.Writer() {
@Override
public void write(Http2Stream stream, int numBytes) {
AbstractState state = state(stream);
writeAllocatedBytes(state, numBytes);
if (isWritable(state) != state.markWritability()) {
notifyWritabilityChanged(state);
}
}
};
private final Writer writeAllocatedBytesWriter = new StreamByteDistributor.Writer() {
@Override
public void write(Http2Stream stream, int numBytes) {
writeAllocatedBytes(state(stream), numBytes);
}
};
ListenerWritabilityMonitor(Listener listener) {
this.listener = listener;
}
@Override
public void writePendingBytes() throws Http2Exception {
writePendingBytes(writeAllocatedBytesWriter);
}
@Override
public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception {
super.incrementWindowSize(state, delta);
if (isWritable(state) != state.markWritability()) {
if (state == connectionState) {
checkAllWritabilityChanged();
} else {
notifyWritabilityChanged(state);
}
}
}
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
if (initialWindowSize(newWindowSize, initialWindowSizeWriter)) {
if (isWritableConnection()) {
// If the write operation does not occur we still need to check all streams because they
// may have transitioned from writable to not writable.
checkAllWritabilityChanged();
}
}
}
@Override
public void enqueueFrame(AbstractState state, FlowControlled frame) throws Http2Exception {
super.enqueueFrame(state, frame);
checkConnectionThenStreamWritabilityChanged(state);
}
@Override
public void stateCancelled(AbstractState state) {
try {
checkConnectionThenStreamWritabilityChanged(state);
} catch (Http2Exception e) {
logger.error("Caught unexpected exception from checkAllWritabilityChanged", e);
}
}
@Override
public void channelWritabilityChange() throws Http2Exception {
if (connectionState.markWritability() != isChannelWritable()) {
checkAllWritabilityChanged();
}
}
private void notifyWritabilityChanged(AbstractState state) {
state.markWritability(!state.markWritability());
try {
listener.writabilityChanged(state.stream);
} catch (RuntimeException e) {
logger.error("Caught unexpected exception from listener.writabilityChanged", e);
}
}
private void checkConnectionThenStreamWritabilityChanged(AbstractState state) throws Http2Exception {
// It is possible that the connection window and/or the individual stream writability could change.
if (isWritableConnection() != connectionState.markWritability()) {
checkAllWritabilityChanged();
} else if (isWritable(state) != state.markWritability()) {
notifyWritabilityChanged(state);
}
}
private void checkAllWritabilityChanged() throws Http2Exception {
// Make sure we mark that we have notified as a result of this change.
connectionState.markWritability(isWritableConnection());
connection.forEachActiveStream(checkStreamWritabilityVisitor);
}
private void writeAllocatedBytes(AbstractState state, int numBytes) {
int written = state.writeAllocatedBytes(numBytes);
if (written != -1) {
listener.streamWritten(state.stream(), written);
}
}
}
}

View File

@ -441,6 +441,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
if (ctx.channel().isWritable()) {
flush(ctx);
}
encoder.flowController().channelWritabilityChanged();
} finally {
super.channelWritabilityChanged(ctx);
}

View File

@ -56,11 +56,21 @@ public interface Http2RemoteFlowController extends Http2FlowController {
void listener(Listener listener);
/**
* Get the current listener to flow-control events.
*
* @return the current listener or {@code null} if one is not set.
* Determine if the {@code stream} has bytes remaining for use in the flow control window.
* <p>
* Note that this only takes into account HTTP/2 flow control. It does <strong>not</strong> take into account
* the underlying {@link io.netty.channel.Channel#isWritable()}.
* @param stream The stream to test.
* @return {@code true} if if the {@code stream} has bytes remaining for use in the flow control window.
* {@code false} otherwise.
*/
Listener listener();
boolean isWritable(Http2Stream stream);
/**
* Notification that the writability of {@link #channelHandlerContext()} has changed.
* @throws Http2Exception If any writes occur as a result of this call and encounter errors.
*/
void channelWritabilityChanged() throws Http2Exception;
/**
* Implementations of this interface are used to progressively write chunks of the underlying
@ -132,11 +142,20 @@ public interface Http2RemoteFlowController extends Http2FlowController {
/**
* Report the number of {@code writtenBytes} for a {@code stream}. Called after the
* flow-controller has flushed bytes for the given stream.
*
* <p>
* This method should not throw. Any thrown exceptions are considered a programming error and are ignored.
* @param stream that had bytes written.
* @param writtenBytes the number of bytes written for a stream, can be 0 in the case of an
* empty DATA frame.
*/
void streamWritten(Http2Stream stream, int writtenBytes);
/**
* Notification that {@link Http2RemoteFlowController#isWritable(Http2Stream)} has changed for {@code stream}.
* <p>
* This method should not throw. Any thrown exceptions are considered a programming error and are ignored.
* @param stream The stream which writability has changed for.
*/
void writabilityChanged(Http2Stream stream);
}
}

View File

@ -54,7 +54,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = state(stream).unallocatedStreamableBytesForTree();
long delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(delta);
}
@ -65,7 +65,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = state(stream).unallocatedStreamableBytesForTree();
long delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(-delta);
}
@ -103,7 +103,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
/**
* For testing only.
*/
int unallocatedStreamableBytesForTree(Http2Stream stream) {
long unallocatedStreamableBytesForTree(Http2Stream stream) {
return state(stream).unallocatedStreamableBytesForTree();
}
@ -307,7 +307,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
boolean hasFrame;
int streamableBytes;
int allocated;
int unallocatedStreamableBytesForTree;
long unallocatedStreamableBytesForTree;
PriorityState(Http2Stream stream) {
this.stream = stream;
@ -317,7 +317,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
* Recursively increments the {@link #unallocatedStreamableBytesForTree()} for this branch in
* the priority tree starting at the current node.
*/
void unallocatedStreamableBytesForTreeChanged(int delta) {
void unallocatedStreamableBytesForTreeChanged(long delta) {
unallocatedStreamableBytesForTree += delta;
if (!stream.isRoot()) {
state(stream.parent()).unallocatedStreamableBytesForTreeChanged(delta);
@ -371,7 +371,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
return streamableBytes - allocated;
}
int unallocatedStreamableBytesForTree() {
long unallocatedStreamableBytesForTree() {
return unallocatedStreamableBytesForTree;
}
}

View File

@ -108,12 +108,13 @@ public class DefaultHttp2RemoteFlowControllerTest {
resetCtx();
// This is intentionally left out of initConnectionAndController so it can be tested below.
controller.channelHandlerContext(ctx);
assertWritabilityChanged(1, true);
reset(listener);
}
private void initConnectionAndController() throws Http2Exception {
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2RemoteFlowController(connection);
controller.listener(listener);
controller = new DefaultHttp2RemoteFlowController(connection, listener);
connection.remote().flowController(controller);
connection.local().createStream(STREAM_A, false);
@ -132,6 +133,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertEquals(0, window(STREAM_B));
assertEquals(0, window(STREAM_C));
assertEquals(0, window(STREAM_D));
assertWritabilityChanged(1, false);
}
@Test
@ -142,6 +144,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
verifyZeroInteractions(listener);
}
@Test
@ -152,6 +155,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
verifyZeroInteractions(listener);
}
@Test
@ -163,6 +167,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
verifyZeroInteractions(listener);
}
@Test
@ -173,6 +178,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 0);
verifyZeroInteractions(listener);
}
@Test
@ -184,7 +190,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
data.assertNotWritten();
controller.writePendingBytes();
data.assertNotWritten();
verifyZeroInteractions(listener);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
}
@Test
@ -201,11 +207,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
data1.assertFullyWritten();
data2.assertNotWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 15);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
assertFalse(controller.isWritable(stream(STREAM_A)));
}
@Test
public void stalledStreamShouldQueuePayloads() throws Http2Exception {
controller.initialWindowSize(0);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
assertFalse(controller.isWritable(stream(STREAM_A)));
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(15);
FakeFlowControlled moreData = new FakeFlowControlled(0);
@ -221,6 +232,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
@Test
public void queuedPayloadsReceiveErrorOnStreamClose() throws Http2Exception {
controller.initialWindowSize(0);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
assertFalse(controller.isWritable(stream(STREAM_A)));
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(15);
FakeFlowControlled moreData = new FakeFlowControlled(0);
@ -240,6 +254,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
@Test
public void payloadLargerThanWindowShouldWritePartial() throws Http2Exception {
controller.initialWindowSize(5);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
assertTrue(controller.isWritable(stream(STREAM_A)));
reset(listener);
final FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
@ -247,12 +264,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that a partial frame of 5 remains to be sent
data.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
assertFalse(controller.isWritable(stream(STREAM_A)));
verifyNoMoreInteractions(listener);
}
@Test
public void windowUpdateAndFlushShouldTriggerWrite() throws Http2Exception {
controller.initialWindowSize(10);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
assertTrue(controller.isWritable(stream(STREAM_A)));
FakeFlowControlled data = new FakeFlowControlled(20);
FakeFlowControlled moreData = new FakeFlowControlled(10);
@ -262,16 +283,24 @@ public class DefaultHttp2RemoteFlowControllerTest {
data.assertPartiallyWritten(10);
moreData.assertNotWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
assertFalse(controller.isWritable(stream(STREAM_A)));
reset(listener);
resetCtx();
// Update the window and verify that the rest of data and some of moreData are written
incrementWindowSize(STREAM_A, 15);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
assertFalse(controller.isWritable(stream(STREAM_A)));
reset(listener);
controller.writePendingBytes();
data.assertFullyWritten();
moreData.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 15);
verifyNoMoreInteractions(listener);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertEquals(DEFAULT_WINDOW_SIZE - 25, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
@ -282,7 +311,13 @@ public class DefaultHttp2RemoteFlowControllerTest {
@Test
public void initialWindowUpdateShouldSendPayload() throws Http2Exception {
incrementWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID) + 10);
assertWritabilityChanged(0, true);
reset(listener);
controller.initialWindowSize(0);
assertWritabilityChanged(1, false);
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
@ -292,6 +327,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that the entire frame was sent.
controller.initialWindowSize(10);
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
assertWritabilityChanged(0, false);
}
@Test
@ -299,6 +336,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Collapse the connection window to force queueing.
incrementWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID));
assertEquals(0, window(CONNECTION_STREAM_ID));
assertWritabilityChanged(1, false);
reset(listener);
FakeFlowControlled dataA = new FakeFlowControlled(10);
// Queue data for stream A and allow most of it to be written.
@ -306,11 +345,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.writePendingBytes();
dataA.assertNotWritten();
incrementWindowSize(CONNECTION_STREAM_ID, 8);
assertWritabilityChanged(0, false);
reset(listener);
controller.writePendingBytes();
dataA.assertPartiallyWritten(8);
assertEquals(65527, window(STREAM_A));
assertEquals(0, window(CONNECTION_STREAM_ID));
verify(listener, times(1)).streamWritten(stream(STREAM_A), 8);
assertWritabilityChanged(0, false);
reset(listener);
// Queue data for stream B and allow the rest of A and all of B to be written.
FakeFlowControlled dataB = new FakeFlowControlled(10);
@ -318,8 +362,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.writePendingBytes();
dataB.assertNotWritten();
incrementWindowSize(CONNECTION_STREAM_ID, 12);
assertWritabilityChanged(0, false);
reset(listener);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertWritabilityChanged(0, false);
// Verify the rest of A is written.
dataA.assertFullyWritten();
@ -337,6 +385,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
final int initWindow = 20;
final int secondWindowSize = 10;
controller.initialWindowSize(initWindow);
assertWritabilityChanged(0, true);
reset(listener);
FakeFlowControlled data1 = new FakeFlowControlled(initWindow);
FakeFlowControlled data2 = new FakeFlowControlled(5);
@ -346,38 +396,93 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.writePendingBytes();
data1.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 20);
assertTrue(window(CONNECTION_STREAM_ID) > 0);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
// Make the window size for stream A negative
controller.initialWindowSize(initWindow - secondWindowSize);
assertEquals(-secondWindowSize, window(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
// Queue up a write. It should not be written now because the window is negative
sendData(STREAM_A, data2);
controller.writePendingBytes();
data2.assertNotWritten();
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
// Open the window size back up a bit (no send should happen)
incrementWindowSize(STREAM_A, 5);
controller.writePendingBytes();
assertEquals(-5, window(STREAM_A));
data2.assertNotWritten();
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
// Open the window size back up a bit (no send should happen)
incrementWindowSize(STREAM_A, 5);
controller.writePendingBytes();
assertEquals(0, window(STREAM_A));
data2.assertNotWritten();
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
// Open the window size back up and allow the write to happen
incrementWindowSize(STREAM_A, 5);
controller.writePendingBytes();
data2.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
}
@Test
public void initialWindowUpdateShouldSendEmptyFrame() throws Http2Exception {
controller.initialWindowSize(0);
assertWritabilityChanged(1, false);
reset(listener);
// First send a frame that will get buffered.
FakeFlowControlled data = new FakeFlowControlled(10, false);
@ -393,15 +498,26 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Re-expand the window and verify that both frames were sent.
controller.initialWindowSize(10);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, times(1)).writabilityChanged(stream(STREAM_B));
verify(listener, times(1)).writabilityChanged(stream(STREAM_C));
verify(listener, times(1)).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
data.assertFullyWritten();
data2.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
}
@Test
public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception {
controller.initialWindowSize(0);
assertWritabilityChanged(1, false);
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
@ -410,6 +526,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that a partial frame of 5 was sent.
controller.initialWindowSize(5);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, times(1)).writabilityChanged(stream(STREAM_B));
verify(listener, times(1)).writabilityChanged(stream(STREAM_C));
verify(listener, times(1)).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
data.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
}
@ -418,19 +543,26 @@ public class DefaultHttp2RemoteFlowControllerTest {
public void connectionWindowUpdateShouldSendFrame() throws Http2Exception {
// Set the connection window size to zero.
exhaustStreamWindow(CONNECTION_STREAM_ID);
assertWritabilityChanged(1, false);
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
controller.writePendingBytes();
data.assertNotWritten();
assertWritabilityChanged(0, false);
reset(listener);
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
assertWritabilityChanged(0, false);
reset(listener);
data.assertNotWritten();
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
assertWritabilityChanged(0, false);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -442,6 +574,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
public void connectionWindowUpdateShouldSendPartialFrame() throws Http2Exception {
// Set the connection window size to zero.
exhaustStreamWindow(CONNECTION_STREAM_ID);
assertWritabilityChanged(1, false);
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
@ -451,9 +585,13 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that a partial frame of 5 was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 5);
data.assertNotWritten();
assertWritabilityChanged(0, false);
reset(listener);
controller.writePendingBytes();
data.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
assertWritabilityChanged(0, false);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -465,6 +603,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
public void streamWindowUpdateShouldSendFrame() throws Http2Exception {
// Set the stream window size to zero.
exhaustStreamWindow(STREAM_A);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
@ -473,10 +620,28 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that the entire frame was sent.
incrementWindowSize(STREAM_A, 10);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
data.assertNotWritten();
controller.writePendingBytes();
data.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B));
@ -488,6 +653,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
public void streamWindowUpdateShouldSendPartialFrame() throws Http2Exception {
// Set the stream window size to zero.
exhaustStreamWindow(STREAM_A);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
FakeFlowControlled data = new FakeFlowControlled(10);
sendData(STREAM_A, data);
@ -496,6 +670,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Verify that a partial frame of 5 was sent.
incrementWindowSize(STREAM_A, 5);
verify(listener, never()).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
reset(listener);
data.assertNotWritten();
controller.writePendingBytes();
data.assertPartiallyWritten(5);
@ -529,6 +713,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
verify(flowControlled, never()).writeComplete();
assertEquals(90, windowBefore - window(STREAM_A));
verify(listener, times(1)).streamWritten(stream(STREAM_A), 90);
assertWritabilityChanged(0, true);
}
@Test
@ -559,6 +745,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
verify(flowControlled, never()).writeComplete();
assertEquals(90, windowBefore - window(STREAM_A));
verifyZeroInteractions(listener);
}
@Test
@ -602,6 +789,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
verify(flowControlled).writeComplete();
assertEquals(150, windowBefore - window(STREAM_A));
verify(listener, times(1)).streamWritten(stream(STREAM_A), 150);
assertWritabilityChanged(0, true);
}
@Test
@ -626,6 +815,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
verify(flowControlled).write(any(ChannelHandlerContext.class), anyInt());
verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
verify(flowControlled, never()).writeComplete();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 0);
verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
}
@Test
@ -633,6 +831,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Start the channel as not writable and exercise the public methods of the flow controller
// making sure no frames are written.
setChannelWritability(false);
assertWritabilityChanged(1, false);
reset(listener);
FakeFlowControlled dataA = new FakeFlowControlled(1);
FakeFlowControlled dataB = new FakeFlowControlled(1);
final Http2Stream stream = stream(STREAM_A);
@ -649,9 +849,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
assertWritabilityChanged(0, false);
// Now change the channel to writable and make sure frames are written.
setChannelWritability(true);
assertWritabilityChanged(1, true);
controller.writePendingBytes();
dataA.assertFullyWritten();
dataB.assertFullyWritten();
@ -667,11 +869,30 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Queue some frames
controller.addFlowControlled(stream, dataA);
controller.writePendingBytes();
dataA.assertNotWritten();
controller.incrementWindowSize(stream, 100);
controller.writePendingBytes();
dataA.assertNotWritten();
assertWritabilityChanged(0, false);
// Set the controller
controller.channelHandlerContext(ctx);
dataA.assertFullyWritten();
assertWritabilityChanged(1, true);
}
@Test
public void initialWindowSizeWithNoContextShouldNotThrow() throws Exception {
// Re-initialize the controller so we can ensure the context hasn't been set yet.
initConnectionAndController();
FakeFlowControlled dataA = new FakeFlowControlled(1);
final Http2Stream stream = stream(STREAM_A);
// Queue some frames
controller.addFlowControlled(stream, dataA);
dataA.assertNotWritten();
// Set the controller
@ -679,6 +900,24 @@ public class DefaultHttp2RemoteFlowControllerTest {
dataA.assertFullyWritten();
}
private void assertWritabilityChanged(int amt, boolean writable) {
verify(listener, times(amt)).writabilityChanged(stream(STREAM_A));
verify(listener, times(amt)).writabilityChanged(stream(STREAM_B));
verify(listener, times(amt)).writabilityChanged(stream(STREAM_C));
verify(listener, times(amt)).writabilityChanged(stream(STREAM_D));
if (writable) {
assertTrue(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
} else {
assertFalse(controller.isWritable(stream(STREAM_A)));
assertFalse(controller.isWritable(stream(STREAM_B)));
assertFalse(controller.isWritable(stream(STREAM_C)));
assertFalse(controller.isWritable(stream(STREAM_D)));
}
}
private static Http2RemoteFlowController.FlowControlled mockedFlowControlledThatThrowsOnWrite() throws Exception {
final Http2RemoteFlowController.FlowControlled flowControlled =
Mockito.mock(Http2RemoteFlowController.FlowControlled.class);
@ -714,10 +953,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
incrementWindowSize(streamId, -window(streamId));
}
private void maxStreamWindow(int streamId) throws Http2Exception {
incrementWindowSize(streamId, Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE - window(streamId));
}
private int window(int streamId) throws Http2Exception {
return controller.windowSize(stream(streamId));
}
@ -736,9 +971,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
when(ctx.executor()).thenReturn(executor);
}
private void setChannelWritability(boolean isWritable) {
private void setChannelWritability(boolean isWritable) throws Http2Exception {
when(channel.bytesBeforeUnwritable()).thenReturn(isWritable ? Long.MAX_VALUE : 0);
when(channel.isWritable()).thenReturn(isWritable);
if (controller != null) {
controller.channelWritabilityChanged();
}
}
private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled {

View File

@ -15,6 +15,21 @@
package io.netty.handler.codec.http2;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -32,21 +47,6 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Tests for {@link PriorityStreamByteDistributor}.
*/
@ -661,7 +661,7 @@ public class PriorityStreamByteDistributorTest {
stream(streamId).setPriority(parent, (short) weight, exclusive);
}
private int streamableBytesForTree(Http2Stream stream) {
private long streamableBytesForTree(Http2Stream stream) {
return distributor.unallocatedStreamableBytesForTree(stream);
}
@ -681,8 +681,8 @@ public class PriorityStreamByteDistributorTest {
verify(writer).write(same(stream(streamId)), (int) AdditionalMatchers.eq(numBytes, delta));
}
private static int calculateStreamSizeSum(IntObjectMap<Integer> streamSizes, List<Integer> streamIds) {
int sum = 0;
private static long calculateStreamSizeSum(IntObjectMap<Integer> streamSizes, List<Integer> streamIds) {
long sum = 0;
for (Integer streamId : streamIds) {
Integer streamSize = streamSizes.get(streamId);
if (streamSize != null) {

View File

@ -41,6 +41,11 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public boolean isWritable(Http2Stream stream) {
return true;
}
@Override
public int initialWindowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
@ -58,11 +63,6 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
public void listener(Listener listener) {
}
@Override
public Listener listener() {
return null;
}
@Override
public void addFlowControlled(Http2Stream stream, FlowControlled payload) {
// Don't check size beforehand because Headers payload returns 0 all the time.
@ -80,4 +80,8 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
public ChannelHandlerContext channelHandlerContext() {
return ctx;
}
@Override
public void channelWritabilityChanged() throws Http2Exception {
}
}