Make flow-controller a write-queue for HEADERS and DATA

Motivation:

Previously flow-controller had to know the implementation details of each frame type in order to write it correctly. That concern is more correctly handled by the encoder. By encapsulating the payload types to be flow-controlled it will be easier to add support for extension types later. This change also fixes #3353.

Modifications:

Add interface FlowControlled which is now delivered to flow-controller.
Implement this interface for HEADERS and DATA
Refactor and improve tests for flow-control.

Result:

Flow control semantics are more cleanly separated for data encoding and implementation is simpler overall.
This commit is contained in:
louiscryan 2015-01-23 11:32:17 -08:00 committed by scottmitch
parent b19a12b952
commit 8bbfcb05a0
5 changed files with 926 additions and 1147 deletions

View File

@ -19,10 +19,12 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque;
@ -85,7 +87,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
frameWriter = checkNotNull(builder.frameWriter, "frameWriter");
lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
if (connection.remote().flowController() == null) {
connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection, frameWriter));
connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
}
}
@ -146,7 +148,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) {
Http2Stream stream;
final Http2Stream stream;
try {
if (connection.isGoAway()) {
throw new IllegalStateException("Sending data after connection going away.");
@ -181,23 +183,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
// Hand control of the frame to the flow controller.
ChannelFuture future =
flowController().sendFlowControlledFrame(ctx, stream, data, padding, endOfStream, promise);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The write failed, handle the error.
lifecycleManager.onException(ctx, future.cause());
} else if (endOfStream) {
// Close the local side of the stream if this is the last frame
Http2Stream stream = connection.stream(streamId);
lifecycleManager.closeLocalSide(stream, ctx.newPromise());
}
}
});
return future;
flowController().sendFlowControlled(ctx, stream,
new FlowControlledData(ctx, stream, data, padding, endOfStream, promise));
return promise;
}
@Override
@ -211,98 +199,49 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
final Http2Headers headers, final int streamDependency, final short weight,
final boolean exclusive, final int padding, final boolean endOfStream,
final ChannelPromise promise) {
Http2Stream stream = connection.stream(streamId);
ChannelFuture lastDataWrite = stream != null ? flowController().lastFlowControlledFrameSent(stream) : null;
try {
if (connection.isGoAway()) {
throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away.");
}
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
stream = connection.createLocalStream(streamId).open(endOfStream);
} else {
if (stream.isResetSent()) {
throw new IllegalStateException("Sending headers after sending RST_STREAM.");
}
if (stream.isEndOfStreamSent()) {
throw new IllegalStateException("Sending headers after sending END_STREAM.");
}
// An existing stream...
switch (stream.state()) {
case RESERVED_LOCAL:
case IDLE:
stream.open(endOfStream);
break;
case OPEN:
case HALF_CLOSED_REMOTE:
// Allowed sending headers in these states.
break;
default:
throw new IllegalStateException(String.format(
"Stream %d in unexpected state: %s", stream.id(), stream.state()));
}
stream = connection.createLocalStream(streamId);
} else if (stream.isResetSent()) {
throw new IllegalStateException("Sending headers after sending RST_STREAM.");
} else if (stream.isEndOfStreamSent()) {
throw new IllegalStateException("Sending headers after sending END_STREAM.");
}
if (lastDataWrite != null && !endOfStream) {
throw new IllegalStateException(
"Sending non-trailing headers after data has been sent for stream: "
+ streamId);
switch (stream.state()) {
case RESERVED_LOCAL:
case IDLE:
stream.open(endOfStream);
break;
case OPEN:
case HALF_CLOSED_REMOTE:
// Allowed sending headers in these states.
break;
default:
throw new IllegalStateException(String.format(
"Stream %d in unexpected state: %s", stream.id(), stream.state()));
}
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
flowController().sendFlowControlled(ctx, stream,
new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise));
if (endOfStream) {
// Flag delivery of EOS synchronously to prevent subsequent frames being enqueued in the flow
// controller.
stream.endOfStreamSent();
}
return promise;
} catch (Http2NoMoreStreamIdsException e) {
lifecycleManager.onException(ctx, e);
return promise.setFailure(e);
} catch (Throwable e) {
return promise.setFailure(e);
}
if (lastDataWrite == null) {
// No previous DATA frames to keep in sync with, just send it now.
return writeHeaders(ctx, stream, headers, streamDependency, weight, exclusive, padding,
endOfStream, promise);
}
// There were previous DATA frames sent. We need to send the HEADERS only after the most
// recent DATA frame to keep them in sync...
// Only write the HEADERS frame after the previous DATA frame has been written.
final Http2Stream theStream = stream;
lastDataWrite.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The DATA write failed, also fail this write.
promise.setFailure(future.cause());
return;
}
// Perform the write.
writeHeaders(ctx, theStream, headers, streamDependency, weight, exclusive, padding,
endOfStream, promise);
}
});
return promise;
}
/**
* Writes the given {@link Http2Headers} to the remote endpoint and updates stream state if appropriate.
*/
private ChannelFuture writeHeaders(ChannelHandlerContext ctx, Http2Stream stream,
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
int padding, boolean endOfStream, ChannelPromise promise) {
ChannelFuture future =
frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight,
exclusive, padding, endOfStream, promise);
ctx.flush();
// If the headers are the end of the stream, close it now.
if (endOfStream) {
stream.endOfStreamSent();
lifecycleManager.closeLocalSide(stream, promise);
}
return future;
}
@Override
@ -464,4 +403,166 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public Configuration configuration() {
return frameWriter.configuration();
}
/**
* Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
* only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
* {@link #size} calculation deterministic thereby greatly simplifying the implementation.
* <p>
* If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
* the passed promise is not completed until last frame write.
* </p>
*/
private final class FlowControlledData extends FlowControlledBase {
private ByteBuf data;
private int size;
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream, ChannelPromise promise) {
super(ctx, stream, padding, endOfStream, promise);
this.data = data;
size = data.readableBytes() + padding;
}
@Override
public int size() {
return size;
}
@Override
public void error(Throwable cause) {
ReferenceCountUtil.safeRelease(data);
lifecycleManager.onException(ctx, cause);
data = null;
size = 0;
promise.tryFailure(cause);
}
@Override
public boolean write(int allowedBytes) {
if (data == null) {
return false;
}
if (allowedBytes == 0 && size() != 0) {
// No point writing an empty DATA frame, wait for a bigger allowance.
return false;
}
int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
try {
int bytesWritten = 0;
do {
int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten);
ByteBuf toWrite;
// Let data consume the frame before padding.
int writeableData = data.readableBytes();
if (writeableData > allowedFrameSize) {
writeableData = allowedFrameSize;
toWrite = data.readSlice(writeableData).retain();
} else {
// We're going to write the full buffer which will cause it to be released, for subsequent
// writes just use empty buffer to avoid over-releasing. Have to use an empty buffer
// as we may continue to write padding in subsequent frames.
toWrite = data;
data = Unpooled.EMPTY_BUFFER;
}
int writeablePadding = Math.min(allowedFrameSize - writeableData, padding);
padding -= writeablePadding;
bytesWritten += writeableData + writeablePadding;
ChannelPromise writePromise;
if (size == bytesWritten) {
// Can use the original promise if it's the last write
writePromise = promise;
} else {
// Create a new promise and listen to it for failure
writePromise = ctx.newPromise();
writePromise.addListener(this);
}
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
size == bytesWritten && endOfStream, writePromise);
} while (size != bytesWritten && allowedBytes > bytesWritten);
size -= bytesWritten;
return true;
} catch (Throwable e) {
error(e);
return false;
}
}
}
/**
* Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
* flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
* blocked on flow-control a HEADER frame must wait until this frame has been written.
*/
private final class FlowControlledHeaders extends FlowControlledBase {
private final Http2Headers headers;
private final int streamDependency;
private final short weight;
private final boolean exclusive;
private FlowControlledHeaders(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding,
boolean endOfStream, ChannelPromise promise) {
super(ctx, stream, padding, endOfStream, promise);
this.headers = headers;
this.streamDependency = streamDependency;
this.weight = weight;
this.exclusive = exclusive;
}
@Override
public int size() {
return 0;
}
@Override
public void error(Throwable cause) {
lifecycleManager.onException(ctx, cause);
promise.tryFailure(cause);
}
@Override
public boolean write(int allowedBytes) {
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
return true;
}
}
/**
* Common base type for payloads to deliver via flow-control.
*/
public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
ChannelFutureListener {
protected final ChannelHandlerContext ctx;
protected final Http2Stream stream;
protected final ChannelPromise promise;
protected final boolean endOfStream;
protected int padding;
public FlowControlledBase(final ChannelHandlerContext ctx, final Http2Stream stream, int padding,
boolean endOfStream, final ChannelPromise promise) {
this.ctx = ctx;
if (padding < 0) {
throw new IllegalArgumentException("padding must be >= 0");
}
this.padding = padding;
this.endOfStream = endOfStream;
this.stream = stream;
this.promise = promise;
promise.addListener(this);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future == promise && endOfStream) {
// Special case where we're listening to the original promise and need to close the stream.
lifecycleManager.closeLocalSide(stream, promise);
}
if (!future.isSuccess()) {
error(future.cause());
}
}
}
}

View File

@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
@ -23,17 +22,12 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic implementation of {@link Http2RemoteFlowController}.
@ -51,14 +45,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
};
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx;
private boolean frameSent;
private boolean needFlush;
public DefaultHttp2RemoteFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
public DefaultHttp2RemoteFlowController(Http2Connection connection) {
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
// Add a flow state for the connection.
connection.connectionStream().setProperty(FlowState.class,
@ -158,65 +150,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Update the stream window and write any pending frames for the stream.
FlowState state = state(stream);
state.incrementStreamWindow(delta);
frameSent = false;
state.writeBytes(state.writableWindow());
if (frameSent) {
flush();
}
flush();
}
}
@Override
public ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream,
FlowControlled payload) {
checkNotNull(ctx, "ctx");
checkNotNull(promise, "promise");
checkNotNull(data, "data");
checkNotNull(payload, "payload");
if (this.ctx != null && this.ctx != ctx) {
throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
}
if (padding < 0) {
throw new IllegalArgumentException("padding must be >= 0");
}
// Save the context. We'll use this later when we write pending bytes.
this.ctx = ctx;
try {
FlowState state = state(stream);
int window = state.writableWindow();
boolean framesAlreadyQueued = state.hasFrame();
FlowState.Frame frame = state.newFrame(promise, data, padding, endStream);
if (!framesAlreadyQueued && window >= frame.size()) {
// Window size is large enough to send entire data frame
frame.write();
ctx.flush();
return promise;
}
// Enqueue the frame to be written when the window size permits.
frame.enqueue();
if (framesAlreadyQueued || window <= 0) {
// Stream already has frames pending or is stalled, don't send anything now.
return promise;
}
// Create and send a partial frame up to the window size.
frame.split(window).write();
ctx.flush();
state.newFrame(payload);
state.writeBytes(state.writableWindow());
flush();
} catch (Throwable e) {
promise.setFailure(e);
payload.error(e);
}
return promise;
}
@Override
public ChannelFuture lastFlowControlledFrameSent(Http2Stream stream) {
FlowState state = state(stream);
return state != null ? state.lastNewFrame() : null;
}
/**
@ -247,8 +203,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* Flushes the {@link ChannelHandlerContext} if we've received any data frames.
*/
private void flush() {
if (ctx != null) {
if (needFlush) {
ctx.flush();
needFlush = false;
}
}
@ -260,14 +217,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
int connectionWindow = state(connectionStream).window();
if (connectionWindow > 0) {
frameSent = false;
writeChildren(connectionStream, connectionWindow);
for (Http2Stream stream : connection.activeStreams()) {
writeChildNode(state(stream));
}
if (frameSent) {
flush();
}
flush();
}
}
@ -373,7 +327,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private int pendingBytes;
private int streamableBytesForTree;
private int allocated;
private ChannelFuture lastNewFrame;
FlowState(Http2Stream stream, int initialWindowSize) {
this.stream = stream;
@ -429,13 +382,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return window;
}
/**
* Returns the future for the last new frame created for this stream.
*/
ChannelFuture lastNewFrame() {
return lastNewFrame;
}
/**
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
@ -458,12 +404,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
/**
* Creates a new frame with the given values but does not add it to the pending queue.
* Creates a new payload with the given values and immediately enqueues it.
*/
Frame newFrame(final ChannelPromise promise, ByteBuf data, int padding, boolean endStream) {
Frame newFrame(FlowControlled payload) {
// Store this as the future for the most recent write attempt.
lastNewFrame = promise;
return new Frame(new SimplePromiseAggregator(promise), data, padding, endStream);
Frame frame = new Frame(payload);
pendingWriteQueue.offer(frame);
return frame;
}
/**
@ -500,32 +447,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* boundaries.
*/
int writeBytes(int bytes) {
if (!stream.localSideOpen()) {
return 0;
}
int bytesAttempted = 0;
int maxBytes = min(bytes, writableWindow());
while (hasFrame()) {
Frame pendingWrite = peek();
if (maxBytes >= pendingWrite.size()) {
// Window size is large enough to send entire data frame
bytesAttempted += pendingWrite.size();
pendingWrite.write();
} else if (maxBytes <= 0) {
// No data from the current frame can be written - we're done.
// We purposely check this after first testing the size of the
// pending frame to properly handle zero-length frame.
break;
} else {
// We can send a partial frame
Frame partialFrame = pendingWrite.split(maxBytes);
bytesAttempted += partialFrame.size();
partialFrame.write();
int maxBytes = min(bytes - bytesAttempted, writableWindow());
bytesAttempted += peek().write(maxBytes);
if (bytes - bytesAttempted <= 0) {
break;
}
// Update the threshold.
maxBytes = min(bytes - bytesAttempted, writableWindow());
}
return bytesAttempted;
}
@ -545,37 +473,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* A wrapper class around the content of a data frame.
*/
private final class Frame {
final ByteBuf data;
final boolean endStream;
final SimplePromiseAggregator promiseAggregator;
final ChannelPromise promise;
int padding;
boolean enqueued;
final FlowControlled payload;
Frame(SimplePromiseAggregator promiseAggregator, ByteBuf data, int padding, boolean endStream) {
this.data = data;
this.padding = padding;
this.endStream = endStream;
this.promiseAggregator = promiseAggregator;
promise = ctx.newPromise();
promiseAggregator.add(promise);
}
/**
* Gets the total size (in bytes) of this frame including the data and padding.
*/
int size() {
return data.readableBytes() + padding;
}
void enqueue() {
if (!enqueued) {
enqueued = true;
pendingWriteQueue.offer(this);
// Increment the number of pending bytes for this stream.
incrementPendingBytes(size());
}
Frame(FlowControlled payload) {
this.payload = payload;
// Increment the number of pending bytes for this stream.
incrementPendingBytes(payload.size());
}
/**
@ -599,35 +502,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* <p>
* Note: this does not flush the {@link ChannelHandlerContext}.
*/
void write() {
// Using a do/while loop because if the buffer is empty we still need to call
// the writer once to send the empty frame.
final Http2FrameSizePolicy frameSizePolicy = frameWriter.configuration().frameSizePolicy();
do {
int bytesToWrite = size();
int frameBytes = min(bytesToWrite, frameSizePolicy.maxFrameSize());
if (frameBytes == bytesToWrite) {
// All the bytes fit into a single HTTP/2 frame, just send it all.
try {
connectionState().incrementStreamWindow(-bytesToWrite);
incrementStreamWindow(-bytesToWrite);
} catch (Http2Exception e) { // Should never get here since we're decrementing.
throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e);
}
frameWriter.writeData(ctx, stream.id(), data, padding, endStream, promise);
frameSent = true;
decrementPendingBytes(bytesToWrite);
if (enqueued) {
// It's enqueued - remove it from the head of the pending write queue.
pendingWriteQueue.remove();
}
return;
}
// Split a chunk that will fit into a single HTTP/2 frame and write it.
Frame frame = split(frameBytes);
frame.write();
} while (size() > 0);
int write(int allowedBytes) {
int before = payload.size();
needFlush |= payload.write(Math.max(0, allowedBytes));
int writtenBytes = before - payload.size();
try {
connectionState().incrementStreamWindow(-writtenBytes);
incrementStreamWindow(-writtenBytes);
} catch (Http2Exception e) { // Should never get here since we're decrementing.
throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e);
}
decrementPendingBytes(writtenBytes);
if (payload.size() == 0) {
pendingWriteQueue.remove();
}
return writtenBytes;
}
/**
@ -635,76 +524,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* removed from this branch of the priority tree.
*/
void writeError(Http2Exception cause) {
decrementPendingBytes(size());
data.release();
promise.setFailure(cause);
}
/**
* Creates a new frame that is a view of this frame's data. The {@code maxBytes} are first split from the
* data buffer. If not all the requested bytes are available, the remaining bytes are then split from the
* padding (if available).
*
* @param maxBytes the maximum number of bytes that is allowed in the created frame.
* @return the partial frame.
*/
Frame split(int maxBytes) {
// The requested maxBytes should always be less than the size of this frame.
assert maxBytes < size() : "Attempting to split a frame for the full size.";
// Get the portion of the data buffer to be split. Limit to the readable bytes.
int dataSplit = min(maxBytes, data.readableBytes());
// Split any remaining bytes from the padding.
int paddingSplit = min(maxBytes - dataSplit, padding);
ByteBuf splitSlice = data.readSlice(dataSplit).retain();
padding -= paddingSplit;
Frame frame = new Frame(promiseAggregator, splitSlice, paddingSplit, false);
int totalBytesSplit = dataSplit + paddingSplit;
decrementPendingBytes(totalBytesSplit);
return frame;
decrementPendingBytes(payload.size());
payload.error(cause);
}
/**
* If this frame is in the pending queue, decrements the number of pending bytes for the stream.
*/
void decrementPendingBytes(int bytes) {
if (enqueued) {
incrementPendingBytes(-bytes);
}
incrementPendingBytes(-bytes);
}
}
}
/**
* Lightweight promise aggregator.
*/
private static final class SimplePromiseAggregator {
final ChannelPromise promise;
final AtomicInteger awaiting = new AtomicInteger();
final ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
} else {
if (awaiting.decrementAndGet() == 0) {
promise.trySuccess();
}
}
}
};
SimplePromiseAggregator(ChannelPromise promise) {
this.promise = promise;
}
void add(ChannelPromise promise) {
awaiting.incrementAndGet();
promise.addListener(listener);
}
}
}

View File

@ -14,10 +14,7 @@
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* A {@link Http2FlowController} for controlling the flow of outbound {@code DATA} frames to the remote
@ -26,36 +23,49 @@ import io.netty.channel.ChannelPromise;
public interface Http2RemoteFlowController extends Http2FlowController {
/**
* Writes or queues a {@code DATA} frame for transmission to the remote endpoint. There is no
* Writes or queues a payload for transmission to the remote endpoint. There is no
* guarantee when the data will be written or whether it will be split into multiple frames
* before sending. The returned future will only be completed once all of the data has been
* successfully written to the remote endpoint.
* before sending.
* <p>
* Manually flushing the {@link ChannelHandlerContext} is not required, since the flow
* controller will flush as appropriate.
*
* @param ctx the context from the handler.
* @param stream the subject stream. Must not be the connection stream object.
* @param data payload buffer for the frame.
* @param padding the number of padding bytes to be added at the end of the frame.
* @param endStream Indicates whether this is the last frame to be sent to the remote endpoint
* for this stream.
* @param promise the promise to be completed when the data has been successfully written or a
* failure occurs.
* @return a future that is completed when the frame is sent to the remote endpoint.
* @param payload payload to write subject to flow-control accounting and ordering rules.
*/
ChannelFuture sendFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
ByteBuf data, int padding, boolean endStream, ChannelPromise promise);
void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload);
/**
* Gets the {@link ChannelFuture} for the most recent frame that was sent for the given stream
* via a call to
* {@link #sendFlowControlledFrame(ChannelHandlerContext, Http2Stream, ByteBuf, int, boolean, ChannelPromise)}.
* This is useful for cases such as ensuring that {@code HEADERS} frames maintain send order with {@code DATA}
* frames.
*
* @param stream the subject stream. Must not be the connection stream object.
* @return the most recent sent frame, or {@code null} if no frame has been sent for the stream.
* Implementations of this interface are used to progressively write chunks of the underlying
* payload to the stream. A payload is considered to be fully written if {@link #write} has
* been called at least once and it's {@link #size} is now zero.
*/
ChannelFuture lastFlowControlledFrameSent(Http2Stream stream);
interface FlowControlled {
/**
* The size of the payload in terms of bytes applied to the flow-control window.
* Some payloads like {@code HEADER} frames have no cost against flow control and would
* return 0 for this value even though they produce a non-zero number of bytes on
* the wire. Other frames like {@code DATA} frames have both their payload and padding count
* against flow-control.
*/
int size();
/**
* Signal an error and release any retained buffers.
* @param cause of the error.
*/
void error(Throwable cause);
/**
* Writes up to {@code allowedBytes} of the encapsulated payload to the stream. Note that
* a value of 0 may be passed which will allow payloads with flow-control size == 0 to be
* written. The flow-controller may call this method multiple times with different values until
* the payload is fully written.
*
* @param allowedBytes an upper bound on the number of bytes the payload can write at this time.
* @return {@code true} if a flush is required, {@code false} otherwise.
*/
boolean write(int allowedBytes);
}
}

View File

@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.buffer.Unpooled.wrappedBuffer;
@ -20,21 +19,28 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -42,9 +48,12 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
@ -98,14 +107,25 @@ public class DefaultHttp2ConnectionEncoderTest {
@Mock
private Http2FrameWriter writer;
@Mock
private Http2FrameWriter.Configuration writerConfig;
@Mock
private Http2FrameSizePolicy frameSizePolicy;
@Mock
private Http2LifecycleManager lifecycleManager;
private ArgumentCaptor<Http2RemoteFlowController.FlowControlled> payloadCaptor;
private List<String> writtenData;
private List<Integer> writtenPadding;
private boolean streamClosed;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
promise = new DefaultChannelPromise(channel);
promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
@ -118,6 +138,9 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote);
when(remote.flowController()).thenReturn(remoteFlow);
when(writer.configuration()).thenReturn(writerConfig);
when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy);
when(frameSizePolicy.maxFrameSize()).thenReturn(64);
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
@ -139,8 +162,35 @@ public class DefaultHttp2ConnectionEncoderTest {
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise)))
.thenReturn(future);
when(remoteFlow.sendFlowControlledFrame(eq(ctx), any(Http2Stream.class),
any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise))).thenReturn(future);
writtenData = new ArrayList<String>();
writtenPadding = new ArrayList<Integer>();
when(writer.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), any(ChannelPromise.class)))
.then(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
// Make sure we only receive stream closure on the last frame and that void promises are used for
// all writes except the last one.
ChannelPromise receivedPromise = (ChannelPromise) invocationOnMock.getArguments()[5];
if (streamClosed) {
fail("Stream already closed");
} else {
streamClosed = (Boolean) invocationOnMock.getArguments()[4];
if (streamClosed) {
assertSame(promise, receivedPromise);
}
}
writtenPadding.add((Integer) invocationOnMock.getArguments()[3]);
ByteBuf data = (ByteBuf) invocationOnMock.getArguments()[2];
writtenData.add(data.toString(UTF_8));
// Release the buffer just as DefaultHttp2FrameWriter does
data.release();
// Let the promise succeed to trigger listeners.
receivedPromise.trySuccess();
return future;
}
});
payloadCaptor = ArgumentCaptor.forClass(Http2RemoteFlowController.FlowControlled.class);
doNothing().when(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
@ -156,7 +206,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.isGoAway()).thenReturn(true);
final ByteBuf data = dummyData();
try {
ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof IllegalStateException);
} finally {
while (data.refCnt() > 0) {
@ -168,31 +218,102 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test
public void dataWriteShouldSucceed() throws Exception {
final ByteBuf data = dummyData();
try {
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(remoteFlow).sendFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(0), eq(false), eq(promise));
} finally {
data.release();
}
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
assertTrue(payloadCaptor.getValue().write(8));
assertEquals(0, payloadCaptor.getValue().size());
assertEquals("abcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
}
@Test
public void dataWriteShouldHalfCloseStream() throws Exception {
reset(future);
final ByteBuf data = dummyData();
try {
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
verify(remoteFlow).sendFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(0), eq(true), eq(promise));
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(1, payloadCaptor.getAllValues().size());
// Write the DATA frame completely
assertTrue(payloadCaptor.getValue().write(Integer.MAX_VALUE));
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
assertEquals(0, data.refCnt());
}
// Invoke the listener callback indicating that the write completed successfully.
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
verify(future).addListener(captor.capture());
when(future.isSuccess()).thenReturn(true);
captor.getValue().operationComplete(future);
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
} finally {
data.release();
}
@Test
public void dataLargerThanMaxFrameSizeShouldBeSplit() throws Http2Exception {
when(frameSizePolicy.maxFrameSize()).thenReturn(3);
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
assertTrue(payloadCaptor.getValue().write(8));
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abc", writtenData.get(0));
assertEquals("def", writtenData.get(1));
assertEquals("gh", writtenData.get(2));
assertEquals(0, data.refCnt());
}
@Test
public void paddingSplitOverFrame() throws Http2Exception {
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 5, true, promise);
assertEquals(payloadCaptor.getValue().size(), 13);
assertTrue(payloadCaptor.getValue().write(13));
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abcde", writtenData.get(0));
assertEquals(0, (int) writtenPadding.get(0));
assertEquals("fgh", writtenData.get(1));
assertEquals(2, (int) writtenPadding.get(1));
assertEquals("", writtenData.get(2));
assertEquals(3, (int) writtenPadding.get(2));
assertEquals(0, data.refCnt());
}
@Test
public void frameShouldSplitPadding() throws Http2Exception {
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 18);
assertTrue(payloadCaptor.getValue().write(18));
// writer was called 4 times
assertEquals(4, writtenData.size());
assertEquals("abcde", writtenData.get(0));
assertEquals(0, (int) writtenPadding.get(0));
assertEquals("fgh", writtenData.get(1));
assertEquals(2, (int) writtenPadding.get(1));
assertEquals("", writtenData.get(2));
assertEquals(5, (int) writtenPadding.get(2));
assertEquals("", writtenData.get(3));
assertEquals(3, (int) writtenPadding.get(3));
assertEquals(0, data.refCnt());
}
@Test
public void emptyFrameShouldSplitPadding() throws Http2Exception {
ByteBuf data = Unpooled.buffer(0);
assertSplitPaddingOnEmptyBuffer(data);
assertEquals(0, data.refCnt());
}
@Test
public void singletonEmptyBufferShouldSplitPadding() throws Http2Exception {
assertSplitPaddingOnEmptyBuffer(Unpooled.EMPTY_BUFFER);
}
private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Http2Exception {
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 10);
assertTrue(payloadCaptor.getValue().write(10));
// writer was called 2 times
assertEquals(2, writtenData.size());
assertEquals("", writtenData.get(0));
assertEquals(5, (int) writtenPadding.get(0));
assertEquals("", writtenData.get(1));
assertEquals(5, (int) writtenPadding.get(1));
}
@Test
@ -211,11 +332,14 @@ public class DefaultHttp2ConnectionEncoderTest {
public void headersWriteForUnknownStreamShouldCreateStream() throws Exception {
int streamId = 5;
when(stream.id()).thenReturn(streamId);
when(stream.state()).thenReturn(IDLE);
mockFutureAddListener(true);
when(local.createStream(eq(streamId))).thenReturn(stream);
encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(local).createStream(eq(streamId));
verify(stream).open(eq(false));
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(0);
verify(writer).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@ -224,37 +348,18 @@ public class DefaultHttp2ConnectionEncoderTest {
public void headersWriteShouldCreateHalfClosedStream() throws Exception {
int streamId = 5;
when(stream.id()).thenReturn(streamId);
when(stream.state()).thenReturn(IDLE);
mockFutureAddListener(true);
when(local.createStream(eq(streamId))).thenReturn(stream);
when(writer.writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise))).thenReturn(future);
encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(local).createStream(eq(streamId));
verify(stream).open(eq(true));
verify(writer).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test
public void headersWriteAfterDataShouldWait() throws Exception {
final AtomicReference<ChannelFutureListener> listener = new AtomicReference<ChannelFutureListener>();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
listener.set((ChannelFutureListener) invocation.getArguments()[0]);
return null;
}
}).when(future).addListener(any(ChannelFutureListener.class));
// Indicate that there was a previous data write operation that the headers must wait for.
when(remoteFlow.lastFlowControlledFrameSent(any(Http2Stream.class))).thenReturn(future);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(writer, never()).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
// Now complete the previous data write operation and verify that the headers were written.
when(future.isSuccess()).thenReturn(true);
listener.get().operationComplete(future);
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
// Trigger the write and mark the promise successful to trigger listeners
payloadCaptor.getValue().write(0);
promise.trySuccess();
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
}
@Test
@ -264,6 +369,8 @@ public class DefaultHttp2ConnectionEncoderTest {
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(stream).open(false);
verify(stream, never()).closeLocalSide();
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(0);
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@ -272,11 +379,14 @@ public class DefaultHttp2ConnectionEncoderTest {
public void headersWriteShouldClosePushStream() throws Exception {
mockFutureAddListener(true);
when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL);
when(writer.writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise))).thenReturn(future);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(stream).open(true);
// Trigger the write and mark the promise successful to trigger listeners
payloadCaptor.getValue().write(0);
promise.trySuccess();
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test