diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index c3963c48b4..a39b10af32 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -299,7 +299,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - int windowSize() { + public int windowSize() { return window; } @@ -389,11 +389,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return window; } - @Override - public int streamableBytes() { - return max(0, min(pendingBytes, window)); - } - /** * Returns the maximum writable window (minimum of the stream and connection windows). */ @@ -402,7 +397,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - int pendingBytes() { + public int pendingBytes() { return pendingBytes; } @@ -514,7 +509,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - int windowSize() { + public int windowSize() { return 0; } @@ -524,12 +519,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public int streamableBytes() { - return 0; - } - - @Override - int pendingBytes() { + public int pendingBytes() { return 0; } @@ -604,13 +594,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll this.markedWritable = isWritable; } - @Override - public final boolean isWriteAllowed() { - return windowSize() >= 0; - } - - abstract int windowSize(); - abstract int initialWindowSize(); /** @@ -620,11 +603,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ abstract int writeAllocatedBytes(int allocated); - /** - * Get the number of bytes pending to be written. - */ - abstract int pendingBytes(); - /** * Any operations that may be pending are cleared and the status of these operations is failed. */ @@ -651,20 +629,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ private abstract class WritabilityMonitor { private long totalPendingBytes; + private final Writer writer; - /** - * Increment all windows by {@code newWindowSize} amount, and write data if streams change from not writable - * to writable. - * @param newWindowSize The new window size. - * @throws Http2Exception If an overflow occurs or an exception on write occurs. - */ - public abstract void initialWindowSize(int newWindowSize) throws Http2Exception; - - /** - * Attempt to allocate bytes to streams which have frames queued. - * @throws Http2Exception If a write occurs and an exception happens in the write operation. - */ - public abstract void writePendingBytes() throws Http2Exception; + protected WritabilityMonitor(Writer writer) { + this.writer = writer; + } /** * Called when the writability of the underlying channel changes. @@ -719,7 +688,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return isWritableConnection() && state.windowSize() - state.pendingBytes() > 0; } - protected final void writePendingBytes(Writer writer) throws Http2Exception { + protected final void writePendingBytes() throws Http2Exception { int bytesToWrite = writableBytes(); // Make sure we always write at least once, regardless if we have bytesToWrite or not. @@ -733,7 +702,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } } - protected final boolean initialWindowSize(int newWindowSize, Writer writer) throws Http2Exception { + protected void initialWindowSize(int newWindowSize) throws Http2Exception { if (newWindowSize < 0) { throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); } @@ -750,10 +719,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll if (delta > 0) { // The window size increased, send any pending frames for all streams. - writePendingBytes(writer); - return false; + writePendingBytes(); } - return true; } protected final boolean isWritableConnection() { @@ -765,21 +732,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * Provides no notification or tracking of writablity changes. */ private final class DefaultWritabilityMonitor extends WritabilityMonitor { - private final Writer writer = new StreamByteDistributor.Writer() { - @Override - public void write(Http2Stream stream, int numBytes) { - state(stream).writeAllocatedBytes(numBytes); - } - }; - - @Override - public void writePendingBytes() throws Http2Exception { - writePendingBytes(writer); - } - - @Override - public void initialWindowSize(int newWindowSize) throws Http2Exception { - initialWindowSize(newWindowSize, writer); + DefaultWritabilityMonitor() { + super(new StreamByteDistributor.Writer() { + @Override + public void write(Http2Stream stream, int numBytes) { + state(stream).writeAllocatedBytes(numBytes); + } + }); } } @@ -803,32 +762,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return true; } }; - private final Writer initialWindowSizeWriter = new StreamByteDistributor.Writer() { - @Override - public void write(Http2Stream stream, int numBytes) { - AbstractState state = state(stream); - writeAllocatedBytes(state, numBytes); - if (isWritable(state) != state.markWritability()) { - notifyWritabilityChanged(state); + + ListenerWritabilityMonitor(final Listener listener) { + super(new StreamByteDistributor.Writer() { + @Override + public void write(Http2Stream stream, int numBytes) { + AbstractState state = state(stream); + int written = state.writeAllocatedBytes(numBytes); + if (written != -1) { + listener.streamWritten(state.stream(), written); + } } - } - }; - private final Writer writeAllocatedBytesWriter = new StreamByteDistributor.Writer() { - @Override - public void write(Http2Stream stream, int numBytes) { - writeAllocatedBytes(state(stream), numBytes); - } - }; - - ListenerWritabilityMonitor(Listener listener) { + }); this.listener = listener; } - @Override - public void writePendingBytes() throws Http2Exception { - writePendingBytes(writeAllocatedBytesWriter); - } - @Override public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception { super.incrementWindowSize(state, delta); @@ -842,13 +790,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public void initialWindowSize(int newWindowSize) throws Http2Exception { - if (initialWindowSize(newWindowSize, initialWindowSizeWriter)) { - if (isWritableConnection()) { - // If the write operation does not occur we still need to check all streams because they - // may have transitioned from writable to not writable. - checkAllWritabilityChanged(); - } + protected void initialWindowSize(int newWindowSize) throws Http2Exception { + super.initialWindowSize(newWindowSize); + if (isWritableConnection()) { + // If the write operation does not occur we still need to check all streams because they + // may have transitioned from writable to not writable. + checkAllWritabilityChanged(); } } @@ -897,12 +844,5 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll connectionState.markWritability(isWritableConnection()); connection.forEachActiveStream(checkStreamWritabilityVisitor); } - - private void writeAllocatedBytes(AbstractState state, int numBytes) { - int written = state.writeAllocatedBytes(numBytes); - if (written != -1) { - listener.streamWritten(state.stream(), written); - } - } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java index c8d9877ab8..90051fc1b7 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java @@ -30,6 +30,8 @@ import static io.netty.buffer.Unpooled.directBuffer; import static io.netty.buffer.Unpooled.unmodifiableBuffer; import static io.netty.buffer.Unpooled.unreleasableBuffer; import static io.netty.util.CharsetUtil.UTF_8; +import static java.lang.Math.max; +import static java.lang.Math.min; /** * Constants and utility method used for encoding/decoding HTTP2 frames. @@ -189,6 +191,13 @@ public final class Http2CodecUtil { writeFrameHeaderInternal(out, payloadLength, type, flags, streamId); } + /** + * Calculate the amount of bytes that can be sent by {@code state}. The lower bound is {@code 0}. + */ + public static int streamableBytes(StreamByteDistributor.StreamState state) { + return max(0, min(state.pendingBytes(), state.windowSize())); + } + static void writeFrameHeaderInternal(ByteBuf out, int payloadLength, byte type, Http2Flags flags, int streamId) { out.writeMedium(payloadLength); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java index 04ab3e73dd..365cff7f73 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http2; import java.util.Arrays; +import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -78,7 +79,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo @Override public void updateStreamableBytes(StreamState streamState) { - state(streamState.stream()).updateStreamableBytes(streamState.streamableBytes(), + state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState), streamState.hasFrame()); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamByteDistributor.java index 844e941f2d..9659975d83 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamByteDistributor.java @@ -32,13 +32,12 @@ public interface StreamByteDistributor { Http2Stream stream(); /** - * Returns the number of pending bytes for this node that will fit within the stream flow - * control window. This is used for the priority algorithm to determine the aggregate number - * of bytes that can be written at each node. Each node only takes into account its stream - * window so that when a change occurs to the connection window, these values need not - * change (i.e. no tree traversal is required). + * Get the amount of bytes this stream has pending to send. The actual amount written must not exceed + * {@link #windowSize()}! + * @return The amount of bytes this stream has pending to send. + * @see {@link #io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes(StreamState)} */ - int streamableBytes(); + int pendingBytes(); /** * Indicates whether or not there are frames pending for this stream. @@ -46,11 +45,15 @@ public interface StreamByteDistributor { boolean hasFrame(); /** - * Determine if a write operation is allowed for this stream. This will typically take into account the - * stream's flow controller being non-negative. - * @return {@code true} if a write is allowed on this stream. {@code false} otherwise. + * The size (in bytes) of the stream's flow control window. The amount written must not exceed this amount! + *
A {@link StreamByteDistributor} needs to know the stream's window size in order to avoid allocating bytes
+ * if the window size is negative. The window size being {@code 0} may also be significant to determine when if
+ * an stream has been given a chance to write an empty frame, and also enables optimizations like not writing
+ * empty frames in some situations (don't write headers until data can also be written).
+ * @return the size of the stream's flow control window.
+ * @see {@link #io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes(StreamState)}
*/
- boolean isWriteAllowed();
+ int windowSize();
}
/**
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java
index 5ffd817114..9b3cd2dade 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/UniformStreamByteDistributor.java
@@ -14,15 +14,16 @@
*/
package io.netty.handler.codec.http2;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
/**
* A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
* streams. This class uses a minimum chunk size that will be allocated to each stream. While
@@ -77,8 +78,9 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
@Override
public void updateStreamableBytes(StreamState streamState) {
- State state = state(streamState.stream());
- state.updateStreamableBytes(streamState.streamableBytes(), streamState.hasFrame());
+ state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
+ streamState.hasFrame(),
+ streamState.windowSize());
}
@Override
@@ -119,6 +121,13 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
return checkNotNull(stream, "stream").getProperty(stateKey);
}
+ /**
+ * For testing only!
+ */
+ int streamableBytes0(Http2Stream stream) {
+ return state(stream).streamableBytes;
+ }
+
/**
* The remote flow control state for a single stream.
*/
@@ -126,12 +135,13 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
final Http2Stream stream;
int streamableBytes;
boolean enqueued;
+ boolean writing;
State(Http2Stream stream) {
this.stream = stream;
}
- void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) {
+ void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
assert hasFrame || newStreamableBytes == 0;
int delta = newStreamableBytes - streamableBytes;
@@ -139,7 +149,11 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
streamableBytes = newStreamableBytes;
totalStreamableBytes += delta;
}
- if (hasFrame) {
+ // We should queue this state if there is a frame. We don't want to queue this frame if the window
+ // size is <= 0 and we are writing this state. The rational being we already gave this state the chance to
+ // write, and if there were empty frames the expectation is they would have been sent. At this point there
+ // must be a call to updateStreamableBytes for this state to be able to write again.
+ if (hasFrame && (!writing || windowSize > 0)) {
// It's not in the queue but has data to send, add it.
addToQueue();
}
@@ -150,15 +164,14 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
* assuming all of the bytes will be written.
*/
void write(int numBytes, Writer writer) throws Http2Exception {
- // Update the streamable bytes, assuming that all the bytes will be written.
- int newStreamableBytes = streamableBytes - numBytes;
- updateStreamableBytes(newStreamableBytes, newStreamableBytes > 0);
-
+ writing = true;
try {
// Write the allocated bytes.
writer.write(stream, numBytes);
} catch (Throwable t) {
throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
+ } finally {
+ writing = false;
}
}
@@ -181,7 +194,7 @@ public final class UniformStreamByteDistributor implements StreamByteDistributor
removeFromQueue();
// Clear the streamable bytes.
- updateStreamableBytes(0, false);
+ updateStreamableBytes(0, false, 0);
}
}
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java
index 06267334f3..ee20b1793b 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java
@@ -21,6 +21,7 @@ import io.netty.util.internal.PriorityQueueNode;
import java.util.Queue;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
@@ -104,8 +105,8 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib
@Override
public void updateStreamableBytes(StreamState state) {
- state(state.stream()).updateStreamableBytes(state.streamableBytes(),
- state.hasFrame() && state.isWriteAllowed());
+ state(state.stream()).updateStreamableBytes(streamableBytes(state),
+ state.hasFrame() && state.windowSize() >= 0);
}
@Override
@@ -204,7 +205,7 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib
/**
* For testing only!
*/
- int streamableBytes(Http2Stream stream) {
+ int streamableBytes0(Http2Stream stream) {
return state(stream).streamableBytes;
}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java
index 2d93c63ccb..c98dfd0b7d 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java
@@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Tests for {@link DefaultHttp2RemoteFlowController}.
*/
-public class DefaultHttp2RemoteFlowControllerTest {
+public abstract class DefaultHttp2RemoteFlowControllerTest {
private static final int STREAM_A = 1;
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
@@ -114,9 +114,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
reset(listener);
}
+ protected abstract StreamByteDistributor newDistributor(Http2Connection connection);
+
private void initConnectionAndController() throws Http2Exception {
connection = new DefaultHttp2Connection(false);
- controller = new DefaultHttp2RemoteFlowController(connection, listener);
+ controller = new DefaultHttp2RemoteFlowController(connection, newDistributor(connection), listener);
connection.remote().flowController(controller);
connection.local().createStream(STREAM_A, false);
@@ -926,7 +928,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
mock(Http2RemoteFlowController.FlowControlled.class);
when(flowControlled.size()).thenReturn(100);
doAnswer(new Answer