Motivation:

Currently the DefaultHttp2InboundFlowController only supports the
ability to turn on and off "window maintenance" for a stream. This is
insufficient for true application-level flow control that may only want
to return a few bytes to flow control at a time.

Modifications:

Removing "window maintenance" interface from
DefaultHttp2InboundFlowController in favor of the new interface.

Created the Http2InboundFlowState interface which extends Http2FlowState
to add the ability to return bytes for a specific stream.

Changed the onDataRead method to return an integer number of bytes that
will be immediately returned to flow control, to support use cases that
want to opt-out of application-level inbound flow control.

Updated DefaultHttp2InboundFlowController to use 2 windows per stream.
The first, "window", is the actual flow control window that is
decremented as soon as data is received. The second "processedWindow"
is a delayed view of "window" that is only decremented after the
application returns the processed bytes. It is processedWindow that is
used when determining when to send a WINDOW_UPDATE to restore part of
the inbound flow control window for the stream/connection.

Result:

The HTTP/2 inbound flow control interfaces support application-level
flow control.
This commit is contained in:
nmittler 2014-11-13 16:04:33 -08:00
parent 543daa3a9b
commit 700ac93b15
20 changed files with 299 additions and 235 deletions

View File

@ -217,8 +217,8 @@ public class DefaultHttp2Connection implements Http2Connection {
private boolean resetReceived; private boolean resetReceived;
private boolean endOfStreamSent; private boolean endOfStreamSent;
private boolean endOfStreamReceived; private boolean endOfStreamReceived;
private FlowState inboundFlow; private Http2InboundFlowState inboundFlow;
private FlowState outboundFlow; private Http2FlowState outboundFlow;
private EmbeddedChannel decompressor; private EmbeddedChannel decompressor;
private EmbeddedChannel compressor; private EmbeddedChannel compressor;
private Object data; private Object data;
@ -324,22 +324,22 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
@Override @Override
public FlowState inboundFlow() { public Http2InboundFlowState inboundFlow() {
return inboundFlow; return inboundFlow;
} }
@Override @Override
public void inboundFlow(FlowState state) { public void inboundFlow(Http2InboundFlowState state) {
inboundFlow = state; inboundFlow = state;
} }
@Override @Override
public FlowState outboundFlow() { public Http2FlowState outboundFlow() {
return outboundFlow; return outboundFlow;
} }
@Override @Override
public void outboundFlow(FlowState state) { public void outboundFlow(Http2FlowState state) {
outboundFlow = state; outboundFlow = state;
} }

View File

@ -198,8 +198,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private final class FrameReadListener implements Http2FrameListener { private final class FrameReadListener implements Http2FrameListener {
@Override @Override
public void onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data,
boolean endOfStream) throws Http2Exception { int padding, boolean endOfStream) throws Http2Exception {
verifyPrefaceReceived(); verifyPrefaceReceived();
// Check if we received a data frame for a stream which is half-closed // Check if we received a data frame for a stream which is half-closed
@ -211,9 +211,9 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a
// lower stream ID. // lower stream ID.
boolean shouldIgnore = shouldIgnoreFrame(stream);
boolean shouldApplyFlowControl = false; boolean shouldApplyFlowControl = false;
int processedBytes = data.readableBytes() + padding;
boolean shouldIgnore = shouldIgnoreFrame(stream);
Http2Exception error = null; Http2Exception error = null;
switch (stream.state()) { switch (stream.state()) {
case OPEN: case OPEN:
@ -240,26 +240,36 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
break; break;
} }
// If we should apply flow control, do so now. try {
if (shouldApplyFlowControl) { // If we should apply flow control, do so now.
inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream); if (shouldApplyFlowControl) {
} inboundFlow.applyFlowControl(ctx, streamId, data, padding, endOfStream);
}
// If we should ignore this frame, do so now. // If we should ignore this frame, do so now.
if (shouldIgnore) { if (shouldIgnore) {
return; return processedBytes;
} }
// If the stream was in an invalid state to receive the frame, throw the error. // If the stream was in an invalid state to receive the frame, throw the error.
if (error != null) { if (error != null) {
throw error; throw error;
} }
listener.onDataRead(ctx, streamId, data, padding, endOfStream); // Call back the application and retrieve the number of bytes that have been
// immediately processed.
processedBytes = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
return processedBytes;
} finally {
// If appropriate, returned the processed bytes to the flow controller.
if (shouldApplyFlowControl && processedBytes > 0) {
stream.inboundFlow().returnProcessedBytes(ctx, processedBytes);
}
if (endOfStream) { if (endOfStream) {
stream.endOfStreamReceived(); stream.endOfStreamReceived();
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
}
} }
} }

View File

@ -39,29 +39,33 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/ */
public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2; public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2;
/**
* A value for the window update ratio to be use in order to disable window updates for
* a stream (i.e. {@code 0}).
*/
public static final double WINDOW_UPDATE_OFF = 0.0;
private final Http2Connection connection; private final Http2Connection connection;
private final Http2FrameWriter frameWriter; private final Http2FrameWriter frameWriter;
private final double windowUpdateRatio;
private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE; private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE;
private int initialWindowSize = DEFAULT_WINDOW_SIZE; private int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO);
}
public DefaultHttp2InboundFlowController(Http2Connection connection,
Http2FrameWriter frameWriter, double windowUpdateRatio) {
this.connection = checkNotNull(connection, "connection"); this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter"); this.frameWriter = checkNotNull(frameWriter, "frameWriter");
if (Double.compare(windowUpdateRatio, 0.0) <= 0 || Double.compare(windowUpdateRatio, 1.0) >= 0) {
throw new IllegalArgumentException("Invalid ratio: " + windowUpdateRatio);
}
this.windowUpdateRatio = windowUpdateRatio;
// Add a flow state for the connection. // Add a flow state for the connection.
connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID)); connection.connectionStream().inboundFlow(new FlowState(CONNECTION_STREAM_ID));
// Register for notification of new streams. // Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@Override @Override
public void streamAdded(Http2Stream stream) { public void streamAdded(Http2Stream stream) {
stream.inboundFlow(new InboundFlowState(stream.id())); stream.inboundFlow(new FlowState(stream.id()));
} }
}); });
} }
@ -80,7 +84,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
initialWindowSize = newWindowSize; initialWindowSize = newWindowSize;
// Apply the delta to all of the windows. // Apply the delta to all of the windows.
connectionState().addAndGet(deltaWindowSize);
for (Http2Stream stream : connection.activeStreams()) { for (Http2Stream stream : connection.activeStreams()) {
state(stream).updatedInitialWindowSize(deltaWindowSize); state(stream).updatedInitialWindowSize(deltaWindowSize);
} }
@ -91,125 +94,75 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
return initialWindowSize; return initialWindowSize;
} }
/**
* Sets the per-stream ratio used to control when window update is performed. The value
* specified is a ratio of the window size to the initial window size, below which a
* {@code WINDOW_UPDATE} should be sent to expand the window. If the given value is
* {@link #WINDOW_UPDATE_OFF} (i.e. {@code 0}) window updates will be disabled for the
* stream.
*
* @throws IllegalArgumentException if the stream does not exist or if the ratio value is
* outside of the range 0 (inclusive) to 1 (exclusive).
*/
public void setWindowUpdateRatio(ChannelHandlerContext ctx, int streamId, double ratio) {
if (ratio < WINDOW_UPDATE_OFF || ratio >= 1.0) {
throw new IllegalArgumentException("Invalid ratio: " + ratio);
}
InboundFlowState state = state(streamId);
if (state == null) {
throw new IllegalArgumentException("Stream does not exist: " + streamId);
}
// Set the ratio.
state.setWindowUpdateRatio(ratio);
// In the event of enabling window update, check to see if we need to update the window now.
try {
state.updateWindowIfAppropriate(ctx);
} catch (Http2Exception e) {
// Shouldn't happen since we only increase the window.
throw new RuntimeException(e);
}
}
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) public void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data,
throws Http2Exception { int padding, boolean endOfStream) throws Http2Exception {
int dataLength = data.readableBytes() + padding; int dataLength = data.readableBytes() + padding;
boolean windowUpdateSent = false; int delta = -dataLength;
try {
windowUpdateSent = applyConnectionFlowControl(ctx, dataLength);
// Apply the stream-level flow control. // Apply the connection-level flow control. Immediately return the bytes for the connection
windowUpdateSent |= applyStreamFlowControl(ctx, streamId, dataLength, endOfStream); // window so that data on this stream does not starve other stream.
} finally { FlowState connectionState = connectionState();
// Optimization: only flush once for any sent WINDOW_UPDATE frames. connectionState.addAndGet(delta);
if (windowUpdateSent) { connectionState.returnProcessedBytes(dataLength);
ctx.flush(); connectionState.updateWindowIfAppropriate(ctx);
}
} // Apply the stream-level flow control, but do not return the bytes immediately.
FlowState state = stateOrFail(streamId);
state.endOfStream(endOfStream);
state.addAndGet(delta);
} }
private InboundFlowState connectionState() { private FlowState connectionState() {
return state(connection.connectionStream()); return state(connection.connectionStream());
} }
private InboundFlowState state(int streamId) { private FlowState state(int streamId) {
return state(connection.stream(streamId)); Http2Stream stream = connection.stream(streamId);
return stream != null ? state(stream) : null;
} }
private InboundFlowState state(Http2Stream stream) { private FlowState state(Http2Stream stream) {
return stream != null ? (InboundFlowState) stream.inboundFlow() : null; return (FlowState) stream.inboundFlow();
} }
/** /**
* Gets the window for the stream or raises a {@code PROTOCOL_ERROR} if not found. * Gets the window for the stream or raises a {@code PROTOCOL_ERROR} if not found.
*/ */
private InboundFlowState stateOrFail(int streamId) throws Http2Exception { private FlowState stateOrFail(int streamId) throws Http2Exception {
InboundFlowState state = state(streamId); FlowState state = state(streamId);
if (state == null) { if (state == null) {
throw protocolError("Flow control window missing for stream: %d", streamId); throw protocolError("Flow control window missing for stream: %d", streamId);
} }
return state; return state;
} }
/**
* Apply connection-wide flow control to the incoming data frame.
*
* @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the connection.
*/
private boolean applyConnectionFlowControl(ChannelHandlerContext ctx, int dataLength)
throws Http2Exception {
// Remove the data length from the available window size. Throw if the lower bound
// was exceeded.
InboundFlowState connectionState = connectionState();
connectionState.addAndGet(-dataLength);
// If appropriate, send a WINDOW_UPDATE frame to open the window.
return connectionState.updateWindowIfAppropriate(ctx);
}
/**
* Apply stream-based flow control to the incoming data frame.
*
* @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the stream.
*/
private boolean applyStreamFlowControl(ChannelHandlerContext ctx, int streamId, int dataLength,
boolean endOfStream) throws Http2Exception {
// Remove the data length from the available window size. Throw if the lower bound
// was exceeded.
InboundFlowState state = stateOrFail(streamId);
state.addAndGet(-dataLength);
state.endOfStream(endOfStream);
// If appropriate, send a WINDOW_UPDATE frame to open the window.
return state.updateWindowIfAppropriate(ctx);
}
/** /**
* Flow control window state for an individual stream. * Flow control window state for an individual stream.
*/ */
private final class InboundFlowState implements FlowState { private final class FlowState implements Http2InboundFlowState {
private final int streamId; private final int streamId;
/**
* The actual flow control window that is decremented as soon as {@code DATA} arrives.
*/
private int window; private int window;
/**
* A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE}
* frames. Decrementing this window for received {@code DATA} frames is delayed until the
* application has indicated that the data has been fully processed. This prevents sending
* a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold.
*/
private int processedWindow;
private int lowerBound; private int lowerBound;
private boolean endOfStream; private boolean endOfStream;
private double windowUpdateRatio = DEFAULT_WINDOW_UPDATE_RATIO;
InboundFlowState(int streamId) { FlowState(int streamId) {
this.streamId = streamId; this.streamId = streamId;
window = initialWindowSize; window = initialWindowSize;
processedWindow = window;
} }
@Override @Override
@ -221,13 +174,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
this.endOfStream = endOfStream; this.endOfStream = endOfStream;
} }
/**
* Enables or disables window updates for this stream.
*/
void setWindowUpdateRatio(double ratio) {
windowUpdateRatio = ratio;
}
/** /**
* Returns the initial size of this window. * Returns the initial size of this window.
*/ */
@ -247,22 +193,44 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
return maxWindowSize; return maxWindowSize;
} }
@Override
public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
if (streamId == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
checkNotNull(ctx, "ctx");
if (numBytes <= 0) {
throw new IllegalArgumentException("numBytes must be positive");
}
// Return the bytes processed and update the window.
returnProcessedBytes(numBytes);
updateWindowIfAppropriate(ctx);
}
/** /**
* Updates the flow control window for this stream if it is appropriate. * Updates the flow control window for this stream if it is appropriate.
*
* @return {@code} true if a {@code WINDOW_UPDATE} frame was sent.
*/ */
boolean updateWindowIfAppropriate(ChannelHandlerContext ctx) throws Http2Exception { void updateWindowIfAppropriate(ChannelHandlerContext ctx) {
if (windowUpdateRatio == WINDOW_UPDATE_OFF || endOfStream || initialWindowSize <= 0) { if (endOfStream || initialWindowSize <= 0) {
return false; return;
} }
int threshold = (int) (initialWindowSize() * windowUpdateRatio); int threshold = (int) (initialWindowSize() * windowUpdateRatio);
if (window <= threshold) { if (processedWindow <= threshold) {
updateWindow(ctx); updateWindow(ctx);
return true;
} }
return false; }
/**
* Returns the processed bytes for this stream.
*/
void returnProcessedBytes(int delta) {
if (processedWindow - delta < window) {
throw new IllegalArgumentException(
"Attempting to return too many bytes for stream " + streamId);
}
processedWindow -= delta;
} }
/** /**
@ -284,7 +252,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
// The value is bounded by the length that SETTINGS frame decrease the window. // The value is bounded by the length that SETTINGS frame decrease the window.
// This difference is stored for the connection when writing the SETTINGS frame // This difference is stored for the connection when writing the SETTINGS frame
// and is cleared once we send a WINDOW_UPDATE frame. // and is cleared once we send a WINDOW_UPDATE frame.
if (window < lowerBound) { if (delta < 0 && window < lowerBound) {
if (streamId == CONNECTION_STREAM_ID) { if (streamId == CONNECTION_STREAM_ID) {
throw protocolError("Connection flow control window exceeded"); throw protocolError("Connection flow control window exceeded");
} else { } else {
@ -310,6 +278,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
throw flowControlError("Flow control window overflowed for stream: %d", streamId); throw flowControlError("Flow control window overflowed for stream: %d", streamId);
} }
window += delta; window += delta;
processedWindow += delta;
if (delta < 0) { if (delta < 0) {
lowerBound = delta; lowerBound = delta;
@ -321,11 +290,21 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
* size back to the size of the initial window and sends a window update frame to the remote * size back to the size of the initial window and sends a window update frame to the remote
* endpoint. * endpoint.
*/ */
void updateWindow(ChannelHandlerContext ctx) throws Http2Exception { void updateWindow(ChannelHandlerContext ctx) {
// Expand the window for this stream back to the size of the initial window. // Expand the window for this stream back to the size of the initial window.
int deltaWindowSize = initialWindowSize() - window; int deltaWindowSize = initialWindowSize() - window;
addAndGet(deltaWindowSize); processedWindow += deltaWindowSize;
try {
addAndGet(deltaWindowSize);
} catch (Http2Exception e) {
// This should never fail since we're adding.
throw new AssertionError("Caught exception while updating window with delta: "
+ deltaWindowSize);
}
// Send a window update for the stream/connection.
frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise()); frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise());
ctx.flush();
} }
} }
} }

View File

@ -353,7 +353,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/** /**
* The outbound flow control state for a single stream. * The outbound flow control state for a single stream.
*/ */
final class OutboundFlowState implements FlowState { final class OutboundFlowState implements Http2FlowState {
private final Queue<Frame> pendingWriteQueue; private final Queue<Frame> pendingWriteQueue;
private final Http2Stream stream; private final Http2Stream stream;
private int window = initialWindowSize; private int window = initialWindowSize;

View File

@ -62,16 +62,18 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
} }
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception { throws Http2Exception {
final Http2Stream stream = connection.stream(streamId); final Http2Stream stream = connection.stream(streamId);
final EmbeddedChannel decompressor = stream == null ? null : stream.decompressor(); final EmbeddedChannel decompressor = stream == null ? null : stream.decompressor();
if (decompressor == null) { if (decompressor == null) {
// The decompressor may be null if no compatible encoding type was found in this stream's headers // The decompressor may be null if no compatible encoding type was found in this stream's headers
listener.onDataRead(ctx, streamId, data, padding, endOfStream); return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
return;
} }
// When decompressing, always opt-out of application-level flow control.
// TODO: investigate how to apply application-level flow control when decompressing.
int processedBytes = data.readableBytes() + padding;
try { try {
// call retain here as it will call release after its written to the channel // call retain here as it will call release after its written to the channel
decompressor.writeInbound(data.retain()); decompressor.writeInbound(data.retain());
@ -96,6 +98,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
buf = nextBuf; buf = nextBuf;
} }
} }
return processedBytes;
} finally { } finally {
if (endOfStream) { if (endOfStream) {
cleanup(stream, decompressor); cleanup(stream, decompressor);

View File

@ -1,38 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* An listener of HTTP/2 {@code DATA} frames.
*/
public interface Http2DataListener {
/**
* Handles an inbound {@code DATA} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param data payload buffer for the frame. If this buffer needs to be retained by the listener
* they must make a copy.
* @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
* endpoint for this stream.
*/
void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
}

View File

@ -23,8 +23,9 @@ import io.netty.channel.ChannelHandlerContext;
*/ */
public class Http2EventAdapter implements Http2Connection.Listener, Http2FrameListener { public class Http2EventAdapter implements Http2Connection.Listener, Http2FrameListener {
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception { throws Http2Exception {
return data.readableBytes() + padding;
} }
@Override @Override

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec.http2;
/** /**
* Base interface for flow-control state for a particular stream. * Base interface for flow-control state for a particular stream.
*/ */
public interface FlowState { public interface Http2FlowState {
/** /**
* Returns the current remaining flow control window (in bytes) for the stream. Depending on the * Returns the current remaining flow control window (in bytes) for the stream. Depending on the

View File

@ -23,8 +23,9 @@ import io.netty.channel.ChannelHandlerContext;
public class Http2FrameAdapter implements Http2FrameListener { public class Http2FrameAdapter implements Http2FrameListener {
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception { boolean endOfStream) throws Http2Exception {
return data.readableBytes() + padding;
} }
@Override @Override

View File

@ -21,7 +21,30 @@ import io.netty.channel.ChannelHandlerContext;
/** /**
* An listener of HTTP/2 frames. * An listener of HTTP/2 frames.
*/ */
public interface Http2FrameListener extends Http2DataListener { public interface Http2FrameListener {
/**
* Handles an inbound {@code DATA} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param data payload buffer for the frame. If this buffer will be released by the codec.
* @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
* endpoint for this stream.
* @return the number of bytes that have been processed by the application. The returned bytes
* are used by the inbound flow controller to determine the appropriate time to expand
* the inbound flow control window (i.e. send {@code WINDOW_UPDATE}). Returning a value
* equal to the length of {@code data} + {@code padding} will effectively opt-out of
* application-level flow control for this frame. Returning a value less than the length
* of {@code data} + {@code padding} will defer the returning of the processed bytes,
* which the application must later return via
* {@link Http2InboundFlowState#returnProcessedBytes(ChannelHandlerContext, int)}. The
* returned value must be >= {@code 0} and <= {@code data.readableBytes()} +
* {@code padding}.
*/
int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
/** /**
* Handles an inbound HEADERS frame. * Handles an inbound HEADERS frame.

View File

@ -29,9 +29,9 @@ public class Http2FrameListenerDecorator implements Http2FrameListener {
} }
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception { throws Http2Exception {
listener.onDataRead(ctx, streamId, data, padding, endOfStream); return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
} }
@Override @Override

View File

@ -15,14 +15,30 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/** /**
* Controls the inbound flow of data frames from the remote endpoint. * Controls the inbound flow of data frames from the remote endpoint.
*/ */
public interface Http2InboundFlowController extends Http2DataListener { public interface Http2InboundFlowController {
/**
* Applies inbound flow control to the given {@code DATA} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param data payload buffer for the frame.
* @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
* endpoint for this stream.
*/
void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
/** /**
* Sets the initial inbound flow control window size and updates all stream window sizes by the * Sets the initial inbound flow control window size and updates all stream window sizes by the
* delta. This is called as part of the processing for an outbound SETTINGS frame. * delta.
* *
* @param newWindowSize the new initial window size. * @param newWindowSize the new initial window size.
* @throws Http2Exception thrown if any protocol-related error occurred. * @throws Http2Exception thrown if any protocol-related error occurred.

View File

@ -0,0 +1,36 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandlerContext;
/**
* The inbound flow control state for a stream. This object is created and managed by the
* {@link Http2InboundFlowController}.
*/
public interface Http2InboundFlowState extends Http2FlowState {
/**
* Used by applications that participate in application-level inbound flow control. Allows the
* application to return a number of bytes that has been processed and thereby enabling the
* {@link Http2InboundFlowController} to send a {@code WINDOW_UPDATE} to restore at least part
* of the flow control window.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if
* appropriate
* @param numBytes the number of bytes to be returned to the flow control window.
*/
void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception;
}

View File

@ -39,11 +39,11 @@ public class Http2InboundFrameLogger implements Http2FrameReader {
reader.readFrame(ctx, input, new Http2FrameListener() { reader.readFrame(ctx, input, new Http2FrameListener() {
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream) int padding, boolean endOfStream)
throws Http2Exception { throws Http2Exception {
logger.logData(INBOUND, streamId, data, padding, endOfStream); logger.logData(INBOUND, streamId, data, padding, endOfStream);
listener.onDataRead(ctx, streamId, data, padding, endOfStream); return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
} }
@Override @Override

View File

@ -166,22 +166,22 @@ public interface Http2Stream {
/** /**
* Gets the in-bound flow control state for this stream. * Gets the in-bound flow control state for this stream.
*/ */
FlowState inboundFlow(); Http2InboundFlowState inboundFlow();
/** /**
* Sets the in-bound flow control state for this stream. * Sets the in-bound flow control state for this stream.
*/ */
void inboundFlow(FlowState state); void inboundFlow(Http2InboundFlowState state);
/** /**
* Gets the out-bound flow control window for this stream. * Gets the out-bound flow control window for this stream.
*/ */
FlowState outboundFlow(); Http2FlowState outboundFlow();
/** /**
* Sets the out-bound flow control window for this stream. * Sets the out-bound flow control window for this stream.
*/ */
void outboundFlow(FlowState state); void outboundFlow(Http2FlowState state);
/** /**
* Updates an priority for this stream. Calling this method may affect the straucture of the * Updates an priority for this stream. Calling this method may affect the straucture of the

View File

@ -252,7 +252,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
} }
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception { throws Http2Exception {
FullHttpMessage msg = messageMap.get(streamId); FullHttpMessage msg = messageMap.get(streamId);
if (msg == null) { if (msg == null) {
@ -271,6 +271,9 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
if (endOfStream) { if (endOfStream) {
fireChannelRead(ctx, msg, streamId); fireChannelRead(ctx, msg, streamId);
} }
// All bytes have been processed.
return dataReadableBytes + padding;
} }
@Override @Override

View File

@ -101,6 +101,9 @@ public class DefaultHttp2ConnectionDecoderTest {
@Mock @Mock
private Http2ConnectionEncoder encoder; private Http2ConnectionEncoder encoder;
@Mock
private Http2InboundFlowState inFlowState;
@Mock @Mock
private Http2LifecycleManager lifecycleManager; private Http2LifecycleManager lifecycleManager;
@ -113,6 +116,7 @@ public class DefaultHttp2ConnectionDecoderTest {
when(channel.isActive()).thenReturn(true); when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID); when(stream.id()).thenReturn(STREAM_ID);
when(stream.state()).thenReturn(OPEN); when(stream.state()).thenReturn(OPEN);
when(stream.inboundFlow()).thenReturn(inFlowState);
when(pushStream.id()).thenReturn(PUSH_STREAM_ID); when(pushStream.id()).thenReturn(PUSH_STREAM_ID);
when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); when(connection.activeStreams()).thenReturn(Collections.singletonList(stream));
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);
@ -161,9 +165,13 @@ public class DefaultHttp2ConnectionDecoderTest {
public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception { public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception {
when(connection.goAwaySent()).thenReturn(true); when(connection.goAwaySent()).thenReturn(true);
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
int padding = 10;
int processedBytes = data.readableBytes() + padding;
mockFlowControl(processedBytes);
try { try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true); decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
verify(inFlowState).returnProcessedBytes(eq(ctx), eq(processedBytes));
// Verify that the event was absorbed and not propagated to the oberver. // Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
@ -192,7 +200,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
try { try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true); decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow, never()).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(inboundFlow, never()).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally { } finally {
data.release(); data.release();
@ -207,7 +215,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
try { try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true); decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally { } finally {
data.release(); data.release();
@ -219,7 +227,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
try { try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true); decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
} finally { } finally {
@ -402,5 +410,14 @@ public class DefaultHttp2ConnectionDecoderTest {
decoder.decodeFrame(ctx, EMPTY_BUFFER, Collections.emptyList()); decoder.decodeFrame(ctx, EMPTY_BUFFER, Collections.emptyList());
return internallistener.getValue(); return internallistener.getValue();
} }
}
private void mockFlowControl(final int processedBytes) throws Http2Exception {
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
return processedBytes;
}
}).when(listener).onDataRead(any(ChannelHandlerContext.class), anyInt(),
any(ByteBuf.class), anyInt(), anyBoolean());
}
}

View File

@ -15,7 +15,6 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.WINDOW_UPDATE_OFF;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -76,6 +75,20 @@ public class DefaultHttp2InboundFlowControllerTest {
verifyWindowUpdateNotSent(); verifyWindowUpdateNotSent();
} }
@Test
public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception {
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
applyFlowControl(STREAM_ID, dataSize, 0, false);
// Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent.
returnProcessedBytes(STREAM_ID, 10);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
// Return the rest and verify the WINDOW_UPDATE is sent.
returnProcessedBytes(STREAM_ID, dataSize - 10);
verifyWindowUpdateSent(STREAM_ID, dataSize);
}
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
public void connectionFlowControlExceededShouldThrow() throws Http2Exception { public void connectionFlowControlExceededShouldThrow() throws Http2Exception {
// Window exceeded because of the padding. // Window exceeded because of the padding.
@ -83,7 +96,7 @@ public class DefaultHttp2InboundFlowControllerTest {
} }
@Test @Test
public void halfWindowRemainingShouldUpdateConnectionWindow() throws Http2Exception { public void windowUpdateShouldNotBeSentAfterEndOfStream() throws Http2Exception {
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
int newWindow = DEFAULT_WINDOW_SIZE - dataSize; int newWindow = DEFAULT_WINDOW_SIZE - dataSize;
int windowDelta = DEFAULT_WINDOW_SIZE - newWindow; int windowDelta = DEFAULT_WINDOW_SIZE - newWindow;
@ -91,16 +104,10 @@ public class DefaultHttp2InboundFlowControllerTest {
// Set end-of-stream on the frame, so no window update will be sent for the stream. // Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(STREAM_ID, dataSize, 0, true); applyFlowControl(STREAM_ID, dataSize, 0, true);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
} verifyWindowUpdateNotSent(STREAM_ID);
@Test returnProcessedBytes(STREAM_ID, dataSize);
public void windowMaintenanceDisabledForConnectionShouldNotUpdateWindow() throws Http2Exception { verifyWindowUpdateNotSent(STREAM_ID);
controller.setWindowUpdateRatio(ctx, CONNECTION_STREAM_ID, WINDOW_UPDATE_OFF);
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
// Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(STREAM_ID, dataSize, 0, true);
verifyWindowUpdateNotSent();
} }
@Test @Test
@ -111,40 +118,36 @@ public class DefaultHttp2InboundFlowControllerTest {
// Don't set end-of-stream so we'll get a window update for the stream as well. // Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(STREAM_ID, dataSize, 0, false); applyFlowControl(STREAM_ID, dataSize, 0, false);
returnProcessedBytes(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateSent(STREAM_ID, windowDelta); verifyWindowUpdateSent(STREAM_ID, windowDelta);
} }
@Test
public void windowMaintenanceDisabledForStreamShouldNotUpdateWindow() throws Http2Exception {
controller.setWindowUpdateRatio(ctx, STREAM_ID, WINDOW_UPDATE_OFF);
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
int initialWindowSize = DEFAULT_WINDOW_SIZE;
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
// Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(STREAM_ID, dataSize, 0, false);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateNotSent(STREAM_ID);
}
@Test @Test
public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception { public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception {
// Send a frame that takes up the entire window. // Send a frame that takes up the entire window.
int initialWindowSize = DEFAULT_WINDOW_SIZE; int initialWindowSize = DEFAULT_WINDOW_SIZE;
applyFlowControl(STREAM_ID, initialWindowSize, 0, false); applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
assertEquals(0, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
returnProcessedBytes(STREAM_ID, initialWindowSize);
assertEquals(initialWindowSize, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
// Update the initial window size to allow another frame. // Update the initial window size to allow another frame.
int newInitialWindowSize = 2 * initialWindowSize; int newInitialWindowSize = 2 * initialWindowSize;
controller.initialInboundWindowSize(newInitialWindowSize); controller.initialInboundWindowSize(newInitialWindowSize);
assertEquals(newInitialWindowSize, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
// Clear any previous calls to the writer. // Clear any previous calls to the writer.
reset(frameWriter); reset(frameWriter);
// Send the next frame and verify that the expected window updates were sent. // Send the next frame and verify that the expected window updates were sent.
applyFlowControl(STREAM_ID, initialWindowSize, 0, false); applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
returnProcessedBytes(STREAM_ID, initialWindowSize);
int delta = newInitialWindowSize - initialWindowSize; int delta = newInitialWindowSize - initialWindowSize;
verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); verifyWindowUpdateSent(CONNECTION_STREAM_ID, newInitialWindowSize);
verifyWindowUpdateSent(STREAM_ID, delta); verifyWindowUpdateSent(STREAM_ID, delta);
} }
@ -175,6 +178,7 @@ public class DefaultHttp2InboundFlowControllerTest {
// connection window continues collapsing. // connection window continues collapsing.
int data2 = window(STREAM_ID); int data2 = window(STREAM_ID);
applyFlowControl(STREAM_ID, data2, 0, false); applyFlowControl(STREAM_ID, data2, 0, false);
returnProcessedBytes(STREAM_ID, data2);
verifyWindowUpdateSent(STREAM_ID, data1 + data2); verifyWindowUpdateSent(STREAM_ID, data1 + data2);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID));
@ -186,6 +190,7 @@ public class DefaultHttp2InboundFlowControllerTest {
// verify the new maximum of the connection window. // verify the new maximum of the connection window.
int data3 = window(STREAM_ID); int data3 = window(STREAM_ID);
applyFlowControl(STREAM_ID, data3, 0, false); applyFlowControl(STREAM_ID, data3, 0, false);
returnProcessedBytes(STREAM_ID, data3);
verifyWindowUpdateSent(STREAM_ID, DEFAULT_WINDOW_SIZE); verifyWindowUpdateSent(STREAM_ID, DEFAULT_WINDOW_SIZE);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE
- (DEFAULT_WINDOW_SIZE * 2 - (data2 + data3))); - (DEFAULT_WINDOW_SIZE * 2 - (data2 + data3)));
@ -201,7 +206,7 @@ public class DefaultHttp2InboundFlowControllerTest {
private void applyFlowControl(int streamId, int dataSize, int padding, boolean endOfStream) throws Http2Exception { private void applyFlowControl(int streamId, int dataSize, int padding, boolean endOfStream) throws Http2Exception {
final ByteBuf buf = dummyData(dataSize); final ByteBuf buf = dummyData(dataSize);
try { try {
controller.onDataRead(ctx, streamId, buf, padding, endOfStream); controller.applyFlowControl(ctx, streamId, buf, padding, endOfStream);
} finally { } finally {
buf.release(); buf.release();
} }
@ -213,7 +218,11 @@ public class DefaultHttp2InboundFlowControllerTest {
return buffer; return buffer;
} }
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) { private void returnProcessedBytes(int streamId, int processedBytes) throws Http2Exception {
connection.requireStream(streamId).inboundFlow().returnProcessedBytes(ctx, processedBytes);
}
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception {
verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise)); verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise));
} }

View File

@ -140,14 +140,15 @@ final class Http2TestUtil {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
reader.readFrame(ctx, in, new Http2FrameListener() { reader.readFrame(ctx, in, new Http2FrameListener() {
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception { boolean endOfStream) throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, endOfStream); Http2Stream stream = getOrCreateStream(streamId, endOfStream);
listener.onDataRead(ctx, streamId, data, padding, endOfStream); int processed = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
if (endOfStream) { if (endOfStream) {
closeStream(stream, true); closeStream(stream, true);
} }
latch.countDown(); latch.countDown();
return processed;
} }
@Override @Override
@ -281,16 +282,17 @@ final class Http2TestUtil {
} }
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception { throws Http2Exception {
int numBytes = data.readableBytes(); int numBytes = data.readableBytes();
listener.onDataRead(ctx, streamId, data, padding, endOfStream); int processed = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
messageLatch.countDown(); messageLatch.countDown();
if (dataLatch != null) { if (dataLatch != null) {
for (int i = 0; i < numBytes; ++i) { for (int i = 0; i < numBytes; ++i) {
dataLatch.countDown(); dataLatch.countDown();
} }
} }
return processed;
} }
@Override @Override

View File

@ -95,11 +95,13 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
* If receive a frame with end-of-stream set, send a pre-canned response. * If receive a frame with end-of-stream set, send a pre-canned response.
*/ */
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception { boolean endOfStream) throws Http2Exception {
int processed = data.readableBytes() + padding;
if (endOfStream) { if (endOfStream) {
sendResponse(ctx, streamId, data.retain()); sendResponse(ctx, streamId, data.retain());
} }
return processed;
} }
/** /**