From 21bc279700928a568f1c75940f88fb27c43b9421 Mon Sep 17 00:00:00 2001 From: nmittler Date: Thu, 21 Aug 2014 09:30:29 -0700 Subject: [PATCH] Refactoring outbound flow controller to use frame writer. Motivation: This is addressing a TODO in the outbound flow controller. We currently have a separate writer interface passed into the outbound flow controller. This is confusing and limiting as to how the flow controller can perform its writes (e.g. no control over flushing). Instead it would be better to just let the flow controller use the Http2FrameWriter directly. Modifications: - Added a new Http2DataWriter interface, which is extended by Http2FrameWriter and Http2OutboundFlowController. - Removed automatic flushing from Http2DataWriter in order to facilitate optimizing the case where there are multiple writes. - Updated DefaultHttp2OutboundFlowController to properly optimize flushing of the ChannelHandlerContext when multiple writes occur. Result: Code is greatly simplified WRT outbound flow control and flushes are optimized for flow-controlled DATA frames. --- .../http2/AbstractHttp2ConnectionHandler.java | 227 +++++++----------- .../codec/http2/DefaultHttp2FrameWriter.java | 70 +++--- .../DefaultHttp2OutboundFlowController.java | 122 +++++++--- .../DelegatingHttp2ConnectionHandler.java | 50 ++-- .../DelegatingHttp2HttpConnectionHandler.java | 6 +- .../handler/codec/http2/Http2DataWriter.java | 40 +++ .../handler/codec/http2/Http2FrameWriter.java | 82 +++---- .../codec/http2/Http2OrHttpChooser.java | 2 - .../http2/Http2OutboundFlowController.java | 56 ++--- .../codec/http2/Http2OutboundFrameLogger.java | 70 +++--- .../codec/http2/DefaultHttp2FrameIOTest.java | 50 ++-- ...efaultHttp2OutboundFlowControllerTest.java | 64 ++--- .../DelegatingHttp2ConnectionHandlerTest.java | 95 ++++---- .../http2/Http2ConnectionRoundtripTest.java | 12 +- .../codec/http2/Http2FrameRoundtripTest.java | 45 ++-- .../http2/InboundHttp2ToHttpAdapterTest.java | 81 ++++--- .../http2/client/Http2ClientInitializer.java | 5 +- .../http2/server/HelloWorldHttp2Handler.java | 15 +- 18 files changed, 555 insertions(+), 537 deletions(-) create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataWriter.java diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java index c05b49a71a..5745a8a1ba 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java @@ -19,7 +19,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf; -import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception; import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; @@ -38,7 +37,6 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -77,8 +75,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode } protected AbstractHttp2ConnectionHandler(Http2Connection connection) { - this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(), - new DefaultHttp2InboundFlowController(connection), new DefaultHttp2OutboundFlowController(connection)); + this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter()); + } + + protected AbstractHttp2ConnectionHandler(Http2Connection connection, + Http2FrameReader frameReader, Http2FrameWriter frameWriter) { + this(connection, frameReader, frameWriter, + new DefaultHttp2InboundFlowController(connection), + new DefaultHttp2OutboundFlowController(connection, frameWriter)); } protected AbstractHttp2ConnectionHandler(Http2Connection connection, @@ -338,9 +342,11 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode return connection.local().nextStreamId(); } - protected ChannelFuture writeData(final ChannelHandlerContext ctx, - final ChannelPromise promise, int streamId, final ByteBuf data, int padding, - boolean endStream) { + /** + * Writes (and flushes) the given data to the remote endpoint. + */ + public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, final ByteBuf data, + int padding, boolean endStream, ChannelPromise promise) { try { if (connection.isGoAway()) { throw protocolError("Sending data after connection going away."); @@ -350,24 +356,28 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE); // Hand control of the frame to the flow controller. - outboundFlow.sendFlowControlled(streamId, data, padding, endStream, - new FlowControlWriter(ctx, data, promise)); - - return promise; + return outboundFlow.writeData(ctx, streamId, data, padding, endStream, promise); } catch (Http2Exception e) { - return promise.setFailure(e); + promise.setFailure(e); + return promise; } } - protected ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int padding, boolean endStream) { - return writeHeaders(ctx, promise, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, - padding, endStream); + /** + * Writes (and flushes) the given headers to the remote endpoint. + */ + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { + return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, + padding, endStream, promise); } - protected ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int streamDependency, short weight, - boolean exclusive, int padding, boolean endStream) { + /** + * Writes (and flushes) the given headers to the remote endpoint. + */ + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, boolean exclusive, + int padding, boolean endStream, ChannelPromise promise) { try { if (connection.isGoAway()) { throw protocolError("Sending headers after connection going away."); @@ -394,8 +404,9 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode } } - ChannelFuture future = frameWriter.writeHeaders(ctx, promise, streamId, headers, streamDependency, - weight, exclusive, padding, endStream); + ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, + weight, exclusive, padding, endStream, promise); + ctx.flush(); // If the headers are the end of the stream, close it now. if (endStream) { @@ -408,8 +419,11 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode } } - protected ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int streamDependency, short weight, boolean exclusive) { + /** + * Writes (and flushes) the given priority to the remote endpoint. + */ + public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, + int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { try { if (connection.isGoAway()) { throw protocolError("Sending priority after connection going away."); @@ -418,15 +432,20 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode // Update the priority on this stream. connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive); - return frameWriter.writePriority(ctx, promise, streamId, streamDependency, weight, - exclusive); + ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, + exclusive, promise); + ctx.flush(); + return future; } catch (Http2Exception e) { return promise.setFailure(e); } } - protected ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, long errorCode) { + /** + * Writes (and flushes) the a {@code RST_STREAM} frame to the remote endpoint. + */ + public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, + ChannelPromise promise) { Http2Stream stream = connection.stream(streamId); if (stream == null) { // The stream may already have been closed ... ignore. @@ -434,7 +453,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode return promise; } - ChannelFuture future = frameWriter.writeRstStream(ctx, promise, streamId, errorCode); + ChannelFuture future = frameWriter.writeRstStream(ctx, streamId, errorCode, promise); + ctx.flush(); stream.terminateSent(); close(stream, promise); @@ -442,8 +462,11 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode return future; } - protected ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, - Http2Settings settings) { + /** + * Writes (and flushes) the given settings to the remote endpoint. + */ + public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, + ChannelPromise promise) { outstandingLocalSettingsQueue.add(settings); try { if (connection.isGoAway()) { @@ -455,28 +478,37 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified"); } - return frameWriter.writeSettings(ctx, promise, settings); + frameWriter.writeSettings(ctx, settings, promise); + ctx.flush(); + return promise; } catch (Http2Exception e) { return promise.setFailure(e); } } - protected ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, - ByteBuf data) { + /** + * Writes (and flushes) the given {@code PING} frame to the remote endpoint. + */ + public ChannelFuture writePing(ChannelHandlerContext ctx, ByteBuf data, ChannelPromise promise) { try { if (connection.isGoAway()) { throw protocolError("Sending ping after connection going away."); } // Just pass the frame through. - return frameWriter.writePing(ctx, promise, false, data); + frameWriter.writePing(ctx, false, data, promise); + ctx.flush(); + return promise; } catch (Http2Exception e) { return promise.setFailure(e); } } - protected ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int promisedStreamId, Http2Headers headers, int padding) { + /** + * Writes (and flushes) the given {@code PUSH_PROMISE} to the remote endpoint. + */ + public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, + int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { try { if (connection.isGoAway()) { throw protocolError("Sending push promise after connection going away."); @@ -487,8 +519,10 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode connection.local().reservePushStream(promisedStreamId, stream); // Write the frame. - return frameWriter.writePushPromise(ctx, promise, streamId, promisedStreamId, headers, - padding); + frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, + padding, promise); + ctx.flush(); + return promise; } catch (Http2Exception e) { return promise.setFailure(e); } @@ -540,7 +574,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) { // Send the RST_STREAM frame to the remote endpoint. int streamId = cause.streamId(); - frameWriter.writeRstStream(ctx, ctx.newPromise(), streamId, cause.error().code()); + frameWriter.writeRstStream(ctx, streamId, cause.error().code(), ctx.newPromise()); + ctx.flush(); // Mark the stream as terminated and close it. Http2Stream stream = connection.stream(streamId); @@ -568,7 +603,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode ByteBuf debugData = toByteBuf(ctx, cause); int lastKnownStream = connection.remote().lastStreamCreated(); - future = frameWriter.writeGoAway(ctx, promise, lastKnownStream, errorCode, debugData); + future = frameWriter.writeGoAway(ctx, lastKnownStream, errorCode, debugData, promise); + ctx.flush(); closePromise = null; connection.remote().goAwayReceived(lastKnownStream); } @@ -710,8 +746,9 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode // Both client and server must send their initial settings. Http2Settings settings = settings(); outstandingLocalSettingsQueue.add(settings); - frameWriter.writeSettings(ctx, ctx.newPromise(), settings).addListener( + frameWriter.writeSettings(ctx, settings, ctx.newPromise()).addListener( ChannelFutureListener.CLOSE_ON_FAILURE); + ctx.flush(); } /** @@ -792,8 +829,9 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode @Override public void writeFrame(int streamId, int windowSizeIncrement) throws Http2Exception { - frameWriter.writeWindowUpdate(ctx, ctx.newPromise(), streamId, - windowSizeIncrement); + frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, + ctx.newPromise()); + ctx.flush(); } }); @@ -994,6 +1032,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode // Acknowledge receipt of the settings. frameWriter.writeSettingsAck(ctx, ctx.newPromise()); + ctx.flush(); // We've received at least one non-ack settings frame from the remote endpoint. prefaceReceived = true; @@ -1007,7 +1046,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode // Send an ack back to the remote client. // Need to retain the buffer here since it will be released after the write completes. - frameWriter.writePing(ctx, ctx.newPromise(), true, data.retain()); + frameWriter.writePing(ctx, true, data.retain(), ctx.newPromise()); + ctx.flush(); AbstractHttp2ConnectionHandler.this.onPingRead(ctx, data); } @@ -1109,103 +1149,4 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode } } } - - /** - * Controls the write for a single outbound DATA frame. This writer is passed to the outbound flow - * controller, which may break the frame into chunks as dictated by the flow control window. If - * the write of any chunk fails, the original promise fails as well. Success occurs after the last - * chunk is written successfully. - */ - private final class FlowControlWriter implements Http2OutboundFlowController.FrameWriter { - private final ChannelHandlerContext ctx; - private final ChannelPromise promise; - private final List promises; - private int remaining; - - FlowControlWriter(ChannelHandlerContext ctx, ByteBuf data, ChannelPromise promise) { - this.ctx = ctx; - this.promise = promise; - promises = new ArrayList(4); - promises.add(promise); - remaining = data.readableBytes(); - } - - @Override - public void writeFrame(int streamId, ByteBuf data, int padding, boolean endStream) { - if (promise.isDone()) { - // Most likely the write already failed. Just release the - // buffer. - data.release(); - return; - } - - remaining -= data.readableBytes(); - - // The flow controller may split the write into chunks. Use a new - // promise for intermediate writes. - final ChannelPromise chunkPromise = - remaining == 0 ? promise : ctx.newPromise(); - - // The original promise is already in the list, so don't add again. - if (chunkPromise != promise) { - promises.add(chunkPromise); - } - - // TODO: consider adding a flush() method to this interface. The - // frameWriter flushes on each write which isn't optimal - // for the case of the outbound flow controller, which sends a batch - // of frames when the flow control window changes. We should let - // the flow controller manually flush after all writes are. - // complete. - - // Write the frame. - ChannelFuture future = - frameWriter.writeData(ctx, chunkPromise, streamId, data, padding, endStream); - - // Close the connection on write failures that leave the outbound - // flow control window in a corrupt state. - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) - throws Exception { - if (!future.isSuccess()) { - // If any of the chunk writes fail, also fail the - // original - // future that was returned to the caller. - failAllPromises(future.cause()); - onHttp2Exception(ctx, - toHttp2Exception(future.cause())); - } - } - }); - - // Close the local side of the stream if this is the last frame - if (endStream) { - Http2Stream stream = connection.stream(streamId); - closeLocalSide(stream, ctx.newPromise()); - } - } - - @Override - public void setFailure(Throwable cause) { - failAllPromises(cause); - } - - @Override - public int maxFrameSize() { - return frameWriter.maxFrameSize(); - } - - /** - * Called when the write for any chunk fails. Fails all promises including - * the one returned to the caller. - */ - private void failAllPromises(Throwable cause) { - for (ChannelPromise chunkPromise : promises) { - if (!chunkPromise.isDone()) { - chunkPromise.setFailure(cause); - } - } - } - } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java index 4e2fdc5945..be272e009a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java @@ -100,8 +100,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { } @Override - public ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - ByteBuf data, int padding, boolean endStream) { + public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endStream, ChannelPromise promise) { try { verifyStreamId(streamId, "Stream ID"); verifyPadding(padding); @@ -122,7 +122,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { // Write the required padding. out.writeZero(padding); - return ctx.writeAndFlush(out, promise); + return ctx.write(out, promise); } catch (RuntimeException e) { return promise.setFailure(e); } finally { @@ -131,23 +131,23 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int padding, boolean endStream) { + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { return writeHeadersInternal(ctx, promise, streamId, headers, padding, endStream, false, 0, (short) 0, false); } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int streamDependency, short weight, - boolean exclusive, int padding, boolean endStream) { + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, boolean exclusive, + int padding, boolean endStream, ChannelPromise promise) { return writeHeadersInternal(ctx, promise, streamId, headers, padding, endStream, true, streamDependency, weight, exclusive); } @Override - public ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int streamDependency, short weight, boolean exclusive) { + public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, + int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { try { verifyStreamId(streamId, "Stream ID"); verifyStreamId(streamDependency, "Stream Dependency"); @@ -161,15 +161,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { // Adjust the weight so that it fits into a single byte on the wire. frame.writeByte(weight - 1); - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } } @Override - public ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, long errorCode) { + public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, + ChannelPromise promise) { try { verifyStreamId(streamId, "Stream ID"); verifyErrorCode(errorCode); @@ -178,15 +178,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { writeFrameHeader(frame, INT_FIELD_LENGTH, RST_STREAM, new Http2Flags(), streamId); writeUnsignedInt(errorCode, frame); - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } } @Override - public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, - Http2Settings settings) { + public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, + ChannelPromise promise) { try { if (settings == null) { throw new NullPointerException("settings"); @@ -198,7 +198,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { writeUnsignedShort(entry.key(), frame); writeUnsignedInt(entry.value(), frame); } - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } @@ -209,15 +209,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { try { ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH); writeFrameHeader(frame, 0, SETTINGS, new Http2Flags().ack(true), 0); - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } } @Override - public ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, boolean ack, - ByteBuf data) { + public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, + ChannelPromise promise) { try { ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + data.readableBytes()); Http2Flags flags = ack ? new Http2Flags().ack(true) : new Http2Flags(); @@ -225,7 +225,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { // Write the debug data. frame.writeBytes(data, data.readerIndex(), data.readableBytes()); - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } finally { @@ -234,8 +234,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { } @Override - public ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int promisedStreamId, Http2Headers headers, int padding) { + public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, + int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { ByteBuf headerBlock = null; try { verifyStreamId(streamId, "Stream ID"); @@ -273,7 +273,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { firstFrame.writeZero(padding); if (headerBlock.readableBytes() == 0) { - return ctx.writeAndFlush(firstFrame, promise); + return ctx.write(firstFrame, promise); } // Create a composite buffer wrapping the first frame and any continuation frames. @@ -288,8 +288,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { } @Override - public ChannelFuture writeGoAway(ChannelHandlerContext ctx, ChannelPromise promise, - int lastStreamId, long errorCode, ByteBuf debugData) { + public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, + ByteBuf debugData, ChannelPromise promise) { try { verifyStreamOrConnectionId(lastStreamId, "Last Stream ID"); verifyErrorCode(errorCode); @@ -300,7 +300,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { frame.writeInt(lastStreamId); writeUnsignedInt(errorCode, frame); frame.writeBytes(debugData, debugData.readerIndex(), debugData.readableBytes()); - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } finally { @@ -309,8 +309,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { } @Override - public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int windowSizeIncrement) { + public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, + int windowSizeIncrement, ChannelPromise promise) { try { verifyStreamOrConnectionId(streamId, "Stream ID"); verifyWindowSizeIncrement(windowSizeIncrement); @@ -319,21 +319,21 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { writeFrameHeader(frame, INT_FIELD_LENGTH, WINDOW_UPDATE, new Http2Flags(), streamId); frame.writeInt(windowSizeIncrement); - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } } @Override - public ChannelFuture writeFrame(ChannelHandlerContext ctx, ChannelPromise promise, - byte frameType, int streamId, Http2Flags flags, ByteBuf payload) { + public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, + Http2Flags flags, ByteBuf payload, ChannelPromise promise) { try { verifyStreamOrConnectionId(streamId, "Stream ID"); ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payload.readableBytes()); writeFrameHeader(frame, payload.readableBytes(), frameType, flags, streamId); frame.writeBytes(payload); - return ctx.writeAndFlush(frame, promise); + return ctx.write(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); } @@ -392,7 +392,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { firstFrame.writeZero(padding); if (flags.endOfHeaders()) { - return ctx.writeAndFlush(firstFrame, promise); + return ctx.write(firstFrame, promise); } // Create a composite buffer wrapping the first frame and any continuation frames. @@ -425,7 +425,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { } out.writerIndex(numBytes); - return ctx.writeAndFlush(out, promise); + return ctx.write(out, promise); } /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java index ec8a67b685..7fe9527f42 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java @@ -16,6 +16,10 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelPromiseAggregator; import java.util.ArrayDeque; import java.util.ArrayList; @@ -57,13 +61,19 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont }; private final Http2Connection connection; + private final Http2FrameWriter frameWriter; private int initialWindowSize = DEFAULT_WINDOW_SIZE; + private ChannelHandlerContext ctx; - public DefaultHttp2OutboundFlowController(Http2Connection connection) { + public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { if (connection == null) { throw new NullPointerException("connection"); } + if (frameWriter == null) { + throw new NullPointerException("frameWriter"); + } this.connection = connection; + this.frameWriter = frameWriter; // Add a flow state for the connection. connection.connectionStream().outboundFlow( @@ -144,36 +154,53 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont OutboundFlowState state = stateOrFail(streamId); state.incrementStreamWindow(delta); state.writeBytes(state.writableWindow()); + flush(); } } @Override - public void sendFlowControlled(int streamId, ByteBuf data, int padding, boolean endStream, - FrameWriter frameWriter) throws Http2Exception { - OutboundFlowState state = stateOrFail(streamId); - OutboundFlowState.Frame frame = - state.newFrame(data, padding, endStream, frameWriter); - - // Limit the window for this write by the maximum frame size. - int window = state.writableWindow(); - - int dataLength = data.readableBytes(); - if (window >= dataLength) { - // Window size is large enough to send entire data frame - frame.write(); - return; + public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endStream, ChannelPromise promise) { + if (ctx == null) { + throw new NullPointerException("ctx"); + } + if (promise == null) { + throw new NullPointerException("promise"); + } + if (data == null) { + throw new NullPointerException("data"); } - // Enqueue the frame to be written when the window size permits. - frame.enqueue(); + // Save the context. We'll use this later when we write pending bytes. + this.ctx = ctx; - if (window <= 0) { - // Stream is stalled, don't send anything now. - return; + try { + OutboundFlowState state = stateOrFail(streamId); + int window = state.writableWindow(); + + OutboundFlowState.Frame frame = state.newFrame(ctx, promise, data, padding, endStream); + if (window >= data.readableBytes()) { + // Window size is large enough to send entire data frame + frame.write(); + ctx.flush(); + return promise; + } + + // Enqueue the frame to be written when the window size permits. + frame.enqueue(); + + if (window <= 0) { + // Stream is stalled, don't send anything now. + return promise; + } + + // Create and send a partial frame up to the window size. + frame.split(window).write(); + ctx.flush(); + } catch (Http2Exception e) { + promise.setFailure(e); } - - // Create and send a partial frame up to the window size. - frame.split(window).write(); + return promise; } private static OutboundFlowState state(Http2Stream stream) { @@ -207,6 +234,15 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont return connectionState().window(); } + /** + * Flushes the {@link ChannelHandlerContext} if we've received any data frames. + */ + private void flush() { + if (ctx != null) { + ctx.flush(); + } + } + /** * Resets the priority bytes for the given subtree following a restructuring of the priority * tree. @@ -236,6 +272,10 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont Http2Stream connectionStream = connection.connectionStream(); int totalAllowance = state(connectionStream).priorityBytes(); writeAllowedBytes(connectionStream, totalAllowance); + + // Optimization: only flush once for all written frames. If it's null, there are no + // data frames to send anyway. + flush(); } /** @@ -444,8 +484,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont /** * Creates a new frame with the given values but does not add it to the pending queue. */ - Frame newFrame(ByteBuf data, int padding, boolean endStream, FrameWriter writer) { - return new Frame(data, padding, endStream, writer); + Frame newFrame(ChannelHandlerContext ctx, ChannelPromise promise, ByteBuf data, + int padding, boolean endStream) { + return new Frame(ctx, new ChannelPromiseAggregator(promise), data, padding, endStream); } /** @@ -528,17 +569,24 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont * A wrapper class around the content of a data frame. */ private final class Frame { - private final ByteBuf data; - private final int padding; - private final boolean endStream; - private final FrameWriter writer; - private boolean enqueued; + final ByteBuf data; + final int padding; + final boolean endStream; + final ChannelHandlerContext ctx; + final ChannelPromiseAggregator promiseAggregator; + final ChannelPromise promise; + boolean enqueued; - Frame(ByteBuf data, int padding, boolean endStream, FrameWriter writer) { + Frame(ChannelHandlerContext ctx, + ChannelPromiseAggregator promiseAggregator, ByteBuf data, int padding, + boolean endStream) { + this.ctx = ctx; this.data = data; this.padding = padding; this.endStream = endStream; - this.writer = writer; + this.promiseAggregator = promiseAggregator; + this.promise = ctx.newPromise(); + promiseAggregator.add(promise); } int size() { @@ -572,19 +620,21 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont * Writes the frame and decrements the stream and connection window sizes. If the frame * is in the pending queue, the written bytes are removed from this branch of the * priority tree. + *

+ * Note: this does not flush the {@link ChannelHandlerContext}. */ void write() throws Http2Exception { // Using a do/while loop because if the buffer is empty we still need to call // the writer once to send the empty frame. do { int bytesToWrite = data.readableBytes(); - int frameBytes = Math.min(bytesToWrite, writer.maxFrameSize()); + int frameBytes = Math.min(bytesToWrite, frameWriter.maxFrameSize()); if (frameBytes == bytesToWrite) { // All the bytes fit into a single HTTP/2 frame, just send it all. connectionState().incrementStreamWindow(-bytesToWrite); incrementStreamWindow(-bytesToWrite); ByteBuf slice = data.readSlice(bytesToWrite); - writer.writeFrame(stream.id(), slice, padding, endStream); + frameWriter.writeData(ctx, stream.id(), slice, padding, endStream, promise); decrementPendingBytes(bytesToWrite); return; } @@ -602,7 +652,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont void writeError(Http2Exception cause) { decrementPendingBytes(data.readableBytes()); data.release(); - writer.setFailure(cause); + promise.setFailure(cause); } /** @@ -617,7 +667,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont Frame split(int maxBytes) { // TODO: Should padding be included in the chunks or only the last frame? maxBytes = min(maxBytes, data.readableBytes()); - Frame frame = new Frame(data.readSlice(maxBytes).retain(), 0, false, writer); + Frame frame = new Frame(ctx, promiseAggregator, data.readSlice(maxBytes).retain(), 0, false); decrementPendingBytes(maxBytes); return frame; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandler.java index 6105e54876..7e92f4c532 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandler.java @@ -52,52 +52,52 @@ public class DelegatingHttp2ConnectionHandler extends AbstractHttp2ConnectionHan } @Override - public ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - ByteBuf data, int padding, boolean endStream) { - return super.writeData(ctx, promise, streamId, data, padding, endStream); + public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endStream, ChannelPromise promise) { + return super.writeData(ctx, streamId, data, padding, endStream, promise); } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int padding, boolean endStream) { - return super.writeHeaders(ctx, promise, streamId, headers, padding, endStream); + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { + return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise); } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int streamDependency, short weight, - boolean exclusive, int padding, boolean endStream) { - return super.writeHeaders(ctx, promise, streamId, headers, streamDependency, weight, - exclusive, padding, endStream); + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, boolean exclusive, + int padding, boolean endStream, ChannelPromise promise) { + return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, + exclusive, padding, endStream, promise); } @Override - public ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int streamDependency, short weight, boolean exclusive) { - return super.writePriority(ctx, promise, streamId, streamDependency, weight, exclusive); + public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, + int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { + return super.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); } @Override - public ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, long errorCode) { - return super.writeRstStream(ctx, promise, streamId, errorCode); + public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, + ChannelPromise promise) { + return super.writeRstStream(ctx, streamId, errorCode, promise); } @Override - public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, - Http2Settings settings) { - return super.writeSettings(ctx, promise, settings); + public ChannelFuture writeSettings(ChannelHandlerContext ctx, + Http2Settings settings, ChannelPromise promise) { + return super.writeSettings(ctx, settings, promise); } @Override - public ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, ByteBuf data) { - return super.writePing(ctx, promise, data); + public ChannelFuture writePing(ChannelHandlerContext ctx, ByteBuf data, ChannelPromise promise) { + return super.writePing(ctx, data, promise); } @Override - public ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int promisedStreamId, Http2Headers headers, int padding) { - return super.writePushPromise(ctx, promise, streamId, promisedStreamId, headers, padding); + public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, + int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { + return super.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandler.java index 78f2148cb2..8fbf114743 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandler.java @@ -177,10 +177,10 @@ public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2Connect ChannelPromise headerPromise = ctx.newPromise(); ChannelPromise dataPromise = ctx.newPromise(); promiseAggregator.add(headerPromise, dataPromise); - writeHeaders(ctx, headerPromise, streamId, http2Headers.build(), 0, false); - writeData(ctx, dataPromise, streamId, httpMsg.content(), 0, true); + writeHeaders(ctx, streamId, http2Headers.build(), 0, false, headerPromise); + writeData(ctx, streamId, httpMsg.content(), 0, true, dataPromise); } else { - writeHeaders(ctx, promise, streamId, http2Headers.build(), 0, true); + writeHeaders(ctx, streamId, http2Headers.build(), 0, true, promise); } } else { ctx.write(msg, promise); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataWriter.java new file mode 100644 index 0000000000..c2e5837011 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataWriter.java @@ -0,0 +1,40 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +/** + * Interface that defines an object capable of writing HTTP/2 data frames. + */ +public interface Http2DataWriter { + + /** + * Writes a {@code DATA} frame to the remote endpoint. + * + * @param ctx the context to use for writing. + * @param streamId the stream for which to send the frame. + * @param data the payload of the frame. + * @param padding the amount of padding to be added to the end of the frame + * @param endStream indicates if this is the last frame to be sent for the stream. + * @param promise the promise for the write. + * @return the future for the write. + */ + ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, + ByteBuf data, int padding, boolean endStream, ChannelPromise promise); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameWriter.java index 07067e9b82..3f94d13e9f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameWriter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameWriter.java @@ -23,43 +23,30 @@ import io.netty.channel.ChannelPromise; import java.io.Closeable; /** - * A writer responsible for marshalling HTTP/2 frames to the channel. + * A writer responsible for marshalling HTTP/2 frames to the channel. All of the write methods in + * this interface write to the context, but DO NOT FLUSH. To perform a flush, you must separately + * call {@link ChannelHandlerContext#flush()}. */ -public interface Http2FrameWriter extends Closeable { - - /** - * Writes a DATA frame to the remote endpoint. - * - * @param ctx the context to use for writing. - * @param promise the promise for the write. - * @param streamId the stream for which to send the frame. - * @param data the payload of the frame. - * @param padding the amount of padding to be added to the end of the frame - * @param endStream indicates if this is the last frame to be sent for the stream. - * @return the future for the write. - */ - ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - ByteBuf data, int padding, boolean endStream); +public interface Http2FrameWriter extends Http2DataWriter, Closeable { /** * Writes a HEADERS frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param streamId the stream for which to send the frame. * @param headers the headers to be sent. * @param padding the amount of padding to be added to the end of the frame * @param endStream indicates if this is the last frame to be sent for the stream. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - Http2Headers headers, int padding, boolean endStream); + ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int padding, boolean endStream, ChannelPromise promise); /** * Writes a HEADERS frame with priority specified to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param streamId the stream for which to send the frame. * @param headers the headers to be sent. * @param streamDependency the stream on which this stream should depend, or 0 if it should @@ -68,49 +55,50 @@ public interface Http2FrameWriter extends Closeable { * @param exclusive whether this stream should be the exclusive dependant of its parent. * @param padding the amount of padding to be added to the end of the frame * @param endStream indicates if this is the last frame to be sent for the stream. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - Http2Headers headers, int streamDependency, short weight, boolean exclusive, - int padding, boolean endStream); + ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, + ChannelPromise promise); /** * Writes a PRIORITY frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param streamId the stream for which to send the frame. * @param streamDependency the stream on which this stream should depend, or 0 if it should * depend on the connection. * @param weight the weight for this stream. * @param exclusive whether this stream should be the exclusive dependant of its parent. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - int streamDependency, short weight, boolean exclusive); + ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, + short weight, boolean exclusive, ChannelPromise promise); /** * Writes a RST_STREAM frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param streamId the stream for which to send the frame. * @param errorCode the error code indicating the nature of the failure. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - long errorCode); + ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, + ChannelPromise promise); /** * Writes a SETTINGS frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param settings the settings to be sent. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, - Http2Settings settings); + ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, + ChannelPromise promise); /** * Writes a SETTINGS acknowledgment to the remote endpoint. @@ -125,68 +113,68 @@ public interface Http2FrameWriter extends Closeable { * Writes a PING frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param ack indicates whether this is an ack of a PING frame previously received from the * remote endpoint. * @param data the payload of the frame. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, boolean ack, - ByteBuf data); + ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, + ChannelPromise promise); /** * Writes a PUSH_PROMISE frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param streamId the stream for which to send the frame. * @param promisedStreamId the ID of the promised stream. * @param headers the headers to be sent. * @param padding the amount of padding to be added to the end of the frame + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - int promisedStreamId, Http2Headers headers, int padding); + ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, + Http2Headers headers, int padding, ChannelPromise promise); /** * Writes a GO_AWAY frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param lastStreamId the last known stream of this endpoint. * @param errorCode the error code, if the connection was abnormally terminated. * @param debugData application-defined debug data. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writeGoAway(ChannelHandlerContext ctx, ChannelPromise promise, int lastStreamId, - long errorCode, ByteBuf debugData); + ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, + ByteBuf debugData, ChannelPromise promise); /** * Writes a WINDOW_UPDATE frame to the remote endpoint. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param streamId the stream for which to send the frame. * @param windowSizeIncrement the number of bytes by which the local inbound flow control window * is increasing. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int windowSizeIncrement); + ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, + int windowSizeIncrement, ChannelPromise promise); /** * Generic write method for any HTTP/2 frame. This allows writing of non-standard frames. * * @param ctx the context to use for writing. - * @param promise the promise for the write. * @param frameType the frame type identifier. * @param streamId the stream for which to send the frame. * @param flags the flags to write for this frame. * @param payload the payload to write for this frame. + * @param promise the promise for the write. * @return the future for the write. */ - ChannelFuture writeFrame(ChannelHandlerContext ctx, ChannelPromise promise, byte frameType, - int streamId, Http2Flags flags, ByteBuf payload); + ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, + Http2Flags flags, ByteBuf payload, ChannelPromise promise); /** * Closes this writer and frees any allocated resources. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java index 99b9d72a8f..e6d839c6ae 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java @@ -37,8 +37,6 @@ import javax.net.ssl.SSLEngine; */ public abstract class Http2OrHttpChooser extends ByteToMessageDecoder { - // TODO: Replace with generic NPN handler - public enum SelectedProtocol { /** Must be updated to match the HTTP/2 draft number. */ HTTP_2(TLS_UPGRADE_PROTOCOL_NAME), diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFlowController.java index 6629512d59..d9b17b8f8c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFlowController.java @@ -16,33 +16,28 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + /** * Controls the outbound flow of data frames to the remote endpoint. */ -public interface Http2OutboundFlowController { +public interface Http2OutboundFlowController extends Http2DataWriter { /** - * Interface that abstracts the writing of frames to the remote endpoint. + * Controls the flow-controlled writing of a DATA frame to the remote endpoint. There is no + * guarantee when the data will be written or whether it will be split into multiple frames + * before sending. The returned future will only be completed once all of the data has been + * successfully written to the remote endpoint. + *

+ * Manually flushing the {@link ChannelHandlerContext} is not required, since the flow + * controller will flush as appropriate. */ - interface FrameWriter { - - /** - * Writes a single data frame to the remote endpoint. - */ - void writeFrame(int streamId, ByteBuf data, int padding, boolean endStream); - - /** - * Called if an error occurred before the write could take place. Sets the failure on the - * channel promise. - */ - void setFailure(Throwable cause); - - /** - * Gets the maximum allowed frame size. - */ - int maxFrameSize(); - } + @Override + ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endStream, ChannelPromise promise); /** * Sets the initial size of the connection's outbound flow control window. The outbound flow @@ -67,25 +62,4 @@ public interface Http2OutboundFlowController { * @throws Http2Exception thrown if a protocol-related error occurred. */ void updateOutboundWindowSize(int streamId, int deltaWindowSize) throws Http2Exception; - - /** - * Sends the frame with outbound flow control applied. The frame may be written at a later time, - * depending on whether the remote endpoint can receive the frame now. - *

- * Data frame flow control processing requirements: - *

- * Sender must not send a data frame with data length greater than the transfer window size. - * After sending each data frame, the stream's transfer window size is decremented by the amount - * of data transmitted. When the window size becomes less than or equal to 0, the sender must - * pause transmitting data frames. - * - * @param streamId the ID of the stream on which the data is to be sent. - * @param data the data be be sent to the remote endpoint. - * @param padding the number of bytes of padding to be added to the frame. - * @param endStream indicates whether this frames is to be the last sent on this stream. - * @param frameWriter peforms to the write of the frame to the remote endpoint. - * @throws Http2Exception thrown if a protocol-related error occurred. - */ - void sendFlowControlled(int streamId, ByteBuf data, int padding, boolean endStream, - FrameWriter frameWriter) throws Http2Exception; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java index 911e6eaf74..d84df02de0 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java @@ -42,47 +42,47 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter { } @Override - public ChannelFuture writeData(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, - ByteBuf data, int padding, boolean endStream) { + public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endStream, ChannelPromise promise) { logger.logData(OUTBOUND, streamId, data, padding, endStream); - return writer.writeData(ctx, promise, streamId, data, padding, endStream); + return writer.writeData(ctx, streamId, data, padding, endStream, promise); } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int padding, boolean endStream) { + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { logger.logHeaders(OUTBOUND, streamId, headers, padding, endStream); - return writer.writeHeaders(ctx, promise, streamId, headers, padding, endStream); + return writer.writeHeaders(ctx, streamId, headers, padding, endStream, promise); } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, Http2Headers headers, int streamDependency, short weight, - boolean exclusive, int padding, boolean endStream) { + public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, boolean exclusive, + int padding, boolean endStream, ChannelPromise promise) { logger.logHeaders(OUTBOUND, streamId, headers, streamDependency, weight, exclusive, padding, endStream); - return writer.writeHeaders(ctx, promise, streamId, headers, streamDependency, weight, - exclusive, padding, endStream); + return writer.writeHeaders(ctx, streamId, headers, streamDependency, weight, + exclusive, padding, endStream, promise); } @Override - public ChannelFuture writePriority(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int streamDependency, short weight, boolean exclusive) { + public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, + int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { logger.logPriority(OUTBOUND, streamId, streamDependency, weight, exclusive); - return writer.writePriority(ctx, promise, streamId, streamDependency, weight, exclusive); + return writer.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); } @Override - public ChannelFuture writeRstStream(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, long errorCode) { - return writer.writeRstStream(ctx, promise, streamId, errorCode); + public ChannelFuture writeRstStream(ChannelHandlerContext ctx, + int streamId, long errorCode, ChannelPromise promise) { + return writer.writeRstStream(ctx, streamId, errorCode, promise); } @Override - public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, - Http2Settings settings) { + public ChannelFuture writeSettings(ChannelHandlerContext ctx, + Http2Settings settings, ChannelPromise promise) { logger.logSettings(OUTBOUND, settings); - return writer.writeSettings(ctx, promise, settings); + return writer.writeSettings(ctx, settings, promise); } @Override @@ -92,38 +92,38 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter { } @Override - public ChannelFuture writePing(ChannelHandlerContext ctx, ChannelPromise promise, boolean ack, - ByteBuf data) { + public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, + ByteBuf data, ChannelPromise promise) { logger.logPing(OUTBOUND, data); - return writer.writePing(ctx, promise, ack, data); + return writer.writePing(ctx, ack, data, promise); } @Override - public ChannelFuture writePushPromise(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int promisedStreamId, Http2Headers headers, int padding) { + public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, + int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { logger.logPushPromise(OUTBOUND, streamId, promisedStreamId, headers, padding); - return writer.writePushPromise(ctx, promise, streamId, promisedStreamId, headers, padding); + return writer.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); } @Override - public ChannelFuture writeGoAway(ChannelHandlerContext ctx, ChannelPromise promise, - int lastStreamId, long errorCode, ByteBuf debugData) { + public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, + ByteBuf debugData, ChannelPromise promise) { logger.logGoAway(OUTBOUND, lastStreamId, errorCode, debugData); - return writer.writeGoAway(ctx, promise, lastStreamId, errorCode, debugData); + return writer.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); } @Override - public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, ChannelPromise promise, - int streamId, int windowSizeIncrement) { + public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, + int streamId, int windowSizeIncrement, ChannelPromise promise) { logger.logWindowsUpdate(OUTBOUND, streamId, windowSizeIncrement); - return writer.writeWindowUpdate(ctx, promise, streamId, windowSizeIncrement); + return writer.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); } @Override - public ChannelFuture writeFrame(ChannelHandlerContext ctx, ChannelPromise promise, - byte frameType, int streamId, Http2Flags flags, ByteBuf payload) { + public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, + Http2Flags flags, ByteBuf payload, ChannelPromise promise) { logger.logUnknownFrame(OUTBOUND, frameType, streamId, flags, payload); - return writer.writeFrame(ctx, promise, frameType, streamId, flags, payload); + return writer.writeFrame(ctx, frameType, streamId, flags, payload, promise); } @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java index 187a74b631..992f8f4b11 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2FrameIOTest.java @@ -67,7 +67,7 @@ public class DefaultHttp2FrameIOTest { @Test public void emptyDataShouldRoundtrip() throws Exception { ByteBuf data = Unpooled.EMPTY_BUFFER; - writer.writeData(ctx, promise, 1000, data, 0, false); + writer.writeData(ctx, 1000, data, 0, false, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -78,7 +78,7 @@ public class DefaultHttp2FrameIOTest { @Test public void dataShouldRoundtrip() throws Exception { ByteBuf data = dummyData(); - writer.writeData(ctx, promise, 1000, data.retain().duplicate(), 0, false); + writer.writeData(ctx, 1000, data.retain().duplicate(), 0, false, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -89,7 +89,7 @@ public class DefaultHttp2FrameIOTest { @Test public void dataWithPaddingShouldRoundtrip() throws Exception { ByteBuf data = dummyData(); - writer.writeData(ctx, promise, 1, data.retain().duplicate(), 0xFF, true); + writer.writeData(ctx, 1, data.retain().duplicate(), 0xFF, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -99,7 +99,7 @@ public class DefaultHttp2FrameIOTest { @Test public void priorityShouldRoundtrip() throws Exception { - writer.writePriority(ctx, promise, 1, 2, (short) 255, true); + writer.writePriority(ctx, 1, 2, (short) 255, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -109,7 +109,7 @@ public class DefaultHttp2FrameIOTest { @Test public void rstStreamShouldRoundtrip() throws Exception { - writer.writeRstStream(ctx, promise, 1, MAX_UNSIGNED_INT); + writer.writeRstStream(ctx, 1, MAX_UNSIGNED_INT, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -119,7 +119,7 @@ public class DefaultHttp2FrameIOTest { @Test public void emptySettingsShouldRoundtrip() throws Exception { - writer.writeSettings(ctx, promise, new Http2Settings()); + writer.writeSettings(ctx, new Http2Settings(), promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -135,7 +135,7 @@ public class DefaultHttp2FrameIOTest { settings.initialWindowSize(123); settings.maxConcurrentStreams(456); - writer.writeSettings(ctx, promise, settings); + writer.writeSettings(ctx, settings, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -156,7 +156,7 @@ public class DefaultHttp2FrameIOTest { @Test public void pingShouldRoundtrip() throws Exception { ByteBuf data = dummyData(); - writer.writePing(ctx, promise, false, data.retain().duplicate()); + writer.writePing(ctx, false, data.retain().duplicate(), promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -167,7 +167,7 @@ public class DefaultHttp2FrameIOTest { @Test public void pingAckShouldRoundtrip() throws Exception { ByteBuf data = dummyData(); - writer.writePing(ctx, promise, true, data.retain().duplicate()); + writer.writePing(ctx, true, data.retain().duplicate(), promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); @@ -178,7 +178,7 @@ public class DefaultHttp2FrameIOTest { @Test public void goAwayShouldRoundtrip() throws Exception { ByteBuf data = dummyData(); - writer.writeGoAway(ctx, promise, 1, MAX_UNSIGNED_INT, data.retain().duplicate()); + writer.writeGoAway(ctx, 1, MAX_UNSIGNED_INT, data.retain().duplicate(), promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onGoAwayRead(eq(ctx), eq(1), eq(MAX_UNSIGNED_INT), eq(data)); @@ -187,7 +187,7 @@ public class DefaultHttp2FrameIOTest { @Test public void windowUpdateShouldRoundtrip() throws Exception { - writer.writeWindowUpdate(ctx, promise, 1, Integer.MAX_VALUE); + writer.writeWindowUpdate(ctx, 1, Integer.MAX_VALUE, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onWindowUpdateRead(eq(ctx), eq(1), eq(Integer.MAX_VALUE)); @@ -197,7 +197,7 @@ public class DefaultHttp2FrameIOTest { @Test public void emptyHeadersShouldRoundtrip() throws Exception { Http2Headers headers = Http2Headers.EMPTY_HEADERS; - writer.writeHeaders(ctx, promise, 1, headers, 0, true); + writer.writeHeaders(ctx, 1, headers, 0, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); @@ -207,7 +207,7 @@ public class DefaultHttp2FrameIOTest { @Test public void emptyHeadersWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = Http2Headers.EMPTY_HEADERS; - writer.writeHeaders(ctx, promise, 1, headers, 0xFF, true); + writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true)); @@ -217,7 +217,7 @@ public class DefaultHttp2FrameIOTest { @Test public void headersWithoutPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); - writer.writeHeaders(ctx, promise, 1, headers, 0, true); + writer.writeHeaders(ctx, 1, headers, 0, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0), eq(true)); @@ -227,7 +227,7 @@ public class DefaultHttp2FrameIOTest { @Test public void headersWithPaddingWithoutPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); - writer.writeHeaders(ctx, promise, 1, headers, 0xFF, true); + writer.writeHeaders(ctx, 1, headers, 0xFF, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(0xFF), eq(true)); @@ -237,7 +237,7 @@ public class DefaultHttp2FrameIOTest { @Test public void headersWithPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); - writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0, true); + writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), @@ -248,7 +248,7 @@ public class DefaultHttp2FrameIOTest { @Test public void headersWithPaddingWithPriorityShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); - writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0xFF, true); + writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF), @@ -259,7 +259,7 @@ public class DefaultHttp2FrameIOTest { @Test public void continuedHeadersShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); - writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0, true); + writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0), @@ -270,7 +270,7 @@ public class DefaultHttp2FrameIOTest { @Test public void continuedHeadersWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); - writer.writeHeaders(ctx, promise, 1, headers, 2, (short) 3, true, 0xFF, true); + writer.writeHeaders(ctx, 1, headers, 2, (short) 3, true, 0xFF, true, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onHeadersRead(eq(ctx), eq(1), eq(headers), eq(2), eq((short) 3), eq(true), eq(0xFF), @@ -281,7 +281,7 @@ public class DefaultHttp2FrameIOTest { @Test public void emptypushPromiseShouldRoundtrip() throws Exception { Http2Headers headers = Http2Headers.EMPTY_HEADERS; - writer.writePushPromise(ctx, promise, 1, 2, headers, 0); + writer.writePushPromise(ctx, 1, 2, headers, 0, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); @@ -291,7 +291,7 @@ public class DefaultHttp2FrameIOTest { @Test public void pushPromiseShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); - writer.writePushPromise(ctx, promise, 1, 2, headers, 0); + writer.writePushPromise(ctx, 1, 2, headers, 0, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); @@ -301,7 +301,7 @@ public class DefaultHttp2FrameIOTest { @Test public void pushPromiseWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = dummyHeaders(); - writer.writePushPromise(ctx, promise, 1, 2, headers, 0xFF); + writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF)); @@ -311,7 +311,7 @@ public class DefaultHttp2FrameIOTest { @Test public void continuedPushPromiseShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); - writer.writePushPromise(ctx, promise, 1, 2, headers, 0); + writer.writePushPromise(ctx, 1, 2, headers, 0, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0)); @@ -321,7 +321,7 @@ public class DefaultHttp2FrameIOTest { @Test public void continuedPushPromiseWithPaddingShouldRoundtrip() throws Exception { Http2Headers headers = largeHeaders(); - writer.writePushPromise(ctx, promise, 1, 2, headers, 0xFF); + writer.writePushPromise(ctx, 1, 2, headers, 0xFF, promise); ByteBuf frame = captureWrite(); reader.readFrame(ctx, frame, observer); verify(observer).onPushPromiseRead(eq(ctx), eq(1), eq(2), eq(headers), eq(0xFF)); @@ -330,7 +330,7 @@ public class DefaultHttp2FrameIOTest { private ByteBuf captureWrite() { ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); - verify(ctx).writeAndFlush(captor.capture(), eq(promise)); + verify(ctx).write(captor.capture(), eq(promise)); return captor.getValue(); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java index 5cf1fcc01a..efa647443e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java @@ -15,10 +15,22 @@ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http2.Http2OutboundFlowController.FrameWriter; -import io.netty.util.CharsetUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import org.junit.Before; import org.junit.Test; @@ -26,11 +38,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import static io.netty.handler.codec.http2.Http2CodecUtil.*; -import static io.netty.util.CharsetUtil.UTF_8; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - /** * Tests for {@link DefaultHttp2OutboundFlowController}. */ @@ -46,7 +53,13 @@ public class DefaultHttp2OutboundFlowControllerTest { private ByteBuf buffer; @Mock - private FrameWriter frameWriter; + private Http2FrameWriter frameWriter; + + @Mock + private ChannelHandlerContext ctx; + + @Mock + private ChannelPromise promise; private DefaultHttp2Connection connection; @@ -54,8 +67,10 @@ public class DefaultHttp2OutboundFlowControllerTest { public void setup() throws Http2Exception { MockitoAnnotations.initMocks(this); + when(ctx.newPromise()).thenReturn(promise); + connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2OutboundFlowController(connection); + controller = new DefaultHttp2OutboundFlowController(connection, frameWriter); connection.local().createStream(STREAM_A, false); connection.local().createStream(STREAM_B, false); @@ -159,8 +174,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void connectionWindowUpdateShouldSendFrame() throws Http2Exception { // Set the connection window size to zero. - controller - .updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); ByteBuf data = dummyData(10); send(STREAM_A, data.slice()); @@ -252,8 +266,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception { // Block the connection - controller - .updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); // Block stream A controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_WINDOW_SIZE); @@ -304,8 +317,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception { // Block the connection - controller - .updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); // Block stream B controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE); @@ -347,8 +359,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void parentShouldWaterFallDataToChildren() throws Http2Exception { // Block the connection - controller - .updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); // Block stream B controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE); @@ -409,8 +420,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception { // Block the connection - controller - .updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); // Block stream B controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_WINDOW_SIZE); @@ -454,8 +464,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void writeShouldPreferHighestWeight() throws Http2Exception { // Block the connection - controller - .updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); // Root the streams at the connection and assign weights. setPriority(STREAM_A, 0, (short) 50, false); @@ -518,8 +527,7 @@ public class DefaultHttp2OutboundFlowControllerTest { @Test public void samePriorityShouldWriteEqualData() throws Http2Exception { // Block the connection - controller - .updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE); // Root the streams at the connection with the same weights. setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false); @@ -560,20 +568,20 @@ public class DefaultHttp2OutboundFlowControllerTest { } private void send(int streamId, ByteBuf data) throws Http2Exception { - controller.sendFlowControlled(streamId, data, 0, false, frameWriter); + controller.writeData(ctx, streamId, data, 0, false, promise); } private void verifyWrite(int streamId, ByteBuf data) { - verify(frameWriter).writeFrame(eq(streamId), eq(data), eq(0), eq(false)); + verify(frameWriter).writeData(eq(ctx), eq(streamId), eq(data), eq(0), eq(false), eq(promise)); } private void verifyNoWrite(int streamId) { - verify(frameWriter, never()).writeFrame(eq(streamId), any(ByteBuf.class), anyInt(), - anyBoolean()); + verify(frameWriter, never()).writeData(eq(ctx), eq(streamId), any(ByteBuf.class), anyInt(), + anyBoolean(), eq(promise)); } private void captureWrite(int streamId, ArgumentCaptor captor, boolean endStream) { - verify(frameWriter).writeFrame(eq(streamId), captor.capture(), eq(0), eq(endStream)); + verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(0), eq(endStream), eq(promise)); } private void setPriority(int stream, int parent, int weight, boolean exclusive) diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java index 95d26e6d4e..9aa5f69ea0 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java @@ -131,9 +131,9 @@ public class DelegatingHttp2ConnectionHandlerTest { when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); - when(writer.writeSettings(eq(ctx), eq(promise), any(Http2Settings.class))).thenReturn( + when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn( future); - when(writer.writeGoAway(eq(ctx), eq(promise), anyInt(), anyInt(), any(ByteBuf.class))) + when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))) .thenReturn(future); mockContext(); @@ -158,7 +158,7 @@ public class DelegatingHttp2ConnectionHandlerTest { when(reader.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE); when(writer.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE); handler.handlerAdded(ctx); - verify(writer).writeSettings(eq(ctx), eq(promise), eq(settings)); + verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise)); // Simulate receiving the initial settings from the remote endpoint. decode().onSettingsRead(ctx, new Http2Settings()); @@ -220,8 +220,8 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void closeShouldSendGoAway() throws Exception { handler.close(ctx, promise); - verify(writer).writeGoAway(eq(ctx), eq(promise), eq(0), eq((long) NO_ERROR.code()), - eq(EMPTY_BUFFER)); + verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) NO_ERROR.code()), + eq(EMPTY_BUFFER), eq(promise)); verify(remote).goAwayReceived(0); } @@ -236,8 +236,8 @@ public class DelegatingHttp2ConnectionHandlerTest { Http2Exception e = new Http2StreamException(STREAM_ID, PROTOCOL_ERROR); handler.exceptionCaught(ctx, e); verify(stream).close(); - verify(writer).writeRstStream(eq(ctx), eq(promise), eq(STREAM_ID), - eq((long) PROTOCOL_ERROR.code())); + verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), + eq((long) PROTOCOL_ERROR.code()), eq(promise)); } @Test @@ -246,8 +246,8 @@ public class DelegatingHttp2ConnectionHandlerTest { when(remote.lastStreamCreated()).thenReturn(STREAM_ID); handler.exceptionCaught(ctx, e); verify(remote).goAwayReceived(STREAM_ID); - verify(writer).writeGoAway(eq(ctx), eq(promise), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), - eq(EMPTY_BUFFER)); + verify(writer).writeGoAway(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), + eq(EMPTY_BUFFER), eq(promise)); } @Test @@ -403,7 +403,7 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void pingReadShouldReplyWithAck() throws Exception { decode().onPingRead(ctx, emptyPingBuf()); - verify(writer).writePing(eq(ctx), eq(promise), eq(true), eq(emptyPingBuf())); + verify(writer).writePing(eq(ctx), eq(true), eq(emptyPingBuf()), eq(promise)); verify(observer, never()).onPingAckRead(eq(ctx), any(ByteBuf.class)); } @@ -450,127 +450,126 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void dataWriteAfterGoAwayShouldFail() throws Exception { when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writeData(ctx, promise, STREAM_ID, dummyData(), 0, false); + ChannelFuture future = handler.writeData(ctx, STREAM_ID, dummyData(), 0, false, promise); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); } @Test public void dataWriteShouldSucceed() throws Exception { - handler.writeData(ctx, promise, STREAM_ID, dummyData(), 0, false); - verify(outboundFlow).sendFlowControlled(eq(STREAM_ID), eq(dummyData()), eq(0), - eq(false), any(Http2OutboundFlowController.FrameWriter.class)); + handler.writeData(ctx, STREAM_ID, dummyData(), 0, false, promise); + verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(false), eq(promise)); } @Test public void headersWriteAfterGoAwayShouldFail() throws Exception { when(connection.isGoAway()).thenReturn(true); ChannelFuture future = handler.writeHeaders( - ctx, promise, 5, EMPTY_HEADERS, 0, (short) 255, false, 0, false); + ctx, 5, EMPTY_HEADERS, 0, (short) 255, false, 0, false, promise); verify(local, never()).createStream(anyInt(), anyBoolean()); - verify(writer, never()).writeHeaders(eq(ctx), eq(promise), anyInt(), - any(Http2Headers.class), anyInt(), anyBoolean()); + verify(writer, never()).writeHeaders(eq(ctx), anyInt(), + any(Http2Headers.class), anyInt(), anyBoolean(), eq(promise)); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); } @Test public void headersWriteForUnknownStreamShouldCreateStream() throws Exception { when(local.createStream(eq(5), eq(false))).thenReturn(stream); - handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, false); + handler.writeHeaders(ctx, 5, EMPTY_HEADERS, 0, false, promise); verify(local).createStream(eq(5), eq(false)); - verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false)); + verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); } @Test public void headersWriteShouldCreateHalfClosedStream() throws Exception { when(local.createStream(eq(5), eq(true))).thenReturn(stream); - handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, true); + handler.writeHeaders(ctx, 5, EMPTY_HEADERS, 0, true, promise); verify(local).createStream(eq(5), eq(true)); - verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true)); + verify(writer).writeHeaders(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)); } @Test public void headersWriteShouldOpenStreamForPush() throws Exception { when(stream.state()).thenReturn(RESERVED_LOCAL); - handler.writeHeaders(ctx, promise, STREAM_ID, EMPTY_HEADERS, 0, false); + handler.writeHeaders(ctx, STREAM_ID, EMPTY_HEADERS, 0, false, promise); verify(stream).openForPush(); verify(stream, never()).closeLocalSide(); - verify(writer).writeHeaders(eq(ctx), eq(promise), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false)); + verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); } @Test public void headersWriteShouldClosePushStream() throws Exception { when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL); - handler.writeHeaders(ctx, promise, STREAM_ID, EMPTY_HEADERS, 0, true); + handler.writeHeaders(ctx, STREAM_ID, EMPTY_HEADERS, 0, true, promise); verify(stream).openForPush(); verify(stream).closeLocalSide(); - verify(writer).writeHeaders(eq(ctx), eq(promise), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true)); + verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)); } @Test public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception { when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writePushPromise(ctx, promise, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0); + ChannelFuture future = handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0, promise); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); } @Test public void pushPromiseWriteShouldReserveStream() throws Exception { - handler.writePushPromise(ctx, promise, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0); + handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EMPTY_HEADERS, 0, promise); verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream)); - verify(writer).writePushPromise(eq(ctx), eq(promise), eq(STREAM_ID), eq(PUSH_STREAM_ID), - eq(EMPTY_HEADERS), eq(0)); + verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), + eq(EMPTY_HEADERS), eq(0), eq(promise)); } @Test public void priorityWriteAfterGoAwayShouldFail() throws Exception { when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writePriority(ctx, promise, STREAM_ID, 0, (short) 255, true); + ChannelFuture future = handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); } @Test public void priorityWriteShouldSetPriorityForStream() throws Exception { - handler.writePriority(ctx, promise, STREAM_ID, 0, (short) 255, true); + handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); - verify(writer).writePriority(eq(ctx), eq(promise), eq(STREAM_ID), eq(0), eq((short) 255), - eq(true)); + verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), + eq(true), eq(promise)); } @Test public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception { - handler.writeRstStream(ctx, promise, 5, PROTOCOL_ERROR.code()); - verify(writer, never()).writeRstStream(eq(ctx), eq(promise), anyInt(), anyLong()); + handler.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise); + verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise)); } @Test public void rstStreamWriteShouldCloseStream() throws Exception { - handler.writeRstStream(ctx, promise, STREAM_ID, PROTOCOL_ERROR.code()); + handler.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise); verify(stream).close(); - verify(writer).writeRstStream(eq(ctx), eq(promise), eq(STREAM_ID), - eq((long) PROTOCOL_ERROR.code())); + verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), + eq((long) PROTOCOL_ERROR.code()), eq(promise)); } @Test public void pingWriteAfterGoAwayShouldFail() throws Exception { when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writePing(ctx, promise, emptyPingBuf()); + ChannelFuture future = handler.writePing(ctx, emptyPingBuf(), promise); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); } @Test public void pingWriteShouldSucceed() throws Exception { - handler.writePing(ctx, promise, emptyPingBuf()); - verify(writer).writePing(eq(ctx), eq(promise), eq(false), eq(emptyPingBuf())); + handler.writePing(ctx, emptyPingBuf(), promise); + verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise)); } @Test public void settingsWriteAfterGoAwayShouldFail() throws Exception { when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writeSettings(ctx, promise, new Http2Settings()); + ChannelFuture future = handler.writeSettings(ctx, new Http2Settings(), promise); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); } @@ -581,8 +580,8 @@ public class DelegatingHttp2ConnectionHandlerTest { settings.pushEnabled(false); settings.maxConcurrentStreams(1000); settings.headerTableSize(2000); - handler.writeSettings(ctx, promise, settings); - verify(writer).writeSettings(eq(ctx), eq(promise), eq(settings)); + handler.writeSettings(ctx, settings, promise); + verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise)); // Verify that application of local settings must not be done when it is dispatched. verify(inboundFlow, never()).initialInboundWindowSize(eq(100)); verify(local, never()).allowPushTo(eq(false)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java index 308f7ca752..8175831000 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java @@ -118,12 +118,12 @@ public class Http2ConnectionRoundtripTest { @Override public void run() { for (int i = 0, nextStream = 3; i < NUM_STREAMS; ++i, nextStream += 2) { - http2Client.writeHeaders( - ctx(), newPromise(), nextStream, headers, 0, (short) 16, false, 0, false); - http2Client.writePing(ctx(), newPromise(), Unpooled.copiedBuffer(pingMsg.getBytes())); - http2Client.writeData( - ctx(), newPromise(), nextStream, - Unpooled.copiedBuffer(text.getBytes()), 0, true); + http2Client.writeHeaders(ctx(), nextStream, headers, 0, (short) 16, false, 0, + false, newPromise()); + http2Client.writePing(ctx(), Unpooled.copiedBuffer(pingMsg.getBytes()), + newPromise()); + http2Client.writeData(ctx(), nextStream, + Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise()); } } }); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java index 598e8dfef1..c28f332532 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java @@ -118,8 +118,9 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeData(ctx(), newPromise(), 0x7FFFFFFF, - Unpooled.copiedBuffer(text.getBytes()), 100, true); + frameWriter.writeData(ctx(), 0x7FFFFFFF, + Unpooled.copiedBuffer(text.getBytes()), 100, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -135,7 +136,8 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 0, true); + frameWriter.writeHeaders(ctx(), 0x7FFFFFFF, headers, 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -151,8 +153,9 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 4, (short) 255, - true, 0, true); + frameWriter.writeHeaders(ctx(), 0x7FFFFFFF, headers, 4, (short) 255, + true, 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -166,8 +169,9 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeGoAway(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL, - Unpooled.copiedBuffer(text.getBytes())); + frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, + Unpooled.copiedBuffer(text.getBytes()), newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -181,7 +185,8 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writePing(ctx(), ctx().newPromise(), true, buf); + frameWriter.writePing(ctx(), true, buf, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -194,7 +199,8 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writePriority(ctx(), newPromise(), 0x7FFFFFFF, 1, (short) 1, true); + frameWriter.writePriority(ctx(), 0x7FFFFFFF, 1, (short) 1, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -210,7 +216,8 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writePushPromise(ctx(), newPromise(), 0x7FFFFFFF, 1, headers, 5); + frameWriter.writePushPromise(ctx(), 0x7FFFFFFF, 1, headers, 5, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -223,7 +230,8 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeRstStream(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL); + frameWriter.writeRstStream(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -240,7 +248,8 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeSettings(ctx(), newPromise(), settings); + frameWriter.writeSettings(ctx(), settings, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -252,7 +261,8 @@ public class Http2FrameRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeWindowUpdate(ctx(), newPromise(), 0x7FFFFFFF, 0x7FFFFFFF); + frameWriter.writeWindowUpdate(ctx(), 0x7FFFFFFF, 0x7FFFFFFF, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -273,10 +283,11 @@ public class Http2FrameRoundtripTest { @Override public void run() { for (int i = 1; i < numStreams + 1; ++i) { - frameWriter.writeHeaders(ctx(), newPromise(), i, headers, 0, (short) 16, false, - 0, false); - frameWriter.writeData(ctx(), newPromise(), i, - Unpooled.copiedBuffer(text.getBytes()), 0, true); + frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false, + 0, false, newPromise()); + frameWriter.writeData(ctx(), i, + Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise()); + ctx().flush(); } } }); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java index 0772a3e93e..4b58719878 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java @@ -145,7 +145,8 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -168,9 +169,10 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise()); + frameWriter.writeData(ctx(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -199,11 +201,12 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text2.getBytes()), 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise()); + frameWriter.writeData(ctx(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, false, newPromise()); + frameWriter.writeData(ctx(), 3, + Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -232,13 +235,13 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise()); + frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, false, + newPromise()); + frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, false, + newPromise()); + frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -272,10 +275,11 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false); - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise()); + frameWriter.writeHeaders(ctx(), 3, http2Headers2, 0, false, newPromise()); + frameWriter.writeData(ctx(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -309,10 +313,11 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, false); - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise()); + frameWriter.writeData(ctx(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, false, newPromise()); + frameWriter.writeHeaders(ctx(), 3, http2Headers2, 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -353,12 +358,13 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false); - frameWriter.writePushPromise(ctx(), newPromise(), 3, 5, http2Headers2, 0); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, true); - frameWriter.writeData(ctx(), newPromise(), 5, - Unpooled.copiedBuffer(text2.getBytes()), 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise()); + frameWriter.writePushPromise(ctx(), 3, 5, http2Headers2, 0, newPromise()); + frameWriter.writeData(ctx(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise()); + frameWriter.writeData(ctx(), 5, + Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); @@ -399,13 +405,14 @@ public class InboundHttp2ToHttpAdapterTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false); - frameWriter.writeHeaders(ctx(), newPromise(), 5, http2Headers2, 0, false); - frameWriter.writePriority(ctx(), newPromise(), 5, 3, (short) 256, true); - frameWriter.writeData(ctx(), newPromise(), 3, - Unpooled.copiedBuffer(text.getBytes()), 0, true); - frameWriter.writeData(ctx(), newPromise(), 5, - Unpooled.copiedBuffer(text2.getBytes()), 0, true); + frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise()); + frameWriter.writeHeaders(ctx(), 5, http2Headers2, 0, false, newPromise()); + frameWriter.writePriority(ctx(), 5, 3, (short) 256, true, newPromise()); + frameWriter.writeData(ctx(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise()); + frameWriter.writeData(ctx(), 5, + Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise()); + ctx().flush(); } }); awaitRequests(); diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java index 6db6b31c2f..51f72deea8 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java @@ -67,9 +67,10 @@ public class Http2ClientInitializer extends ChannelInitializer { @Override public void initChannel(SocketChannel ch) throws Exception { Http2Connection connection = new DefaultHttp2Connection(false); + Http2FrameWriter frameWriter = frameWriter(); connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection, - frameReader(), frameWriter(), new DefaultHttp2InboundFlowController(connection), - new DefaultHttp2OutboundFlowController(connection), + frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection), + new DefaultHttp2OutboundFlowController(connection, frameWriter), InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)); responseHandler = new HttpResponseHandler(); settingsHandler = new Http2SettingsHandler(ch.newPromise()); diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index 3d91ba9cf1..e63c9002bb 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -32,6 +32,7 @@ import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2InboundFrameLogger; import io.netty.handler.codec.http2.Http2OutboundFrameLogger; @@ -48,14 +49,14 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8)); public HelloWorldHttp2Handler() { - this(new DefaultHttp2Connection(true)); + this(new DefaultHttp2Connection(true), new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger)); } - private HelloWorldHttp2Handler(Http2Connection connection) { + private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) { super(connection, new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger), - new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger), + frameWriter, new DefaultHttp2InboundFlowController(connection), - new DefaultHttp2OutboundFlowController(connection)); + new DefaultHttp2OutboundFlowController(connection, frameWriter)); } /** @@ -69,7 +70,7 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200") .set(UPGRADE_RESPONSE_HEADER, "true").build(); - writeHeaders(ctx, ctx.newPromise(), 1, headers, 0, true); + writeHeaders(ctx, 1, headers, 0, true, ctx.newPromise()); } super.userEventTriggered(ctx, evt); } @@ -109,8 +110,8 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) { // Send a frame for the response status Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200").build(); - writeHeaders(ctx(), ctx().newPromise(), streamId, headers, 0, false); + writeHeaders(ctx(), streamId, headers, 0, false, ctx().newPromise()); - writeData(ctx(), ctx().newPromise(), streamId, payload, 0, true); + writeData(ctx(), streamId, payload, 0, true, ctx().newPromise()); } }