diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java
index cb98793db9..4281c4d41c 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java
@@ -19,10 +19,12 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque;
@@ -85,7 +87,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
frameWriter = checkNotNull(builder.frameWriter, "frameWriter");
lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
if (connection.remote().flowController() == null) {
- connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection, frameWriter));
+ connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
}
}
@@ -146,7 +148,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) {
- Http2Stream stream;
+ final Http2Stream stream;
try {
if (connection.isGoAway()) {
throw new IllegalStateException("Sending data after connection going away.");
@@ -181,23 +183,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
// Hand control of the frame to the flow controller.
- ChannelFuture future =
- flowController().sendFlowControlledFrame(ctx, stream, data, padding, endOfStream, promise);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- // The write failed, handle the error.
- lifecycleManager.onException(ctx, future.cause());
- } else if (endOfStream) {
- // Close the local side of the stream if this is the last frame
- Http2Stream stream = connection.stream(streamId);
- lifecycleManager.closeLocalSide(stream, ctx.newPromise());
- }
- }
- });
-
- return future;
+ flowController().sendFlowControlled(ctx, stream,
+ new FlowControlledData(ctx, stream, data, padding, endOfStream, promise));
+ return promise;
}
@Override
@@ -211,98 +199,49 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
final Http2Headers headers, final int streamDependency, final short weight,
final boolean exclusive, final int padding, final boolean endOfStream,
final ChannelPromise promise) {
- Http2Stream stream = connection.stream(streamId);
- ChannelFuture lastDataWrite = stream != null ? flowController().lastFlowControlledFrameSent(stream) : null;
try {
if (connection.isGoAway()) {
throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away.");
}
-
+ Http2Stream stream = connection.stream(streamId);
if (stream == null) {
- stream = connection.createLocalStream(streamId).open(endOfStream);
- } else {
- if (stream.isResetSent()) {
- throw new IllegalStateException("Sending headers after sending RST_STREAM.");
- }
- if (stream.isEndOfStreamSent()) {
- throw new IllegalStateException("Sending headers after sending END_STREAM.");
- }
-
- // An existing stream...
- switch (stream.state()) {
- case RESERVED_LOCAL:
- case IDLE:
- stream.open(endOfStream);
- break;
- case OPEN:
- case HALF_CLOSED_REMOTE:
- // Allowed sending headers in these states.
- break;
- default:
- throw new IllegalStateException(String.format(
- "Stream %d in unexpected state: %s", stream.id(), stream.state()));
- }
+ stream = connection.createLocalStream(streamId);
+ } else if (stream.isResetSent()) {
+ throw new IllegalStateException("Sending headers after sending RST_STREAM.");
+ } else if (stream.isEndOfStreamSent()) {
+ throw new IllegalStateException("Sending headers after sending END_STREAM.");
}
- if (lastDataWrite != null && !endOfStream) {
- throw new IllegalStateException(
- "Sending non-trailing headers after data has been sent for stream: "
- + streamId);
+ switch (stream.state()) {
+ case RESERVED_LOCAL:
+ case IDLE:
+ stream.open(endOfStream);
+ break;
+ case OPEN:
+ case HALF_CLOSED_REMOTE:
+ // Allowed sending headers in these states.
+ break;
+ default:
+ throw new IllegalStateException(String.format(
+ "Stream %d in unexpected state: %s", stream.id(), stream.state()));
}
+
+ // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
+ flowController().sendFlowControlled(ctx, stream,
+ new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
+ exclusive, padding, endOfStream, promise));
+ if (endOfStream) {
+ // Flag delivery of EOS synchronously to prevent subsequent frames being enqueued in the flow
+ // controller.
+ stream.endOfStreamSent();
+ }
+ return promise;
} catch (Http2NoMoreStreamIdsException e) {
lifecycleManager.onException(ctx, e);
return promise.setFailure(e);
} catch (Throwable e) {
return promise.setFailure(e);
}
-
- if (lastDataWrite == null) {
- // No previous DATA frames to keep in sync with, just send it now.
- return writeHeaders(ctx, stream, headers, streamDependency, weight, exclusive, padding,
- endOfStream, promise);
- }
-
- // There were previous DATA frames sent. We need to send the HEADERS only after the most
- // recent DATA frame to keep them in sync...
-
- // Only write the HEADERS frame after the previous DATA frame has been written.
- final Http2Stream theStream = stream;
- lastDataWrite.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- // The DATA write failed, also fail this write.
- promise.setFailure(future.cause());
- return;
- }
-
- // Perform the write.
- writeHeaders(ctx, theStream, headers, streamDependency, weight, exclusive, padding,
- endOfStream, promise);
- }
- });
-
- return promise;
- }
-
- /**
- * Writes the given {@link Http2Headers} to the remote endpoint and updates stream state if appropriate.
- */
- private ChannelFuture writeHeaders(ChannelHandlerContext ctx, Http2Stream stream,
- Http2Headers headers, int streamDependency, short weight, boolean exclusive,
- int padding, boolean endOfStream, ChannelPromise promise) {
- ChannelFuture future =
- frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight,
- exclusive, padding, endOfStream, promise);
- ctx.flush();
-
- // If the headers are the end of the stream, close it now.
- if (endOfStream) {
- stream.endOfStreamSent();
- lifecycleManager.closeLocalSide(stream, promise);
- }
-
- return future;
}
@Override
@@ -464,4 +403,166 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public Configuration configuration() {
return frameWriter.configuration();
}
+
+ /**
+ * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
+ * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
+ * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
+ *
+ * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
+ * the passed promise is not completed until last frame write.
+ *
+ */
+ private final class FlowControlledData extends FlowControlledBase {
+ private ByteBuf data;
+ private int size;
+
+ private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
+ boolean endOfStream, ChannelPromise promise) {
+ super(ctx, stream, padding, endOfStream, promise);
+ this.data = data;
+ size = data.readableBytes() + padding;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public void error(Throwable cause) {
+ ReferenceCountUtil.safeRelease(data);
+ lifecycleManager.onException(ctx, cause);
+ data = null;
+ size = 0;
+ promise.tryFailure(cause);
+ }
+
+ @Override
+ public boolean write(int allowedBytes) {
+ if (data == null) {
+ return false;
+ }
+ if (allowedBytes == 0 && size() != 0) {
+ // No point writing an empty DATA frame, wait for a bigger allowance.
+ return false;
+ }
+ int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
+ try {
+ int bytesWritten = 0;
+ do {
+ int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten);
+ ByteBuf toWrite;
+ // Let data consume the frame before padding.
+ int writeableData = data.readableBytes();
+ if (writeableData > allowedFrameSize) {
+ writeableData = allowedFrameSize;
+ toWrite = data.readSlice(writeableData).retain();
+ } else {
+ // We're going to write the full buffer which will cause it to be released, for subsequent
+ // writes just use empty buffer to avoid over-releasing. Have to use an empty buffer
+ // as we may continue to write padding in subsequent frames.
+ toWrite = data;
+ data = Unpooled.EMPTY_BUFFER;
+ }
+ int writeablePadding = Math.min(allowedFrameSize - writeableData, padding);
+ padding -= writeablePadding;
+ bytesWritten += writeableData + writeablePadding;
+ ChannelPromise writePromise;
+ if (size == bytesWritten) {
+ // Can use the original promise if it's the last write
+ writePromise = promise;
+ } else {
+ // Create a new promise and listen to it for failure
+ writePromise = ctx.newPromise();
+ writePromise.addListener(this);
+ }
+ frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
+ size == bytesWritten && endOfStream, writePromise);
+ } while (size != bytesWritten && allowedBytes > bytesWritten);
+ size -= bytesWritten;
+ return true;
+ } catch (Throwable e) {
+ error(e);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
+ * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
+ * blocked on flow-control a HEADER frame must wait until this frame has been written.
+ */
+ private final class FlowControlledHeaders extends FlowControlledBase {
+
+ private final Http2Headers headers;
+ private final int streamDependency;
+ private final short weight;
+ private final boolean exclusive;
+
+ private FlowControlledHeaders(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
+ int streamDependency, short weight, boolean exclusive, int padding,
+ boolean endOfStream, ChannelPromise promise) {
+ super(ctx, stream, padding, endOfStream, promise);
+ this.headers = headers;
+ this.streamDependency = streamDependency;
+ this.weight = weight;
+ this.exclusive = exclusive;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public void error(Throwable cause) {
+ lifecycleManager.onException(ctx, cause);
+ promise.tryFailure(cause);
+ }
+
+ @Override
+ public boolean write(int allowedBytes) {
+ frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
+ padding, endOfStream, promise);
+ return true;
+ }
+ }
+
+ /**
+ * Common base type for payloads to deliver via flow-control.
+ */
+ public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
+ ChannelFutureListener {
+ protected final ChannelHandlerContext ctx;
+ protected final Http2Stream stream;
+ protected final ChannelPromise promise;
+ protected final boolean endOfStream;
+ protected int padding;
+
+ public FlowControlledBase(final ChannelHandlerContext ctx, final Http2Stream stream, int padding,
+ boolean endOfStream, final ChannelPromise promise) {
+ this.ctx = ctx;
+ if (padding < 0) {
+ throw new IllegalArgumentException("padding must be >= 0");
+ }
+ this.padding = padding;
+ this.endOfStream = endOfStream;
+ this.stream = stream;
+ this.promise = promise;
+ promise.addListener(this);
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future == promise && endOfStream) {
+ // Special case where we're listening to the original promise and need to close the stream.
+ lifecycleManager.closeLocalSide(stream, promise);
+ }
+ if (!future.isSuccess()) {
+ error(future.cause());
+ }
+ }
+ }
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java
index c2d46d5809..ebb686741e 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
@@ -23,17 +22,12 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic implementation of {@link Http2RemoteFlowController}.
@@ -51,14 +45,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
};
private final Http2Connection connection;
- private final Http2FrameWriter frameWriter;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx;
- private boolean frameSent;
+ private boolean needFlush;
- public DefaultHttp2RemoteFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
+ public DefaultHttp2RemoteFlowController(Http2Connection connection) {
this.connection = checkNotNull(connection, "connection");
- this.frameWriter = checkNotNull(frameWriter, "frameWriter");
// Add a flow state for the connection.
connection.connectionStream().setProperty(FlowState.class,
@@ -158,65 +150,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Update the stream window and write any pending frames for the stream.
FlowState state = state(stream);
state.incrementStreamWindow(delta);
- frameSent = false;
state.writeBytes(state.writableWindow());
- if (frameSent) {
- flush();
- }
+ flush();
}
}
@Override
- public ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
- ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
+ public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream,
+ FlowControlled payload) {
checkNotNull(ctx, "ctx");
- checkNotNull(promise, "promise");
- checkNotNull(data, "data");
+ checkNotNull(payload, "payload");
if (this.ctx != null && this.ctx != ctx) {
throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
}
- if (padding < 0) {
- throw new IllegalArgumentException("padding must be >= 0");
- }
-
// Save the context. We'll use this later when we write pending bytes.
this.ctx = ctx;
-
try {
FlowState state = state(stream);
-
- int window = state.writableWindow();
- boolean framesAlreadyQueued = state.hasFrame();
-
- FlowState.Frame frame = state.newFrame(promise, data, padding, endStream);
- if (!framesAlreadyQueued && window >= frame.size()) {
- // Window size is large enough to send entire data frame
- frame.write();
- ctx.flush();
- return promise;
- }
-
- // Enqueue the frame to be written when the window size permits.
- frame.enqueue();
-
- if (framesAlreadyQueued || window <= 0) {
- // Stream already has frames pending or is stalled, don't send anything now.
- return promise;
- }
-
- // Create and send a partial frame up to the window size.
- frame.split(window).write();
- ctx.flush();
+ state.newFrame(payload);
+ state.writeBytes(state.writableWindow());
+ flush();
} catch (Throwable e) {
- promise.setFailure(e);
+ payload.error(e);
}
- return promise;
- }
-
- @Override
- public ChannelFuture lastFlowControlledFrameSent(Http2Stream stream) {
- FlowState state = state(stream);
- return state != null ? state.lastNewFrame() : null;
}
/**
@@ -247,8 +203,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* Flushes the {@link ChannelHandlerContext} if we've received any data frames.
*/
private void flush() {
- if (ctx != null) {
+ if (needFlush) {
ctx.flush();
+ needFlush = false;
}
}
@@ -260,14 +217,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
int connectionWindow = state(connectionStream).window();
if (connectionWindow > 0) {
- frameSent = false;
writeChildren(connectionStream, connectionWindow);
for (Http2Stream stream : connection.activeStreams()) {
writeChildNode(state(stream));
}
- if (frameSent) {
- flush();
- }
+ flush();
}
}
@@ -373,7 +327,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private int pendingBytes;
private int streamableBytesForTree;
private int allocated;
- private ChannelFuture lastNewFrame;
FlowState(Http2Stream stream, int initialWindowSize) {
this.stream = stream;
@@ -429,13 +382,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return window;
}
- /**
- * Returns the future for the last new frame created for this stream.
- */
- ChannelFuture lastNewFrame() {
- return lastNewFrame;
- }
-
/**
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
@@ -458,12 +404,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
/**
- * Creates a new frame with the given values but does not add it to the pending queue.
+ * Creates a new payload with the given values and immediately enqueues it.
*/
- Frame newFrame(final ChannelPromise promise, ByteBuf data, int padding, boolean endStream) {
+ Frame newFrame(FlowControlled payload) {
// Store this as the future for the most recent write attempt.
- lastNewFrame = promise;
- return new Frame(new SimplePromiseAggregator(promise), data, padding, endStream);
+ Frame frame = new Frame(payload);
+ pendingWriteQueue.offer(frame);
+ return frame;
}
/**
@@ -500,32 +447,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* boundaries.
*/
int writeBytes(int bytes) {
- if (!stream.localSideOpen()) {
- return 0;
- }
-
int bytesAttempted = 0;
- int maxBytes = min(bytes, writableWindow());
while (hasFrame()) {
- Frame pendingWrite = peek();
- if (maxBytes >= pendingWrite.size()) {
- // Window size is large enough to send entire data frame
- bytesAttempted += pendingWrite.size();
- pendingWrite.write();
- } else if (maxBytes <= 0) {
- // No data from the current frame can be written - we're done.
- // We purposely check this after first testing the size of the
- // pending frame to properly handle zero-length frame.
- break;
- } else {
- // We can send a partial frame
- Frame partialFrame = pendingWrite.split(maxBytes);
- bytesAttempted += partialFrame.size();
- partialFrame.write();
+ int maxBytes = min(bytes - bytesAttempted, writableWindow());
+ bytesAttempted += peek().write(maxBytes);
+ if (bytes - bytesAttempted <= 0) {
+ break;
}
-
- // Update the threshold.
- maxBytes = min(bytes - bytesAttempted, writableWindow());
}
return bytesAttempted;
}
@@ -545,37 +473,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* A wrapper class around the content of a data frame.
*/
private final class Frame {
- final ByteBuf data;
- final boolean endStream;
- final SimplePromiseAggregator promiseAggregator;
- final ChannelPromise promise;
- int padding;
- boolean enqueued;
+ final FlowControlled payload;
- Frame(SimplePromiseAggregator promiseAggregator, ByteBuf data, int padding, boolean endStream) {
- this.data = data;
- this.padding = padding;
- this.endStream = endStream;
- this.promiseAggregator = promiseAggregator;
- promise = ctx.newPromise();
- promiseAggregator.add(promise);
- }
-
- /**
- * Gets the total size (in bytes) of this frame including the data and padding.
- */
- int size() {
- return data.readableBytes() + padding;
- }
-
- void enqueue() {
- if (!enqueued) {
- enqueued = true;
- pendingWriteQueue.offer(this);
-
- // Increment the number of pending bytes for this stream.
- incrementPendingBytes(size());
- }
+ Frame(FlowControlled payload) {
+ this.payload = payload;
+ // Increment the number of pending bytes for this stream.
+ incrementPendingBytes(payload.size());
}
/**
@@ -599,35 +502,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*
* Note: this does not flush the {@link ChannelHandlerContext}.
*/
- void write() {
- // Using a do/while loop because if the buffer is empty we still need to call
- // the writer once to send the empty frame.
- final Http2FrameSizePolicy frameSizePolicy = frameWriter.configuration().frameSizePolicy();
- do {
- int bytesToWrite = size();
- int frameBytes = min(bytesToWrite, frameSizePolicy.maxFrameSize());
- if (frameBytes == bytesToWrite) {
- // All the bytes fit into a single HTTP/2 frame, just send it all.
- try {
- connectionState().incrementStreamWindow(-bytesToWrite);
- incrementStreamWindow(-bytesToWrite);
- } catch (Http2Exception e) { // Should never get here since we're decrementing.
- throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e);
- }
- frameWriter.writeData(ctx, stream.id(), data, padding, endStream, promise);
- frameSent = true;
- decrementPendingBytes(bytesToWrite);
- if (enqueued) {
- // It's enqueued - remove it from the head of the pending write queue.
- pendingWriteQueue.remove();
- }
- return;
- }
-
- // Split a chunk that will fit into a single HTTP/2 frame and write it.
- Frame frame = split(frameBytes);
- frame.write();
- } while (size() > 0);
+ int write(int allowedBytes) {
+ int before = payload.size();
+ needFlush |= payload.write(Math.max(0, allowedBytes));
+ int writtenBytes = before - payload.size();
+ try {
+ connectionState().incrementStreamWindow(-writtenBytes);
+ incrementStreamWindow(-writtenBytes);
+ } catch (Http2Exception e) { // Should never get here since we're decrementing.
+ throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e);
+ }
+ decrementPendingBytes(writtenBytes);
+ if (payload.size() == 0) {
+ pendingWriteQueue.remove();
+ }
+ return writtenBytes;
}
/**
@@ -635,76 +524,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* removed from this branch of the priority tree.
*/
void writeError(Http2Exception cause) {
- decrementPendingBytes(size());
- data.release();
- promise.setFailure(cause);
- }
-
- /**
- * Creates a new frame that is a view of this frame's data. The {@code maxBytes} are first split from the
- * data buffer. If not all the requested bytes are available, the remaining bytes are then split from the
- * padding (if available).
- *
- * @param maxBytes the maximum number of bytes that is allowed in the created frame.
- * @return the partial frame.
- */
- Frame split(int maxBytes) {
- // The requested maxBytes should always be less than the size of this frame.
- assert maxBytes < size() : "Attempting to split a frame for the full size.";
-
- // Get the portion of the data buffer to be split. Limit to the readable bytes.
- int dataSplit = min(maxBytes, data.readableBytes());
-
- // Split any remaining bytes from the padding.
- int paddingSplit = min(maxBytes - dataSplit, padding);
-
- ByteBuf splitSlice = data.readSlice(dataSplit).retain();
- padding -= paddingSplit;
-
- Frame frame = new Frame(promiseAggregator, splitSlice, paddingSplit, false);
-
- int totalBytesSplit = dataSplit + paddingSplit;
- decrementPendingBytes(totalBytesSplit);
- return frame;
+ decrementPendingBytes(payload.size());
+ payload.error(cause);
}
/**
* If this frame is in the pending queue, decrements the number of pending bytes for the stream.
*/
void decrementPendingBytes(int bytes) {
- if (enqueued) {
- incrementPendingBytes(-bytes);
- }
+ incrementPendingBytes(-bytes);
}
}
}
-
- /**
- * Lightweight promise aggregator.
- */
- private static final class SimplePromiseAggregator {
- final ChannelPromise promise;
- final AtomicInteger awaiting = new AtomicInteger();
- final ChannelFutureListener listener = new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- promise.tryFailure(future.cause());
- } else {
- if (awaiting.decrementAndGet() == 0) {
- promise.trySuccess();
- }
- }
- }
- };
-
- SimplePromiseAggregator(ChannelPromise promise) {
- this.promise = promise;
- }
-
- void add(ChannelPromise promise) {
- awaiting.incrementAndGet();
- promise.addListener(listener);
- }
- }
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java
index 839a5fe5ab..3b3d6f3211 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java
@@ -14,10 +14,7 @@
*/
package io.netty.handler.codec.http2;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
/**
* A {@link Http2FlowController} for controlling the flow of outbound {@code DATA} frames to the remote
@@ -26,36 +23,49 @@ import io.netty.channel.ChannelPromise;
public interface Http2RemoteFlowController extends Http2FlowController {
/**
- * Writes or queues a {@code DATA} frame for transmission to the remote endpoint. There is no
+ * Writes or queues a payload for transmission to the remote endpoint. There is no
* guarantee when the data will be written or whether it will be split into multiple frames
- * before sending. The returned future will only be completed once all of the data has been
- * successfully written to the remote endpoint.
+ * before sending.
*
* Manually flushing the {@link ChannelHandlerContext} is not required, since the flow
* controller will flush as appropriate.
*
* @param ctx the context from the handler.
* @param stream the subject stream. Must not be the connection stream object.
- * @param data payload buffer for the frame.
- * @param padding the number of padding bytes to be added at the end of the frame.
- * @param endStream Indicates whether this is the last frame to be sent to the remote endpoint
- * for this stream.
- * @param promise the promise to be completed when the data has been successfully written or a
- * failure occurs.
- * @return a future that is completed when the frame is sent to the remote endpoint.
+ * @param payload payload to write subject to flow-control accounting and ordering rules.
*/
- ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
- ByteBuf data, int padding, boolean endStream, ChannelPromise promise);
+ void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload);
/**
- * Gets the {@link ChannelFuture} for the most recent frame that was sent for the given stream
- * via a call to
- * {@link #sendFlowControlledFrame(ChannelHandlerContext, Http2Stream, ByteBuf, int, boolean, ChannelPromise)}.
- * This is useful for cases such as ensuring that {@code HEADERS} frames maintain send order with {@code DATA}
- * frames.
- *
- * @param stream the subject stream. Must not be the connection stream object.
- * @return the most recent sent frame, or {@code null} if no frame has been sent for the stream.
+ * Implementations of this interface are used to progressively write chunks of the underlying
+ * payload to the stream. A payload is considered to be fully written if {@link #write} has
+ * been called at least once and it's {@link #size} is now zero.
*/
- ChannelFuture lastFlowControlledFrameSent(Http2Stream stream);
+ interface FlowControlled {
+ /**
+ * The size of the payload in terms of bytes applied to the flow-control window.
+ * Some payloads like {@code HEADER} frames have no cost against flow control and would
+ * return 0 for this value even though they produce a non-zero number of bytes on
+ * the wire. Other frames like {@code DATA} frames have both their payload and padding count
+ * against flow-control.
+ */
+ int size();
+
+ /**
+ * Signal an error and release any retained buffers.
+ * @param cause of the error.
+ */
+ void error(Throwable cause);
+
+ /**
+ * Writes up to {@code allowedBytes} of the encapsulated payload to the stream. Note that
+ * a value of 0 may be passed which will allow payloads with flow-control size == 0 to be
+ * written. The flow-controller may call this method multiple times with different values until
+ * the payload is fully written.
+ *
+ * @param allowedBytes an upper bound on the number of bytes the payload can write at this time.
+ * @return {@code true} if a flush is required, {@code false} otherwise.
+ */
+ boolean write(int allowedBytes);
+ }
}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java
index c29bcc1b6c..3834897742 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package io.netty.handler.codec.http2;
import static io.netty.buffer.Unpooled.wrappedBuffer;
@@ -20,21 +19,28 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
+import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.util.CharsetUtil.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -42,9 +48,12 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
@@ -98,14 +107,25 @@ public class DefaultHttp2ConnectionEncoderTest {
@Mock
private Http2FrameWriter writer;
+ @Mock
+ private Http2FrameWriter.Configuration writerConfig;
+
+ @Mock
+ private Http2FrameSizePolicy frameSizePolicy;
+
@Mock
private Http2LifecycleManager lifecycleManager;
+ private ArgumentCaptor payloadCaptor;
+ private List writtenData;
+ private List writtenPadding;
+ private boolean streamClosed;
+
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
- promise = new DefaultChannelPromise(channel);
+ promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
@@ -118,6 +138,9 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote);
when(remote.flowController()).thenReturn(remoteFlow);
+ when(writer.configuration()).thenReturn(writerConfig);
+ when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy);
+ when(frameSizePolicy.maxFrameSize()).thenReturn(64);
doAnswer(new Answer() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
@@ -139,8 +162,35 @@ public class DefaultHttp2ConnectionEncoderTest {
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise)))
.thenReturn(future);
- when(remoteFlow.sendFlowControlledFrame(eq(ctx), any(Http2Stream.class),
- any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise))).thenReturn(future);
+ writtenData = new ArrayList();
+ writtenPadding = new ArrayList();
+ when(writer.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), any(ChannelPromise.class)))
+ .then(new Answer