HTTP/2 Flow Controller should use Channel.isWritable()

Motivation:
See #3783

Modifications:
- The DefaultHttp2RemoteFlowController should use Channel.isWritable() before attempting to do any write operations.
- The Flow controller methods should no longer take ChannelHandlerContext. The concept of flow control is tied to a connection and we do not support 1 flow controller keeping track of multiple ChannelHandlerContext.

Result:
Writes are delayed until isWritable() is true. Flow controller interface methods are more clear as to ChannelHandlerContext restrictions.
This commit is contained in:
Scott Mitchell 2015-06-30 10:10:17 -07:00
parent f608a139cf
commit 9f422ed0f4
17 changed files with 400 additions and 201 deletions

View File

@ -192,8 +192,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
// Ignoring this frame. We still need to count the frame towards the connection flow control
// window, but we immediately mark all bytes as consumed.
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
flowController.consumeBytes(ctx, stream, bytesToReturn);
flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
flowController.consumeBytes(stream, bytesToReturn);
// Verify that the stream may have existed after we apply flow control.
verifyStreamMayHaveExisted(streamId);
@ -220,7 +220,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
int unconsumedBytes = unconsumedBytes(stream);
try {
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
// Update the unconsumed bytes after flow control is applied.
unconsumedBytes = unconsumedBytes(stream);
@ -249,7 +249,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
throw e;
} finally {
// If appropriate, return the processed bytes to the flow controller.
flowController.consumeBytes(ctx, stream, bytesToReturn);
flowController.consumeBytes(stream, bytesToReturn);
if (endOfStream) {
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
@ -514,7 +514,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
// Update the outbound flow control window.
encoder.flowController().incrementWindowSize(ctx, stream, windowSizeIncrement);
encoder.flowController().incrementWindowSize(stream, windowSizeIncrement);
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
}

View File

@ -130,8 +130,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
// Hand control of the frame to the flow controller.
flowController().addFlowControlled(ctx, stream,
new FlowControlledData(ctx, stream, data, padding, endOfStream, promise));
flowController().addFlowControlled(stream,
new FlowControlledData(stream, data, padding, endOfStream, promise));
return promise;
}
@ -166,9 +166,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
flowController().addFlowControlled(ctx, stream,
new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise));
flowController().addFlowControlled(stream,
new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
endOfStream, promise));
return promise;
} catch (Http2NoMoreStreamIdsException e) {
lifecycleManager.onException(ctx, e);
@ -318,10 +318,10 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
private final CoalescingBufferQueue queue;
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf buf, int padding,
boolean endOfStream, ChannelPromise promise) {
super(ctx, stream, padding, endOfStream, promise);
queue = new CoalescingBufferQueue(ctx.channel());
private FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
ChannelPromise promise) {
super(stream, padding, endOfStream, promise);
queue = new CoalescingBufferQueue(promise.channel());
queue.add(buf, promise);
}
@ -331,14 +331,14 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
@Override
public void error(Throwable cause) {
public void error(ChannelHandlerContext ctx, Throwable cause) {
queue.releaseAndFailAll(cause);
lifecycleManager.onException(ctx, cause);
promise.tryFailure(cause);
}
@Override
public void write(int allowedBytes) {
public void write(ChannelHandlerContext ctx, int allowedBytes) {
if (!endOfStream && (queue.readableBytes() == 0 || allowedBytes == 0)) {
// Nothing to write and we don't have to force a write because of EOS.
return;
@ -360,7 +360,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
@Override
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
if (FlowControlledData.class != next.getClass()) {
return false;
}
@ -379,16 +379,14 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
* blocked on flow-control a HEADER frame must wait until this frame has been written.
*/
private final class FlowControlledHeaders extends FlowControlledBase {
private final Http2Headers headers;
private final int streamDependency;
private final short weight;
private final boolean exclusive;
private FlowControlledHeaders(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding,
boolean endOfStream, ChannelPromise promise) {
super(ctx, stream, padding, endOfStream, promise);
private FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) {
super(stream, padding, endOfStream, promise);
this.headers = headers;
this.streamDependency = streamDependency;
this.weight = weight;
@ -401,23 +399,26 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
@Override
public void error(Throwable cause) {
public void error(ChannelHandlerContext ctx, Throwable cause) {
if (ctx != null) {
lifecycleManager.onException(ctx, cause);
}
promise.tryFailure(cause);
}
@Override
public void write(int allowedBytes) {
public void write(ChannelHandlerContext ctx, int allowedBytes) {
if (promise.isVoid()) {
promise = ctx.newPromise();
}
promise.addListener(this);
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
}
@Override
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
return false;
}
}
@ -427,15 +428,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
*/
public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
ChannelFutureListener {
protected final ChannelHandlerContext ctx;
protected final Http2Stream stream;
protected ChannelPromise promise;
protected boolean endOfStream;
protected int padding;
public FlowControlledBase(final ChannelHandlerContext ctx, final Http2Stream stream, int padding,
boolean endOfStream, final ChannelPromise promise) {
this.ctx = ctx;
public FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
final ChannelPromise promise) {
if (padding < 0) {
throw new IllegalArgumentException("padding must be >= 0");
}
@ -455,7 +454,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
error(future.cause());
error(flowController().channelHandlerContext(), future.cause());
}
}
}

View File

@ -26,7 +26,6 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
@ -89,8 +88,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
FlowState state = state(stream);
int unconsumedBytes = state.unconsumedBytes();
if (ctx != null && unconsumedBytes > 0) {
connectionState().consumeBytes(ctx, unconsumedBytes);
state.consumeBytes(ctx, unconsumedBytes);
connectionState().consumeBytes(unconsumedBytes);
state.consumeBytes(unconsumedBytes);
}
} catch (Http2Exception e) {
PlatformDependent.throwException(e);
@ -104,6 +103,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
});
}
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
int delta = newWindowSize - initialWindowSize;
@ -130,32 +134,32 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
checkNotNull(ctx, "ctx");
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
FlowState state = state(stream);
// Just add the delta to the stream-specific initial window size so that the next time the window
// expands it will grow to the new initial size.
state.incrementInitialStreamWindow(delta);
state.writeWindowUpdateIfNeeded(ctx);
state.writeWindowUpdateIfNeeded();
}
@Override
public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception {
public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
if (numBytes < 0) {
throw new IllegalArgumentException("numBytes must not be negative");
}
if (numBytes == 0) {
return false;
}
// Streams automatically consume all remaining bytes when they are closed, so just ignore
// if already closed.
if (stream != null && !isClosed(stream)) {
if (stream.id() == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
boolean windowUpdateSent = connectionState().consumeBytes(ctx, numBytes);
windowUpdateSent |= state(stream).consumeBytes(ctx, numBytes);
boolean windowUpdateSent = connectionState().consumeBytes(numBytes);
windowUpdateSent |= state(stream).consumeBytes(numBytes);
return windowUpdateSent;
}
return false;
@ -202,16 +206,15 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* initial {@code SETTINGS} frame is sent before this is called. It would
* be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
* was generated by this method before the initial {@code SETTINGS} frame is sent.
* @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary.
* @param stream the stream for which {@code ratio} applies to.
* @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
* @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
*/
public void windowUpdateRatio(ChannelHandlerContext ctx, Http2Stream stream, float ratio) throws Http2Exception {
public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
checkValidRatio(ratio);
FlowState state = state(stream);
state.windowUpdateRatio(ratio);
state.writeWindowUpdateIfNeeded(ctx);
state.writeWindowUpdateIfNeeded();
}
/**
@ -225,9 +228,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data,
int padding, boolean endOfStream) throws Http2Exception {
this.ctx = checkNotNull(ctx, "ctx");
public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
int dataLength = data.readableBytes() + padding;
// Apply the connection-level flow control
@ -241,7 +243,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
state.receiveFlowControlledFrame(dataLength);
} else if (dataLength > 0) {
// Immediately consume the bytes for the connection window.
connectionState.consumeBytes(ctx, dataLength);
connectionState.consumeBytes(dataLength);
}
}
@ -378,10 +380,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
public boolean consumeBytes(int numBytes) throws Http2Exception {
// Return the bytes processed and update the window.
returnProcessedBytes(numBytes);
return writeWindowUpdateIfNeeded(ctx);
return writeWindowUpdateIfNeeded();
}
@Override
@ -390,14 +392,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
if (endOfStream || initialStreamWindowSize <= 0) {
return false;
}
int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
if (processedWindow <= threshold) {
writeWindowUpdate(ctx);
writeWindowUpdate();
return true;
}
return false;
@ -407,7 +409,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* Called to perform a window update for this stream (or connection). Updates the window size back
* to the size of the initial window and sends a window update frame to the remote endpoint.
*/
private void writeWindowUpdate(ChannelHandlerContext ctx) throws Http2Exception {
private void writeWindowUpdate() throws Http2Exception {
// Expand the window for this stream back to the size of the initial window.
int deltaWindowSize = initialStreamWindowSize - processedWindow;
try {
@ -450,12 +452,12 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
throw new UnsupportedOperationException();
}
@Override
public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
public boolean consumeBytes(int numBytes) throws Http2Exception {
return false;
}
@ -513,18 +515,17 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
*
* @return true if {@code WINDOW_UPDATE} was written, false otherwise.
*/
boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception;
boolean writeWindowUpdateIfNeeded() throws Http2Exception;
/**
* Indicates that the application has consumed {@code numBytes} from the connection or stream and is
* ready to receive more data.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate
* @param numBytes the number of bytes to be returned to the flow control window.
* @return true if {@code WINDOW_UPDATE} was written, false otherwise.
* @throws Http2Exception
*/
boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception;
boolean consumeBytes(int numBytes) throws Http2Exception;
int unconsumedBytes();

View File

@ -23,7 +23,6 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Stream.State;
@ -35,6 +34,7 @@ import java.util.Deque;
* Basic implementation of {@link Http2RemoteFlowController}.
*/
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) {
@ -139,6 +139,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
});
}
/**
* {@inheritDoc}
* <p>
* Any queued {@link FlowControlled} objects will be sent.
*/
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
this.ctx = ctx;
// Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
// closed and the queue cleanup will occur when the stream state transitions occur.
// If any frames have been queued up, we should send them now that we have a channel context.
if (ctx != null && ctx.channel().isWritable()) {
writePendingBytes();
}
}
@Override
public ChannelHandlerContext channelHandlerContext() {
return ctx;
}
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
if (newWindowSize < 0) {
@ -178,8 +201,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
// This call does not trigger any writes, all writes will occur when writePendingBytes is called.
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
if (stream.id() == CONNECTION_STREAM_ID) {
// Update the connection window
connectionState().incrementStreamWindow(delta);
@ -201,20 +223,15 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) {
checkNotNull(ctx, "ctx");
public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
checkNotNull(frame, "frame");
if (this.ctx != null && this.ctx != ctx) {
throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
}
// Save the context. We'll use this later when we write pending bytes.
this.ctx = ctx;
final AbstractState state;
try {
state = state(stream);
state.enqueueFrame(frame);
} catch (Throwable t) {
frame.error(t);
frame.error(ctx, t);
return;
}
}
@ -241,13 +258,40 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return connectionState().windowSize();
}
private int minUsableChannelBytes() {
// The current allocation algorithm values "fairness" and doesn't give any consideration to "goodput". It
// is possible that 1 byte will be allocated to many streams. In an effort to try to make "goodput"
// reasonable with the current allocation algorithm we have this "cheap" check up front to ensure there is
// an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
// number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
// circumstances in which this can happen without rewriting the allocation algorithm.
return Math.max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
}
private int maxUsableChannelBytes() {
if (ctx == null) {
return 0;
}
// If the channel isWritable, allow at least minUseableChannelBytes.
int channelWritableBytes = (int) Math.min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
int useableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
// Clip the usable bytes by the connection window.
return min(connectionState().windowSize(), useableBytes);
}
private int writableBytes(int requestedBytes) {
return Math.min(requestedBytes, maxUsableChannelBytes());
}
/**
* Writes as many pending bytes as possible, according to stream priority.
*/
@Override
public void writePendingBytes() throws Http2Exception {
Http2Stream connectionStream = connection.connectionStream();
int connectionWindowSize = state(connectionStream).windowSize();
int connectionWindowSize = writableBytes(state(connectionStream).windowSize());
if (connectionWindowSize > 0) {
// Allocate the bytes for the connection window to the streams, but do not write.
@ -395,10 +439,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if
// maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get
// all of the available connection window.
stillHungry = new Http2Stream[max(2, maxSize / 4)];
stillHungry = new Http2Stream[max(2, maxSize >>> 2)];
} else if (index == stillHungry.length) {
// Grow the array by a factor of 2.
stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length * 2));
stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length << 1));
}
}
}
@ -536,7 +580,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
void enqueueFrame(FlowControlled frame) {
incrementPendingBytes(frame.size());
FlowControlled last = pendingWriteQueue.peekLast();
if (last == null || !last.merge(frame)) {
if (last == null || !last.merge(ctx, frame)) {
pendingWriteQueue.offer(frame);
}
}
@ -580,19 +624,30 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
int writeBytes(int bytes) {
boolean wrote = false;
int bytesAttempted = 0;
int writableBytes = min(bytes, writableWindow());
while (hasFrame() && (writableBytes > 0 || peek().size() == 0)) {
wrote = true;
bytesAttempted += write(peek(), writableBytes);
writableBytes = min(bytes - bytesAttempted, writableWindow());
}
if (wrote) {
return bytesAttempted;
} else {
if (!hasFrame()) {
return -1;
}
// Check if the first frame is a "writable" frame to get the "-1" return status out of the way
FlowControlled frame = peek();
int maxBytes = min(bytes, writableWindow());
if (maxBytes <= 0 && frame.size() != 0) {
// The frame had data and all of it was written.
return -1;
}
int originalBytes = bytes;
bytes -= write(frame, maxBytes);
// Write the remainder of frames that we are allowed to
while (hasFrame()) {
frame = peek();
maxBytes = min(bytes, writableWindow());
if (maxBytes <= 0 && frame.size() != 0) {
// The frame had data and all of it was written.
break;
}
bytes -= write(frame, maxBytes);
}
return originalBytes - bytes;
}
/**
@ -609,7 +664,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Write the portion of the frame.
writing = true;
frame.write(max(0, allowedBytes));
frame.write(ctx, max(0, allowedBytes));
if (!cancelled && frame.size() == 0) {
// This frame has been fully written, remove this frame and notify it. Since we remove this frame
// first, we're guaranteed that its error method will not be called when we call cancel.
@ -677,8 +732,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* the unwritten bytes are removed from this branch of the priority tree.
*/
private void writeError(FlowControlled frame, Http2Exception cause) {
assert ctx != null;
decrementPendingBytes(frame.size());
frame.error(cause);
frame.error(ctx, cause);
}
}

View File

@ -24,7 +24,6 @@ import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -295,6 +294,11 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
this.flowController = checkNotNull(flowController, "flowController");
}
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
flowController.channelHandlerContext(ctx);
}
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
flowController.initialWindowSize(newWindowSize);
@ -316,20 +320,18 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {
flowController.incrementWindowSize(ctx, stream, delta);
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
flowController.incrementWindowSize(stream, delta);
}
@Override
public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream,
ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
}
@Override
public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception {
public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
Http2Decompressor decompressor = decompressor(stream);
Http2Decompressor copy = null;
try {
@ -339,7 +341,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
// Convert the uncompressed consumed bytes to compressed (on the wire) bytes.
numBytes = decompressor.consumeProcessedBytes(numBytes);
}
return flowController.consumeBytes(ctx, stream, numBytes);
return flowController.consumeBytes(stream, numBytes);
} catch (Http2Exception e) {
if (copy != null) {
stream.setProperty(propertyKey, copy);

View File

@ -356,9 +356,11 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Initialize the encoder and decoder.
// Initialize the encoder, decoder, flow controllers, and internal state.
encoder.lifecycleManager(this);
decoder.lifecycleManager(this);
encoder.flowController().channelHandlerContext(ctx);
decoder.flowController().channelHandlerContext(ctx);
byteDecoder = new PrefaceDecoder(ctx);
}
@ -382,12 +384,24 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (byteDecoder != null) {
encoder.flowController().channelHandlerContext(null);
decoder.flowController().channelHandlerContext(null);
byteDecoder.channelInactive(ctx);
super.channelInactive(ctx);
byteDecoder = null;
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
// the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
if (ctx.channel().isWritable()) {
encoder.flowController().writePendingBytes();
}
super.channelWritabilityChanged(ctx);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byteDecoder.decode(ctx, in, out);

View File

@ -20,6 +20,15 @@ import io.netty.channel.ChannelHandlerContext;
* Base interface for all HTTP/2 flow controllers.
*/
public interface Http2FlowController {
/**
* Set the {@link ChannelHandlerContext} for which to apply flow control on.
* <p>
* This <strong>must</strong> be called to properly initialize the {@link Http2FlowController}.
* Not calling this is considered a programming error.
* @param ctx The {@link ChannelHandlerContext} for which to apply flow control on.
* @throws Http2Exception if any protocol-related error occurred.
*/
void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception;
/**
* Sets the connection-wide initial flow control window and updates all stream windows (but not the connection
@ -63,11 +72,10 @@ public interface Http2FlowController {
* window size published by this endpoint. It is up to the implementation, however, as to when a
* {@code WINDOW_UPDATE} is actually sent.
*
* @param ctx The context for the calling handler
* @param stream The subject stream. Use {@link Http2Connection#connectionStream()} for
* requesting the size of the connection window.
* @param delta the change in size of the flow control window.
* @throws Http2Exception thrown if a protocol-related error occurred.
*/
void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception;
void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception;
}

View File

@ -15,11 +15,9 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* A {@link Http2FlowController} for controlling the inbound flow of {@code DATA} frames from the remote
* endpoint.
* A {@link Http2FlowController} for controlling the inbound flow of {@code DATA} frames from the remote endpoint.
*/
public interface Http2LocalFlowController extends Http2FlowController {
@ -40,7 +38,7 @@ public interface Http2LocalFlowController extends Http2FlowController {
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream.
* @throws Http2Exception if any flow control errors are encountered.
*/
void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
/**
@ -52,7 +50,6 @@ public interface Http2LocalFlowController extends Http2FlowController {
* If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
* Http2Stream.State#CLOSED}), calling this method has no effect.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate
* @param stream the stream for which window space should be freed. The connection stream object must not be used.
* If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
* Http2Stream.State#CLOSED}), calling this method has no effect.
@ -61,7 +58,7 @@ public interface Http2LocalFlowController extends Http2FlowController {
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the
* stream.
*/
boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception;
boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception;
/**
* The number of bytes for the given stream that have been received but not yet consumed by the

View File

@ -21,6 +21,13 @@ import io.netty.channel.ChannelHandlerContext;
* endpoint.
*/
public interface Http2RemoteFlowController extends Http2FlowController {
/**
* Get the {@link ChannelHandlerContext} for which to apply flow control on.
* <p>
* This is intended for us by {@link FlowControlled} implementations only. Use with caution.
* @return The {@link ChannelHandlerContext} for which to apply flow control on.
*/
ChannelHandlerContext channelHandlerContext();
/**
* Queues a payload for transmission to the remote endpoint. There is no guarantee as to when the data
@ -29,11 +36,10 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* <p>
* Writes do not actually occur until {@link #writePendingBytes()} is called.
*
* @param ctx the context from the handler.
* @param stream the subject stream. Must not be the connection stream object.
* @param payload payload to write subject to flow-control accounting and ordering rules.
*/
void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload);
void addFlowControlled(Http2Stream stream, FlowControlled payload);
/**
* Write all data pending in the flow controller up to the flow-control limits.
@ -78,9 +84,11 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* {@link #writeComplete()}.
* </p>
*
* @param ctx The context to use if any communication needs to occur as a result of the error.
* This may be {@code null} if an exception occurs when the connection has not been established yet.
* @param cause of the error.
*/
void error(Throwable cause);
void error(ChannelHandlerContext ctx, Throwable cause);
/**
* Called after this object has been successfully written.
@ -101,9 +109,10 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* {@link #error(Throwable)}.
* </p>
*
* @param ctx The context to use for writing.
* @param allowedBytes an upper bound on the number of bytes the payload can write at this time.
*/
void write(int allowedBytes);
void write(ChannelHandlerContext ctx, int allowedBytes);
/**
* Merge the contents of the {@code next} message into this message so they can be written out as one unit.
@ -112,7 +121,7 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* @return {@code true} if {@code next} was successfully merged and does not need to be enqueued,
* {@code false} otherwise.
*/
boolean merge(FlowControlled next);
boolean merge(ChannelHandlerContext ctx, FlowControlled next);
}
/**

View File

@ -69,6 +69,7 @@ public class DefaultHttp2ConnectionDecoderTest {
private static final int STREAM_DEPENDENCY_ID = 5;
private Http2ConnectionDecoder decoder;
private ChannelPromise promise;
@Mock
private Http2Connection connection;
@ -91,8 +92,6 @@ public class DefaultHttp2ConnectionDecoderTest {
@Mock
private Channel channel;
private ChannelPromise promise;
@Mock
private ChannelFuture future;
@ -177,8 +176,8 @@ public class DefaultHttp2ConnectionDecoderTest {
mockFlowControl(processedBytes);
try {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(ctx), eq(stream), eq(processedBytes));
verify(localFlow).receiveFlowControlledFrame(eq(stream), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(stream), eq(processedBytes));
// Verify that the event was absorbed and not propagated to the observer.
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
@ -197,8 +196,8 @@ public class DefaultHttp2ConnectionDecoderTest {
mockFlowControl(processedBytes);
try {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(ctx), eq(stream), eq(processedBytes));
verify(localFlow).receiveFlowControlledFrame(eq(stream), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(stream), eq(processedBytes));
// Verify that the event was absorbed and not propagated to the observer.
verify(listener).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
@ -219,8 +218,8 @@ public class DefaultHttp2ConnectionDecoderTest {
} finally {
try {
verify(localFlow)
.receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes));
.receiveFlowControlledFrame(eq((Http2Stream) null), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq((Http2Stream) null), eq(processedBytes));
verifyNoMoreInteractions(localFlow);
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
@ -238,8 +237,8 @@ public class DefaultHttp2ConnectionDecoderTest {
try {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
verify(localFlow)
.receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes));
.receiveFlowControlledFrame(eq((Http2Stream) null), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq((Http2Stream) null), eq(processedBytes));
verifyNoMoreInteractions(localFlow);
// Verify that the event was absorbed and not propagated to the observer.
@ -256,10 +255,10 @@ public class DefaultHttp2ConnectionDecoderTest {
mockFlowControl(0);
try {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true));
verify(localFlow).receiveFlowControlledFrame(eq(stream), eq(data), eq(padding), eq(true));
// Now we ignore the empty bytes inside consumeBytes method, so it will be called once.
verify(localFlow).consumeBytes(eq(ctx), eq(stream), eq(0));
verify(localFlow).consumeBytes(eq(stream), eq(0));
// Verify that the empty data event was propagated to the observer.
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
@ -288,7 +287,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(localFlow).receiveFlowControlledFrame(eq(stream), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
@ -303,7 +302,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(localFlow).receiveFlowControlledFrame(eq(stream), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
@ -315,7 +314,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(localFlow).receiveFlowControlledFrame(eq(stream), eq(data), eq(10), eq(true));
verify(lifecycleManager).closeStreamRemote(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
} finally {
@ -337,19 +336,19 @@ public class DefaultHttp2ConnectionDecoderTest {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
int delta = (Integer) in.getArguments()[2];
int delta = (Integer) in.getArguments()[1];
int newValue = unprocessed.addAndGet(-delta);
if (newValue < 0) {
throw new RuntimeException("Returned too many bytes");
}
return null;
}
}).when(localFlow).consumeBytes(eq(ctx), eq(stream), anyInt());
}).when(localFlow).consumeBytes(eq(stream), anyInt());
// When the listener callback is called, process a few bytes and then throw.
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock in) throws Throwable {
localFlow.consumeBytes(ctx, stream, 4);
localFlow.consumeBytes(stream, 4);
throw new RuntimeException("Fake Exception");
}
}).when(listener).onDataRead(eq(ctx), eq(STREAM_ID), any(ByteBuf.class), eq(10), eq(true));
@ -358,7 +357,7 @@ public class DefaultHttp2ConnectionDecoderTest {
fail("Expected exception");
} catch (RuntimeException cause) {
verify(localFlow)
.receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true));
.receiveFlowControlledFrame(eq(stream), eq(data), eq(padding), eq(true));
verify(lifecycleManager).closeStreamRemote(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
assertEquals(0, localFlow.unconsumedBytes(stream));
@ -559,7 +558,7 @@ public class DefaultHttp2ConnectionDecoderTest {
public void windowUpdateReadAfterGoAwaySentShouldBeIgnored() throws Exception {
mockGoAwaySent();
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(remoteFlow, never()).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt());
verify(remoteFlow, never()).incrementWindowSize(any(Http2Stream.class), anyInt());
verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@ -567,7 +566,7 @@ public class DefaultHttp2ConnectionDecoderTest {
public void windowUpdateReadAfterGoAwaySentShouldAllowFramesForStreamCreatedByLocalEndpoint() throws Exception {
mockGoAwaySentShouldAllowFramesForStreamCreatedByLocalEndpoint();
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(remoteFlow).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt());
verify(remoteFlow).incrementWindowSize(any(Http2Stream.class), anyInt());
verify(listener).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@ -582,14 +581,14 @@ public class DefaultHttp2ConnectionDecoderTest {
public void windowUpdateReadForUnknownStreamShouldBeIgnored() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(remoteFlow, never()).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt());
verify(remoteFlow, never()).incrementWindowSize(any(Http2Stream.class), anyInt());
verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@Test
public void windowUpdateReadShouldSucceed() throws Exception {
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(remoteFlow).incrementWindowSize(eq(ctx), eq(stream), eq(10));
verify(remoteFlow).incrementWindowSize(eq(stream), eq(10));
verify(listener).onWindowUpdateRead(eq(ctx), eq(STREAM_ID), eq(10));
}

View File

@ -64,6 +64,7 @@ import java.util.ArrayList;
import java.util.List;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -81,6 +82,7 @@ public class DefaultHttp2ConnectionEncoderTest {
private static final int PUSH_STREAM_ID = 2;
private Http2ConnectionEncoder encoder;
private ChannelPromise promise;
@Mock
private Http2Connection connection;
@ -100,8 +102,6 @@ public class DefaultHttp2ConnectionEncoderTest {
@Mock
private Channel channel;
private ChannelPromise promise;
@Mock
private ChannelPromise voidPromise;
@ -134,6 +134,7 @@ public class DefaultHttp2ConnectionEncoderTest {
private List<Integer> writtenPadding;
private boolean streamClosed;
@SuppressWarnings("unchecked")
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
@ -143,6 +144,7 @@ public class DefaultHttp2ConnectionEncoderTest {
new AssertionFailedError());
when(voidPromise.addListeners(Matchers.<GenericFutureListener<Future<? super Void>>>any())).thenThrow(
new AssertionFailedError());
when(voidPromise.channel()).thenReturn(channel);
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
@ -213,7 +215,7 @@ public class DefaultHttp2ConnectionEncoderTest {
}
});
payloadCaptor = ArgumentCaptor.forClass(Http2RemoteFlowController.FlowControlled.class);
doNothing().when(remoteFlow).addFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
doNothing().when(remoteFlow).addFlowControlled(eq(stream), payloadCaptor.capture());
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
@ -231,7 +233,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
payloadCaptor.getValue().write(8);
payloadCaptor.getValue().write(ctx, 8);
assertEquals(0, payloadCaptor.getValue().size());
assertEquals("abcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
@ -245,11 +247,11 @@ public class DefaultHttp2ConnectionEncoderTest {
encoder.writeData(ctx, STREAM_ID, data, 0, true, secondPromise);
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
FlowControlled mergedPayload = capturedWrites.get(0);
mergedPayload.merge(capturedWrites.get(1));
mergedPayload.merge(ctx, capturedWrites.get(1));
assertEquals(16, mergedPayload.size());
assertFalse(secondPromise.isSuccess());
mergedPayload.write(16);
mergedPayload.write(ctx, 16);
assertEquals(0, mergedPayload.size());
assertEquals("abcdefghabcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
@ -265,10 +267,10 @@ public class DefaultHttp2ConnectionEncoderTest {
encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise);
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
FlowControlled mergedPayload = capturedWrites.get(0);
assertTrue(mergedPayload.merge(capturedWrites.get(1)));
assertTrue(mergedPayload.merge(ctx, capturedWrites.get(1)));
assertEquals(16, mergedPayload.size());
mergedPayload.write(16);
mergedPayload.write(ctx, 16);
assertEquals(0, mergedPayload.size());
assertEquals("abcdefghabcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
@ -280,7 +282,7 @@ public class DefaultHttp2ConnectionEncoderTest {
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
assertFalse(capturedWrites.get(0).merge(capturedWrites.get(1)));
assertFalse(capturedWrites.get(0).merge(ctx, capturedWrites.get(1)));
}
@Test
@ -289,7 +291,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
payloadCaptor.getValue().write(8);
payloadCaptor.getValue().write(ctx, 8);
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abc", writtenData.get(0));
@ -304,7 +306,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 5, true, promise);
assertEquals(payloadCaptor.getValue().size(), 13);
payloadCaptor.getValue().write(13);
payloadCaptor.getValue().write(ctx, 13);
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abcde", writtenData.get(0));
@ -322,7 +324,7 @@ public class DefaultHttp2ConnectionEncoderTest {
ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 18);
payloadCaptor.getValue().write(18);
payloadCaptor.getValue().write(ctx, 18);
// writer was called 4 times
assertEquals(4, writtenData.size());
assertEquals("abcde", writtenData.get(0));
@ -352,7 +354,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 10);
payloadCaptor.getValue().write(10);
payloadCaptor.getValue().write(ctx, 10);
// writer was called 2 times
assertEquals(2, writtenData.size());
assertEquals("", writtenData.get(0));
@ -371,7 +373,7 @@ public class DefaultHttp2ConnectionEncoderTest {
encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(local).createStream(eq(streamId), eq(false));
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(0);
payloadCaptor.getValue().write(ctx, 0);
verify(writer).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@ -384,7 +386,7 @@ public class DefaultHttp2ConnectionEncoderTest {
verify(stream).open(false);
verify(stream, never()).closeLocalSide();
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(0);
payloadCaptor.getValue().write(ctx, 0);
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@ -501,7 +503,7 @@ public class DefaultHttp2ConnectionEncoderTest {
mockSendFlowControlledWriteEverything();
ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data.retain(), 0, true, promise);
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
verify(lifecycleManager).closeStreamLocal(stream, promise);
assertEquals(data.toString(UTF_8), writtenData.get(0));
data.release();
@ -522,7 +524,7 @@ public class DefaultHttp2ConnectionEncoderTest {
verify(local).createStream(eq(streamId), eq(true));
// Trigger the write and mark the promise successful to trigger listeners
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(0);
payloadCaptor.getValue().write(ctx, 0);
promise.trySuccess();
verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise));
}
@ -578,7 +580,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(remote.lastStreamKnownByPeer()).thenReturn(0);
ByteBuf data = mock(ByteBuf.class);
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
}
@Test
@ -586,7 +588,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.goAwaySent()).thenReturn(true);
when(remote.lastStreamKnownByPeer()).thenReturn(0);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
}
@Test
@ -595,7 +597,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
ByteBuf data = mock(ByteBuf.class);
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
}
@Test
@ -603,19 +605,19 @@ public class DefaultHttp2ConnectionEncoderTest {
when(connection.goAwayReceived()).thenReturn(true);
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(remoteFlow).addFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
}
private void mockSendFlowControlledWriteEverything() {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
FlowControlled flowControlled = (FlowControlled) invocationOnMock.getArguments()[2];
flowControlled.write(Integer.MAX_VALUE);
FlowControlled flowControlled = (FlowControlled) invocationOnMock.getArguments()[1];
flowControlled.write(ctx, Integer.MAX_VALUE);
flowControlled.writeComplete();
return null;
}
}).when(remoteFlow).addFlowControlled(eq(ctx), eq(stream), payloadCaptor.capture());
}).when(remoteFlow).addFlowControlled(eq(stream), payloadCaptor.capture());
}
private void mockFutureAddListener(boolean success) {

View File

@ -74,6 +74,7 @@ public class DefaultHttp2LocalFlowControllerTest {
connection.local().flowController(controller);
connection.local().createStream(STREAM_ID, false);
controller.channelHandlerContext(ctx);
}
@Test
@ -230,7 +231,7 @@ public class DefaultHttp2LocalFlowControllerTest {
@Test
public void consumeBytesForNullStreamShouldIgnore() throws Http2Exception {
controller.consumeBytes(ctx, null, 10);
controller.consumeBytes(null, 10);
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
}
@ -249,23 +250,23 @@ public class DefaultHttp2LocalFlowControllerTest {
@Test
public void consumeBytesForZeroNumBytesShouldIgnore() throws Http2Exception {
assertFalse(controller.consumeBytes(ctx, connection.stream(STREAM_ID), 0));
assertFalse(controller.consumeBytes(connection.stream(STREAM_ID), 0));
}
@Test(expected = IllegalArgumentException.class)
public void consumeBytesForNegativeNumBytesShouldFail() throws Http2Exception {
assertFalse(controller.consumeBytes(ctx, connection.stream(STREAM_ID), -1));
assertFalse(controller.consumeBytes(connection.stream(STREAM_ID), -1));
}
private void testRatio(float ratio, int newDefaultWindowSize, int newStreamId, boolean setStreamRatio)
throws Http2Exception {
int delta = newDefaultWindowSize - DEFAULT_WINDOW_SIZE;
controller.incrementWindowSize(ctx, stream(0), delta);
controller.incrementWindowSize(stream(0), delta);
Http2Stream stream = connection.local().createStream(newStreamId, false);
if (setStreamRatio) {
controller.windowUpdateRatio(ctx, stream, ratio);
controller.windowUpdateRatio(stream, ratio);
}
controller.incrementWindowSize(ctx, stream, delta);
controller.incrementWindowSize(stream, delta);
reset(frameWriter);
try {
int data1 = (int) (newDefaultWindowSize * ratio) + 1;
@ -305,7 +306,7 @@ public class DefaultHttp2LocalFlowControllerTest {
boolean endOfStream) throws Http2Exception {
final ByteBuf buf = dummyData(dataSize);
try {
controller.receiveFlowControlledFrame(ctx, stream, buf, padding, endOfStream);
controller.receiveFlowControlledFrame(stream, buf, padding, endOfStream);
} finally {
buf.release();
}
@ -318,7 +319,7 @@ public class DefaultHttp2LocalFlowControllerTest {
}
private boolean consumeBytes(int streamId, int numBytes) throws Http2Exception {
return controller.consumeBytes(ctx, stream(streamId), numBytes);
return controller.consumeBytes(stream(streamId), numBytes);
}
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) {

View File

@ -36,6 +36,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
@ -47,6 +49,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@ -79,6 +82,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
@Mock
private ChannelHandlerContext ctx;
@Mock
private Channel channel;
@Mock
private ChannelConfig config;
@Mock
private ChannelPromise promise;
@ -93,7 +102,17 @@ public class DefaultHttp2RemoteFlowControllerTest {
when(ctx.newPromise()).thenReturn(promise);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
setChannelWritability(true);
when(channel.config()).thenReturn(config);
initConnectionAndController();
resetCtx();
// This is intentionally left out of initConnectionAndController so it can be tested below.
controller.channelHandlerContext(ctx);
}
private void initConnectionAndController() throws Http2Exception {
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2RemoteFlowController(connection);
controller.listener(listener);
@ -245,7 +264,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
data.assertPartiallyWritten(10);
moreData.assertNotWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
reset(ctx);
resetCtx();
// Update the window and verify that the rest of data and some of moreData are written
incrementWindowSize(STREAM_A, 15);
@ -1194,15 +1213,15 @@ public class DefaultHttp2RemoteFlowControllerTest {
stream.closeLocalSide();
return null;
}
}).when(flowControlled).error(any(Throwable.class));
}).when(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
int windowBefore = window(STREAM_A);
controller.addFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(stream, flowControlled);
controller.writePendingBytes();
verify(flowControlled, times(3)).write(anyInt());
verify(flowControlled).error(any(Throwable.class));
verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt());
verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
verify(flowControlled, never()).writeComplete();
assertEquals(90, windowBefore - window(STREAM_A));
@ -1217,13 +1236,13 @@ public class DefaultHttp2RemoteFlowControllerTest {
public Void answer(InvocationOnMock invocationOnMock) {
throw new RuntimeException("error failed");
}
}).when(flowControlled).error(any(Throwable.class));
}).when(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
int windowBefore = window(STREAM_A);
boolean exceptionThrown = false;
try {
controller.addFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(stream, flowControlled);
controller.writePendingBytes();
} catch (RuntimeException e) {
exceptionThrown = true;
@ -1231,8 +1250,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertTrue(exceptionThrown);
}
verify(flowControlled, times(3)).write(anyInt());
verify(flowControlled).error(any(Throwable.class));
verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt());
verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
verify(flowControlled, never()).writeComplete();
assertEquals(90, windowBefore - window(STREAM_A));
@ -1255,7 +1274,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
size.addAndGet(-50);
return null;
}
}).when(flowControlled).write(anyInt());
}).when(flowControlled).write(any(ChannelHandlerContext.class), anyInt());
final Http2Stream stream = stream(STREAM_A);
doAnswer(new Answer<Void>() {
@ -1268,14 +1287,14 @@ public class DefaultHttp2RemoteFlowControllerTest {
int windowBefore = window(STREAM_A);
try {
controller.addFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(stream, flowControlled);
controller.writePendingBytes();
} catch (Exception e) {
fail();
}
verify(flowControlled, times(3)).write(anyInt());
verify(flowControlled, never()).error(any(Throwable.class));
verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt());
verify(flowControlled, never()).error(any(ChannelHandlerContext.class), any(Throwable.class));
verify(flowControlled).writeComplete();
assertEquals(150, windowBefore - window(STREAM_A));
@ -1287,23 +1306,75 @@ public class DefaultHttp2RemoteFlowControllerTest {
Mockito.mock(Http2RemoteFlowController.FlowControlled.class);
final Http2Stream stream = stream(STREAM_A);
when(flowControlled.size()).thenReturn(100);
doThrow(new RuntimeException("write failed")).when(flowControlled).write(anyInt());
doThrow(new RuntimeException("write failed"))
.when(flowControlled).write(any(ChannelHandlerContext.class), anyInt());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) {
stream.close();
return null;
}
}).when(flowControlled).error(any(Throwable.class));
}).when(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
controller.addFlowControlled(ctx, stream, flowControlled);
controller.addFlowControlled(stream, flowControlled);
controller.writePendingBytes();
verify(flowControlled).write(anyInt());
verify(flowControlled).error(any(Throwable.class));
verify(flowControlled).write(any(ChannelHandlerContext.class), anyInt());
verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
verify(flowControlled, never()).writeComplete();
}
@Test
public void nonWritableChannelDoesNotAttemptToWrite() throws Exception {
// Start the channel as not writable and exercise the public methods of the flow controller
// making sure no frames are written.
setChannelWritability(false);
FakeFlowControlled dataA = new FakeFlowControlled(1);
FakeFlowControlled dataB = new FakeFlowControlled(1);
final Http2Stream stream = stream(STREAM_A);
controller.addFlowControlled(stream, dataA);
controller.writePendingBytes();
dataA.assertNotWritten();
controller.incrementWindowSize(stream, 100);
controller.writePendingBytes();
dataA.assertNotWritten();
controller.addFlowControlled(stream, dataB);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
// Now change the channel to writable and make sure frames are written.
setChannelWritability(true);
controller.writePendingBytes();
dataA.assertFullyWritten();
dataB.assertFullyWritten();
}
@Test
public void contextShouldSendQueuedFramesWhenSet() throws Exception {
// Re-initialize the controller so we can ensure the context hasn't been set yet.
initConnectionAndController();
FakeFlowControlled dataA = new FakeFlowControlled(1);
final Http2Stream stream = stream(STREAM_A);
// Queue some frames
controller.addFlowControlled(stream, dataA);
controller.writePendingBytes();
dataA.assertNotWritten();
controller.incrementWindowSize(stream, 100);
controller.writePendingBytes();
dataA.assertNotWritten();
// Set the controller
controller.channelHandlerContext(ctx);
dataA.assertFullyWritten();
}
private static Http2RemoteFlowController.FlowControlled mockedFlowControlledThatThrowsOnWrite() throws Exception {
final Http2RemoteFlowController.FlowControlled flowControlled =
Mockito.mock(Http2RemoteFlowController.FlowControlled.class);
@ -1326,7 +1397,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
throw new RuntimeException("Write failed");
}
}
}).when(flowControlled).write(anyInt());
}).when(flowControlled).write(any(ChannelHandlerContext.class), anyInt());
return flowControlled;
}
@ -1343,7 +1414,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
private void sendData(int streamId, FakeFlowControlled data) throws Http2Exception {
Http2Stream stream = stream(streamId);
controller.addFlowControlled(ctx, stream, data);
controller.addFlowControlled(stream, data);
}
private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception {
@ -1359,7 +1430,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
private void incrementWindowSize(int streamId, int delta) throws Http2Exception {
controller.incrementWindowSize(ctx, stream(streamId), delta);
controller.incrementWindowSize(stream(streamId), delta);
}
private int streamableBytesForTree(Http2Stream stream) {
@ -1370,6 +1441,16 @@ public class DefaultHttp2RemoteFlowControllerTest {
return connection.stream(streamId);
}
private void resetCtx() {
reset(ctx);
when(ctx.channel()).thenReturn(channel);
}
private void setChannelWritability(boolean isWritable) {
when(channel.bytesBeforeUnwritable()).thenReturn(isWritable ? Long.MAX_VALUE : 0);
when(channel.isWritable()).thenReturn(isWritable);
}
private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled {
private int currentSize;
@ -1398,7 +1479,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
@Override
public void error(Throwable t) {
public void error(ChannelHandlerContext ctx, Throwable t) {
this.t = t;
}
@ -1407,7 +1488,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
@Override
public void write(int allowedBytes) {
public void write(ChannelHandlerContext ctx, int allowedBytes) {
if (allowedBytes <= 0 && currentSize != 0) {
// Write has been called but no data can be written
return;
@ -1418,7 +1499,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
@Override
public boolean merge(Http2RemoteFlowController.FlowControlled next) {
public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
if (mergeable && next instanceof FakeFlowControlled) {
this.originalSize += ((FakeFlowControlled) next).originalSize;
this.currentSize += ((FakeFlowControlled) next).originalSize;

View File

@ -74,6 +74,12 @@ public class Http2ConnectionHandlerTest {
@Mock
private Http2Connection connection;
@Mock
private Http2RemoteFlowController remoteFlow;
@Mock
private Http2LocalFlowController localFlow;
@Mock
private Http2Connection.Endpoint<Http2RemoteFlowController> remote;
@ -118,6 +124,8 @@ public class Http2ConnectionHandlerTest {
when(encoder.connection()).thenReturn(connection);
when(decoder.connection()).thenReturn(connection);
when(encoder.frameWriter()).thenReturn(frameWriter);
when(encoder.flowController()).thenReturn(remoteFlow);
when(decoder.flowController()).thenReturn(localFlow);
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock invocation) throws Throwable {

View File

@ -32,11 +32,11 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@ -45,6 +45,7 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedExc
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -73,6 +74,9 @@ public class StreamBufferingEncoderTest {
@Mock
private Channel channel;
@Mock
private ChannelConfig config;
@Mock
private ChannelPromise promise;
@ -111,7 +115,12 @@ public class StreamBufferingEncoderTest {
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.newPromise()).thenReturn(promise);
when(promise.channel()).thenReturn(channel);
when(channel.isActive()).thenReturn(false);
when(channel.config()).thenReturn(config);
when(channel.isWritable()).thenReturn(true);
when(channel.bytesBeforeUnwritable()).thenReturn(Long.MAX_VALUE);
when(config.getWriteBufferHighWaterMark()).thenReturn(Integer.MAX_VALUE);
handler.handlerAdded(ctx);
}

View File

@ -47,17 +47,16 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
}
@Override
public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
}
@Override
public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data,
int padding, boolean endOfStream) throws Http2Exception {
}
@Override
public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception {
public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
return false;
}
@ -65,4 +64,8 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
public int unconsumedBytes(Http2Stream stream) {
return 0;
}
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
}
}

View File

@ -23,6 +23,7 @@ import io.netty.handler.codec.http2.Http2Stream;
public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowController {
public static final NoopHttp2RemoteFlowController INSTANCE = new NoopHttp2RemoteFlowController();
private ChannelHandlerContext ctx;
private NoopHttp2RemoteFlowController() { }
@ -46,8 +47,7 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
}
@Override
@ -64,10 +64,20 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
}
@Override
public void addFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled payload) {
public void addFlowControlled(Http2Stream stream, FlowControlled payload) {
// Don't check size beforehand because Headers payload returns 0 all the time.
do {
payload.write(MAX_INITIAL_WINDOW_SIZE);
payload.write(ctx, MAX_INITIAL_WINDOW_SIZE);
} while (payload.size() > 0);
}
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
this.ctx = ctx;
}
@Override
public ChannelHandlerContext channelHandlerContext() {
return ctx;
}
}