diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 2038ae69a6..b2253f82e0 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -14,22 +14,24 @@ */ package io.netty.handler.codec.http2; -import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.StreamByteDistributor.Writer; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.ArrayDeque; +import java.util.Deque; + import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.streamError; +import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; import static java.lang.Math.min; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http2.Http2Stream.State; - -import java.util.ArrayDeque; -import java.util.Deque; - /** * Basic implementation of {@link Http2RemoteFlowController}. *

@@ -37,39 +39,45 @@ import java.util.Deque; * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class. */ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class); private static final int MIN_WRITABLE_CHUNK = 32 * 1024; - - private final StreamByteDistributor.Writer writer = new StreamByteDistributor.Writer() { - @Override - public void write(Http2Stream stream, int numBytes) { - int written = state(stream).writeAllocatedBytes(numBytes); - if (written != -1 && listener != null) { - listener.streamWritten(stream, written); - } - } - }; private final Http2Connection connection; private final Http2Connection.PropertyKey stateKey; private final StreamByteDistributor streamByteDistributor; private final AbstractState connectionState; private int initialWindowSize = DEFAULT_WINDOW_SIZE; + private WritabilityMonitor monitor; private ChannelHandlerContext ctx; - private Listener listener; public DefaultHttp2RemoteFlowController(Http2Connection connection) { - this(connection, new PriorityStreamByteDistributor(connection)); + this(connection, (Listener) null); } public DefaultHttp2RemoteFlowController(Http2Connection connection, StreamByteDistributor streamByteDistributor) { + this(connection, streamByteDistributor, null); + } + + public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) { + this(connection, new PriorityStreamByteDistributor(connection), listener); + } + + public DefaultHttp2RemoteFlowController(Http2Connection connection, + StreamByteDistributor streamByteDistributor, + final Listener listener) { this.connection = checkNotNull(connection, "connection"); this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor"); // Add a flow state for the connection. stateKey = connection.newKey(); - connectionState = new DefaultState(connection.connectionStream(), initialWindowSize); + connectionState = new DefaultState(connection.connectionStream(), initialWindowSize, + initialWindowSize > 0 && isChannelWritable()); connection.connectionStream().setProperty(stateKey, connectionState); + // Monitor may depend upon connectionState, and so initialize after connectionState + listener(listener); + // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @Override @@ -78,7 +86,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // only requires the ReducedFlowState. Otherwise the full amount of memory is required. stream.setProperty(stateKey, stream.state() == IDLE ? new ReducedState(stream) : - new DefaultState(stream, 0)); + new DefaultState(stream, 0, + isWritable(DefaultHttp2RemoteFlowController.this.connection.connectionStream()))); } @Override @@ -104,13 +113,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // decrease the amount of memory required for this stream because no flow controlled frames can // be exchanged on this stream if (stream.prioritizableForTree() != 0) { - stream.setProperty(stateKey, new ReducedState(state)); + state = new ReducedState(state); + stream.setProperty(stateKey, state); } + // Tell the monitor after cancel has been called and after the new state is used. + monitor.stateCancelled(state); } @Override public void onStreamHalfClosed(Http2Stream stream) { - if (State.HALF_CLOSED_LOCAL.equals(stream.state())) { + if (HALF_CLOSED_LOCAL.equals(stream.state())) { /** * When this method is called there should not be any * pending frames left if the API is used correctly. However, @@ -122,7 +134,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * * This is to cancel any such illegal writes. */ - state(stream).cancel(); + AbstractState state = state(stream); + state.cancel(); + monitor.stateCancelled(state); } } }); @@ -137,11 +151,15 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception { this.ctx = ctx; + // Writing the pending bytes will not check writability change and instead a writability change notification + // to be provided by an explicit call. + channelWritabilityChanged(); + // Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be // closed and the queue cleanup will occur when the stream state transitions occur. // If any frames have been queued up, we should send them now that we have a channel context. - if (ctx != null && ctx.channel().isWritable()) { + if (isChannelWritable()) { writePendingBytes(); } } @@ -154,25 +172,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override public void initialWindowSize(int newWindowSize) throws Http2Exception { assert ctx == null || ctx.executor().inEventLoop(); - if (newWindowSize < 0) { - throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); - } - - final int delta = newWindowSize - initialWindowSize; - initialWindowSize = newWindowSize; - connection.forEachActiveStream(new Http2StreamVisitor() { - @Override - public boolean visit(Http2Stream stream) throws Http2Exception { - // Verify that the maximum value is not exceeded by this change. - state(stream).incrementStreamWindow(delta); - return true; - } - }); - - if (delta > 0) { - // The window size increased, send any pending frames for all streams. - writePendingBytes(); - } + monitor.initialWindowSize(newWindowSize); } @Override @@ -185,6 +185,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return state(stream).windowSize(); } + @Override + public boolean isWritable(Http2Stream stream) { + return monitor.isWritable(state(stream)); + } + + @Override + public void channelWritabilityChanged() throws Http2Exception { + monitor.channelWritabilityChange(); + } + + private boolean isChannelWritable() { + return ctx != null && isChannelWritable0(); + } + + private boolean isChannelWritable0() { + return ctx.channel().isWritable(); + } + + @Override + public void listener(Listener listener) { + monitor = listener == null ? new DefaultWritabilityMonitor() : new ListenerWritabilityMonitor(listener); + } + @Override public int initialWindowSize(Http2Stream stream) { return state(stream).initialWindowSize(); @@ -193,24 +216,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { assert ctx == null || ctx.executor().inEventLoop(); - if (stream.id() == CONNECTION_STREAM_ID) { - // Update the connection window - connectionState.incrementStreamWindow(delta); - } else { - // Update the stream window - AbstractState state = state(stream); - state.incrementStreamWindow(delta); - } - } - - @Override - public void listener(Listener listener) { - this.listener = listener; - } - - @Override - public Listener listener() { - return this.listener; + monitor.incrementWindowSize(state(stream), delta); } @Override @@ -218,10 +224,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // The context can be null assuming the frame will be queued and send later when the context is set. assert ctx == null || ctx.executor().inEventLoop(); checkNotNull(frame, "frame"); - final AbstractState state; try { - state = state(stream); - state.enqueueFrame(frame); + monitor.enqueueFrame(state(stream), frame); } catch (Throwable t) { frame.error(ctx, t); } @@ -245,16 +249,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the // number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the // circumstances in which this can happen without rewriting the allocation algorithm. - return Math.max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK); + return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK); } private int maxUsableChannelBytes() { - if (ctx == null) { - return 0; - } - // If the channel isWritable, allow at least minUseableChannelBytes. - int channelWritableBytes = (int) Math.min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable()); + int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable()); int useableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0; // Clip the usable bytes by the connection window. @@ -262,29 +262,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } /** - * Package private for testing purposes only! - * - * @return The amount of bytes that can be supported by underlying {@link - * io.netty.channel.Channel} without queuing "too-much". + * The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel} without + * queuing "too-much". */ private int writableBytes() { - return Math.min(connectionWindowSize(), maxUsableChannelBytes()); + return min(connectionWindowSize(), maxUsableChannelBytes()); } - /** - * Writes as many pending bytes as possible, according to stream priority. - */ @Override public void writePendingBytes() throws Http2Exception { - int bytesToWrite = writableBytes(); - boolean haveUnwrittenBytes; - - // Using a do-while loop so that we always write at least once, regardless if we have - // bytesToWrite or not. This ensures that zero-length frames will always be written. - do { - // Distribute the connection window across the streams and write the data. - haveUnwrittenBytes = streamByteDistributor.distribute(bytesToWrite, writer); - } while (haveUnwrittenBytes && (bytesToWrite = writableBytes()) > 0 && ctx.channel().isWritable()); + monitor.writePendingBytes(); } /** @@ -299,8 +286,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // Set to true if cancel() was called. private boolean cancelled; - DefaultState(Http2Stream stream, int initialWindowSize) { - super(stream); + DefaultState(Http2Stream stream, int initialWindowSize, boolean markedWritable) { + super(stream, markedWritable); window(initialWindowSize); pendingWriteQueue = new ArrayDeque(2); } @@ -348,13 +335,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return window; } - int writableWindow() { + @Override + public int streamableBytes() { + return max(0, min(pendingBytes, window)); + } + + /** + * Returns the maximum writable window (minimum of the stream and connection windows). + */ + private int writableWindow() { return min(window, connectionWindowSize()); } @Override - public int streamableBytes() { - return max(0, min(pendingBytes, window)); + int pendingBytes() { + return pendingBytes; } @Override @@ -401,6 +396,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll writeError(frame, streamError(stream.id(), INTERNAL_ERROR, cause, "Stream closed before write could take place")); } + streamByteDistributor.updateStreamableBytes(this); } @@ -412,7 +408,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll FlowControlled frame = peek(); int maxBytes = min(bytes, writableWindow()); if (maxBytes <= 0 && frame.size() != 0) { - // The frame had data and all of it was written. + // The frame still has data, but the amount of allocated bytes has been exhausted. return -1; } int originalBytes = bytes; @@ -423,7 +419,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll frame = peek(); maxBytes = min(bytes, writableWindow()); if (maxBytes <= 0 && frame.size() != 0) { - // The frame had data and all of it was written. + // The frame still has data, but the amount of allocated bytes has been exhausted. break; } bytes -= write(frame, maxBytes); @@ -477,7 +473,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ private void incrementPendingBytes(int numBytes) { pendingBytes += numBytes; + streamByteDistributor.updateStreamableBytes(this); + monitor.incrementPendingBytes(numBytes); } /** @@ -518,7 +516,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ private final class ReducedState extends AbstractState { ReducedState(Http2Stream stream) { - super(stream); + super(stream, false); } ReducedState(AbstractState existingState) { @@ -540,6 +538,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return 0; } + @Override + int pendingBytes() { + return 0; + } + @Override int writeAllocatedBytes(int allocated) { throw new UnsupportedOperationException(); @@ -577,13 +580,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ private abstract class AbstractState implements StreamByteDistributor.StreamState { protected final Http2Stream stream; + private boolean markedWritable; - AbstractState(Http2Stream stream) { + AbstractState(Http2Stream stream, boolean markedWritable) { this.stream = stream; + this.markedWritable = markedWritable; } AbstractState(AbstractState existingState) { - this.stream = existingState.stream(); + stream = existingState.stream(); + markedWritable = existingState.markWritability(); } /** @@ -594,6 +600,20 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return stream; } + /** + * Returns the parameter from the last call to {@link #markWritability(boolean)}. + */ + final boolean markWritability() { + return markedWritable; + } + + /** + * Save the state of writability. + */ + final void markWritability(boolean isWritable) { + this.markedWritable = isWritable; + } + abstract int windowSize(); abstract int initialWindowSize(); @@ -605,6 +625,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ abstract int writeAllocatedBytes(int allocated); + /** + * Get the number of bytes pending to be written. + */ + abstract int pendingBytes(); + /** * Any operations that may be pending are cleared and the status of these operations is failed. */ @@ -625,4 +650,265 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ abstract void enqueueFrame(FlowControlled frame); } + + /** + * Abstract class which provides common functionality for {@link WritabilityMonitorfoo} implementations. + */ + private abstract class WritabilityMonitor { + private long totalPendingBytes; + + /** + * Increment all windows by {@code newWindowSize} amount, and write data if streams change from not writable + * to writable. + * @param newWindowSize The new window size. + * @throws Http2Exception If an overflow occurs or an exception on write occurs. + */ + public abstract void initialWindowSize(int newWindowSize) throws Http2Exception; + + /** + * Attempt to allocate bytes to streams which have frames queued. + * @throws Http2Exception If a write occurs and an exception happens in the write operation. + */ + public abstract void writePendingBytes() throws Http2Exception; + + /** + * Called when the writability of the underlying channel changes. + * @throws Http2Exception If a write occurs and an exception happens in the write operation. + */ + public void channelWritabilityChange() throws Http2Exception { } + + /** + * Called when the state is cancelled outside of a write operation. + * @param state the state that was cancelled. + */ + public void stateCancelled(AbstractState state) { } + + /** + * Increment the window size for a particular stream. + * @param state the state associated with the stream whose window is being incremented. + * @param delta The amount to increment by. + * @throws Http2Exception If this operation overflows the window for {@code state}. + */ + public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception { + state.incrementStreamWindow(delta); + } + + /** + * Add a frame to be sent via flow control. + * @param state The state associated with the stream which the {@code frame} is associated with. + * @param frame the frame to enqueue. + * @throws Http2Exception If a writability error occurs. + */ + public void enqueueFrame(AbstractState state, FlowControlled frame) throws Http2Exception { + state.enqueueFrame(frame); + } + + /** + * Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes + * method should be called. + * @param delta The amount to increment by. + */ + public final void incrementPendingBytes(int delta) { + totalPendingBytes += delta; + + // Notification of writibilty change should be delayed until the end of the top level event. + // This is to ensure the flow controller is more consistent state before calling external listener methods. + } + + /** + * Determine if the stream associated with {@code state} is writable. + * @param state The state which is associated with the stream to test writability for. + * @return {@code true} if {@link AbstractState#stream()} is writable. {@code false} otherwise. + */ + public final boolean isWritable(AbstractState state) { + return isWritableConnection() && state.windowSize() - state.pendingBytes() > 0; + } + + protected final void writePendingBytes(Writer writer) throws Http2Exception { + int bytesToWrite = writableBytes(); + + // Make sure we always write at least once, regardless if we have bytesToWrite or not. + // This ensures that zero-length frames will always be written. + for (;;) { + if (!streamByteDistributor.distribute(bytesToWrite, writer) || + (bytesToWrite = writableBytes()) <= 0 || + !isChannelWritable0()) { + break; + } + } + } + + protected final boolean initialWindowSize(int newWindowSize, Writer writer) + throws Http2Exception { + if (newWindowSize < 0) { + throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); + } + + final int delta = newWindowSize - initialWindowSize; + initialWindowSize = newWindowSize; + connection.forEachActiveStream(new Http2StreamVisitor() { + @Override + public boolean visit(Http2Stream stream) throws Http2Exception { + state(stream).incrementStreamWindow(delta); + return true; + } + }); + + if (delta > 0) { + // The window size increased, send any pending frames for all streams. + writePendingBytes(writer); + return false; + } + return true; + } + + protected final boolean isWritableConnection() { + return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable(); + } + } + + /** + * Provides no notification or tracking of writablity changes. + */ + private final class DefaultWritabilityMonitor extends WritabilityMonitor { + private final Writer writer = new StreamByteDistributor.Writer() { + @Override + public void write(Http2Stream stream, int numBytes) { + state(stream).writeAllocatedBytes(numBytes); + } + }; + + @Override + public void writePendingBytes() throws Http2Exception { + writePendingBytes(writer); + } + + @Override + public void initialWindowSize(int newWindowSize) throws Http2Exception { + initialWindowSize(newWindowSize, writer); + } + } + + /** + * Writability of a {@code stream} is calculated using the following: + *

+     * Connection Window - Total Queued Bytes > 0 &&
+     * Stream Window - Bytes Queued for Stream > 0 &&
+     * isChannelWritable()
+     * 
+ */ + private final class ListenerWritabilityMonitor extends WritabilityMonitor { + private final Listener listener; + private final Http2StreamVisitor checkStreamWritabilityVisitor = new Http2StreamVisitor() { + @Override + public boolean visit(Http2Stream stream) throws Http2Exception { + AbstractState state = state(stream); + if (isWritable(state) != state.markWritability()) { + notifyWritabilityChanged(state); + } + return true; + } + }; + private final Writer initialWindowSizeWriter = new StreamByteDistributor.Writer() { + @Override + public void write(Http2Stream stream, int numBytes) { + AbstractState state = state(stream); + writeAllocatedBytes(state, numBytes); + if (isWritable(state) != state.markWritability()) { + notifyWritabilityChanged(state); + } + } + }; + private final Writer writeAllocatedBytesWriter = new StreamByteDistributor.Writer() { + @Override + public void write(Http2Stream stream, int numBytes) { + writeAllocatedBytes(state(stream), numBytes); + } + }; + + ListenerWritabilityMonitor(Listener listener) { + this.listener = listener; + } + + @Override + public void writePendingBytes() throws Http2Exception { + writePendingBytes(writeAllocatedBytesWriter); + } + + @Override + public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception { + super.incrementWindowSize(state, delta); + if (isWritable(state) != state.markWritability()) { + if (state == connectionState) { + checkAllWritabilityChanged(); + } else { + notifyWritabilityChanged(state); + } + } + } + + @Override + public void initialWindowSize(int newWindowSize) throws Http2Exception { + if (initialWindowSize(newWindowSize, initialWindowSizeWriter)) { + if (isWritableConnection()) { + // If the write operation does not occur we still need to check all streams because they + // may have transitioned from writable to not writable. + checkAllWritabilityChanged(); + } + } + } + + @Override + public void enqueueFrame(AbstractState state, FlowControlled frame) throws Http2Exception { + super.enqueueFrame(state, frame); + checkConnectionThenStreamWritabilityChanged(state); + } + + @Override + public void stateCancelled(AbstractState state) { + try { + checkConnectionThenStreamWritabilityChanged(state); + } catch (Http2Exception e) { + logger.error("Caught unexpected exception from checkAllWritabilityChanged", e); + } + } + + @Override + public void channelWritabilityChange() throws Http2Exception { + if (connectionState.markWritability() != isChannelWritable()) { + checkAllWritabilityChanged(); + } + } + + private void notifyWritabilityChanged(AbstractState state) { + state.markWritability(!state.markWritability()); + try { + listener.writabilityChanged(state.stream); + } catch (RuntimeException e) { + logger.error("Caught unexpected exception from listener.writabilityChanged", e); + } + } + + private void checkConnectionThenStreamWritabilityChanged(AbstractState state) throws Http2Exception { + // It is possible that the connection window and/or the individual stream writability could change. + if (isWritableConnection() != connectionState.markWritability()) { + checkAllWritabilityChanged(); + } else if (isWritable(state) != state.markWritability()) { + notifyWritabilityChanged(state); + } + } + + private void checkAllWritabilityChanged() throws Http2Exception { + // Make sure we mark that we have notified as a result of this change. + connectionState.markWritability(isWritableConnection()); + connection.forEachActiveStream(checkStreamWritabilityVisitor); + } + + private void writeAllocatedBytes(AbstractState state, int numBytes) { + int written = state.writeAllocatedBytes(numBytes); + if (written != -1) { + listener.streamWritten(state.stream(), written); + } + } + } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index a9bd220d95..e893b217f8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -438,6 +438,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http if (ctx.channel().isWritable()) { flush(ctx); } + encoder.flowController().channelWritabilityChanged(); } finally { super.channelWritabilityChanged(ctx); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java index e54339f443..052146f644 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java @@ -56,11 +56,21 @@ public interface Http2RemoteFlowController extends Http2FlowController { void listener(Listener listener); /** - * Get the current listener to flow-control events. - * - * @return the current listener or {@code null} if one is not set. + * Determine if the {@code stream} has bytes remaining for use in the flow control window. + *

+ * Note that this only takes into account HTTP/2 flow control. It does not take into account + * the underlying {@link io.netty.channel.Channel#isWritable()}. + * @param stream The stream to test. + * @return {@code true} if if the {@code stream} has bytes remaining for use in the flow control window. + * {@code false} otherwise. */ - Listener listener(); + boolean isWritable(Http2Stream stream); + + /** + * Notification that the writability of {@link #channelHandlerContext()} has changed. + * @throws Http2Exception If any writes occur as a result of this call and encounter errors. + */ + void channelWritabilityChanged() throws Http2Exception; /** * Implementations of this interface are used to progressively write chunks of the underlying @@ -132,11 +142,20 @@ public interface Http2RemoteFlowController extends Http2FlowController { /** * Report the number of {@code writtenBytes} for a {@code stream}. Called after the * flow-controller has flushed bytes for the given stream. - * + *

+ * This method should not throw. Any thrown exceptions are considered a programming error and are ignored. * @param stream that had bytes written. * @param writtenBytes the number of bytes written for a stream, can be 0 in the case of an * empty DATA frame. */ void streamWritten(Http2Stream stream, int writtenBytes); + + /** + * Notification that {@link Http2RemoteFlowController#isWritable(Http2Stream)} has changed for {@code stream}. + *

+ * This method should not throw. Any thrown exceptions are considered a programming error and are ignored. + * @param stream The stream which writability has changed for. + */ + void writabilityChanged(Http2Stream stream); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java index c417a25714..05d645d2ae 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java @@ -54,7 +54,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { Http2Stream parent = stream.parent(); if (parent != null) { - int delta = state(stream).unallocatedStreamableBytesForTree(); + long delta = state(stream).unallocatedStreamableBytesForTree(); if (delta != 0) { state(parent).unallocatedStreamableBytesForTreeChanged(delta); } @@ -65,7 +65,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) { Http2Stream parent = stream.parent(); if (parent != null) { - int delta = state(stream).unallocatedStreamableBytesForTree(); + long delta = state(stream).unallocatedStreamableBytesForTree(); if (delta != 0) { state(parent).unallocatedStreamableBytesForTreeChanged(-delta); } @@ -103,7 +103,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo /** * For testing only. */ - int unallocatedStreamableBytesForTree(Http2Stream stream) { + long unallocatedStreamableBytesForTree(Http2Stream stream) { return state(stream).unallocatedStreamableBytesForTree(); } @@ -307,7 +307,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo boolean hasFrame; int streamableBytes; int allocated; - int unallocatedStreamableBytesForTree; + long unallocatedStreamableBytesForTree; PriorityState(Http2Stream stream) { this.stream = stream; @@ -317,7 +317,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo * Recursively increments the {@link #unallocatedStreamableBytesForTree()} for this branch in * the priority tree starting at the current node. */ - void unallocatedStreamableBytesForTreeChanged(int delta) { + void unallocatedStreamableBytesForTreeChanged(long delta) { unallocatedStreamableBytesForTree += delta; if (!stream.isRoot()) { state(stream.parent()).unallocatedStreamableBytesForTreeChanged(delta); @@ -371,7 +371,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo return streamableBytes - allocated; } - int unallocatedStreamableBytesForTree() { + long unallocatedStreamableBytesForTree() { return unallocatedStreamableBytesForTree; } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index 228a993d35..bee91aa016 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -108,12 +108,13 @@ public class DefaultHttp2RemoteFlowControllerTest { resetCtx(); // This is intentionally left out of initConnectionAndController so it can be tested below. controller.channelHandlerContext(ctx); + assertWritabilityChanged(1, true); + reset(listener); } private void initConnectionAndController() throws Http2Exception { connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2RemoteFlowController(connection); - controller.listener(listener); + controller = new DefaultHttp2RemoteFlowController(connection, listener); connection.remote().flowController(controller); connection.local().createStream(STREAM_A, false); @@ -132,6 +133,7 @@ public class DefaultHttp2RemoteFlowControllerTest { assertEquals(0, window(STREAM_B)); assertEquals(0, window(STREAM_C)); assertEquals(0, window(STREAM_D)); + assertWritabilityChanged(1, false); } @Test @@ -142,6 +144,7 @@ public class DefaultHttp2RemoteFlowControllerTest { assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D)); + verifyZeroInteractions(listener); } @Test @@ -152,6 +155,7 @@ public class DefaultHttp2RemoteFlowControllerTest { assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D)); + verifyZeroInteractions(listener); } @Test @@ -163,6 +167,7 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.writePendingBytes(); data.assertFullyWritten(); verify(listener, times(1)).streamWritten(stream(STREAM_A), 5); + verifyZeroInteractions(listener); } @Test @@ -173,6 +178,7 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.writePendingBytes(); data.assertFullyWritten(); verify(listener, times(1)).streamWritten(stream(STREAM_A), 0); + verifyZeroInteractions(listener); } @Test @@ -184,7 +190,7 @@ public class DefaultHttp2RemoteFlowControllerTest { data.assertNotWritten(); controller.writePendingBytes(); data.assertNotWritten(); - verifyZeroInteractions(listener); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); } @Test @@ -201,11 +207,16 @@ public class DefaultHttp2RemoteFlowControllerTest { data1.assertFullyWritten(); data2.assertNotWritten(); verify(listener, times(1)).streamWritten(stream(STREAM_A), 15); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + assertFalse(controller.isWritable(stream(STREAM_A))); } @Test public void stalledStreamShouldQueuePayloads() throws Http2Exception { controller.initialWindowSize(0); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + assertFalse(controller.isWritable(stream(STREAM_A))); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(15); FakeFlowControlled moreData = new FakeFlowControlled(0); @@ -221,6 +232,9 @@ public class DefaultHttp2RemoteFlowControllerTest { @Test public void queuedPayloadsReceiveErrorOnStreamClose() throws Http2Exception { controller.initialWindowSize(0); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + assertFalse(controller.isWritable(stream(STREAM_A))); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(15); FakeFlowControlled moreData = new FakeFlowControlled(0); @@ -240,6 +254,9 @@ public class DefaultHttp2RemoteFlowControllerTest { @Test public void payloadLargerThanWindowShouldWritePartial() throws Http2Exception { controller.initialWindowSize(5); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + assertTrue(controller.isWritable(stream(STREAM_A))); + reset(listener); final FakeFlowControlled data = new FakeFlowControlled(10); sendData(STREAM_A, data); @@ -247,12 +264,16 @@ public class DefaultHttp2RemoteFlowControllerTest { // Verify that a partial frame of 5 remains to be sent data.assertPartiallyWritten(5); verify(listener, times(1)).streamWritten(stream(STREAM_A), 5); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + assertFalse(controller.isWritable(stream(STREAM_A))); verifyNoMoreInteractions(listener); } @Test public void windowUpdateAndFlushShouldTriggerWrite() throws Http2Exception { controller.initialWindowSize(10); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + assertTrue(controller.isWritable(stream(STREAM_A))); FakeFlowControlled data = new FakeFlowControlled(20); FakeFlowControlled moreData = new FakeFlowControlled(10); @@ -262,16 +283,24 @@ public class DefaultHttp2RemoteFlowControllerTest { data.assertPartiallyWritten(10); moreData.assertNotWritten(); verify(listener, times(1)).streamWritten(stream(STREAM_A), 10); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + assertFalse(controller.isWritable(stream(STREAM_A))); + reset(listener); resetCtx(); // Update the window and verify that the rest of data and some of moreData are written incrementWindowSize(STREAM_A, 15); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + assertFalse(controller.isWritable(stream(STREAM_A))); + reset(listener); + controller.writePendingBytes(); data.assertFullyWritten(); moreData.assertPartiallyWritten(5); verify(listener, times(1)).streamWritten(stream(STREAM_A), 15); - verifyNoMoreInteractions(listener); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + assertFalse(controller.isWritable(stream(STREAM_A))); assertEquals(DEFAULT_WINDOW_SIZE - 25, window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); @@ -282,7 +311,13 @@ public class DefaultHttp2RemoteFlowControllerTest { @Test public void initialWindowUpdateShouldSendPayload() throws Http2Exception { + incrementWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID) + 10); + assertWritabilityChanged(0, true); + reset(listener); + controller.initialWindowSize(0); + assertWritabilityChanged(1, false); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(10); sendData(STREAM_A, data); @@ -292,6 +327,8 @@ public class DefaultHttp2RemoteFlowControllerTest { // Verify that the entire frame was sent. controller.initialWindowSize(10); data.assertFullyWritten(); + verify(listener, times(1)).streamWritten(stream(STREAM_A), 10); + assertWritabilityChanged(0, false); } @Test @@ -299,6 +336,8 @@ public class DefaultHttp2RemoteFlowControllerTest { // Collapse the connection window to force queueing. incrementWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID)); assertEquals(0, window(CONNECTION_STREAM_ID)); + assertWritabilityChanged(1, false); + reset(listener); FakeFlowControlled dataA = new FakeFlowControlled(10); // Queue data for stream A and allow most of it to be written. @@ -306,11 +345,16 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.writePendingBytes(); dataA.assertNotWritten(); incrementWindowSize(CONNECTION_STREAM_ID, 8); + assertWritabilityChanged(0, false); + reset(listener); + controller.writePendingBytes(); dataA.assertPartiallyWritten(8); assertEquals(65527, window(STREAM_A)); assertEquals(0, window(CONNECTION_STREAM_ID)); verify(listener, times(1)).streamWritten(stream(STREAM_A), 8); + assertWritabilityChanged(0, false); + reset(listener); // Queue data for stream B and allow the rest of A and all of B to be written. FakeFlowControlled dataB = new FakeFlowControlled(10); @@ -318,8 +362,12 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.writePendingBytes(); dataB.assertNotWritten(); incrementWindowSize(CONNECTION_STREAM_ID, 12); + assertWritabilityChanged(0, false); + reset(listener); + controller.writePendingBytes(); assertEquals(0, window(CONNECTION_STREAM_ID)); + assertWritabilityChanged(0, false); // Verify the rest of A is written. dataA.assertFullyWritten(); @@ -337,6 +385,8 @@ public class DefaultHttp2RemoteFlowControllerTest { final int initWindow = 20; final int secondWindowSize = 10; controller.initialWindowSize(initWindow); + assertWritabilityChanged(0, true); + reset(listener); FakeFlowControlled data1 = new FakeFlowControlled(initWindow); FakeFlowControlled data2 = new FakeFlowControlled(5); @@ -346,38 +396,93 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.writePendingBytes(); data1.assertFullyWritten(); verify(listener, times(1)).streamWritten(stream(STREAM_A), 20); + assertTrue(window(CONNECTION_STREAM_ID) > 0); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); // Make the window size for stream A negative controller.initialWindowSize(initWindow - secondWindowSize); assertEquals(-secondWindowSize, window(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); // Queue up a write. It should not be written now because the window is negative sendData(STREAM_A, data2); controller.writePendingBytes(); data2.assertNotWritten(); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); // Open the window size back up a bit (no send should happen) incrementWindowSize(STREAM_A, 5); controller.writePendingBytes(); assertEquals(-5, window(STREAM_A)); data2.assertNotWritten(); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); // Open the window size back up a bit (no send should happen) incrementWindowSize(STREAM_A, 5); controller.writePendingBytes(); assertEquals(0, window(STREAM_A)); data2.assertNotWritten(); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); // Open the window size back up and allow the write to happen incrementWindowSize(STREAM_A, 5); controller.writePendingBytes(); data2.assertFullyWritten(); - verify(listener, times(1)).streamWritten(stream(STREAM_A), 5); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); } @Test public void initialWindowUpdateShouldSendEmptyFrame() throws Http2Exception { controller.initialWindowSize(0); + assertWritabilityChanged(1, false); + reset(listener); // First send a frame that will get buffered. FakeFlowControlled data = new FakeFlowControlled(10, false); @@ -393,15 +498,26 @@ public class DefaultHttp2RemoteFlowControllerTest { // Re-expand the window and verify that both frames were sent. controller.initialWindowSize(10); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, times(1)).writabilityChanged(stream(STREAM_B)); + verify(listener, times(1)).writabilityChanged(stream(STREAM_C)); + verify(listener, times(1)).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); data.assertFullyWritten(); data2.assertFullyWritten(); + verify(listener, times(1)).streamWritten(stream(STREAM_A), 10); } @Test public void initialWindowUpdateShouldSendPartialFrame() throws Http2Exception { controller.initialWindowSize(0); + assertWritabilityChanged(1, false); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(10); sendData(STREAM_A, data); @@ -410,6 +526,15 @@ public class DefaultHttp2RemoteFlowControllerTest { // Verify that a partial frame of 5 was sent. controller.initialWindowSize(5); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, times(1)).writabilityChanged(stream(STREAM_B)); + verify(listener, times(1)).writabilityChanged(stream(STREAM_C)); + verify(listener, times(1)).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + data.assertPartiallyWritten(5); verify(listener, times(1)).streamWritten(stream(STREAM_A), 5); } @@ -418,19 +543,26 @@ public class DefaultHttp2RemoteFlowControllerTest { public void connectionWindowUpdateShouldSendFrame() throws Http2Exception { // Set the connection window size to zero. exhaustStreamWindow(CONNECTION_STREAM_ID); + assertWritabilityChanged(1, false); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(10); sendData(STREAM_A, data); controller.writePendingBytes(); data.assertNotWritten(); + assertWritabilityChanged(0, false); + reset(listener); // Verify that the entire frame was sent. incrementWindowSize(CONNECTION_STREAM_ID, 10); + assertWritabilityChanged(0, false); + reset(listener); data.assertNotWritten(); + controller.writePendingBytes(); data.assertFullyWritten(); verify(listener, times(1)).streamWritten(stream(STREAM_A), 10); - + assertWritabilityChanged(0, false); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -442,6 +574,8 @@ public class DefaultHttp2RemoteFlowControllerTest { public void connectionWindowUpdateShouldSendPartialFrame() throws Http2Exception { // Set the connection window size to zero. exhaustStreamWindow(CONNECTION_STREAM_ID); + assertWritabilityChanged(1, false); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(10); sendData(STREAM_A, data); @@ -451,9 +585,13 @@ public class DefaultHttp2RemoteFlowControllerTest { // Verify that a partial frame of 5 was sent. incrementWindowSize(CONNECTION_STREAM_ID, 5); data.assertNotWritten(); + assertWritabilityChanged(0, false); + reset(listener); + controller.writePendingBytes(); data.assertPartiallyWritten(5); verify(listener, times(1)).streamWritten(stream(STREAM_A), 5); + assertWritabilityChanged(0, false); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -465,6 +603,15 @@ public class DefaultHttp2RemoteFlowControllerTest { public void streamWindowUpdateShouldSendFrame() throws Http2Exception { // Set the stream window size to zero. exhaustStreamWindow(STREAM_A); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(10); sendData(STREAM_A, data); @@ -473,10 +620,28 @@ public class DefaultHttp2RemoteFlowControllerTest { // Verify that the entire frame was sent. incrementWindowSize(STREAM_A, 10); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); + data.assertNotWritten(); controller.writePendingBytes(); data.assertFullyWritten(); verify(listener, times(1)).streamWritten(stream(STREAM_A), 10); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); assertEquals(DEFAULT_WINDOW_SIZE - 10, window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_B)); @@ -488,6 +653,15 @@ public class DefaultHttp2RemoteFlowControllerTest { public void streamWindowUpdateShouldSendPartialFrame() throws Http2Exception { // Set the stream window size to zero. exhaustStreamWindow(STREAM_A); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); FakeFlowControlled data = new FakeFlowControlled(10); sendData(STREAM_A, data); @@ -496,6 +670,16 @@ public class DefaultHttp2RemoteFlowControllerTest { // Verify that a partial frame of 5 was sent. incrementWindowSize(STREAM_A, 5); + verify(listener, never()).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + reset(listener); + data.assertNotWritten(); controller.writePendingBytes(); data.assertPartiallyWritten(5); @@ -529,6 +713,8 @@ public class DefaultHttp2RemoteFlowControllerTest { verify(flowControlled, never()).writeComplete(); assertEquals(90, windowBefore - window(STREAM_A)); + verify(listener, times(1)).streamWritten(stream(STREAM_A), 90); + assertWritabilityChanged(0, true); } @Test @@ -559,6 +745,7 @@ public class DefaultHttp2RemoteFlowControllerTest { verify(flowControlled, never()).writeComplete(); assertEquals(90, windowBefore - window(STREAM_A)); + verifyZeroInteractions(listener); } @Test @@ -602,6 +789,8 @@ public class DefaultHttp2RemoteFlowControllerTest { verify(flowControlled).writeComplete(); assertEquals(150, windowBefore - window(STREAM_A)); + verify(listener, times(1)).streamWritten(stream(STREAM_A), 150); + assertWritabilityChanged(0, true); } @Test @@ -626,6 +815,15 @@ public class DefaultHttp2RemoteFlowControllerTest { verify(flowControlled).write(any(ChannelHandlerContext.class), anyInt()); verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class)); verify(flowControlled, never()).writeComplete(); + verify(listener, times(1)).streamWritten(stream(STREAM_A), 0); + verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); + verify(listener, never()).writabilityChanged(stream(STREAM_B)); + verify(listener, never()).writabilityChanged(stream(STREAM_C)); + verify(listener, never()).writabilityChanged(stream(STREAM_D)); + assertFalse(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); } @Test @@ -633,6 +831,8 @@ public class DefaultHttp2RemoteFlowControllerTest { // Start the channel as not writable and exercise the public methods of the flow controller // making sure no frames are written. setChannelWritability(false); + assertWritabilityChanged(1, false); + reset(listener); FakeFlowControlled dataA = new FakeFlowControlled(1); FakeFlowControlled dataB = new FakeFlowControlled(1); final Http2Stream stream = stream(STREAM_A); @@ -649,9 +849,11 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.writePendingBytes(); dataA.assertNotWritten(); dataB.assertNotWritten(); + assertWritabilityChanged(0, false); // Now change the channel to writable and make sure frames are written. setChannelWritability(true); + assertWritabilityChanged(1, true); controller.writePendingBytes(); dataA.assertFullyWritten(); dataB.assertFullyWritten(); @@ -667,11 +869,30 @@ public class DefaultHttp2RemoteFlowControllerTest { // Queue some frames controller.addFlowControlled(stream, dataA); - controller.writePendingBytes(); dataA.assertNotWritten(); controller.incrementWindowSize(stream, 100); - controller.writePendingBytes(); + dataA.assertNotWritten(); + + assertWritabilityChanged(0, false); + + // Set the controller + controller.channelHandlerContext(ctx); + dataA.assertFullyWritten(); + + assertWritabilityChanged(1, true); + } + + @Test + public void initialWindowSizeWithNoContextShouldNotThrow() throws Exception { + // Re-initialize the controller so we can ensure the context hasn't been set yet. + initConnectionAndController(); + + FakeFlowControlled dataA = new FakeFlowControlled(1); + final Http2Stream stream = stream(STREAM_A); + + // Queue some frames + controller.addFlowControlled(stream, dataA); dataA.assertNotWritten(); // Set the controller @@ -679,6 +900,24 @@ public class DefaultHttp2RemoteFlowControllerTest { dataA.assertFullyWritten(); } + private void assertWritabilityChanged(int amt, boolean writable) { + verify(listener, times(amt)).writabilityChanged(stream(STREAM_A)); + verify(listener, times(amt)).writabilityChanged(stream(STREAM_B)); + verify(listener, times(amt)).writabilityChanged(stream(STREAM_C)); + verify(listener, times(amt)).writabilityChanged(stream(STREAM_D)); + if (writable) { + assertTrue(controller.isWritable(stream(STREAM_A))); + assertTrue(controller.isWritable(stream(STREAM_B))); + assertTrue(controller.isWritable(stream(STREAM_C))); + assertTrue(controller.isWritable(stream(STREAM_D))); + } else { + assertFalse(controller.isWritable(stream(STREAM_A))); + assertFalse(controller.isWritable(stream(STREAM_B))); + assertFalse(controller.isWritable(stream(STREAM_C))); + assertFalse(controller.isWritable(stream(STREAM_D))); + } + } + private static Http2RemoteFlowController.FlowControlled mockedFlowControlledThatThrowsOnWrite() throws Exception { final Http2RemoteFlowController.FlowControlled flowControlled = Mockito.mock(Http2RemoteFlowController.FlowControlled.class); @@ -714,10 +953,6 @@ public class DefaultHttp2RemoteFlowControllerTest { incrementWindowSize(streamId, -window(streamId)); } - private void maxStreamWindow(int streamId) throws Http2Exception { - incrementWindowSize(streamId, Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE - window(streamId)); - } - private int window(int streamId) throws Http2Exception { return controller.windowSize(stream(streamId)); } @@ -736,9 +971,12 @@ public class DefaultHttp2RemoteFlowControllerTest { when(ctx.executor()).thenReturn(executor); } - private void setChannelWritability(boolean isWritable) { + private void setChannelWritability(boolean isWritable) throws Http2Exception { when(channel.bytesBeforeUnwritable()).thenReturn(isWritable ? Long.MAX_VALUE : 0); when(channel.isWritable()).thenReturn(isWritable); + if (controller != null) { + controller.channelWritabilityChanged(); + } } private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java index 12f1a759b2..1f8504a155 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java @@ -15,6 +15,21 @@ package io.netty.handler.codec.http2; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.IntObjectMap; +import org.junit.Before; +import org.junit.Test; +import org.mockito.AdditionalMatchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.verification.VerificationMode; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -32,21 +47,6 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import io.netty.util.collection.IntObjectHashMap; -import io.netty.util.collection.IntObjectMap; -import org.junit.Before; -import org.junit.Test; -import org.mockito.AdditionalMatchers; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.mockito.verification.VerificationMode; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - /** * Tests for {@link PriorityStreamByteDistributor}. */ @@ -661,7 +661,7 @@ public class PriorityStreamByteDistributorTest { stream(streamId).setPriority(parent, (short) weight, exclusive); } - private int streamableBytesForTree(Http2Stream stream) { + private long streamableBytesForTree(Http2Stream stream) { return distributor.unallocatedStreamableBytesForTree(stream); } @@ -681,8 +681,8 @@ public class PriorityStreamByteDistributorTest { verify(writer).write(same(stream(streamId)), (int) AdditionalMatchers.eq(numBytes, delta)); } - private static int calculateStreamSizeSum(IntObjectMap streamSizes, List streamIds) { - int sum = 0; + private static long calculateStreamSizeSum(IntObjectMap streamSizes, List streamIds) { + long sum = 0; for (Integer streamId : streamIds) { Integer streamSize = streamSizes.get(streamId); if (streamSize != null) { diff --git a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java index 27e236da43..ede3a9b883 100644 --- a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java +++ b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java @@ -41,6 +41,11 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr return MAX_INITIAL_WINDOW_SIZE; } + @Override + public boolean isWritable(Http2Stream stream) { + return true; + } + @Override public int initialWindowSize(Http2Stream stream) { return MAX_INITIAL_WINDOW_SIZE; @@ -58,11 +63,6 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr public void listener(Listener listener) { } - @Override - public Listener listener() { - return null; - } - @Override public void addFlowControlled(Http2Stream stream, FlowControlled payload) { // Don't check size beforehand because Headers payload returns 0 all the time. @@ -80,4 +80,8 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr public ChannelHandlerContext channelHandlerContext() { return ctx; } + + @Override + public void channelWritabilityChanged() throws Http2Exception { + } }