Refactoring outbound flow controller to use frame writer.

Motivation:

This is addressing a TODO in the outbound flow controller. We currently
have a separate writer interface passed into the outbound flow
controller. This is confusing and limiting as to how the flow controller
can perform its writes (e.g. no control over flushing). Instead it would
be better to just let the flow controller use the Http2FrameWriter
directly.

Modifications:

- Added a new Http2DataWriter interface, which is extended by
Http2FrameWriter and Http2OutboundFlowController.

- Removed automatic flushing from Http2DataWriter in order to facilitate
optimizing the case where there are multiple writes.

- Updated DefaultHttp2OutboundFlowController to properly optimize
flushing of the ChannelHandlerContext when multiple writes occur.

Result:

Code is greatly simplified WRT outbound flow control and flushes are
optimized for flow-controlled DATA frames.
This commit is contained in:
nmittler 2014-08-21 09:30:29 -07:00
parent d332c00e2f
commit 21bc279700
18 changed files with 555 additions and 537 deletions

View File

@ -19,7 +19,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
@ -38,7 +37,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -77,8 +75,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
protected AbstractHttp2ConnectionHandler(Http2Connection connection) {
this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(),
new DefaultHttp2InboundFlowController(connection), new DefaultHttp2OutboundFlowController(connection));
this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter());
}
protected AbstractHttp2ConnectionHandler(Http2Connection connection,
Http2FrameReader frameReader, Http2FrameWriter frameWriter) {
this(connection, frameReader, frameWriter,
new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2OutboundFlowController(connection, frameWriter));
}
protected AbstractHttp2ConnectionHandler(Http2Connection connection,
@ -338,9 +342,11 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return connection.local().nextStreamId();
}
protected ChannelFuture writeData(final ChannelHandlerContext ctx,
final ChannelPromise promise, int streamId, final ByteBuf data, int padding,
boolean endStream) {
/**
* Writes (and flushes) the given data to the remote endpoint.
*/
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, final ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
try {
if (connection.isGoAway()) {
throw protocolError("Sending data after connection going away.");
@ -350,24 +356,28 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
// Hand control of the frame to the flow controller.
outboundFlow.sendFlowControlled(streamId, data, padding, endStream,
new FlowControlWriter(ctx, data, promise));
return promise;
return outboundFlow.writeData(ctx, streamId, data, padding, endStream, promise);
} catch (Http2Exception e) {
return promise.setFailure(e);
promise.setFailure(e);
return promise;
}
}
protected ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int padding, boolean endStream) {
return writeHeaders(ctx, promise, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false,
padding, endStream);
/**
* Writes (and flushes) the given headers to the remote endpoint.
*/
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false,
padding, endStream, promise);
}
protected ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream) {
/**
* Writes (and flushes) the given headers to the remote endpoint.
*/
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
int padding, boolean endStream, ChannelPromise promise) {
try {
if (connection.isGoAway()) {
throw protocolError("Sending headers after connection going away.");
@ -394,8 +404,9 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
}
ChannelFuture future = frameWriter.writeHeaders(ctx, promise, streamId, headers, streamDependency,
weight, exclusive, padding, endStream);
ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
weight, exclusive, padding, endStream, promise);
ctx.flush();
// If the headers are the end of the stream, close it now.
if (endStream) {
@ -408,8 +419,11 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
}
protected ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int streamDependency, short weight, boolean exclusive) {
/**
* Writes (and flushes) the given priority to the remote endpoint.
*/
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId,
int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
try {
if (connection.isGoAway()) {
throw protocolError("Sending priority after connection going away.");
@ -418,15 +432,20 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// Update the priority on this stream.
connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive);
return frameWriter.writePriority(ctx, promise, streamId, streamDependency, weight,
exclusive);
ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight,
exclusive, promise);
ctx.flush();
return future;
} catch (Http2Exception e) {
return promise.setFailure(e);
}
}
protected ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, long errorCode) {
/**
* Writes (and flushes) the a {@code RST_STREAM} frame to the remote endpoint.
*/
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
// The stream may already have been closed ... ignore.
@ -434,7 +453,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return promise;
}
ChannelFuture future = frameWriter.writeRstStream(ctx, promise, streamId, errorCode);
ChannelFuture future = frameWriter.writeRstStream(ctx, streamId, errorCode, promise);
ctx.flush();
stream.terminateSent();
close(stream, promise);
@ -442,8 +462,11 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return future;
}
protected ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Settings settings) {
/**
* Writes (and flushes) the given settings to the remote endpoint.
*/
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
ChannelPromise promise) {
outstandingLocalSettingsQueue.add(settings);
try {
if (connection.isGoAway()) {
@ -455,28 +478,37 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
}
return frameWriter.writeSettings(ctx, promise, settings);
frameWriter.writeSettings(ctx, settings, promise);
ctx.flush();
return promise;
} catch (Http2Exception e) {
return promise.setFailure(e);
}
}
protected ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise,
ByteBuf data) {
/**
* Writes (and flushes) the given {@code PING} frame to the remote endpoint.
*/
public ChannelFuture writePing(ChannelHandlerContext ctx, ByteBuf data, ChannelPromise promise) {
try {
if (connection.isGoAway()) {
throw protocolError("Sending ping after connection going away.");
}
// Just pass the frame through.
return frameWriter.writePing(ctx, promise, false, data);
frameWriter.writePing(ctx, false, data, promise);
ctx.flush();
return promise;
} catch (Http2Exception e) {
return promise.setFailure(e);
}
}
protected ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int promisedStreamId, Http2Headers headers, int padding) {
/**
* Writes (and flushes) the given {@code PUSH_PROMISE} to the remote endpoint.
*/
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
try {
if (connection.isGoAway()) {
throw protocolError("Sending push promise after connection going away.");
@ -487,8 +519,10 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
connection.local().reservePushStream(promisedStreamId, stream);
// Write the frame.
return frameWriter.writePushPromise(ctx, promise, streamId, promisedStreamId, headers,
padding);
frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers,
padding, promise);
ctx.flush();
return promise;
} catch (Http2Exception e) {
return promise.setFailure(e);
}
@ -540,7 +574,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
// Send the RST_STREAM frame to the remote endpoint.
int streamId = cause.streamId();
frameWriter.writeRstStream(ctx, ctx.newPromise(), streamId, cause.error().code());
frameWriter.writeRstStream(ctx, streamId, cause.error().code(), ctx.newPromise());
ctx.flush();
// Mark the stream as terminated and close it.
Http2Stream stream = connection.stream(streamId);
@ -568,7 +603,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
ByteBuf debugData = toByteBuf(ctx, cause);
int lastKnownStream = connection.remote().lastStreamCreated();
future = frameWriter.writeGoAway(ctx, promise, lastKnownStream, errorCode, debugData);
future = frameWriter.writeGoAway(ctx, lastKnownStream, errorCode, debugData, promise);
ctx.flush();
closePromise = null;
connection.remote().goAwayReceived(lastKnownStream);
}
@ -710,8 +746,9 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// Both client and server must send their initial settings.
Http2Settings settings = settings();
outstandingLocalSettingsQueue.add(settings);
frameWriter.writeSettings(ctx, ctx.newPromise(), settings).addListener(
frameWriter.writeSettings(ctx, settings, ctx.newPromise()).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.flush();
}
/**
@ -792,8 +829,9 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
@Override
public void writeFrame(int streamId, int windowSizeIncrement)
throws Http2Exception {
frameWriter.writeWindowUpdate(ctx, ctx.newPromise(), streamId,
windowSizeIncrement);
frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement,
ctx.newPromise());
ctx.flush();
}
});
@ -994,6 +1032,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// Acknowledge receipt of the settings.
frameWriter.writeSettingsAck(ctx, ctx.newPromise());
ctx.flush();
// We've received at least one non-ack settings frame from the remote endpoint.
prefaceReceived = true;
@ -1007,7 +1046,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// Send an ack back to the remote client.
// Need to retain the buffer here since it will be released after the write completes.
frameWriter.writePing(ctx, ctx.newPromise(), true, data.retain());
frameWriter.writePing(ctx, true, data.retain(), ctx.newPromise());
ctx.flush();
AbstractHttp2ConnectionHandler.this.onPingRead(ctx, data);
}
@ -1109,103 +1149,4 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
}
}
/**
* Controls the write for a single outbound DATA frame. This writer is passed to the outbound flow
* controller, which may break the frame into chunks as dictated by the flow control window. If
* the write of any chunk fails, the original promise fails as well. Success occurs after the last
* chunk is written successfully.
*/
private final class FlowControlWriter implements Http2OutboundFlowController.FrameWriter {
private final ChannelHandlerContext ctx;
private final ChannelPromise promise;
private final List<ChannelPromise> promises;
private int remaining;
FlowControlWriter(ChannelHandlerContext ctx, ByteBuf data, ChannelPromise promise) {
this.ctx = ctx;
this.promise = promise;
promises = new ArrayList<ChannelPromise>(4);
promises.add(promise);
remaining = data.readableBytes();
}
@Override
public void writeFrame(int streamId, ByteBuf data, int padding, boolean endStream) {
if (promise.isDone()) {
// Most likely the write already failed. Just release the
// buffer.
data.release();
return;
}
remaining -= data.readableBytes();
// The flow controller may split the write into chunks. Use a new
// promise for intermediate writes.
final ChannelPromise chunkPromise =
remaining == 0 ? promise : ctx.newPromise();
// The original promise is already in the list, so don't add again.
if (chunkPromise != promise) {
promises.add(chunkPromise);
}
// TODO: consider adding a flush() method to this interface. The
// frameWriter flushes on each write which isn't optimal
// for the case of the outbound flow controller, which sends a batch
// of frames when the flow control window changes. We should let
// the flow controller manually flush after all writes are.
// complete.
// Write the frame.
ChannelFuture future =
frameWriter.writeData(ctx, chunkPromise, streamId, data, padding, endStream);
// Close the connection on write failures that leave the outbound
// flow control window in a corrupt state.
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess()) {
// If any of the chunk writes fail, also fail the
// original
// future that was returned to the caller.
failAllPromises(future.cause());
onHttp2Exception(ctx,
toHttp2Exception(future.cause()));
}
}
});
// Close the local side of the stream if this is the last frame
if (endStream) {
Http2Stream stream = connection.stream(streamId);
closeLocalSide(stream, ctx.newPromise());
}
}
@Override
public void setFailure(Throwable cause) {
failAllPromises(cause);
}
@Override
public int maxFrameSize() {
return frameWriter.maxFrameSize();
}
/**
* Called when the write for any chunk fails. Fails all promises including
* the one returned to the caller.
*/
private void failAllPromises(Throwable cause) {
for (ChannelPromise chunkPromise : promises) {
if (!chunkPromise.isDone()) {
chunkPromise.setFailure(cause);
}
}
}
}
}

View File

@ -100,8 +100,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
}
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
ByteBuf data, int padding, boolean endStream) {
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
try {
verifyStreamId(streamId, "Stream ID");
verifyPadding(padding);
@ -122,7 +122,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
// Write the required padding.
out.writeZero(padding);
return ctx.writeAndFlush(out, promise);
return ctx.write(out, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
} finally {
@ -131,23 +131,23 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int padding, boolean endStream) {
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
return writeHeadersInternal(ctx, promise, streamId, headers, padding, endStream,
false, 0, (short) 0, false);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream) {
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
int padding, boolean endStream, ChannelPromise promise) {
return writeHeadersInternal(ctx, promise, streamId, headers, padding, endStream,
true, streamDependency, weight, exclusive);
}
@Override
public ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int streamDependency, short weight, boolean exclusive) {
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId,
int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
try {
verifyStreamId(streamId, "Stream ID");
verifyStreamId(streamDependency, "Stream Dependency");
@ -161,15 +161,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
// Adjust the weight so that it fits into a single byte on the wire.
frame.writeByte(weight - 1);
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
}
}
@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, long errorCode) {
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
try {
verifyStreamId(streamId, "Stream ID");
verifyErrorCode(errorCode);
@ -178,15 +178,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
writeFrameHeader(frame, INT_FIELD_LENGTH, RST_STREAM, new Http2Flags(),
streamId);
writeUnsignedInt(errorCode, frame);
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
}
}
@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Settings settings) {
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
ChannelPromise promise) {
try {
if (settings == null) {
throw new NullPointerException("settings");
@ -198,7 +198,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
writeUnsignedShort(entry.key(), frame);
writeUnsignedInt(entry.value(), frame);
}
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
}
@ -209,15 +209,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
try {
ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH);
writeFrameHeader(frame, 0, SETTINGS, new Http2Flags().ack(true), 0);
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
}
}
@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, boolean ack,
ByteBuf data) {
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data,
ChannelPromise promise) {
try {
ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + data.readableBytes());
Http2Flags flags = ack ? new Http2Flags().ack(true) : new Http2Flags();
@ -225,7 +225,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
// Write the debug data.
frame.writeBytes(data, data.readerIndex(), data.readableBytes());
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
} finally {
@ -234,8 +234,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
}
@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int promisedStreamId, Http2Headers headers, int padding) {
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
ByteBuf headerBlock = null;
try {
verifyStreamId(streamId, "Stream ID");
@ -273,7 +273,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
firstFrame.writeZero(padding);
if (headerBlock.readableBytes() == 0) {
return ctx.writeAndFlush(firstFrame, promise);
return ctx.write(firstFrame, promise);
}
// Create a composite buffer wrapping the first frame and any continuation frames.
@ -288,8 +288,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
}
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, ChannelPromise promise,
int lastStreamId, long errorCode, ByteBuf debugData) {
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData, ChannelPromise promise) {
try {
verifyStreamOrConnectionId(lastStreamId, "Last Stream ID");
verifyErrorCode(errorCode);
@ -300,7 +300,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
frame.writeInt(lastStreamId);
writeUnsignedInt(errorCode, frame);
frame.writeBytes(debugData, debugData.readerIndex(), debugData.readableBytes());
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
} finally {
@ -309,8 +309,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
}
@Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int windowSizeIncrement) {
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement, ChannelPromise promise) {
try {
verifyStreamOrConnectionId(streamId, "Stream ID");
verifyWindowSizeIncrement(windowSizeIncrement);
@ -319,21 +319,21 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
writeFrameHeader(frame, INT_FIELD_LENGTH, WINDOW_UPDATE,
new Http2Flags(), streamId);
frame.writeInt(windowSizeIncrement);
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
}
}
@Override
public ChannelFuture writeFrame(ChannelHandlerContext ctx, ChannelPromise promise,
byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload, ChannelPromise promise) {
try {
verifyStreamOrConnectionId(streamId, "Stream ID");
ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payload.readableBytes());
writeFrameHeader(frame, payload.readableBytes(), frameType, flags, streamId);
frame.writeBytes(payload);
return ctx.writeAndFlush(frame, promise);
return ctx.write(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
}
@ -392,7 +392,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
firstFrame.writeZero(padding);
if (flags.endOfHeaders()) {
return ctx.writeAndFlush(firstFrame, promise);
return ctx.write(firstFrame, promise);
}
// Create a composite buffer wrapping the first frame and any continuation frames.
@ -425,7 +425,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
}
out.writerIndex(numBytes);
return ctx.writeAndFlush(out, promise);
return ctx.write(out, promise);
}
/**

View File

@ -16,6 +16,10 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseAggregator;
import java.util.ArrayDeque;
import java.util.ArrayList;
@ -57,13 +61,19 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
};
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx;
public DefaultHttp2OutboundFlowController(Http2Connection connection) {
public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
if (connection == null) {
throw new NullPointerException("connection");
}
if (frameWriter == null) {
throw new NullPointerException("frameWriter");
}
this.connection = connection;
this.frameWriter = frameWriter;
// Add a flow state for the connection.
connection.connectionStream().outboundFlow(
@ -144,36 +154,53 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
OutboundFlowState state = stateOrFail(streamId);
state.incrementStreamWindow(delta);
state.writeBytes(state.writableWindow());
flush();
}
}
@Override
public void sendFlowControlled(int streamId, ByteBuf data, int padding, boolean endStream,
FrameWriter frameWriter) throws Http2Exception {
OutboundFlowState state = stateOrFail(streamId);
OutboundFlowState.Frame frame =
state.newFrame(data, padding, endStream, frameWriter);
// Limit the window for this write by the maximum frame size.
int window = state.writableWindow();
int dataLength = data.readableBytes();
if (window >= dataLength) {
// Window size is large enough to send entire data frame
frame.write();
return;
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
if (ctx == null) {
throw new NullPointerException("ctx");
}
if (promise == null) {
throw new NullPointerException("promise");
}
if (data == null) {
throw new NullPointerException("data");
}
// Enqueue the frame to be written when the window size permits.
frame.enqueue();
// Save the context. We'll use this later when we write pending bytes.
this.ctx = ctx;
if (window <= 0) {
// Stream is stalled, don't send anything now.
return;
try {
OutboundFlowState state = stateOrFail(streamId);
int window = state.writableWindow();
OutboundFlowState.Frame frame = state.newFrame(ctx, promise, data, padding, endStream);
if (window >= data.readableBytes()) {
// Window size is large enough to send entire data frame
frame.write();
ctx.flush();
return promise;
}
// Enqueue the frame to be written when the window size permits.
frame.enqueue();
if (window <= 0) {
// Stream is stalled, don't send anything now.
return promise;
}
// Create and send a partial frame up to the window size.
frame.split(window).write();
ctx.flush();
} catch (Http2Exception e) {
promise.setFailure(e);
}
// Create and send a partial frame up to the window size.
frame.split(window).write();
return promise;
}
private static OutboundFlowState state(Http2Stream stream) {
@ -207,6 +234,15 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
return connectionState().window();
}
/**
* Flushes the {@link ChannelHandlerContext} if we've received any data frames.
*/
private void flush() {
if (ctx != null) {
ctx.flush();
}
}
/**
* Resets the priority bytes for the given subtree following a restructuring of the priority
* tree.
@ -236,6 +272,10 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
Http2Stream connectionStream = connection.connectionStream();
int totalAllowance = state(connectionStream).priorityBytes();
writeAllowedBytes(connectionStream, totalAllowance);
// Optimization: only flush once for all written frames. If it's null, there are no
// data frames to send anyway.
flush();
}
/**
@ -444,8 +484,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* Creates a new frame with the given values but does not add it to the pending queue.
*/
Frame newFrame(ByteBuf data, int padding, boolean endStream, FrameWriter writer) {
return new Frame(data, padding, endStream, writer);
Frame newFrame(ChannelHandlerContext ctx, ChannelPromise promise, ByteBuf data,
int padding, boolean endStream) {
return new Frame(ctx, new ChannelPromiseAggregator(promise), data, padding, endStream);
}
/**
@ -528,17 +569,24 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* A wrapper class around the content of a data frame.
*/
private final class Frame {
private final ByteBuf data;
private final int padding;
private final boolean endStream;
private final FrameWriter writer;
private boolean enqueued;
final ByteBuf data;
final int padding;
final boolean endStream;
final ChannelHandlerContext ctx;
final ChannelPromiseAggregator promiseAggregator;
final ChannelPromise promise;
boolean enqueued;
Frame(ByteBuf data, int padding, boolean endStream, FrameWriter writer) {
Frame(ChannelHandlerContext ctx,
ChannelPromiseAggregator promiseAggregator, ByteBuf data, int padding,
boolean endStream) {
this.ctx = ctx;
this.data = data;
this.padding = padding;
this.endStream = endStream;
this.writer = writer;
this.promiseAggregator = promiseAggregator;
this.promise = ctx.newPromise();
promiseAggregator.add(promise);
}
int size() {
@ -572,19 +620,21 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* Writes the frame and decrements the stream and connection window sizes. If the frame
* is in the pending queue, the written bytes are removed from this branch of the
* priority tree.
* <p>
* Note: this does not flush the {@link ChannelHandlerContext}.
*/
void write() throws Http2Exception {
// Using a do/while loop because if the buffer is empty we still need to call
// the writer once to send the empty frame.
do {
int bytesToWrite = data.readableBytes();
int frameBytes = Math.min(bytesToWrite, writer.maxFrameSize());
int frameBytes = Math.min(bytesToWrite, frameWriter.maxFrameSize());
if (frameBytes == bytesToWrite) {
// All the bytes fit into a single HTTP/2 frame, just send it all.
connectionState().incrementStreamWindow(-bytesToWrite);
incrementStreamWindow(-bytesToWrite);
ByteBuf slice = data.readSlice(bytesToWrite);
writer.writeFrame(stream.id(), slice, padding, endStream);
frameWriter.writeData(ctx, stream.id(), slice, padding, endStream, promise);
decrementPendingBytes(bytesToWrite);
return;
}
@ -602,7 +652,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
void writeError(Http2Exception cause) {
decrementPendingBytes(data.readableBytes());
data.release();
writer.setFailure(cause);
promise.setFailure(cause);
}
/**
@ -617,7 +667,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
Frame split(int maxBytes) {
// TODO: Should padding be included in the chunks or only the last frame?
maxBytes = min(maxBytes, data.readableBytes());
Frame frame = new Frame(data.readSlice(maxBytes).retain(), 0, false, writer);
Frame frame = new Frame(ctx, promiseAggregator, data.readSlice(maxBytes).retain(), 0, false);
decrementPendingBytes(maxBytes);
return frame;
}

View File

@ -52,52 +52,52 @@ public class DelegatingHttp2ConnectionHandler extends AbstractHttp2ConnectionHan
}
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
ByteBuf data, int padding, boolean endStream) {
return super.writeData(ctx, promise, streamId, data, padding, endStream);
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
return super.writeData(ctx, streamId, data, padding, endStream, promise);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int padding, boolean endStream) {
return super.writeHeaders(ctx, promise, streamId, headers, padding, endStream);
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream) {
return super.writeHeaders(ctx, promise, streamId, headers, streamDependency, weight,
exclusive, padding, endStream);
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
int padding, boolean endStream, ChannelPromise promise) {
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream, promise);
}
@Override
public ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int streamDependency, short weight, boolean exclusive) {
return super.writePriority(ctx, promise, streamId, streamDependency, weight, exclusive);
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId,
int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
return super.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
}
@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, long errorCode) {
return super.writeRstStream(ctx, promise, streamId, errorCode);
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
return super.writeRstStream(ctx, streamId, errorCode, promise);
}
@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Settings settings) {
return super.writeSettings(ctx, promise, settings);
public ChannelFuture writeSettings(ChannelHandlerContext ctx,
Http2Settings settings, ChannelPromise promise) {
return super.writeSettings(ctx, settings, promise);
}
@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, ByteBuf data) {
return super.writePing(ctx, promise, data);
public ChannelFuture writePing(ChannelHandlerContext ctx, ByteBuf data, ChannelPromise promise) {
return super.writePing(ctx, data, promise);
}
@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int promisedStreamId, Http2Headers headers, int padding) {
return super.writePushPromise(ctx, promise, streamId, promisedStreamId, headers, padding);
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
return super.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
}
@Override

View File

@ -177,10 +177,10 @@ public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2Connect
ChannelPromise headerPromise = ctx.newPromise();
ChannelPromise dataPromise = ctx.newPromise();
promiseAggregator.add(headerPromise, dataPromise);
writeHeaders(ctx, headerPromise, streamId, http2Headers.build(), 0, false);
writeData(ctx, dataPromise, streamId, httpMsg.content(), 0, true);
writeHeaders(ctx, streamId, http2Headers.build(), 0, false, headerPromise);
writeData(ctx, streamId, httpMsg.content(), 0, true, dataPromise);
} else {
writeHeaders(ctx, promise, streamId, http2Headers.build(), 0, true);
writeHeaders(ctx, streamId, http2Headers.build(), 0, true, promise);
}
} else {
ctx.write(msg, promise);

View File

@ -0,0 +1,40 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* Interface that defines an object capable of writing HTTP/2 data frames.
*/
public interface Http2DataWriter {
/**
* Writes a {@code DATA} frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param streamId the stream for which to send the frame.
* @param data the payload of the frame.
* @param padding the amount of padding to be added to the end of the frame
* @param endStream indicates if this is the last frame to be sent for the stream.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeData(ChannelHandlerContext ctx, int streamId,
ByteBuf data, int padding, boolean endStream, ChannelPromise promise);
}

View File

@ -23,43 +23,30 @@ import io.netty.channel.ChannelPromise;
import java.io.Closeable;
/**
* A writer responsible for marshalling HTTP/2 frames to the channel.
* A writer responsible for marshalling HTTP/2 frames to the channel. All of the write methods in
* this interface write to the context, but DO NOT FLUSH. To perform a flush, you must separately
* call {@link ChannelHandlerContext#flush()}.
*/
public interface Http2FrameWriter extends Closeable {
/**
* Writes a DATA frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param streamId the stream for which to send the frame.
* @param data the payload of the frame.
* @param padding the amount of padding to be added to the end of the frame
* @param endStream indicates if this is the last frame to be sent for the stream.
* @return the future for the write.
*/
ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
ByteBuf data, int padding, boolean endStream);
public interface Http2FrameWriter extends Http2DataWriter, Closeable {
/**
* Writes a HEADERS frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param streamId the stream for which to send the frame.
* @param headers the headers to be sent.
* @param padding the amount of padding to be added to the end of the frame
* @param endStream indicates if this is the last frame to be sent for the stream.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
Http2Headers headers, int padding, boolean endStream);
ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream, ChannelPromise promise);
/**
* Writes a HEADERS frame with priority specified to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param streamId the stream for which to send the frame.
* @param headers the headers to be sent.
* @param streamDependency the stream on which this stream should depend, or 0 if it should
@ -68,49 +55,50 @@ public interface Http2FrameWriter extends Closeable {
* @param exclusive whether this stream should be the exclusive dependant of its parent.
* @param padding the amount of padding to be added to the end of the frame
* @param endStream indicates if this is the last frame to be sent for the stream.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
int padding, boolean endStream);
ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
ChannelPromise promise);
/**
* Writes a PRIORITY frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param streamId the stream for which to send the frame.
* @param streamDependency the stream on which this stream should depend, or 0 if it should
* depend on the connection.
* @param weight the weight for this stream.
* @param exclusive whether this stream should be the exclusive dependant of its parent.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
int streamDependency, short weight, boolean exclusive);
ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive, ChannelPromise promise);
/**
* Writes a RST_STREAM frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param streamId the stream for which to send the frame.
* @param errorCode the error code indicating the nature of the failure.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
long errorCode);
ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise);
/**
* Writes a SETTINGS frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param settings the settings to be sent.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Settings settings);
ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
ChannelPromise promise);
/**
* Writes a SETTINGS acknowledgment to the remote endpoint.
@ -125,68 +113,68 @@ public interface Http2FrameWriter extends Closeable {
* Writes a PING frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param ack indicates whether this is an ack of a PING frame previously received from the
* remote endpoint.
* @param data the payload of the frame.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, boolean ack,
ByteBuf data);
ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data,
ChannelPromise promise);
/**
* Writes a PUSH_PROMISE frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param streamId the stream for which to send the frame.
* @param promisedStreamId the ID of the promised stream.
* @param headers the headers to be sent.
* @param padding the amount of padding to be added to the end of the frame
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
int promisedStreamId, Http2Headers headers, int padding);
ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding, ChannelPromise promise);
/**
* Writes a GO_AWAY frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param lastStreamId the last known stream of this endpoint.
* @param errorCode the error code, if the connection was abnormally terminated.
* @param debugData application-defined debug data.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeGoAway(ChannelHandlerContext ctx, ChannelPromise promise, int lastStreamId,
long errorCode, ByteBuf debugData);
ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData, ChannelPromise promise);
/**
* Writes a WINDOW_UPDATE frame to the remote endpoint.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param streamId the stream for which to send the frame.
* @param windowSizeIncrement the number of bytes by which the local inbound flow control window
* is increasing.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int windowSizeIncrement);
ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement, ChannelPromise promise);
/**
* Generic write method for any HTTP/2 frame. This allows writing of non-standard frames.
*
* @param ctx the context to use for writing.
* @param promise the promise for the write.
* @param frameType the frame type identifier.
* @param streamId the stream for which to send the frame.
* @param flags the flags to write for this frame.
* @param payload the payload to write for this frame.
* @param promise the promise for the write.
* @return the future for the write.
*/
ChannelFuture writeFrame(ChannelHandlerContext ctx, ChannelPromise promise, byte frameType,
int streamId, Http2Flags flags, ByteBuf payload);
ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload, ChannelPromise promise);
/**
* Closes this writer and frees any allocated resources.

View File

@ -37,8 +37,6 @@ import javax.net.ssl.SSLEngine;
*/
public abstract class Http2OrHttpChooser extends ByteToMessageDecoder {
// TODO: Replace with generic NPN handler
public enum SelectedProtocol {
/** Must be updated to match the HTTP/2 draft number. */
HTTP_2(TLS_UPGRADE_PROTOCOL_NAME),

View File

@ -16,33 +16,28 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* Controls the outbound flow of data frames to the remote endpoint.
*/
public interface Http2OutboundFlowController {
public interface Http2OutboundFlowController extends Http2DataWriter {
/**
* Interface that abstracts the writing of frames to the remote endpoint.
* Controls the flow-controlled writing of a DATA frame to the remote endpoint. There is no
* guarantee when the data will be written or whether it will be split into multiple frames
* before sending. The returned future will only be completed once all of the data has been
* successfully written to the remote endpoint.
* <p>
* Manually flushing the {@link ChannelHandlerContext} is not required, since the flow
* controller will flush as appropriate.
*/
interface FrameWriter {
/**
* Writes a single data frame to the remote endpoint.
*/
void writeFrame(int streamId, ByteBuf data, int padding, boolean endStream);
/**
* Called if an error occurred before the write could take place. Sets the failure on the
* channel promise.
*/
void setFailure(Throwable cause);
/**
* Gets the maximum allowed frame size.
*/
int maxFrameSize();
}
@Override
ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endStream, ChannelPromise promise);
/**
* Sets the initial size of the connection's outbound flow control window. The outbound flow
@ -67,25 +62,4 @@ public interface Http2OutboundFlowController {
* @throws Http2Exception thrown if a protocol-related error occurred.
*/
void updateOutboundWindowSize(int streamId, int deltaWindowSize) throws Http2Exception;
/**
* Sends the frame with outbound flow control applied. The frame may be written at a later time,
* depending on whether the remote endpoint can receive the frame now.
* <p/>
* Data frame flow control processing requirements:
* <p/>
* Sender must not send a data frame with data length greater than the transfer window size.
* After sending each data frame, the stream's transfer window size is decremented by the amount
* of data transmitted. When the window size becomes less than or equal to 0, the sender must
* pause transmitting data frames.
*
* @param streamId the ID of the stream on which the data is to be sent.
* @param data the data be be sent to the remote endpoint.
* @param padding the number of bytes of padding to be added to the frame.
* @param endStream indicates whether this frames is to be the last sent on this stream.
* @param frameWriter peforms to the write of the frame to the remote endpoint.
* @throws Http2Exception thrown if a protocol-related error occurred.
*/
void sendFlowControlled(int streamId, ByteBuf data, int padding, boolean endStream,
FrameWriter frameWriter) throws Http2Exception;
}

View File

@ -42,47 +42,47 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter {
}
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId,
ByteBuf data, int padding, boolean endStream) {
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
logger.logData(OUTBOUND, streamId, data, padding, endStream);
return writer.writeData(ctx, promise, streamId, data, padding, endStream);
return writer.writeData(ctx, streamId, data, padding, endStream, promise);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int padding, boolean endStream) {
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
logger.logHeaders(OUTBOUND, streamId, headers, padding, endStream);
return writer.writeHeaders(ctx, promise, streamId, headers, padding, endStream);
return writer.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream) {
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
int padding, boolean endStream, ChannelPromise promise) {
logger.logHeaders(OUTBOUND, streamId, headers, streamDependency, weight, exclusive,
padding, endStream);
return writer.writeHeaders(ctx, promise, streamId, headers, streamDependency, weight,
exclusive, padding, endStream);
return writer.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream, promise);
}
@Override
public ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int streamDependency, short weight, boolean exclusive) {
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId,
int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
logger.logPriority(OUTBOUND, streamId, streamDependency, weight, exclusive);
return writer.writePriority(ctx, promise, streamId, streamDependency, weight, exclusive);
return writer.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
}
@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, long errorCode) {
return writer.writeRstStream(ctx, promise, streamId, errorCode);
public ChannelFuture writeRstStream(ChannelHandlerContext ctx,
int streamId, long errorCode, ChannelPromise promise) {
return writer.writeRstStream(ctx, streamId, errorCode, promise);
}
@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Settings settings) {
public ChannelFuture writeSettings(ChannelHandlerContext ctx,
Http2Settings settings, ChannelPromise promise) {
logger.logSettings(OUTBOUND, settings);
return writer.writeSettings(ctx, promise, settings);
return writer.writeSettings(ctx, settings, promise);
}
@Override
@ -92,38 +92,38 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter {
}
@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, boolean ack,
ByteBuf data) {
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack,
ByteBuf data, ChannelPromise promise) {
logger.logPing(OUTBOUND, data);
return writer.writePing(ctx, promise, ack, data);
return writer.writePing(ctx, ack, data, promise);
}
@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int promisedStreamId, Http2Headers headers, int padding) {
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
logger.logPushPromise(OUTBOUND, streamId, promisedStreamId, headers, padding);
return writer.writePushPromise(ctx, promise, streamId, promisedStreamId, headers, padding);
return writer.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
}
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, ChannelPromise promise,
int lastStreamId, long errorCode, ByteBuf debugData) {
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData, ChannelPromise promise) {
logger.logGoAway(OUTBOUND, lastStreamId, errorCode, debugData);
return writer.writeGoAway(ctx, promise, lastStreamId, errorCode, debugData);
return writer.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
}
@Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, int windowSizeIncrement) {
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx,
int streamId, int windowSizeIncrement, ChannelPromise promise) {
logger.logWindowsUpdate(OUTBOUND, streamId, windowSizeIncrement);
return writer.writeWindowUpdate(ctx, promise, streamId, windowSizeIncrement);
return writer.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}
@Override
public ChannelFuture writeFrame(ChannelHandlerContext ctx, ChannelPromise promise,
byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload, ChannelPromise promise) {
logger.logUnknownFrame(OUTBOUND, frameType, streamId, flags, payload);
return writer.writeFrame(ctx, promise, frameType, streamId, flags, payload);
return writer.writeFrame(ctx, frameType, streamId, flags, payload, promise);
}
@Override

View File

@ -67,7 +67,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void emptyDataShouldRoundtrip() throws Exception {
ByteBuf data = Unpooled.EMPTY_BUFFER;
writer.writeData(ctx, promise, 1000, data, 0, false);
writer.writeData(ctx, 1000, data, 0, false, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -78,7 +78,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void dataShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writeData(ctx, promise, 1000, data.retain().duplicate(), 0, false);
writer.writeData(ctx, 1000, data.retain().duplicate(), 0, false, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -89,7 +89,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void dataWithPaddingShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writeData(ctx, promise, 1, data.retain().duplicate(), 0xFF, true);
writer.writeData(ctx, 1, data.retain().duplicate(), 0xFF, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -99,7 +99,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void priorityShouldRoundtrip() throws Exception {
writer.writePriority(ctx, promise, 1, 2, (short) 255, true);
writer.writePriority(ctx, 1, 2, (short) 255, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -109,7 +109,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void rstStreamShouldRoundtrip() throws Exception {
writer.writeRstStream(ctx, promise, 1, MAX_UNSIGNED_INT);
writer.writeRstStream(ctx, 1, MAX_UNSIGNED_INT, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -119,7 +119,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void emptySettingsShouldRoundtrip() throws Exception {
writer.writeSettings(ctx, promise, new Http2Settings());
writer.writeSettings(ctx, new Http2Settings(), promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -135,7 +135,7 @@ public class DefaultHttp2FrameIOTest {
settings.initialWindowSize(123);
settings.maxConcurrentStreams(456);
writer.writeSettings(ctx, promise, settings);
writer.writeSettings(ctx, settings, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -156,7 +156,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void pingShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writePing(ctx, promise, false, data.retain().duplicate());
writer.writePing(ctx, false, data.retain().duplicate(), promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -167,7 +167,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void pingAckShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writePing(ctx, promise, true, data.retain().duplicate());
writer.writePing(ctx, true, data.retain().duplicate(), promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
@ -178,7 +178,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void goAwayShouldRoundtrip() throws Exception {
ByteBuf data = dummyData();
writer.writeGoAway(ctx, promise, 1, MAX_UNSIGNED_INT, data.retain().duplicate());
writer.writeGoAway(ctx, 1, MAX_UNSIGNED_INT, data.retain().duplicate(), promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onGoAwayRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT), eq(data));
@ -187,7 +187,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void windowUpdateShouldRoundtrip() throws Exception {
writer.writeWindowUpdate(ctx, promise, 1, Integer.MAX_VALUE);
writer.writeWindowUpdate(ctx, 1, Integer.MAX_VALUE, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onWindowUpdateRead(eq(ctx), eq(1), eq(Integer.MAX_VALUE));
@ -197,7 +197,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void emptyHeadersShouldRoundtrip() throws Exception {
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
writer.writeHeaders(ctx, promise, 1, headers, 0, true);
writer.writeHeaders(ctx, 1, headers, 0, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true));
@ -207,7 +207,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void emptyHeadersWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
writer.writeHeaders(ctx, promise, 1, headers, 0xFF, true);
writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true));
@ -217,7 +217,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void headersWithoutPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, promise, 1, headers, 0, true);
writer.writeHeaders(ctx, 1, headers, 0, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true));
@ -227,7 +227,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void headersWithPaddingWithoutPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, promise, 1, headers, 0xFF, true);
writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true));
@ -237,7 +237,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void headersWithPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0, true);
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0),
@ -248,7 +248,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void headersWithPaddingWithPriorityShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0xFF, true);
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF),
@ -259,7 +259,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void continuedHeadersShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0, true);
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0),
@ -270,7 +270,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void continuedHeadersWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0xFF, true);
writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF),
@ -281,7 +281,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void emptypushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
writer.writePushPromise(ctx, promise, 1, 2, headers, 0);
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
@ -291,7 +291,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void pushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writePushPromise(ctx, promise, 1, 2, headers, 0);
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
@ -301,7 +301,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void pushPromiseWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = dummyHeaders();
writer.writePushPromise(ctx, promise, 1, 2, headers, 0xFF);
writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF));
@ -311,7 +311,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void continuedPushPromiseShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writePushPromise(ctx, promise, 1, 2, headers, 0);
writer.writePushPromise(ctx, 1, 2, headers, 0, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0));
@ -321,7 +321,7 @@ public class DefaultHttp2FrameIOTest {
@Test
public void continuedPushPromiseWithPaddingShouldRoundtrip() throws Exception {
Http2Headers headers = largeHeaders();
writer.writePushPromise(ctx, promise, 1, 2, headers, 0xFF);
writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise);
ByteBuf frame = captureWrite();
reader.readFrame(ctx, frame, observer);
verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF));
@ -330,7 +330,7 @@ public class DefaultHttp2FrameIOTest {
private ByteBuf captureWrite() {
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verify(ctx).writeAndFlush(captor.capture(), eq(promise));
verify(ctx).write(captor.capture(), eq(promise));
return captor.getValue();
}

View File

@ -15,10 +15,22 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2OutboundFlowController.FrameWriter;
import io.netty.util.CharsetUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.junit.Before;
import org.junit.Test;
@ -26,11 +38,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static io.netty.handler.codec.http2.Http2CodecUtil.*;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Tests for {@link DefaultHttp2OutboundFlowController}.
*/
@ -46,7 +53,13 @@ public class DefaultHttp2OutboundFlowControllerTest {
private ByteBuf buffer;
@Mock
private FrameWriter frameWriter;
private Http2FrameWriter frameWriter;
@Mock
private ChannelHandlerContext ctx;
@Mock
private ChannelPromise promise;
private DefaultHttp2Connection connection;
@ -54,8 +67,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
when(ctx.newPromise()).thenReturn(promise);
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2OutboundFlowController(connection);
controller = new DefaultHttp2OutboundFlowController(connection, frameWriter);
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
@ -159,8 +174,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void connectionWindowUpdateShouldSendFrame() throws Http2Exception {
// Set the connection window size to zero.
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
ByteBuf data = dummyData(10);
send(STREAM_A, data.slice());
@ -252,8 +266,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception {
// Block the connection
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream A
controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE);
@ -304,8 +317,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// Block the connection
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
@ -347,8 +359,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// Block the connection
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
@ -409,8 +420,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// Block the connection
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE);
@ -454,8 +464,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Block the connection
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Root the streams at the connection and assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
@ -518,8 +527,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void samePriorityShouldWriteEqualData() throws Http2Exception {
// Block the connection
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
// Root the streams at the connection with the same weights.
setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
@ -560,20 +568,20 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
private void send(int streamId, ByteBuf data) throws Http2Exception {
controller.sendFlowControlled(streamId, data, 0, false, frameWriter);
controller.writeData(ctx, streamId, data, 0, false, promise);
}
private void verifyWrite(int streamId, ByteBuf data) {
verify(frameWriter).writeFrame(eq(streamId), eq(data), eq(0), eq(false));
verify(frameWriter).writeData(eq(ctx), eq(streamId), eq(data), eq(0), eq(false), eq(promise));
}
private void verifyNoWrite(int streamId) {
verify(frameWriter, never()).writeFrame(eq(streamId), any(ByteBuf.class), anyInt(),
anyBoolean());
verify(frameWriter, never()).writeData(eq(ctx), eq(streamId), any(ByteBuf.class), anyInt(),
anyBoolean(), eq(promise));
}
private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, boolean endStream) {
verify(frameWriter).writeFrame(eq(streamId), captor.capture(), eq(0), eq(endStream));
verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(0), eq(endStream), eq(promise));
}
private void setPriority(int stream, int parent, int weight, boolean exclusive)

View File

@ -131,9 +131,9 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(writer.writeSettings(eq(ctx), eq(promise), any(Http2Settings.class))).thenReturn(
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(
future);
when(writer.writeGoAway(eq(ctx), eq(promise), anyInt(), anyInt(), any(ByteBuf.class)))
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise)))
.thenReturn(future);
mockContext();
@ -158,7 +158,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(reader.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
when(writer.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
handler.handlerAdded(ctx);
verify(writer).writeSettings(eq(ctx), eq(promise), eq(settings));
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
// Simulate receiving the initial settings from the remote endpoint.
decode().onSettingsRead(ctx, new Http2Settings());
@ -220,8 +220,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void closeShouldSendGoAway() throws Exception {
handler.close(ctx, promise);
verify(writer).writeGoAway(eq(ctx), eq(promise), eq(0), eq((long) NO_ERROR.code()),
eq(EMPTY_BUFFER));
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) NO_ERROR.code()),
eq(EMPTY_BUFFER), eq(promise));
verify(remote).goAwayReceived(0);
}
@ -236,8 +236,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
Http2Exception e = new Http2StreamException(STREAM_ID, PROTOCOL_ERROR);
handler.exceptionCaught(ctx, e);
verify(stream).close();
verify(writer).writeRstStream(eq(ctx), eq(promise), eq(STREAM_ID),
eq((long) PROTOCOL_ERROR.code()));
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID),
eq((long) PROTOCOL_ERROR.code()), eq(promise));
}
@Test
@ -246,8 +246,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(remote.lastStreamCreated()).thenReturn(STREAM_ID);
handler.exceptionCaught(ctx, e);
verify(remote).goAwayReceived(STREAM_ID);
verify(writer).writeGoAway(eq(ctx), eq(promise), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()),
eq(EMPTY_BUFFER));
verify(writer).writeGoAway(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()),
eq(EMPTY_BUFFER), eq(promise));
}
@Test
@ -403,7 +403,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void pingReadShouldReplyWithAck() throws Exception {
decode().onPingRead(ctx, emptyPingBuf());
verify(writer).writePing(eq(ctx), eq(promise), eq(true), eq(emptyPingBuf()));
verify(writer).writePing(eq(ctx), eq(true), eq(emptyPingBuf()), eq(promise));
verify(observer, never()).onPingAckRead(eq(ctx), any(ByteBuf.class));
}
@ -450,127 +450,126 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void dataWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeData(ctx, promise, STREAM_ID, dummyData(), 0, false);
ChannelFuture future = handler.writeData(ctx, STREAM_ID, dummyData(), 0, false, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void dataWriteShouldSucceed() throws Exception {
handler.writeData(ctx, promise, STREAM_ID, dummyData(), 0, false);
verify(outboundFlow).sendFlowControlled(eq(STREAM_ID), eq(dummyData()), eq(0),
eq(false), any(Http2OutboundFlowController.FrameWriter.class));
handler.writeData(ctx, STREAM_ID, dummyData(), 0, false, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(false), eq(promise));
}
@Test
public void headersWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeHeaders(
ctx, promise, 5, EMPTY_HEADERS, 0, (short) 255, false, 0, false);
ctx, 5, EMPTY_HEADERS, 0, (short) 255, false, 0, false, promise);
verify(local, never()).createStream(anyInt(), anyBoolean());
verify(writer, never()).writeHeaders(eq(ctx), eq(promise), anyInt(),
any(Http2Headers.class), anyInt(), anyBoolean());
verify(writer, never()).writeHeaders(eq(ctx), anyInt(),
any(Http2Headers.class), anyInt(), anyBoolean(), eq(promise));
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void headersWriteForUnknownStreamShouldCreateStream() throws Exception {
when(local.createStream(eq(5), eq(false))).thenReturn(stream);
handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, false);
handler.writeHeaders(ctx, 5, EMPTY_HEADERS, 0, false, promise);
verify(local).createStream(eq(5), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void headersWriteShouldCreateHalfClosedStream() throws Exception {
when(local.createStream(eq(5), eq(true))).thenReturn(stream);
handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, true);
handler.writeHeaders(ctx, 5, EMPTY_HEADERS, 0, true, promise);
verify(local).createStream(eq(5), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test
public void headersWriteShouldOpenStreamForPush() throws Exception {
when(stream.state()).thenReturn(RESERVED_LOCAL);
handler.writeHeaders(ctx, promise, STREAM_ID, EMPTY_HEADERS, 0, false);
handler.writeHeaders(ctx, STREAM_ID, EMPTY_HEADERS, 0, false, promise);
verify(stream).openForPush();
verify(stream, never()).closeLocalSide();
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void headersWriteShouldClosePushStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL);
handler.writeHeaders(ctx, promise, STREAM_ID, EMPTY_HEADERS, 0, true);
handler.writeHeaders(ctx, STREAM_ID, EMPTY_HEADERS, 0, true, promise);
verify(stream).openForPush();
verify(stream).closeLocalSide();
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test
public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writePushPromise(ctx, promise, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0);
ChannelFuture future = handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void pushPromiseWriteShouldReserveStream() throws Exception {
handler.writePushPromise(ctx, promise, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0);
handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0, promise);
verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
verify(writer).writePushPromise(eq(ctx), eq(promise), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EMPTY_HEADERS), eq(0));
verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EMPTY_HEADERS), eq(0), eq(promise));
}
@Test
public void priorityWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writePriority(ctx, promise, STREAM_ID, 0, (short) 255, true);
ChannelFuture future = handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void priorityWriteShouldSetPriorityForStream() throws Exception {
handler.writePriority(ctx, promise, STREAM_ID, 0, (short) 255, true);
handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(promise), eq(STREAM_ID), eq(0), eq((short) 255),
eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255),
eq(true), eq(promise));
}
@Test
public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception {
handler.writeRstStream(ctx, promise, 5, PROTOCOL_ERROR.code());
verify(writer, never()).writeRstStream(eq(ctx), eq(promise), anyInt(), anyLong());
handler.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise);
verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise));
}
@Test
public void rstStreamWriteShouldCloseStream() throws Exception {
handler.writeRstStream(ctx, promise, STREAM_ID, PROTOCOL_ERROR.code());
handler.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise);
verify(stream).close();
verify(writer).writeRstStream(eq(ctx), eq(promise), eq(STREAM_ID),
eq((long) PROTOCOL_ERROR.code()));
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID),
eq((long) PROTOCOL_ERROR.code()), eq(promise));
}
@Test
public void pingWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writePing(ctx, promise, emptyPingBuf());
ChannelFuture future = handler.writePing(ctx, emptyPingBuf(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void pingWriteShouldSucceed() throws Exception {
handler.writePing(ctx, promise, emptyPingBuf());
verify(writer).writePing(eq(ctx), eq(promise), eq(false), eq(emptyPingBuf()));
handler.writePing(ctx, emptyPingBuf(), promise);
verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise));
}
@Test
public void settingsWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeSettings(ctx, promise, new Http2Settings());
ChannelFuture future = handler.writeSettings(ctx, new Http2Settings(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@ -581,8 +580,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
settings.pushEnabled(false);
settings.maxConcurrentStreams(1000);
settings.headerTableSize(2000);
handler.writeSettings(ctx, promise, settings);
verify(writer).writeSettings(eq(ctx), eq(promise), eq(settings));
handler.writeSettings(ctx, settings, promise);
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
// Verify that application of local settings must not be done when it is dispatched.
verify(inboundFlow, never()).initialInboundWindowSize(eq(100));
verify(local, never()).allowPushTo(eq(false));

View File

@ -118,12 +118,12 @@ public class Http2ConnectionRoundtripTest {
@Override
public void run() {
for (int i = 0, nextStream = 3; i < NUM_STREAMS; ++i, nextStream += 2) {
http2Client.writeHeaders(
ctx(), newPromise(), nextStream, headers, 0, (short) 16, false, 0, false);
http2Client.writePing(ctx(), newPromise(), Unpooled.copiedBuffer(pingMsg.getBytes()));
http2Client.writeData(
ctx(), newPromise(), nextStream,
Unpooled.copiedBuffer(text.getBytes()), 0, true);
http2Client.writeHeaders(ctx(), nextStream, headers, 0, (short) 16, false, 0,
false, newPromise());
http2Client.writePing(ctx(), Unpooled.copiedBuffer(pingMsg.getBytes()),
newPromise());
http2Client.writeData(ctx(), nextStream,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
}
}
});

View File

@ -118,8 +118,9 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeData(ctx(), newPromise(), 0x7FFFFFFF,
Unpooled.copiedBuffer(text.getBytes()), 100, true);
frameWriter.writeData(ctx(), 0x7FFFFFFF,
Unpooled.copiedBuffer(text.getBytes()), 100, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -135,7 +136,8 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 0, true);
frameWriter.writeHeaders(ctx(), 0x7FFFFFFF, headers, 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -151,8 +153,9 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 4, (short) 255,
true, 0, true);
frameWriter.writeHeaders(ctx(), 0x7FFFFFFF, headers, 4, (short) 255,
true, 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -166,8 +169,9 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeGoAway(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL,
Unpooled.copiedBuffer(text.getBytes()));
frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL,
Unpooled.copiedBuffer(text.getBytes()), newPromise());
ctx().flush();
}
});
awaitRequests();
@ -181,7 +185,8 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePing(ctx(), ctx().newPromise(), true, buf);
frameWriter.writePing(ctx(), true, buf, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -194,7 +199,8 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePriority(ctx(), newPromise(), 0x7FFFFFFF, 1, (short) 1, true);
frameWriter.writePriority(ctx(), 0x7FFFFFFF, 1, (short) 1, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -210,7 +216,8 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePushPromise(ctx(), newPromise(), 0x7FFFFFFF, 1, headers, 5);
frameWriter.writePushPromise(ctx(), 0x7FFFFFFF, 1, headers, 5, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -223,7 +230,8 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeRstStream(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL);
frameWriter.writeRstStream(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -240,7 +248,8 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeSettings(ctx(), newPromise(), settings);
frameWriter.writeSettings(ctx(), settings, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -252,7 +261,8 @@ public class Http2FrameRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeWindowUpdate(ctx(), newPromise(), 0x7FFFFFFF, 0x7FFFFFFF);
frameWriter.writeWindowUpdate(ctx(), 0x7FFFFFFF, 0x7FFFFFFF, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -273,10 +283,11 @@ public class Http2FrameRoundtripTest {
@Override
public void run() {
for (int i = 1; i < numStreams + 1; ++i) {
frameWriter.writeHeaders(ctx(), newPromise(), i, headers, 0, (short) 16, false,
0, false);
frameWriter.writeData(ctx(), newPromise(), i,
Unpooled.copiedBuffer(text.getBytes()), 0, true);
frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false,
0, false, newPromise());
frameWriter.writeData(ctx(), i,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
ctx().flush();
}
}
});

View File

@ -145,7 +145,8 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -168,9 +169,10 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -199,11 +201,12 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text2.getBytes()), 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -232,13 +235,13 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, false,
newPromise());
frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, false,
newPromise());
frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -272,10 +275,11 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false);
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeHeaders(ctx(), 3, http2Headers2, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -309,10 +313,11 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false);
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, newPromise());
frameWriter.writeHeaders(ctx(), 3, http2Headers2, 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -353,12 +358,13 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false);
frameWriter.writePushPromise(ctx(), newPromise(), 3, 5, http2Headers2, 0);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true);
frameWriter.writeData(ctx(), newPromise(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writePushPromise(ctx(), 3, 5, http2Headers2, 0, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
frameWriter.writeData(ctx(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
@ -399,13 +405,14 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false);
frameWriter.writeHeaders(ctx(), newPromise(), 5, http2Headers2, 0, false);
frameWriter.writePriority(ctx(), newPromise(), 5, 3, (short) 256, true);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true);
frameWriter.writeData(ctx(), newPromise(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true);
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeHeaders(ctx(), 5, http2Headers2, 0, false, newPromise());
frameWriter.writePriority(ctx(), 5, 3, (short) 256, true, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
frameWriter.writeData(ctx(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();

View File

@ -67,9 +67,10 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameWriter frameWriter = frameWriter();
connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection,
frameReader(), frameWriter(), new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2OutboundFlowController(connection),
frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2OutboundFlowController(connection, frameWriter),
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength));
responseHandler = new HttpResponseHandler();
settingsHandler = new Http2SettingsHandler(ch.newPromise());

View File

@ -32,6 +32,7 @@ import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
@ -48,14 +49,14 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8));
public HelloWorldHttp2Handler() {
this(new DefaultHttp2Connection(true));
this(new DefaultHttp2Connection(true), new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger));
}
private HelloWorldHttp2Handler(Http2Connection connection) {
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) {
super(connection, new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger),
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger),
frameWriter,
new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2OutboundFlowController(connection));
new DefaultHttp2OutboundFlowController(connection, frameWriter));
}
/**
@ -69,7 +70,7 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
Http2Headers headers =
DefaultHttp2Headers.newBuilder().status("200")
.set(UPGRADE_RESPONSE_HEADER, "true").build();
writeHeaders(ctx, ctx.newPromise(), 1, headers, 0, true);
writeHeaders(ctx, 1, headers, 0, true, ctx.newPromise());
}
super.userEventTriggered(ctx, evt);
}
@ -109,8 +110,8 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) {
// Send a frame for the response status
Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200").build();
writeHeaders(ctx(), ctx().newPromise(), streamId, headers, 0, false);
writeHeaders(ctx(), streamId, headers, 0, false, ctx().newPromise());
writeData(ctx(), ctx().newPromise(), streamId, payload, 0, true);
writeData(ctx(), streamId, payload, 0, true, ctx().newPromise());
}
}