From 2b7f344a015e2fed04989841e168703150775a69 Mon Sep 17 00:00:00 2001 From: nmittler Date: Thu, 25 Sep 2014 15:38:55 -0700 Subject: [PATCH] Fixing bugs in HTTP/2 pipeline exception handling Motivation: HTTP/2 codec does not properly test exception passed to exceptionCaught() for instanceof Http2Exception (since the exception will always be wrapped in a PipelineException), so it will never properly handle Http2Exceptions in the pipeline. Also if any streams are present, the connection close logic will execute twice when a pipeline exception. This is because the exception logic calls ctx.close() which then triggers the handleInActive() logic to execute. This clears all of the remaining streams and then attempts to run the closeListener logic (which has already been run). Modifications: Changed exceptionCaught logic to properly extract Http2Exception from the PipelineException. Also added logic to the closeListener so that is only run once. Changed Http2CodecUtil.toHttp2Exception() to avoid NPE when creating an exception with cause.getMessage(). Refactored Http2ConnectionHandler to more cleanly separate inbound and outbound flows (Http2ConnectionDecoder/Http2ConnectionEncoder). Added a test for verifying that a pipeline exception closes the connection. Result: Exception handling logic is tidied up. --- .../codec/http2/DefaultHttp2Connection.java | 20 +- ...ava => DefaultHttp2ConnectionDecoder.java} | 346 ++-------- ...ava => DefaultHttp2ConnectionEncoder.java} | 193 ++---- .../codec/http2/DefaultHttp2FrameWriter.java | 5 +- .../DefaultHttp2InboundFlowController.java | 11 +- .../DefaultHttp2OutboundFlowController.java | 40 +- .../codec/http2/Http2ClientUpgradeCodec.java | 32 +- .../handler/codec/http2/Http2CodecUtil.java | 31 +- .../handler/codec/http2/Http2Connection.java | 11 - .../codec/http2/Http2ConnectionDecoder.java | 50 ++ .../codec/http2/Http2ConnectionEncoder.java | 32 + .../codec/http2/Http2ConnectionHandler.java | 286 +++++++-- .../http2/Http2FrameListenerDecorator.java | 6 +- .../handler/codec/http2/Http2FrameLogger.java | 11 +- .../codec/http2/Http2InboundFrameLogger.java | 11 +- .../codec/http2/Http2LifecycleManager.java | 211 ++++++ .../codec/http2/Http2OrHttpChooser.java | 2 +- .../codec/http2/Http2OutboundFrameLogger.java | 11 +- .../codec/http2/Http2ServerUpgradeCodec.java | 18 +- .../handler/codec/http2/Http2Settings.java | 5 +- .../http2/Http2ToHttpConnectionHandler.java | 24 +- .../http2/InboundHttp2ToHttpAdapter.java | 5 +- .../DefaultHttp2ConnectionDecoderTest.java | 364 +++++++++++ .../DefaultHttp2ConnectionEncoderTest.java | 320 ++++++++++ .../http2/Http2ConnectionHandlerTest.java | 604 +----------------- .../http2/Http2ConnectionRoundtripTest.java | 94 ++- .../handler/codec/DefaultBinaryHeaders.java | 7 +- .../handler/codec/DefaultTextHeaders.java | 7 +- .../io/netty/handler/codec/HeaderMap.java | 9 +- .../io/netty/util/internal/ObjectUtil.java | 35 + .../http2/client/Http2ClientInitializer.java | 11 +- .../http2/server/HelloWorldHttp2Handler.java | 31 +- 32 files changed, 1576 insertions(+), 1267 deletions(-) rename codec-http2/src/main/java/io/netty/handler/codec/http2/{Http2InboundConnectionHandler.java => DefaultHttp2ConnectionDecoder.java} (55%) rename codec-http2/src/main/java/io/netty/handler/codec/http2/{Http2OutboundConnectionAdapter.java => DefaultHttp2ConnectionEncoder.java} (66%) create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java create mode 100644 codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java create mode 100644 codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java create mode 100644 common/src/main/java/io/netty/util/internal/ObjectUtil.java diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index dc787cfa52..c69cc5bec2 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -29,8 +29,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action; import io.netty.util.collection.IntObjectHashMap; @@ -77,10 +76,8 @@ public class DefaultHttp2Connection implements Http2Connection { * the policy to be used for removal of closed stream. */ public DefaultHttp2Connection(boolean server, Http2StreamRemovalPolicy removalPolicy) { - if (removalPolicy == null) { - throw new NullPointerException("removalPolicy"); - } - this.removalPolicy = removalPolicy; + + this.removalPolicy = checkNotNull(removalPolicy, "removalPolicy"); localEndpoint = new DefaultEndpoint(server); remoteEndpoint = new DefaultEndpoint(!server); @@ -165,17 +162,6 @@ public class DefaultHttp2Connection implements Http2Connection { return remote().createStream(streamId, halfClosed); } - @Override - public void close(Http2Stream stream, ChannelFuture future, ChannelFutureListener closeListener) { - stream.close(); - - // If this connection is closing and there are no longer any - // active streams, close after the current operation completes. - if (closeListener != null && numActiveStreams() == 0) { - future.addListener(closeListener); - } - } - private void removeStream(DefaultStream stream) { // Notify the listeners of the event first. for (Listener listener : listeners) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java similarity index 55% rename from codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundConnectionHandler.java rename to codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index bcc8c91df1..3dea034e1b 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -15,8 +15,6 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; -import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; import static io.netty.handler.codec.http2.Http2Exception.protocolError; @@ -24,14 +22,10 @@ import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.ByteToMessageDecoder; -import java.util.Collection; import java.util.List; /** @@ -42,89 +36,54 @@ import java.util.List; *

* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController} */ -public class Http2InboundConnectionHandler extends ByteToMessageDecoder { +public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { private final Http2FrameListener internalFrameListener = new FrameReadListener(); - protected final Http2OutboundConnectionAdapter outbound; - private final Http2FrameListener listener; + private final Http2Connection connection; + private final Http2LifecycleManager lifecycleManager; + private final Http2ConnectionEncoder encoder; private final Http2FrameReader frameReader; - protected final Http2Connection connection; private final Http2InboundFlowController inboundFlow; - private ByteBuf clientPrefaceString; - private boolean prefaceSent; + private final Http2FrameListener listener; private boolean prefaceReceived; - public Http2InboundConnectionHandler(Http2Connection connection, Http2FrameListener listener, - Http2FrameReader frameReader, Http2InboundFlowController inboundFlow, - Http2OutboundConnectionAdapter outbound) { - if (connection == null) { - throw new NullPointerException("connection"); - } - if (frameReader == null) { - throw new NullPointerException("frameReader"); - } - if (listener == null) { - throw new NullPointerException("listener"); - } - if (inboundFlow == null) { - throw new NullPointerException("inboundFlow"); - } - if (outbound == null) { - throw new NullPointerException("outbound"); - } - - this.connection = connection; - this.frameReader = frameReader; - this.listener = listener; - this.outbound = outbound; - this.inboundFlow = inboundFlow; - - // Set the expected client preface string. Only servers should receive this. - clientPrefaceString = connection.isServer() ? connectionPrefaceBuf() : null; + public DefaultHttp2ConnectionDecoder(Http2Connection connection, Http2FrameReader frameReader, + Http2InboundFlowController inboundFlow, Http2ConnectionEncoder encoder, + Http2LifecycleManager lifecycleManager, Http2FrameListener listener) { + this.connection = checkNotNull(connection, "connection"); + this.frameReader = checkNotNull(frameReader, "frameReader"); + this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager"); + this.encoder = checkNotNull(encoder, "encoder"); + this.inboundFlow = checkNotNull(inboundFlow, "inboundFlow"); + this.listener = checkNotNull(listener, "listener"); } - /** - * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2. - * Reserves local stream 1 for the HTTP/2 response. - */ - public void onHttpClientUpgrade() throws Http2Exception { - if (connection.isServer()) { - throw protocolError("Client-side HTTP upgrade requested for a server"); - } - if (prefaceSent || prefaceReceived) { - throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received"); - } - - // Create a local stream used for the HTTP cleartext upgrade. - connection.createLocalStream(HTTP_UPGRADE_STREAM_ID, true); + public Http2Connection connection() { + return connection; } - /** - * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2. - * @param settings the settings for the remote endpoint. - */ - public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception { - if (!connection.isServer()) { - throw protocolError("Server-side HTTP upgrade requested for a client"); - } - if (prefaceSent || prefaceReceived) { - throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received"); - } - - // Apply the settings but no ACK is necessary. - applyRemoteSettings(settings); - - // Create a stream in the half-closed state. - connection.createRemoteStream(HTTP_UPGRADE_STREAM_ID, true); + public Http2FrameListener listener() { + return listener; } - /** - * Gets the local settings for this endpoint of the HTTP/2 connection. - */ - public Http2Settings settings() { + public Http2LifecycleManager lifecycleManager() { + return lifecycleManager; + } + + public boolean prefaceReceived() { + return prefaceReceived; + } + + @Override + public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List out) throws Http2Exception { + frameReader.readFrame(ctx, in, internalFrameListener); + } + + @Override + public Http2Settings localSettings() { Http2Settings settings = new Http2Settings(); - final Http2FrameReader.Configuration config = frameReader.configuration(); - final Http2HeaderTable headerTable = config.headerTable(); - final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); + Http2FrameReader.Configuration config = frameReader.configuration(); + Http2HeaderTable headerTable = config.headerTable(); + Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); settings.initialWindowSize(inboundFlow.initialInboundWindowSize()); settings.maxConcurrentStreams(connection.remote().maxStreams()); settings.headerTableSize(headerTable.maxHeaderTableSize()); @@ -137,212 +96,49 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { return settings; } - /** - * Closes all closable resources and frees any allocated resources. - *

- * This does NOT close the {@link Http2OutboundConnectionAdapter} reference in this class - */ - public void close() { - frameReader.close(); - if (clientPrefaceString != null) { - clientPrefaceString.release(); - clientPrefaceString = null; - } - } - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - // The channel just became active - send the connection preface to the remote - // endpoint. - sendPreface(ctx); - super.channelActive(ctx); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - // This handler was just added to the context. In case it was handled after - // the connection became active, send the connection preface now. - sendPreface(ctx); - } - - @Override - protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { - close(); - } - - @Override - public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - // Avoid NotYetConnectedException - if (!ctx.channel().isActive()) { - ctx.close(promise); - return; - } - - outbound.sendGoAway(ctx, promise, null); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - ChannelFuture future = ctx.newSucceededFuture(); - final Collection streams = connection.activeStreams(); - for (Http2Stream s : streams.toArray(new Http2Stream[streams.size()])) { - connection.close(s, future, outbound.closeListener()); - } - super.channelInactive(ctx); - } - - /** - * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions. - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (cause instanceof Http2Exception) { - onHttp2Exception(ctx, (Http2Exception) cause); - } - - super.exceptionCaught(ctx, cause); - } - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - try { - // Read the remaining of the client preface string if we haven't already. - // If this is a client endpoint, always returns true. - if (!readClientPrefaceString(ctx, in)) { - // Still processing the client preface. - return; - } - - frameReader.readFrame(ctx, in, internalFrameListener); - } catch (Http2Exception e) { - onHttp2Exception(ctx, e); - } catch (Throwable e) { - onHttp2Exception(ctx, new Http2Exception(Http2Error.INTERNAL_ERROR, e.getMessage(), e)); - } - } - - /** - * Processes the given exception. Depending on the type of exception, delegates to either - * {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or - * {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}. - */ - protected final void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) { - if (e instanceof Http2StreamException) { - onStreamError(ctx, (Http2StreamException) e); - } else { - onConnectionError(ctx, e); - } - } - - /** - * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until all streams are - * closed before shutting down the connection. - */ - protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) { - outbound.sendGoAway(ctx, ctx.newPromise(), cause); - } - - /** - * Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream. - */ - protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) { - outbound.writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise(), true); - } - - /** - * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent. - */ - private void sendPreface(final ChannelHandlerContext ctx) { - if (prefaceSent || !ctx.channel().isActive()) { - return; - } - - prefaceSent = true; - - if (!connection.isServer()) { - // Clients must send the preface string as the first bytes on the connection. - ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - } - - // Both client and server must send their initial settings. - outbound.writeSettings(ctx, settings(), ctx.newPromise()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - } - - /** - * Applies settings received from the remote endpoint. - */ - private void applyRemoteSettings(Http2Settings settings) throws Http2Exception { + public void localSettings(Http2Settings settings) throws Http2Exception { Boolean pushEnabled = settings.pushEnabled(); - final Http2FrameWriter.Configuration config = outbound.configuration(); - final Http2HeaderTable headerTable = config.headerTable(); - final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); + Http2FrameReader.Configuration config = frameReader.configuration(); + Http2HeaderTable inboundHeaderTable = config.headerTable(); + Http2FrameSizePolicy inboundFrameSizePolicy = config.frameSizePolicy(); if (pushEnabled != null) { - if (!connection.isServer()) { - throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified"); + if (connection.isServer()) { + throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified"); } - connection.remote().allowPushTo(pushEnabled); + connection.local().allowPushTo(pushEnabled); } Long maxConcurrentStreams = settings.maxConcurrentStreams(); if (maxConcurrentStreams != null) { int value = (int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE); - connection.local().maxStreams(value); + connection.remote().maxStreams(value); } Long headerTableSize = settings.headerTableSize(); if (headerTableSize != null) { - headerTable.maxHeaderTableSize((int) Math.min(headerTableSize.intValue(), Integer.MAX_VALUE)); + inboundHeaderTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE)); } Integer maxHeaderListSize = settings.maxHeaderListSize(); if (maxHeaderListSize != null) { - headerTable.maxHeaderListSize(maxHeaderListSize); + inboundHeaderTable.maxHeaderListSize(maxHeaderListSize); } Integer maxFrameSize = settings.maxFrameSize(); if (maxFrameSize != null) { - frameSizePolicy.maxFrameSize(maxFrameSize); + inboundFrameSizePolicy.maxFrameSize(maxFrameSize); } Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { - outbound.initialOutboundWindowSize(initialWindowSize); + inboundFlow.initialInboundWindowSize(initialWindowSize); } } - /** - * Decodes the client connection preface string from the input buffer. - * - * @return {@code true} if processing of the client preface string is complete. Since client preface strings can - * only be received by servers, returns true immediately for client endpoints. - */ - private boolean readClientPrefaceString(ChannelHandlerContext ctx, ByteBuf in) { - if (clientPrefaceString == null) { - return true; - } - - int prefaceRemaining = clientPrefaceString.readableBytes(); - int bytesRead = Math.min(in.readableBytes(), prefaceRemaining); - - // Read the portion of the input up to the length of the preface, if reached. - ByteBuf sourceSlice = in.readSlice(bytesRead); - - // Read the same number of bytes from the preface buffer. - ByteBuf prefaceSlice = clientPrefaceString.readSlice(bytesRead); - - // If the input so far doesn't match the preface, break the connection. - if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) { - ctx.close(); - return false; - } - - if (!clientPrefaceString.isReadable()) { - // Entire preface has been read. - clientPrefaceString.release(); - clientPrefaceString = null; - return true; - } - return false; + @Override + public void close() { + frameReader.close(); } /** @@ -372,7 +168,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { listener.onDataRead(ctx, streamId, data, padding, endOfStream); if (endOfStream) { - closeRemoteSide(stream, ctx.newSucceededFuture()); + lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); } } @@ -385,25 +181,6 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { } } - /** - * Closes the remote side of the given stream. If this causes the stream to be closed, adds a hook to close the - * channel after the given future completes. - * - * @param stream the stream to be half closed. - * @param future If closing, the future after which to close the channel. If {@code null}, ignored. - */ - private void closeRemoteSide(Http2Stream stream, ChannelFuture future) { - switch (stream.state()) { - case HALF_CLOSED_REMOTE: - case OPEN: - stream.closeRemoteSide(); - break; - default: - connection.close(stream, future, outbound.closeListener()); - break; - } - } - @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream) throws Http2Exception { @@ -436,13 +213,14 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { } } - listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream); + listener.onHeadersRead(ctx, streamId, headers, + streamDependency, weight, exclusive, padding, endStream); stream.setPriority(streamDependency, weight, exclusive); // If the headers completes this stream, close it. if (endStream) { - closeRemoteSide(stream, ctx.newSucceededFuture()); + lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); } } @@ -479,7 +257,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { listener.onRstStreamRead(ctx, streamId, errorCode); - connection.close(stream, ctx.newSucceededFuture(), outbound.closeListener()); + lifecycleManager.closeStream(stream, ctx.newSucceededFuture()); } @Override @@ -487,7 +265,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { verifyPrefaceReceived(); // Apply oldest outstanding local settings here. This is a synchronization point // between endpoints. - Http2Settings settings = outbound.pollSettings(); + Http2Settings settings = encoder.pollSentSettings(); if (settings != null) { applyLocalSettings(settings); @@ -540,10 +318,10 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { @Override public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { - applyRemoteSettings(settings); + encoder.remoteSettings(settings); // Acknowledge receipt of the settings. - outbound.writeSettingsAck(ctx, ctx.newPromise()); + encoder.writeSettingsAck(ctx, ctx.newPromise()); ctx.flush(); // We've received at least one non-ack settings frame from the remote endpoint. @@ -558,7 +336,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { // Send an ack back to the remote client. // Need to retain the buffer here since it will be released after the write completes. - outbound.writePing(ctx, true, data.retain(), ctx.newPromise()); + encoder.writePing(ctx, true, data.retain(), ctx.newPromise()); ctx.flush(); listener.onPingRead(ctx, data); @@ -613,7 +391,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder { } // Update the outbound flow controller. - outbound.updateOutboundWindowSize(streamId, windowSizeIncrement); + encoder.updateOutboundWindowSize(streamId, windowSizeIncrement); listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundConnectionAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java similarity index 66% rename from codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundConnectionAdapter.java rename to codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 71f5e9a03e..9b0cad278c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundConnectionAdapter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -15,48 +15,78 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception; -import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.protocolError; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import java.io.Closeable; import java.util.ArrayDeque; /** - * Provides the ability to write HTTP/2 frames - *

- * This class provides write methods which turn java calls into HTTP/2 frames - *

- * This interface enforces outbound flow control functionality through {@link Http2OutboundFlowController} + * Default implementation of {@link Http2ConnectionEncoder}. */ -public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2OutboundFlowController, Closeable { +public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { private final Http2FrameWriter frameWriter; private final Http2Connection connection; private final Http2OutboundFlowController outboundFlow; + private final Http2LifecycleManager lifecycleManager; // We prefer ArrayDeque to LinkedList because later will produce more GC. // This initial capacity is plenty for SETTINGS traffic. private final ArrayDeque outstandingLocalSettingsQueue = new ArrayDeque(4); - private ChannelFutureListener closeListener; - public Http2OutboundConnectionAdapter(Http2Connection connection, Http2FrameWriter frameWriter) { - this(connection, frameWriter, new DefaultHttp2OutboundFlowController(connection, frameWriter)); + public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter, + Http2OutboundFlowController outboundFlow, Http2LifecycleManager lifecycleManager) { + this.frameWriter = checkNotNull(frameWriter, "frameWriter"); + this.connection = checkNotNull(connection, "connection"); + this.outboundFlow = checkNotNull(outboundFlow, "outboundFlow"); + this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager"); } - public Http2OutboundConnectionAdapter(Http2Connection connection, Http2FrameWriter frameWriter, - Http2OutboundFlowController outboundFlow) { - this.frameWriter = frameWriter; - this.connection = connection; - this.outboundFlow = outboundFlow; + @Override + public void remoteSettings(Http2Settings settings) throws Http2Exception { + Boolean pushEnabled = settings.pushEnabled(); + Http2FrameWriter.Configuration config = configuration(); + Http2HeaderTable outboundHeaderTable = config.headerTable(); + Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy(); + if (pushEnabled != null) { + if (!connection.isServer()) { + throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified"); + } + connection.remote().allowPushTo(pushEnabled); + } + + Long maxConcurrentStreams = settings.maxConcurrentStreams(); + if (maxConcurrentStreams != null) { + connection.local().maxStreams((int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE)); + } + + Long headerTableSize = settings.headerTableSize(); + if (headerTableSize != null) { + outboundHeaderTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE)); + } + + Integer maxHeaderListSize = settings.maxHeaderListSize(); + if (maxHeaderListSize != null) { + outboundHeaderTable.maxHeaderListSize(maxHeaderListSize); + } + + Integer maxFrameSize = settings.maxFrameSize(); + if (maxFrameSize != null) { + outboundFrameSizePolicy.maxFrameSize(maxFrameSize); + } + + Integer initialWindowSize = settings.initialWindowSize(); + if (initialWindowSize != null) { + initialOutboundWindowSize(initialWindowSize); + } } @Override @@ -79,11 +109,11 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { // The write failed, handle the error. - onHttp2Exception(ctx, toHttp2Exception(future.cause())); + lifecycleManager.onHttp2Exception(ctx, toHttp2Exception(future.cause())); } else if (endStream) { // Close the local side of the stream if this is the last frame Http2Stream stream = connection.stream(streamId); - closeLocalSide(stream, ctx.newPromise()); + lifecycleManager.closeLocalSide(stream, ctx.newPromise()); } } }); @@ -139,7 +169,7 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou // If the headers are the end of the stream, close it now. if (endStream) { - closeLocalSide(stream, promise); + lifecycleManager.closeLocalSide(stream, promise); } return future; @@ -171,7 +201,8 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou @Override public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) { - return writeRstStream(ctx, streamId, errorCode, promise, false); + // Delegate to the lifecycle manager for proper updating of connection state. + return lifecycleManager.writeRstStream(ctx, streamId, errorCode, promise); } /** @@ -201,7 +232,7 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou if (stream != null) { stream.terminateSent(); - connection.close(stream, promise, closeListener); + lifecycleManager.closeStream(stream, promise); } return future; @@ -274,42 +305,10 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou } } - /** - * Sends a GO_AWAY frame to the remote endpoint. Waits until all streams are closed before shutting down the - * connection. - * @param ctx the handler context - * @param promise the promise used to create the close listener. - * @param cause connection error that caused this GO_AWAY, or {@code null} if normal termination. - */ - public void sendGoAway(ChannelHandlerContext ctx, ChannelPromise promise, Http2Exception cause) { - ChannelFuture future = null; - ChannelPromise closePromise = promise; - if (!connection.isGoAway()) { - int errorCode = cause != null ? cause.error().code() : NO_ERROR.code(); - ByteBuf debugData = toByteBuf(ctx, cause); - - future = writeGoAway(ctx, connection.remote().lastStreamCreated(), errorCode, debugData, promise); - ctx.flush(); - closePromise = null; - } - - closeListener = getOrCreateCloseListener(ctx, closePromise); - - // If there are no active streams, close immediately after the send is complete. - // Otherwise wait until all streams are inactive. - if (cause != null || connection.numActiveStreams() == 0) { - if (future == null) { - future = ctx.newSucceededFuture(); - } - future.addListener(closeListener); - } - } - @Override public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) { - connection.remote().goAwayReceived(lastStreamId); - return frameWriter.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); + return lifecycleManager.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); } @Override @@ -324,85 +323,13 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise); } - /** - * Processes the given exception. Depending on the type of exception, delegates to either - * {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or - * {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}. - */ - protected final void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) { - if (e instanceof Http2StreamException) { - onStreamError(ctx, (Http2StreamException) e); - } else { - onConnectionError(ctx, e); - } - } - - /** - * Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream. - */ - protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) { - writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise(), true); - } - - /** - * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until all streams are - * closed before shutting down the connection. - */ - protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) { - sendGoAway(ctx, ctx.newPromise(), cause); - } - - /** - * If not already created, creates a new listener for the given promise which, when complete, closes the connection - * and frees any resources. - */ - private ChannelFutureListener getOrCreateCloseListener(final ChannelHandlerContext ctx, ChannelPromise promise) { - final ChannelPromise closePromise = promise == null ? ctx.newPromise() : promise; - if (closeListener == null) { - // If no promise was provided, create a new one. - closeListener = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - ctx.close(closePromise); - close(); - } - }; - } else { - closePromise.setSuccess(); - } - - return closeListener; - } - - /** - * Closes the remote side of the given stream. If this causes the stream to be closed, adds a hook to close the - * channel after the given future completes. - * - * @param stream the stream to be half closed. - * @param future If closing, the future after which to close the channel. If {@code null}, ignored. - */ - private void closeLocalSide(Http2Stream stream, ChannelFuture future) { - switch (stream.state()) { - case HALF_CLOSED_LOCAL: - case OPEN: - stream.closeLocalSide(); - break; - default: - connection.close(stream, future, closeListener); - break; - } - } - @Override public void close() { frameWriter.close(); } - /** - * Get the {@link Http2Settings} object on the top of the queue that has been sent but not ACKed. - * This may return {@code null}. - */ - public Http2Settings pollSettings() { + @Override + public Http2Settings pollSentSettings() { return outstandingLocalSettingsQueue.poll(); } @@ -411,14 +338,6 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou return frameWriter.configuration(); } - /** - * Get the close listener associated with this object - * @return - */ - public ChannelFutureListener closeListener() { - return closeListener; - } - @Override public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception { outboundFlow.initialOutboundWindowSize(newWindowSize); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java index a0c54b0269..5b950942f2 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java @@ -39,6 +39,7 @@ import static io.netty.handler.codec.http2.Http2FrameTypes.PUSH_PROMISE; import static io.netty.handler.codec.http2.Http2FrameTypes.RST_STREAM; import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS; import static io.netty.handler.codec.http2.Http2FrameTypes.WINDOW_UPDATE; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelFuture; @@ -183,9 +184,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) { try { - if (settings == null) { - throw new NullPointerException("settings"); - } + checkNotNull(settings, "settings"); int payloadLength = SETTING_ENTRY_LENGTH * settings.size(); ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payloadLength); writeFrameHeader(frame, payloadLength, SETTINGS, new Http2Flags(), 0); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index adbf64eba8..5fbde3ffab 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -19,6 +19,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Exception.flowControlError; import static io.netty.handler.codec.http2.Http2Exception.protocolError; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -43,14 +44,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro private int initialWindowSize = DEFAULT_WINDOW_SIZE; public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { - if (connection == null) { - throw new NullPointerException("connection"); - } - if (frameWriter == null) { - throw new NullPointerException("frameWriter"); - } - this.connection = connection; - this.frameWriter = frameWriter; + this.connection = checkNotNull(connection, "connection"); + this.frameWriter = checkNotNull(frameWriter, "frameWriter"); // Add a flow state for the connection. connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID)); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java index 116af4c243..e3d55cc7ce 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java @@ -15,6 +15,15 @@ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; +import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; +import static io.netty.handler.codec.http2.Http2Exception.format; +import static io.netty.handler.codec.http2.Http2Exception.protocolError; +import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static java.lang.Math.max; +import static java.lang.Math.min; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; @@ -28,15 +37,6 @@ import java.util.Comparator; import java.util.List; import java.util.Queue; -import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; -import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; -import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; -import static io.netty.handler.codec.http2.Http2Exception.format; -import static io.netty.handler.codec.http2.Http2Exception.protocolError; -import static java.lang.Math.max; -import static java.lang.Math.min; - /** * Basic implementation of {@link Http2OutboundFlowController}. */ @@ -60,14 +60,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont private ChannelHandlerContext ctx; public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { - if (connection == null) { - throw new NullPointerException("connection"); - } - if (frameWriter == null) { - throw new NullPointerException("frameWriter"); - } - this.connection = connection; - this.frameWriter = frameWriter; + this.connection = checkNotNull(connection, "connection"); + this.frameWriter = checkNotNull(frameWriter, "frameWriter"); // Add a flow state for the connection. connection.connectionStream().outboundFlow(new OutboundFlowState(connection.connectionStream())); @@ -161,15 +155,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont @Override public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) { - if (ctx == null) { - throw new NullPointerException("ctx"); - } - if (promise == null) { - throw new NullPointerException("promise"); - } - if (data == null) { - throw new NullPointerException("data"); - } + checkNotNull(ctx, "ctx"); + checkNotNull(promise, "promise"); + checkNotNull(data, "data"); if (this.ctx != null && this.ctx != ctx) { throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported"); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodec.java index 4b0e29437b..1d57bacaf1 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodec.java @@ -14,6 +14,15 @@ */ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER; +import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH; +import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedInt; +import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedShort; +import static io.netty.util.CharsetUtil.UTF_8; +import static io.netty.util.ReferenceCountUtil.release; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.base64.Base64; @@ -26,11 +35,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import static io.netty.handler.codec.base64.Base64Dialect.*; -import static io.netty.handler.codec.http2.Http2CodecUtil.*; -import static io.netty.util.CharsetUtil.*; -import static io.netty.util.ReferenceCountUtil.*; - /** * Client-side cleartext upgrade codec from HTTP to HTTP/2. */ @@ -39,7 +43,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade private static final List UPGRADE_HEADERS = Collections.singletonList(HTTP_UPGRADE_SETTINGS_HEADER); private final String handlerName; - private final Http2InboundConnectionHandler connectionHandler; + private final Http2ConnectionHandler connectionHandler; /** * Creates the codec using a default name for the connection handler when adding to the @@ -47,7 +51,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade * * @param connectionHandler the HTTP/2 connection handler. */ - public Http2ClientUpgradeCodec(Http2InboundConnectionHandler connectionHandler) { + public Http2ClientUpgradeCodec(Http2ConnectionHandler connectionHandler) { this("http2ConnectionHandler", connectionHandler); } @@ -58,15 +62,9 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade * @param connectionHandler the HTTP/2 connection handler. */ public Http2ClientUpgradeCodec(String handlerName, - Http2InboundConnectionHandler connectionHandler) { - if (handlerName == null) { - throw new NullPointerException("handlerName"); - } - if (connectionHandler == null) { - throw new NullPointerException("connectionHandler"); - } - this.handlerName = handlerName; - this.connectionHandler = connectionHandler; + Http2ConnectionHandler connectionHandler) { + this.handlerName = checkNotNull(handlerName, "handlerName"); + this.connectionHandler = checkNotNull(connectionHandler, "connectionHandler"); } @Override @@ -101,7 +99,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade ByteBuf encodedBuf = null; try { // Get the local settings for the handler. - Http2Settings settings = connectionHandler.settings(); + Http2Settings settings = connectionHandler.decoder().localSettings(); // Serialize the payload of the SETTINGS frame. int payloadLength = SETTING_ENTRY_LENGTH * settings.size(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java index edbc5540eb..a22e8dc9df 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java @@ -16,8 +16,8 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; -import static io.netty.handler.codec.http2.Http2Exception.format; import static io.netty.util.CharsetUtil.UTF_8; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; @@ -105,10 +105,7 @@ public final class Http2CodecUtil { @Override public void setAction(Action action) { - if (action == null) { - throw new NullPointerException("action"); - } - this.action = action; + this.action = checkNotNull(action, "action"); } @Override @@ -135,11 +132,27 @@ public final class Http2CodecUtil { * Converts the given cause to a {@link Http2Exception} if it isn't already. */ public static Http2Exception toHttp2Exception(Throwable cause) { - if (cause instanceof Http2Exception) { - return (Http2Exception) cause; + // Look for an embedded Http2Exception. + Http2Exception httpException = getEmbeddedHttp2Exception(cause); + if (httpException != null) { + return httpException; } - String msg = cause != null ? cause.getMessage() : "Failed writing the data frame."; - return format(INTERNAL_ERROR, msg); + + return new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause); + } + + /** + * Iteratively looks through the causaility chain for the given exception and returns the first + * {@link Http2Exception} or {@code null} if none. + */ + public static Http2Exception getEmbeddedHttp2Exception(Throwable cause) { + while (cause != null) { + if (cause instanceof Http2Exception) { + return (Http2Exception) cause; + } + cause = cause.getCause(); + } + return null; } /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 49c6c8ed05..8d77c3ad13 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -15,9 +15,6 @@ package io.netty.handler.codec.http2; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; - import java.util.Collection; /** @@ -272,12 +269,4 @@ public interface Http2Connection { * Indicates whether or not either endpoint has received a GOAWAY. */ boolean isGoAway(); - - /** - * Closes the given stream and adds a hook to close the channel after the given future completes. - * @param stream the stream to be closed. - * @param future the future after which to close the channel. If {@code null}, ignored. - * @param closeListener the listener to add to the {@code future} if notification is expected - */ - void close(Http2Stream stream, ChannelFuture future, ChannelFutureListener closeListener); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java new file mode 100644 index 0000000000..899a5f6371 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java @@ -0,0 +1,50 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +import java.io.Closeable; +import java.util.List; + +/** + * Handler for inbound traffic on behalf of {@link Http2ConnectionHandler}. + */ +public interface Http2ConnectionDecoder extends Closeable { + + /** + * Called by the {@link Http2ConnectionHandler} to decode the next frame from the input buffer. + */ + void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List out) throws Http2Exception; + + /** + * Gets the local settings for this endpoint of the HTTP/2 connection. + */ + Http2Settings localSettings(); + + /** + * Sets the local settings for this endpoint of the HTTP/2 connection. + */ + void localSettings(Http2Settings settings) throws Http2Exception; + + /** + * Indicates whether or not the first initial {@code SETTINGS} frame was received from the remote endpoint. + */ + boolean prefaceReceived(); + + @Override + void close(); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java new file mode 100644 index 0000000000..e6a42045c8 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java @@ -0,0 +1,32 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +/** + * Handler for outbound traffic on behalf of {@link Http2ConectionHandler}. + */ +public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundFlowController { + + /** + * Gets the local settings on the top of the queue that has been sent but not ACKed. This may + * return {@code null}. + */ + Http2Settings pollSentSettings(); + + /** + * Sets the settings for the remote endpoint of the HTTP/2 connection. + */ + void remoteSettings(Http2Settings settings) throws Http2Exception; +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 9c8d8f8379..6f939eadac 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -14,114 +14,266 @@ */ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; +import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; +import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; +import static io.netty.handler.codec.http2.Http2Exception.protocolError; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.Collection; +import java.util.List; /** - * This class handles writing HTTP/2 frames, delegating responses to a {@link Http2FrameListener}, - * and can be inserted into a Netty pipeline. + * Provides the default implementation for processing inbound frame events + * and delegates to a {@link Http2FrameListener} + *

+ * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener} + *

+ * This interface enforces inbound flow control functionality through {@link Http2InboundFlowController} */ -public class Http2ConnectionHandler extends Http2InboundConnectionHandler implements Http2FrameWriter { +public class Http2ConnectionHandler extends ByteToMessageDecoder { + private final Http2LifecycleManager lifecycleManager; + private final Http2ConnectionDecoder decoder; + private final Http2ConnectionEncoder encoder; + private final Http2Connection connection; + private ByteBuf clientPrefaceString; + private boolean prefaceSent; + public Http2ConnectionHandler(boolean server, Http2FrameListener listener) { this(new DefaultHttp2Connection(server), listener); } public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener) { - this(connection, listener, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter()); + this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(), listener); } - public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener, - Http2FrameReader frameReader, Http2FrameWriter frameWriter) { - this(connection, listener, frameReader, new DefaultHttp2InboundFlowController(connection, frameWriter), - new Http2OutboundConnectionAdapter(connection, frameWriter)); + public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, + Http2FrameWriter frameWriter, Http2FrameListener listener) { + this(connection, frameReader, frameWriter, new DefaultHttp2InboundFlowController( + connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, + frameWriter), listener); } - public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener, - Http2FrameReader frameReader, Http2InboundFlowController inboundFlow, - Http2OutboundConnectionAdapter outbound) { - super(connection, listener, frameReader, inboundFlow, outbound); + public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, + Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow, + Http2OutboundFlowController outboundFlow, Http2FrameListener listener) { + checkNotNull(frameWriter, "frameWriter"); + checkNotNull(inboundFlow, "inboundFlow"); + checkNotNull(outboundFlow, "outboundFlow"); + checkNotNull(listener, "listener"); + this.connection = checkNotNull(connection, "connection"); + this.lifecycleManager = new Http2LifecycleManager(connection, frameWriter); + this.encoder = + new DefaultHttp2ConnectionEncoder(connection, frameWriter, outboundFlow, + lifecycleManager); + this.decoder = + new DefaultHttp2ConnectionDecoder(connection, frameReader, inboundFlow, encoder, + lifecycleManager, listener); + clientPrefaceString = clientPrefaceString(connection); + } + + public Http2ConnectionHandler(Http2Connection connection, Http2ConnectionDecoder decoder, + Http2ConnectionEncoder encoder, Http2LifecycleManager lifecycleManager) { + this.connection = checkNotNull(connection, "connection"); + this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager"); + this.encoder = checkNotNull(encoder, "encoder"); + this.decoder = checkNotNull(decoder, "decoder"); + clientPrefaceString = clientPrefaceString(connection); + } + + public Http2Connection connection() { + return connection; + } + + public Http2LifecycleManager lifecycleManager() { + return lifecycleManager; + } + + public Http2ConnectionDecoder decoder() { + return decoder; + } + + public Http2ConnectionEncoder encoder() { + return encoder; + } + + /** + * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2. + * Reserves local stream 1 for the HTTP/2 response. + */ + public void onHttpClientUpgrade() throws Http2Exception { + if (connection.isServer()) { + throw protocolError("Client-side HTTP upgrade requested for a server"); + } + if (prefaceSent || decoder.prefaceReceived()) { + throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received"); + } + + // Create a local stream used for the HTTP cleartext upgrade. + connection.createLocalStream(HTTP_UPGRADE_STREAM_ID, true); + } + + /** + * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2. + * @param settings the settings for the remote endpoint. + */ + public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception { + if (!connection.isServer()) { + throw protocolError("Server-side HTTP upgrade requested for a client"); + } + if (prefaceSent || decoder.prefaceReceived()) { + throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received"); + } + + // Apply the settings but no ACK is necessary. + encoder.remoteSettings(settings); + + // Create a stream in the half-closed state. + connection.createRemoteStream(HTTP_UPGRADE_STREAM_ID, true); } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, - boolean endStream, ChannelPromise promise) { - return outbound.writeHeaders(ctx, streamId, headers, padding, endStream, promise); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // The channel just became active - send the connection preface to the remote + // endpoint. + sendPreface(ctx); + super.channelActive(ctx); } @Override - public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, - int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, - ChannelPromise promise) { - return outbound.writeHeaders(ctx, streamId, headers, padding, endStream, promise); + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + // This handler was just added to the context. In case it was handled after + // the connection became active, send the connection preface now. + sendPreface(ctx); } @Override - public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, - boolean exclusive, ChannelPromise promise) { - return outbound.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); + protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { + dispose(); } @Override - public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, - ChannelPromise promise) { - return outbound.writeRstStream(ctx, streamId, errorCode, promise); + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + lifecycleManager.close(ctx, promise); } @Override - public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) { - return outbound.writeSettings(ctx, settings, promise); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ChannelFuture future = ctx.newSucceededFuture(); + final Collection streams = connection.activeStreams(); + for (Http2Stream s : streams.toArray(new Http2Stream[streams.size()])) { + lifecycleManager.closeStream(s, future); + } + super.channelInactive(ctx); + } + + /** + * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions. + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Http2Exception ex = getEmbeddedHttp2Exception(cause); + if (ex != null) { + lifecycleManager.onHttp2Exception(ctx, ex); + } else { + super.exceptionCaught(ctx, cause); + } } @Override - public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { - return outbound.writeSettingsAck(ctx, promise); + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + try { + // Read the remaining of the client preface string if we haven't already. + // If this is a client endpoint, always returns true. + if (!readClientPrefaceString(ctx, in)) { + // Still processing the client preface. + return; + } + + decoder.decodeFrame(ctx, in, out); + } catch (Http2Exception e) { + lifecycleManager.onHttp2Exception(ctx, e); + } catch (Throwable e) { + lifecycleManager.onHttp2Exception(ctx, new Http2Exception(Http2Error.INTERNAL_ERROR, e.getMessage(), e)); + } } - @Override - public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) { - return outbound.writePing(ctx, ack, data, promise); + /** + * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent. + */ + private void sendPreface(final ChannelHandlerContext ctx) { + if (prefaceSent || !ctx.channel().isActive()) { + return; + } + + prefaceSent = true; + + if (!connection.isServer()) { + // Clients must send the preface string as the first bytes on the connection. + ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } + + // Both client and server must send their initial settings. + encoder.writeSettings(ctx, decoder.localSettings(), ctx.newPromise()).addListener( + ChannelFutureListener.CLOSE_ON_FAILURE); } - @Override - public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, - Http2Headers headers, int padding, ChannelPromise promise) { - return outbound.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); + /** + * Disposes of all resources. + */ + private void dispose() { + encoder.close(); + decoder.close(); + if (clientPrefaceString != null) { + clientPrefaceString.release(); + clientPrefaceString = null; + } } - @Override - public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, - ChannelPromise promise) { - return outbound.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); + /** + * Decodes the client connection preface string from the input buffer. + * + * @return {@code true} if processing of the client preface string is complete. Since client preface strings can + * only be received by servers, returns true immediately for client endpoints. + */ + private boolean readClientPrefaceString(ChannelHandlerContext ctx, ByteBuf in) throws Http2Exception { + if (clientPrefaceString == null) { + return true; + } + + int prefaceRemaining = clientPrefaceString.readableBytes(); + int bytesRead = Math.min(in.readableBytes(), prefaceRemaining); + + // Read the portion of the input up to the length of the preface, if reached. + ByteBuf sourceSlice = in.readSlice(bytesRead); + + // Read the same number of bytes from the preface buffer. + ByteBuf prefaceSlice = clientPrefaceString.readSlice(bytesRead); + + // If the input so far doesn't match the preface, break the connection. + if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) { + throw protocolError("HTTP/2 client preface string missing or corrupt."); + } + + if (!clientPrefaceString.isReadable()) { + // Entire preface has been read. + clientPrefaceString.release(); + clientPrefaceString = null; + return true; + } + return false; } - @Override - public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, - ChannelPromise promise) { - return outbound.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); - } - - @Override - public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, - ByteBuf payload, ChannelPromise promise) { - return outbound.writeFrame(ctx, frameType, streamId, flags, payload, promise); - } - - @Override - public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endStream, ChannelPromise promise) { - return outbound.writeData(ctx, streamId, data, padding, endStream, promise); - } - - @Override - public void close() { - outbound.close(); - super.close(); - } - - @Override - public Configuration configuration() { - return outbound.configuration(); + /** + * Returns the client preface string if this is a client connection, otherwise returns {@code null}. + */ + private static ByteBuf clientPrefaceString(Http2Connection connection) { + return connection.isServer() ? connectionPrefaceBuf() : null; } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java index 453fc35a5e..8664d64fe2 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java @@ -14,6 +14,7 @@ */ package io.netty.handler.codec.http2; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -24,10 +25,7 @@ public class Http2FrameListenerDecorator implements Http2FrameListener { protected final Http2FrameListener listener; public Http2FrameListenerDecorator(Http2FrameListener listener) { - if (listener == null) { - throw new NullPointerException("listener"); - } - this.listener = listener; + this.listener = checkNotNull(listener, "listener"); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java index f6e75c874d..3d0b49da21 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java @@ -15,6 +15,7 @@ */ package io.netty.handler.codec.http2; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerAdapter; @@ -40,14 +41,8 @@ public class Http2FrameLogger extends ChannelHandlerAdapter { } public Http2FrameLogger(InternalLogLevel level, InternalLogger logger) { - if (level == null) { - throw new NullPointerException("level"); - } - if (logger == null) { - throw new NullPointerException("logger"); - } - this.level = level; - this.logger = logger; + this.level = checkNotNull(level, "level"); + this.logger = checkNotNull(logger, "logger"); } public void logData(Direction direction, int streamId, ByteBuf data, int padding, diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java index a0ac1a7424..2b1942f9e3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2FrameLogger.Direction.INBOUND; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -28,14 +29,8 @@ public class Http2InboundFrameLogger implements Http2FrameReader { private final Http2FrameLogger logger; public Http2InboundFrameLogger(Http2FrameReader reader, Http2FrameLogger logger) { - if (reader == null) { - throw new NullPointerException("reader"); - } - if (logger == null) { - throw new NullPointerException("logger"); - } - this.reader = reader; - this.logger = logger; + this.reader = checkNotNull(reader, "reader"); + this.logger = checkNotNull(logger, "logger"); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java new file mode 100644 index 0000000000..d0eeef6634 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java @@ -0,0 +1,211 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; +import static io.netty.util.internal.ObjectUtil.checkNotNull; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +/** + * Manager for the life cycle of the HTTP/2 connection. Handles graceful shutdown of the channel, + * closing only after all of the streams have closed. + */ +public class Http2LifecycleManager { + + private final Http2Connection connection; + private final Http2FrameWriter frameWriter; + private ChannelFutureListener closeListener; + + public Http2LifecycleManager(Http2Connection connection, Http2FrameWriter frameWriter) { + this.connection = checkNotNull(connection, "connection"); + this.frameWriter = checkNotNull(frameWriter, "frameWriter"); + } + + /** + * Handles the close processing on behalf of the {@link DelegatingHttp2ConnectionHandler}. + */ + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + // Avoid NotYetConnectedException + if (!ctx.channel().isActive()) { + ctx.close(promise); + return; + } + + ChannelFuture future = writeGoAway(ctx, null); + + // If there are no active streams, close immediately after the send is complete. + // Otherwise wait until all streams are inactive. + if (connection.numActiveStreams() == 0) { + future.addListener(new ClosingChannelFutureListener(ctx, promise)); + } else { + closeListener = new ClosingChannelFutureListener(ctx, promise); + } + } + + /** + * Closes the remote side of the given stream. If this causes the stream to be closed, adds a + * hook to close the channel after the given future completes. + * + * @param stream the stream to be half closed. + * @param future If closing, the future after which to close the channel. + */ + public void closeLocalSide(Http2Stream stream, ChannelFuture future) { + switch (stream.state()) { + case HALF_CLOSED_LOCAL: + case OPEN: + stream.closeLocalSide(); + break; + default: + closeStream(stream, future); + break; + } + } + + /** + * Closes the remote side of the given stream. If this causes the stream to be closed, adds a + * hook to close the channel after the given future completes. + * + * @param stream the stream to be half closed. + * @param future If closing, the future after which to close the channel. + */ + public void closeRemoteSide(Http2Stream stream, ChannelFuture future) { + switch (stream.state()) { + case HALF_CLOSED_REMOTE: + case OPEN: + stream.closeRemoteSide(); + break; + default: + closeStream(stream, future); + break; + } + } + + /** + * Closes the given stream and adds a hook to close the channel after the given future + * completes. + * + * @param stream the stream to be closed. + * @param future the future after which to close the channel. + */ + public void closeStream(Http2Stream stream, ChannelFuture future) { + stream.close(); + + // If this connection is closing and there are no longer any + // active streams, close after the current operation completes. + if (closeListener != null && connection.numActiveStreams() == 0) { + future.addListener(closeListener); + } + } + + /** + * Processes the given exception. Depending on the type of exception, delegates to either + * {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or + * {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}. + */ + public void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) { + if (e instanceof Http2StreamException) { + onStreamError(ctx, (Http2StreamException) e); + } else { + onConnectionError(ctx, e); + } + } + + /** + * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until + * all streams are closed before shutting down the connection. + */ + private void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) { + writeGoAway(ctx, cause).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise())); + } + + /** + * Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream. + */ + private void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) { + writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise()); + } + + /** + * Writes a RST_STREAM frame to the remote endpoint and updates the connection state appropriately. + */ + public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, + ChannelPromise promise) { + Http2Stream stream = connection.stream(streamId); + ChannelFuture future = frameWriter.writeRstStream(ctx, streamId, errorCode, promise); + ctx.flush(); + + if (stream != null) { + stream.terminateSent(); + closeStream(stream, promise); + } + + return future; + } + + /** + * Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state appropriately. + */ + public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, + ChannelPromise promise) { + if (connection.isGoAway()) { + debugData.release(); + return ctx.newSucceededFuture(); + } + + ChannelFuture future = frameWriter.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); + ctx.flush(); + + connection.remote().goAwayReceived(lastStreamId); + return future; + } + + /** + * Sends a GO_AWAY frame appropriate for the given exception. + */ + private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) { + if (connection.isGoAway()) { + return ctx.newSucceededFuture(); + } + + // The connection isn't alredy going away, send the GO_AWAY frame now to start + // the process. + int errorCode = cause != null ? cause.error().code() : NO_ERROR.code(); + ByteBuf debugData = Http2CodecUtil.toByteBuf(ctx, cause); + int lastKnownStream = connection.remote().lastStreamCreated(); + return writeGoAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise()); + } + + /** + * Closes the channel when the future completes. + */ + private static final class ClosingChannelFutureListener implements ChannelFutureListener { + private final ChannelHandlerContext ctx; + private final ChannelPromise promise; + + ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) { + this.ctx = ctx; + this.promise = promise; + } + + @Override + public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception { + ctx.close(promise); + } + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java index 57052d17c8..50d2e7ec90 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OrHttpChooser.java @@ -150,7 +150,7 @@ public abstract class Http2OrHttpChooser extends ByteToMessageDecoder { /** * Create the {@link io.netty.channel.ChannelHandler} that is responsible for handling the http * responses when the when the {@link SelectedProtocol} was {@link SelectedProtocol#HTTP_2}. The - * returned class should be a subclass of {@link Http2ConnectionHandler}. + * returned class should be a subclass of {@link DelegatingHttp2ConnectionHandler}. */ protected abstract ChannelHandler createHttp2RequestHandler(); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java index 543a8f3318..9e4fad3c22 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2FrameLogger.Direction.OUTBOUND; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; @@ -30,14 +31,8 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter { private final Http2FrameLogger logger; public Http2OutboundFrameLogger(Http2FrameWriter writer, Http2FrameLogger logger) { - if (writer == null) { - throw new NullPointerException("writer"); - } - if (logger == null) { - throw new NullPointerException("logger"); - } - this.writer = writer; - this.logger = logger; + this.writer = checkNotNull(writer, "writer"); + this.logger = checkNotNull(logger, "logger"); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java index 4bdc05196f..a125121018 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java @@ -21,6 +21,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_ import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER; import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader; import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -43,7 +44,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade Collections.singletonList(HTTP_UPGRADE_SETTINGS_HEADER); private final String handlerName; - private final Http2InboundConnectionHandler connectionHandler; + private final Http2ConnectionHandler connectionHandler; private final Http2FrameReader frameReader; private Http2Settings settings; @@ -53,7 +54,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade * * @param connectionHandler the HTTP/2 connection handler. */ - public Http2ServerUpgradeCodec(Http2InboundConnectionHandler connectionHandler) { + public Http2ServerUpgradeCodec(Http2ConnectionHandler connectionHandler) { this("http2ConnectionHandler", connectionHandler); } @@ -63,16 +64,9 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade * @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline. * @param connectionHandler the HTTP/2 connection handler. */ - public Http2ServerUpgradeCodec(String handlerName, - Http2InboundConnectionHandler connectionHandler) { - if (handlerName == null) { - throw new NullPointerException("handlerName"); - } - if (connectionHandler == null) { - throw new NullPointerException("connectionHandler"); - } - this.handlerName = handlerName; - this.connectionHandler = connectionHandler; + public Http2ServerUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler) { + this.handlerName = checkNotNull(handlerName, "handlerName"); + this.connectionHandler = checkNotNull(connectionHandler, "connectionHandler"); frameReader = new DefaultHttp2FrameReader(); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java index 7c243b7f54..df2b7e0cdd 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java @@ -23,6 +23,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_CONCURREN import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_FRAME_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_HEADER_LIST_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.isMaxFrameSizeValid; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.util.collection.IntObjectHashMap; /** @@ -177,9 +178,7 @@ public final class Http2Settings extends IntObjectHashMap { } private void verifyStandardSetting(int key, Long value) { - if (value == null) { - throw new NullPointerException("value"); - } + checkNotNull(value, "value"); switch (key) { case SETTINGS_HEADER_TABLE_SIZE: if (value < 0L || value > MAX_UNSIGNED_INT) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ToHttpConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ToHttpConnectionHandler.java index b82598d65c..ad161e8e41 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ToHttpConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ToHttpConnectionHandler.java @@ -22,7 +22,7 @@ import io.netty.handler.codec.http.FullHttpMessage; import io.netty.handler.codec.http.HttpHeaders; /** - * Light weight wrapper around {@link Http2ConnectionHandler} to provide HTTP/1.x objects to HTTP/2 frames + * Light weight wrapper around {@link DelegatingHttp2ConnectionHandler} to provide HTTP/1.x objects to HTTP/2 frames *

* See {@link InboundHttp2ToHttpAdapter} to get translation from HTTP/2 frames to HTTP/1.x objects */ @@ -35,15 +35,15 @@ public class Http2ToHttpConnectionHandler extends Http2ConnectionHandler { super(connection, listener); } - public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameListener listener, - Http2FrameReader frameReader, Http2FrameWriter frameWriter) { - super(connection, listener, frameReader, frameWriter); + public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, + Http2FrameWriter frameWriter, Http2FrameListener listener) { + super(connection, frameReader, frameWriter, listener); } - public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameListener listener, - Http2FrameReader frameReader, Http2InboundFlowController inboundFlow, - Http2OutboundConnectionAdapter outbound) { - super(connection, listener, frameReader, inboundFlow, outbound); + public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, + Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow, + Http2OutboundFlowController outboundFlow, Http2FrameListener listener) { + super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, listener); } /** @@ -57,7 +57,7 @@ public class Http2ToHttpConnectionHandler extends Http2ConnectionHandler { int streamId = 0; String value = httpHeaders.get(HttpUtil.ExtensionHeaderNames.STREAM_ID.text()); if (value == null) { - streamId = connection.local().nextStreamId(); + streamId = connection().local().nextStreamId(); } else { try { streamId = Integer.parseInt(value); @@ -97,10 +97,10 @@ public class Http2ToHttpConnectionHandler extends Http2ConnectionHandler { ChannelPromise headerPromise = ctx.newPromise(); ChannelPromise dataPromise = ctx.newPromise(); promiseAggregator.add(headerPromise, dataPromise); - writeHeaders(ctx, streamId, http2Headers, 0, false, headerPromise); - writeData(ctx, streamId, httpMsg.content(), 0, true, dataPromise); + encoder().writeHeaders(ctx, streamId, http2Headers, 0, false, headerPromise); + encoder().writeData(ctx, streamId, httpMsg.content(), 0, true, dataPromise); } else { - writeHeaders(ctx, streamId, http2Headers, 0, true, promise); + encoder().writeHeaders(ctx, streamId, http2Headers, 0, true, promise); } } else { ctx.write(msg, promise); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java index 2612958b0e..51048943fd 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java @@ -14,6 +14,7 @@ */ package io.netty.handler.codec.http2; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.TooLongFrameException; @@ -124,9 +125,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter { */ protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength, boolean validateHttpHeaders) { - if (connection == null) { - throw new NullPointerException("connection"); - } + checkNotNull(connection, "connection"); if (maxContentLength <= 0) { throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java new file mode 100644 index 0000000000..d9c671b5c9 --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -0,0 +1,364 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.buffer.Unpooled.EMPTY_BUFFER; +import static io.netty.buffer.Unpooled.wrappedBuffer; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf; +import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; +import static io.netty.handler.codec.http2.Http2Exception.protocolError; +import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; +import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; +import static io.netty.util.CharsetUtil.UTF_8; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyShort; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; + +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests for {@link DefaultHttp2ConnectionDecoder}. + */ +public class DefaultHttp2ConnectionDecoderTest { + private static final int STREAM_ID = 1; + private static final int PUSH_STREAM_ID = 2; + + private DefaultHttp2ConnectionDecoder decoder; + + @Mock + private Http2Connection connection; + + @Mock + private Http2Connection.Endpoint remote; + + @Mock + private Http2Connection.Endpoint local; + + @Mock + private Http2InboundFlowController inboundFlow; + + @Mock + private ChannelHandlerContext ctx; + + @Mock + private Channel channel; + + private ChannelPromise promise; + + @Mock + private ChannelFuture future; + + @Mock + private Http2Stream stream; + + @Mock + private Http2Stream pushStream; + + @Mock + private Http2FrameListener listener; + + @Mock + private Http2FrameReader reader; + + @Mock + private Http2ConnectionEncoder encoder; + + @Mock + private Http2LifecycleManager lifecycleManager; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + + promise = new DefaultChannelPromise(channel); + + when(channel.isActive()).thenReturn(true); + when(stream.id()).thenReturn(STREAM_ID); + when(stream.state()).thenReturn(OPEN); + when(pushStream.id()).thenReturn(PUSH_STREAM_ID); + when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); + when(connection.stream(STREAM_ID)).thenReturn(stream); + when(connection.requireStream(STREAM_ID)).thenReturn(stream); + when(connection.local()).thenReturn(local); + when(connection.remote()).thenReturn(remote); + doAnswer(new Answer() { + @Override + public Http2Stream answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return local.createStream((Integer) args[0], (Boolean) args[1]); + } + }).when(connection).createLocalStream(anyInt(), anyBoolean()); + doAnswer(new Answer() { + @Override + public Http2Stream answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return remote.createStream((Integer) args[0], (Boolean) args[1]); + } + }).when(connection).createRemoteStream(anyInt(), anyBoolean()); + when(local.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); + when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); + when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); + when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); + when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); + when(ctx.channel()).thenReturn(channel); + when(ctx.newSucceededFuture()).thenReturn(future); + when(ctx.newPromise()).thenReturn(promise); + when(ctx.write(any())).thenReturn(future); + + decoder = + new DefaultHttp2ConnectionDecoder(connection, reader, inboundFlow, encoder, + lifecycleManager, listener); + + // Simulate receiving the initial settings from the remote endpoint. + decode().onSettingsRead(ctx, new Http2Settings()); + verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings())); + assertTrue(decoder.prefaceReceived()); + verify(encoder).writeSettingsAck(eq(ctx), eq(promise)); + + // Simulate receiving the SETTINGS ACK for the initial settings. + decode().onSettingsAckRead(ctx); + } + + @Test + public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception { + when(remote.isGoAwayReceived()).thenReturn(true); + final ByteBuf data = dummyData(); + try { + decode().onDataRead(ctx, STREAM_ID, data, 10, true); + verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + + // Verify that the event was absorbed and not propagated to the oberver. + verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); + } finally { + data.release(); + } + } + + @Test + public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception { + final ByteBuf data = dummyData(); + try { + decode().onDataRead(ctx, STREAM_ID, data, 10, true); + verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); + verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + } finally { + data.release(); + } + } + + @Test + public void headersReadAfterGoAwayShouldBeIgnored() throws Exception { + when(remote.isGoAwayReceived()).thenReturn(true); + decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); + verify(remote, never()).createStream(eq(STREAM_ID), eq(false)); + + // Verify that the event was absorbed and not propagated to the oberver. + verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean()); + verify(remote, never()).createStream(anyInt(), anyBoolean()); + } + + @Test + public void headersReadForUnknownStreamShouldCreateStream() throws Exception { + when(remote.createStream(eq(5), eq(false))).thenReturn(stream); + decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false); + verify(remote).createStream(eq(5), eq(false)); + verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false)); + } + + @Test + public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception { + when(remote.createStream(eq(5), eq(true))).thenReturn(stream); + decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true); + verify(remote).createStream(eq(5), eq(true)); + verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true)); + } + + @Test + public void headersReadForPromisedStreamShouldHalfOpenStream() throws Exception { + when(stream.state()).thenReturn(RESERVED_REMOTE); + decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); + verify(stream).openForPush(); + verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false)); + } + + @Test + public void headersReadForPromisedStreamShouldCloseStream() throws Exception { + when(stream.state()).thenReturn(RESERVED_REMOTE); + decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true); + verify(stream).openForPush(); + verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); + verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true)); + } + + @Test + public void pushPromiseReadAfterGoAwayShouldBeIgnored() throws Exception { + when(remote.isGoAwayReceived()).thenReturn(true); + decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0); + verify(remote, never()).reservePushStream(anyInt(), any(Http2Stream.class)); + verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt()); + } + + @Test + public void pushPromiseReadShouldSucceed() throws Exception { + decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0); + verify(remote).reservePushStream(eq(PUSH_STREAM_ID), eq(stream)); + verify(listener).onPushPromiseRead(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), + eq(EmptyHttp2Headers.INSTANCE), eq(0)); + } + + @Test + public void priorityReadAfterGoAwayShouldBeIgnored() throws Exception { + when(remote.isGoAwayReceived()).thenReturn(true); + decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true); + verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); + verify(listener, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean()); + } + + @Test + public void priorityReadShouldSucceed() throws Exception { + decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true); + verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); + verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true)); + } + + @Test + public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception { + when(remote.isGoAwayReceived()).thenReturn(true); + decode().onWindowUpdateRead(ctx, STREAM_ID, 10); + verify(encoder, never()).updateOutboundWindowSize(anyInt(), anyInt()); + verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt()); + } + + @Test(expected = Http2Exception.class) + public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception { + when(connection.requireStream(5)).thenThrow(protocolError("")); + decode().onWindowUpdateRead(ctx, 5, 10); + } + + @Test + public void windowUpdateReadShouldSucceed() throws Exception { + decode().onWindowUpdateRead(ctx, STREAM_ID, 10); + verify(encoder).updateOutboundWindowSize(eq(STREAM_ID), eq(10)); + verify(listener).onWindowUpdateRead(eq(ctx), eq(STREAM_ID), eq(10)); + } + + @Test + public void rstStreamReadAfterGoAwayShouldSucceed() throws Exception { + when(remote.isGoAwayReceived()).thenReturn(true); + decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code()); + verify(lifecycleManager).closeStream(eq(stream), eq(future)); + verify(listener).onRstStreamRead(eq(ctx), anyInt(), anyLong()); + } + + @Test(expected = Http2Exception.class) + public void rstStreamReadForUnknownStreamShouldThrow() throws Exception { + when(connection.requireStream(5)).thenThrow(protocolError("")); + decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code()); + } + + @Test + public void rstStreamReadShouldCloseStream() throws Exception { + decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code()); + verify(lifecycleManager).closeStream(eq(stream), eq(future)); + verify(listener).onRstStreamRead(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code())); + } + + @Test + public void pingReadWithAckShouldNotifylistener() throws Exception { + decode().onPingAckRead(ctx, emptyPingBuf()); + verify(listener).onPingAckRead(eq(ctx), eq(emptyPingBuf())); + } + + @Test + public void pingReadShouldReplyWithAck() throws Exception { + decode().onPingRead(ctx, emptyPingBuf()); + verify(encoder).writePing(eq(ctx), eq(true), eq(emptyPingBuf()), eq(promise)); + verify(listener, never()).onPingAckRead(eq(ctx), any(ByteBuf.class)); + } + + @Test + public void settingsReadWithAckShouldNotifylistener() throws Exception { + decode().onSettingsAckRead(ctx); + // Take into account the time this was called during setup(). + verify(listener, times(2)).onSettingsAckRead(eq(ctx)); + } + + @Test + public void settingsReadShouldSetValues() throws Exception { + when(connection.isServer()).thenReturn(true); + Http2Settings settings = new Http2Settings(); + settings.pushEnabled(true); + settings.initialWindowSize(123); + settings.maxConcurrentStreams(456); + settings.headerTableSize(789); + decode().onSettingsRead(ctx, settings); + verify(encoder).remoteSettings(settings); + verify(listener).onSettingsRead(eq(ctx), eq(settings)); + } + + @Test + public void goAwayShouldReadShouldUpdateConnectionState() throws Exception { + decode().onGoAwayRead(ctx, 1, 2L, EMPTY_BUFFER); + verify(local).goAwayReceived(1); + verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(2L), eq(EMPTY_BUFFER)); + } + + private static ByteBuf dummyData() { + // The buffer is purposely 8 bytes so it will even work for a ping frame. + return wrappedBuffer("abcdefgh".getBytes(UTF_8)); + } + + /** + * Calls the decode method on the handler and gets back the captured internal listener + */ + private Http2FrameListener decode() throws Exception { + ArgumentCaptor internallistener = ArgumentCaptor.forClass(Http2FrameListener.class); + doNothing().when(reader).readFrame(eq(ctx), any(ByteBuf.class), internallistener.capture()); + decoder.decodeFrame(ctx, EMPTY_BUFFER, Collections.emptyList()); + return internallistener.getValue(); + } +} + diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java new file mode 100644 index 0000000000..8c6cd33e52 --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -0,0 +1,320 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +import static io.netty.buffer.Unpooled.wrappedBuffer; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf; +import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; +import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; +import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; +import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; +import static io.netty.util.CharsetUtil.UTF_8; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; + +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests for {@link DefaultHttp2ConnectionEncoder} + */ +public class DefaultHttp2ConnectionEncoderTest { + private static final int STREAM_ID = 1; + private static final int PUSH_STREAM_ID = 2; + + private DefaultHttp2ConnectionEncoder encoder; + + @Mock + private Http2Connection connection; + + @Mock + private Http2Connection.Endpoint remote; + + @Mock + private Http2Connection.Endpoint local; + + @Mock + private Http2OutboundFlowController outboundFlow; + + @Mock + private ChannelHandlerContext ctx; + + @Mock + private Channel channel; + + private ChannelPromise promise; + + @Mock + private ChannelFuture future; + + @Mock + private Http2Stream stream; + + @Mock + private Http2Stream pushStream; + + @Mock + private Http2FrameListener listener; + + @Mock + private Http2FrameWriter writer; + + @Mock + private Http2LifecycleManager lifecycleManager; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + + promise = new DefaultChannelPromise(channel); + + when(channel.isActive()).thenReturn(true); + when(stream.id()).thenReturn(STREAM_ID); + when(stream.state()).thenReturn(OPEN); + when(pushStream.id()).thenReturn(PUSH_STREAM_ID); + when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); + when(connection.stream(STREAM_ID)).thenReturn(stream); + when(connection.requireStream(STREAM_ID)).thenReturn(stream); + when(connection.local()).thenReturn(local); + when(connection.remote()).thenReturn(remote); + doAnswer(new Answer() { + @Override + public Http2Stream answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return local.createStream((Integer) args[0], (Boolean) args[1]); + } + }).when(connection).createLocalStream(anyInt(), anyBoolean()); + doAnswer(new Answer() { + @Override + public Http2Stream answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return remote.createStream((Integer) args[0], (Boolean) args[1]); + } + }).when(connection).createRemoteStream(anyInt(), anyBoolean()); + when(local.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); + when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); + when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); + when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); + when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future); + when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future); + when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise))) + .thenReturn(future); + when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); + when(ctx.channel()).thenReturn(channel); + when(ctx.newSucceededFuture()).thenReturn(future); + when(ctx.newPromise()).thenReturn(promise); + when(ctx.write(any())).thenReturn(future); + + encoder = new DefaultHttp2ConnectionEncoder(connection, writer, outboundFlow, lifecycleManager); + } + + @Test + public void dataWriteAfterGoAwayShouldFail() throws Exception { + when(connection.isGoAway()).thenReturn(true); + final ByteBuf data = dummyData(); + try { + ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + } finally { + while (data.refCnt() > 0) { + data.release(); + } + } + } + + @Test + public void dataWriteShouldSucceed() throws Exception { + final ByteBuf data = dummyData(); + try { + encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(false), eq(promise)); + } finally { + data.release(); + } + } + + @Test + public void dataWriteShouldHalfCloseStream() throws Exception { + reset(future); + final ByteBuf data = dummyData(); + try { + encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); + verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise)); + + // Invoke the listener callback indicating that the write completed successfully. + ArgumentCaptor captor = ArgumentCaptor.forClass(ChannelFutureListener.class); + verify(future).addListener(captor.capture()); + when(future.isSuccess()).thenReturn(true); + captor.getValue().operationComplete(future); + verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise)); + } finally { + data.release(); + } + } + + @Test + public void headersWriteAfterGoAwayShouldFail() throws Exception { + when(connection.isGoAway()).thenReturn(true); + ChannelFuture future = encoder.writeHeaders( + ctx, 5, EmptyHttp2Headers.INSTANCE, 0, (short) 255, false, 0, false, promise); + verify(local, never()).createStream(anyInt(), anyBoolean()); + verify(writer, never()).writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean(), + eq(promise)); + assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + } + + @Test + public void headersWriteForUnknownStreamShouldCreateStream() throws Exception { + when(local.createStream(eq(5), eq(false))).thenReturn(stream); + encoder.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false, promise); + verify(local).createStream(eq(5), eq(false)); + verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); + } + + @Test + public void headersWriteShouldCreateHalfClosedStream() throws Exception { + when(local.createStream(eq(5), eq(true))).thenReturn(stream); + encoder.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true, promise); + verify(local).createStream(eq(5), eq(true)); + verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)); + } + + @Test + public void headersWriteShouldOpenStreamForPush() throws Exception { + when(stream.state()).thenReturn(RESERVED_LOCAL); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); + verify(stream).openForPush(); + verify(stream, never()).closeLocalSide(); + verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); + } + + @Test + public void headersWriteShouldClosePushStream() throws Exception { + when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise); + verify(stream).openForPush(); + verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise)); + verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)); + } + + @Test + public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception { + when(connection.isGoAway()).thenReturn(true); + ChannelFuture future = + encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, + EmptyHttp2Headers.INSTANCE, 0, promise); + assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + } + + @Test + public void pushPromiseWriteShouldReserveStream() throws Exception { + encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, promise); + verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream)); + verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), + eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(promise)); + } + + @Test + public void priorityWriteAfterGoAwayShouldFail() throws Exception { + when(connection.isGoAway()).thenReturn(true); + ChannelFuture future = encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); + assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + } + + @Test + public void priorityWriteShouldSetPriorityForStream() throws Exception { + encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); + verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); + verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); + } + + @Test + public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception { + encoder.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise); + verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise)); + } + + @Test + public void rstStreamWriteShouldCloseStream() throws Exception { + encoder.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise); + verify(lifecycleManager).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise)); + } + + @Test + public void pingWriteAfterGoAwayShouldFail() throws Exception { + when(connection.isGoAway()).thenReturn(true); + ChannelFuture future = encoder.writePing(ctx, false, emptyPingBuf(), promise); + assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + } + + @Test + public void pingWriteShouldSucceed() throws Exception { + encoder.writePing(ctx, false, emptyPingBuf(), promise); + verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise)); + } + + @Test + public void settingsWriteAfterGoAwayShouldFail() throws Exception { + when(connection.isGoAway()).thenReturn(true); + ChannelFuture future = encoder.writeSettings(ctx, new Http2Settings(), promise); + assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + } + + @Test + public void settingsWriteShouldNotUpdateSettings() throws Exception { + Http2Settings settings = new Http2Settings(); + settings.initialWindowSize(100); + settings.pushEnabled(false); + settings.maxConcurrentStreams(1000); + settings.headerTableSize(2000); + encoder.writeSettings(ctx, settings, promise); + verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise)); + } + + private static ByteBuf dummyData() { + // The buffer is purposely 8 bytes so it will even work for a ping frame. + return wrappedBuffer("abcdefgh".getBytes(UTF_8)); + } +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java index 86d374033c..6ac91e074d 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java @@ -15,43 +15,22 @@ package io.netty.handler.codec.http2; -import static io.netty.buffer.Unpooled.EMPTY_BUFFER; import static io.netty.buffer.Unpooled.copiedBuffer; -import static io.netty.buffer.Unpooled.wrappedBuffer; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; -import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf; -import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; -import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; -import static io.netty.handler.codec.http2.Http2Exception.protocolError; -import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; -import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; -import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; -import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.util.CharsetUtil.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyShort; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; @@ -62,7 +41,7 @@ import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; @@ -73,7 +52,6 @@ import org.mockito.stubbing.Answer; */ public class Http2ConnectionHandlerTest { private static final int STREAM_ID = 1; - private static final int PUSH_STREAM_ID = 2; private Http2ConnectionHandler handler; @@ -86,12 +64,6 @@ public class Http2ConnectionHandlerTest { @Mock private Http2Connection.Endpoint local; - @Mock - private Http2InboundFlowController inboundFlow; - - @Mock - private Http2OutboundFlowController outboundFlow; - @Mock private ChannelHandlerContext ctx; @@ -107,34 +79,13 @@ public class Http2ConnectionHandlerTest { private Http2Stream stream; @Mock - private Http2Stream pushStream; + private Http2LifecycleManager lifecycleManager; @Mock - private Http2FrameListener listener; + private Http2ConnectionDecoder decoder; @Mock - private Http2FrameReader reader; - - @Mock - private Http2FrameWriter writer; - - @Mock - private Http2HeaderTable readerTable; - - @Mock - private Http2HeaderTable writerTable; - - @Mock - private Http2FrameSizePolicy readerFrameSizePolicy; - - @Mock - private Http2FrameSizePolicy writerFrameSizePolicy; - - @Mock - private Http2FrameReader.Configuration readerConfiguration; - - @Mock - private Http2FrameWriter.Configuration writerConfiguration; + private Http2ConnectionEncoder encoder; @Before public void setup() throws Exception { @@ -143,21 +94,7 @@ public class Http2ConnectionHandlerTest { promise = new DefaultChannelPromise(channel); when(channel.isActive()).thenReturn(true); - when(stream.id()).thenReturn(STREAM_ID); - when(stream.state()).thenReturn(OPEN); - when(pushStream.id()).thenReturn(PUSH_STREAM_ID); when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); - when(connection.stream(STREAM_ID)).thenReturn(stream); - when(connection.requireStream(STREAM_ID)).thenReturn(stream); - when(connection.local()).thenReturn(local); - when(connection.remote()).thenReturn(remote); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ((Http2Stream) invocation.getArguments()[0]).close(); - return null; - } - }).when(connection).close(any(Http2Stream.class), any(ChannelFuture.class), any(ChannelFutureListener.class)); doAnswer(new Answer() { @Override public Http2Stream answer(InvocationOnMock invocation) throws Throwable { @@ -172,59 +109,18 @@ public class Http2ConnectionHandlerTest { return remote.createStream((Integer) args[0], (Boolean) args[1]); } }).when(connection).createRemoteStream(anyInt(), anyBoolean()); - when(local.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); - when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); - when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream); - when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); - when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future); - when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future); - when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise))) - .thenReturn(future); - mockContext(); + when(encoder.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future); + when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); + when(ctx.channel()).thenReturn(channel); + when(ctx.newSucceededFuture()).thenReturn(future); + when(ctx.newPromise()).thenReturn(promise); + when(ctx.write(any())).thenReturn(future); - handler = newConnectionHandler(); - - // Simulate activation of the handler to force writing the initial settings. - Http2Settings settings = new Http2Settings(); - settings.initialWindowSize(10); - settings.pushEnabled(true); - settings.maxConcurrentStreams(100); - settings.headerTableSize(200); - settings.maxFrameSize(DEFAULT_MAX_FRAME_SIZE); - settings.maxHeaderListSize(Integer.MAX_VALUE); - when(inboundFlow.initialInboundWindowSize()).thenReturn(10); - when(local.allowPushTo()).thenReturn(true); - when(remote.maxStreams()).thenReturn(100); - when(reader.configuration()).thenReturn(readerConfiguration); - when(writer.configuration()).thenReturn(writerConfiguration); - when(readerConfiguration.frameSizePolicy()).thenReturn(readerFrameSizePolicy); - when(writerConfiguration.frameSizePolicy()).thenReturn(writerFrameSizePolicy); - when(readerFrameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE); - when(writerFrameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE); - when(readerConfiguration.headerTable()).thenReturn(readerTable); - when(writerConfiguration.headerTable()).thenReturn(writerTable); - when(readerTable.maxHeaderTableSize()).thenReturn(200); - when(readerTable.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE); - when(writerTable.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE); - handler.handlerAdded(ctx); - verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise)); - - // Simulate receiving the initial settings from the remote endpoint. - decode().onSettingsRead(ctx, new Http2Settings()); - verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings())); - verify(writer).writeSettingsAck(eq(ctx), eq(promise)); - - // Simulate receiving the SETTINGS ACK for the initial settings. - decode().onSettingsAckRead(ctx); - - // Re-mock the context so no calls are registered. - mockContext(); - handler.handlerAdded(ctx); + handler = newHandler(); } - private Http2ConnectionHandler newConnectionHandler() { - return new Http2ConnectionHandler(connection, listener, reader, inboundFlow, - new Http2OutboundConnectionAdapter(connection, writer, outboundFlow)); + private Http2ConnectionHandler newHandler() { + return new Http2ConnectionHandler(connection, decoder, encoder, lifecycleManager); } @After @@ -235,7 +131,6 @@ public class Http2ConnectionHandlerTest { @Test public void clientShouldSendClientPrefaceStringWhenActive() throws Exception { when(connection.isServer()).thenReturn(false); - handler = newConnectionHandler(); handler.channelActive(ctx); verify(ctx).write(eq(connectionPrefaceBuf())); } @@ -243,497 +138,42 @@ public class Http2ConnectionHandlerTest { @Test public void serverShouldNotSendClientPrefaceStringWhenActive() throws Exception { when(connection.isServer()).thenReturn(true); - handler = newConnectionHandler(); handler.channelActive(ctx); verify(ctx, never()).write(eq(connectionPrefaceBuf())); } @Test - public void serverReceivingInvalidClientPrefaceStringShouldCloseConnection() throws Exception { + public void serverReceivingInvalidClientPrefaceStringShouldHandleException() throws Exception { when(connection.isServer()).thenReturn(true); - handler = newConnectionHandler(); + handler = newHandler(); handler.channelRead(ctx, copiedBuffer("BAD_PREFACE", UTF_8)); - verify(ctx).close(); + verify(lifecycleManager).onHttp2Exception(eq(ctx), any(Http2Exception.class)); } @Test public void serverReceivingValidClientPrefaceStringShouldContinueReadingFrames() throws Exception { - reset(listener); when(connection.isServer()).thenReturn(true); - handler = newConnectionHandler(); handler.channelRead(ctx, connectionPrefaceBuf()); - verify(ctx, never()).close(); - decode().onSettingsRead(ctx, new Http2Settings()); - verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings())); + verify(decoder).decodeFrame(eq(ctx), any(ByteBuf.class), Matchers.>any()); } @Test - public void closeShouldSendGoAway() throws Exception { + public void closeShouldCallLifecycleManager() throws Exception { handler.close(ctx, promise); - verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) NO_ERROR.code()), eq(EMPTY_BUFFER), eq(promise)); - verify(remote).goAwayReceived(0); + verify(lifecycleManager).close(eq(ctx), eq(promise)); } @Test public void channelInactiveShouldCloseStreams() throws Exception { handler.channelInactive(ctx); - verify(stream).close(); + verify(lifecycleManager).closeStream(eq(stream), eq(future)); } @Test - public void streamErrorShouldCloseStream() throws Exception { - Http2Exception e = new Http2StreamException(STREAM_ID, PROTOCOL_ERROR); - handler.exceptionCaught(ctx, e); - verify(stream).close(); - verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise)); - } - - @Test - public void connectionErrorShouldSendGoAway() throws Exception { + public void http2ExceptionShouldCallLifecycleManager() throws Exception { Http2Exception e = new Http2Exception(PROTOCOL_ERROR); when(remote.lastStreamCreated()).thenReturn(STREAM_ID); handler.exceptionCaught(ctx, e); - verify(remote).goAwayReceived(STREAM_ID); - verify(writer).writeGoAway(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(EMPTY_BUFFER), - eq(promise)); - } - - @Test - public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception { - when(remote.isGoAwayReceived()).thenReturn(true); - final ByteBuf data = dummyData(); - try { - decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); - - // Verify that the event was absorbed and not propagated to the oberver. - verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); - } finally { - data.release(); - } - } - - @Test - public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception { - final ByteBuf data = dummyData(); - try { - decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); - verify(stream).closeRemoteSide(); - verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); - } finally { - data.release(); - } - } - - @Test - public void headersReadAfterGoAwayShouldBeIgnored() throws Exception { - when(remote.isGoAwayReceived()).thenReturn(true); - decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); - verify(remote, never()).createStream(eq(STREAM_ID), eq(false)); - - // Verify that the event was absorbed and not propagated to the oberver. - verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean()); - verify(remote, never()).createStream(anyInt(), anyBoolean()); - } - - @Test - public void headersReadForUnknownStreamShouldCreateStream() throws Exception { - when(remote.createStream(eq(5), eq(false))).thenReturn(stream); - decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false); - verify(remote).createStream(eq(5), eq(false)); - verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false)); - } - - @Test - public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception { - when(remote.createStream(eq(5), eq(true))).thenReturn(stream); - decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true); - verify(remote).createStream(eq(5), eq(true)); - verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true)); - } - - @Test - public void headersReadForPromisedStreamShouldHalfOpenStream() throws Exception { - when(stream.state()).thenReturn(RESERVED_REMOTE); - decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); - verify(stream).openForPush(); - verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false)); - } - - @Test - public void headersReadForPromisedStreamShouldCloseStream() throws Exception { - when(stream.state()).thenReturn(RESERVED_REMOTE); - decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true); - verify(stream).openForPush(); - verify(stream).close(); - verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true)); - } - - @Test - public void pushPromiseReadAfterGoAwayShouldBeIgnored() throws Exception { - when(remote.isGoAwayReceived()).thenReturn(true); - decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0); - verify(remote, never()).reservePushStream(anyInt(), any(Http2Stream.class)); - verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt()); - } - - @Test - public void pushPromiseReadShouldSucceed() throws Exception { - decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0); - verify(remote).reservePushStream(eq(PUSH_STREAM_ID), eq(stream)); - verify(listener).onPushPromiseRead(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), - eq(EmptyHttp2Headers.INSTANCE), eq(0)); - } - - @Test - public void priorityReadAfterGoAwayShouldBeIgnored() throws Exception { - when(remote.isGoAwayReceived()).thenReturn(true); - decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true); - verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); - verify(listener, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean()); - } - - @Test - public void priorityReadShouldSucceed() throws Exception { - decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true); - verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); - verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true)); - } - - @Test - public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception { - when(remote.isGoAwayReceived()).thenReturn(true); - decode().onWindowUpdateRead(ctx, STREAM_ID, 10); - verify(outboundFlow, never()).updateOutboundWindowSize(anyInt(), anyInt()); - verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt()); - } - - @Test(expected = Http2Exception.class) - public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception { - when(connection.requireStream(5)).thenThrow(protocolError("")); - decode().onWindowUpdateRead(ctx, 5, 10); - } - - @Test - public void windowUpdateReadShouldSucceed() throws Exception { - decode().onWindowUpdateRead(ctx, STREAM_ID, 10); - verify(outboundFlow).updateOutboundWindowSize(eq(STREAM_ID), eq(10)); - verify(listener).onWindowUpdateRead(eq(ctx), eq(STREAM_ID), eq(10)); - } - - @Test - public void rstStreamReadAfterGoAwayShouldSucceed() throws Exception { - when(remote.isGoAwayReceived()).thenReturn(true); - decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code()); - verify(stream).close(); - verify(listener).onRstStreamRead(eq(ctx), anyInt(), anyLong()); - } - - @Test(expected = Http2Exception.class) - public void rstStreamReadForUnknownStreamShouldThrow() throws Exception { - when(connection.requireStream(5)).thenThrow(protocolError("")); - decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code()); - } - - @Test - public void rstStreamReadShouldCloseStream() throws Exception { - decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code()); - verify(stream).close(); - verify(listener).onRstStreamRead(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code())); - } - - @Test - public void pingReadWithAckShouldNotifylistener() throws Exception { - decode().onPingAckRead(ctx, emptyPingBuf()); - verify(listener).onPingAckRead(eq(ctx), eq(emptyPingBuf())); - } - - @Test - public void pingReadShouldReplyWithAck() throws Exception { - decode().onPingRead(ctx, emptyPingBuf()); - verify(writer).writePing(eq(ctx), eq(true), eq(emptyPingBuf()), eq(promise)); - verify(listener, never()).onPingAckRead(eq(ctx), any(ByteBuf.class)); - } - - @Test - public void settingsReadWithAckShouldNotifylistener() throws Exception { - decode().onSettingsAckRead(ctx); - // Take into account the time this was called during setup(). - verify(listener, times(2)).onSettingsAckRead(eq(ctx)); - } - - @Test(expected = Http2Exception.class) - public void clientSettingsReadWithPushShouldThrow() throws Exception { - when(connection.isServer()).thenReturn(false); - Http2Settings settings = new Http2Settings(); - settings.pushEnabled(true); - decode().onSettingsRead(ctx, settings); - } - - @Test - public void settingsReadShouldSetValues() throws Exception { - when(connection.isServer()).thenReturn(true); - Http2Settings settings = new Http2Settings(); - settings.pushEnabled(true); - settings.initialWindowSize(123); - settings.maxConcurrentStreams(456); - settings.headerTableSize(789); - decode().onSettingsRead(ctx, settings); - verify(remote).allowPushTo(true); - verify(outboundFlow).initialOutboundWindowSize(123); - verify(local).maxStreams(456); - verify(writerTable).maxHeaderTableSize(789); - // Take into account the time this was called during setup(). - verify(writer, times(2)).writeSettingsAck(eq(ctx), eq(promise)); - verify(listener).onSettingsRead(eq(ctx), eq(settings)); - } - - @Test - public void goAwayShouldReadShouldUpdateConnectionState() throws Exception { - decode().onGoAwayRead(ctx, 1, 2L, EMPTY_BUFFER); - verify(local).goAwayReceived(1); - verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(2L), eq(EMPTY_BUFFER)); - } - - @Test - public void dataWriteAfterGoAwayShouldFail() throws Exception { - when(connection.isGoAway()).thenReturn(true); - final ByteBuf data = dummyData(); - try { - ChannelFuture future = handler.writeData(ctx, STREAM_ID, data, 0, false, promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); - } finally { - while (data.refCnt() > 0) { - data.release(); - } - } - } - - @Test - public void dataWriteShouldSucceed() throws Exception { - final ByteBuf data = dummyData(); - try { - handler.writeData(ctx, STREAM_ID, data, 0, false, promise); - verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(false), eq(promise)); - } finally { - data.release(); - } - } - - @Test - public void dataWriteShouldHalfCloseStream() throws Exception { - reset(future); - final ByteBuf data = dummyData(); - try { - handler.writeData(ctx, STREAM_ID, data, 0, true, promise); - verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise)); - - // Invoke the listener callback indicating that the write completed successfully. - ArgumentCaptor captor = ArgumentCaptor.forClass(ChannelFutureListener.class); - verify(future).addListener(captor.capture()); - when(future.isSuccess()).thenReturn(true); - captor.getValue().operationComplete(future); - verify(stream).closeLocalSide(); - } finally { - data.release(); - } - } - - @Test - public void dataWriteWithFailureShouldHandleException() throws Exception { - reset(future); - final String msg = "fake exception"; - final ByteBuf exceptionData = Unpooled.copiedBuffer(msg.getBytes(UTF_8)); - final ByteBuf data = dummyData(); - List goAwayDataCapture = null; - try { - handler.writeData(ctx, STREAM_ID, data, 0, true, promise); - verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise)); - - // Invoke the listener callback indicating that the write failed. - ArgumentCaptor captor = ArgumentCaptor.forClass(ChannelFutureListener.class); - verify(future).addListener(captor.capture()); - when(future.isSuccess()).thenReturn(false); - when(future.cause()).thenReturn(new RuntimeException(msg)); - captor.getValue().operationComplete(future); - final ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuf.class); - verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) INTERNAL_ERROR.code()), bufferCaptor.capture(), - eq(promise)); - goAwayDataCapture = bufferCaptor.getAllValues(); - assertEquals(exceptionData, goAwayDataCapture.get(0)); - verify(remote).goAwayReceived(0); - } finally { - data.release(); - exceptionData.release(); - if (goAwayDataCapture != null) { - for (int i = 0; i < goAwayDataCapture.size(); ++i) { - goAwayDataCapture.get(i).release(); - } - } - } - } - - @Test - public void headersWriteAfterGoAwayShouldFail() throws Exception { - when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writeHeaders( - ctx, 5, EmptyHttp2Headers.INSTANCE, 0, (short) 255, false, 0, false, promise); - verify(local, never()).createStream(anyInt(), anyBoolean()); - verify(writer, never()).writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean(), - eq(promise)); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); - } - - @Test - public void headersWriteForUnknownStreamShouldCreateStream() throws Exception { - when(local.createStream(eq(5), eq(false))).thenReturn(stream); - handler.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false, promise); - verify(local).createStream(eq(5), eq(false)); - verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); - } - - @Test - public void headersWriteShouldCreateHalfClosedStream() throws Exception { - when(local.createStream(eq(5), eq(true))).thenReturn(stream); - handler.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true, promise); - verify(local).createStream(eq(5), eq(true)); - verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)); - } - - @Test - public void headersWriteShouldOpenStreamForPush() throws Exception { - when(stream.state()).thenReturn(RESERVED_LOCAL); - handler.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); - verify(stream).openForPush(); - verify(stream, never()).closeLocalSide(); - verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); - } - - @Test - public void headersWriteShouldClosePushStream() throws Exception { - when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL); - handler.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise); - verify(stream).openForPush(); - verify(stream).closeLocalSide(); - verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)); - } - - @Test - public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception { - when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = - handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, - EmptyHttp2Headers.INSTANCE, 0, promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); - } - - @Test - public void pushPromiseWriteShouldReserveStream() throws Exception { - handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, promise); - verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream)); - verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), - eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(promise)); - } - - @Test - public void priorityWriteAfterGoAwayShouldFail() throws Exception { - when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); - } - - @Test - public void priorityWriteShouldSetPriorityForStream() throws Exception { - handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); - verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); - verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); - } - - @Test - public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception { - handler.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise); - verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise)); - } - - @Test - public void rstStreamWriteShouldCloseStream() throws Exception { - handler.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise); - verify(stream).close(); - verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise)); - } - - @Test - public void pingWriteAfterGoAwayShouldFail() throws Exception { - when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writePing(ctx, false, emptyPingBuf(), promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); - } - - @Test - public void pingWriteShouldSucceed() throws Exception { - handler.writePing(ctx, false, emptyPingBuf(), promise); - verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise)); - } - - @Test - public void settingsWriteAfterGoAwayShouldFail() throws Exception { - when(connection.isGoAway()).thenReturn(true); - ChannelFuture future = handler.writeSettings(ctx, new Http2Settings(), promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); - } - - @Test - public void settingsWriteShouldNotUpdateSettings() throws Exception { - Http2Settings settings = new Http2Settings(); - settings.initialWindowSize(100); - settings.pushEnabled(false); - settings.maxConcurrentStreams(1000); - settings.headerTableSize(2000); - handler.writeSettings(ctx, settings, promise); - verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise)); - // Verify that application of local settings must not be done when it is dispatched. - verify(inboundFlow, never()).initialInboundWindowSize(eq(100)); - verify(local, never()).allowPushTo(eq(false)); - verify(remote, never()).maxStreams(eq(1000)); - verify(readerTable, never()).maxHeaderTableSize(eq(2000)); - // Verify that settings values are applied on the reception of SETTINGS ACK - decode().onSettingsAckRead(ctx); - verify(inboundFlow).initialInboundWindowSize(eq(100)); - verify(local).allowPushTo(eq(false)); - verify(remote).maxStreams(eq(1000)); - verify(readerTable).maxHeaderTableSize(eq(2000)); - } - - private static ByteBuf dummyData() { - // The buffer is purposely 8 bytes so it will even work for a ping frame. - return wrappedBuffer("abcdefgh".getBytes(UTF_8)); - } - - private void mockContext() { - reset(ctx); - when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); - when(ctx.channel()).thenReturn(channel); - when(ctx.newSucceededFuture()).thenReturn(future); - when(ctx.newPromise()).thenReturn(promise); - when(ctx.write(any())).thenReturn(future); - } - - /** - * Calls the decode method on the handler and gets back the captured internal listener - */ - private Http2FrameListener decode() throws Exception { - ArgumentCaptor internallistener = ArgumentCaptor.forClass(Http2FrameListener.class); - doNothing().when(reader).readFrame(eq(ctx), any(ByteBuf.class), internallistener.capture()); - handler.decode(ctx, EMPTY_BUFFER, Collections.emptyList()); - return internallistener.getValue(); + verify(lifecycleManager).onHttp2Exception(eq(ctx), eq(e)); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java index 0a0f80bb27..8970e4f532 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java @@ -22,6 +22,7 @@ import static io.netty.util.CharsetUtil.UTF_8; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -35,6 +36,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; @@ -139,6 +142,84 @@ public class Http2ConnectionRoundtripTest { clientGroup.sync(); } + @Test + public void http2ExceptionInPipelineShouldCloseConnection() throws Exception { + + // Create a latch to track when the close occurs. + final CountDownLatch closeLatch = new CountDownLatch(1); + clientChannel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + closeLatch.countDown(); + } + }); + + // Create a single stream by sending a HEADERS frame to the server. + final Http2Headers headers = dummyHeaders(); + requestLatch(new CountDownLatch(1)); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, + newPromise()); + } + }); + + // Wait for the server to create the stream. + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + + // Add a handler that will immediately throw an exception. + clientChannel.pipeline().addFirst(new ChannelHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + throw Http2Exception.protocolError("Fake Exception"); + } + }); + + // Wait for the close to occur. + assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); + assertFalse(clientChannel.isOpen()); + } + + @Test + public void nonHttp2ExceptionInPipelineShouldNotCloseConnection() throws Exception { + + // Create a latch to track when the close occurs. + final CountDownLatch closeLatch = new CountDownLatch(1); + clientChannel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + closeLatch.countDown(); + } + }); + + // Create a single stream by sending a HEADERS frame to the server. + final Http2Headers headers = dummyHeaders(); + requestLatch(new CountDownLatch(1)); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, + newPromise()); + } + }); + + // Wait for the server to create the stream. + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + + // Add a handler that will immediately throw an exception. + clientChannel.pipeline().addFirst(new ChannelHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + throw new RuntimeException("Fake Exception"); + } + }); + + // The close should NOT occur. + assertFalse(closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientChannel.isOpen()); + } + @Test public void flowControlProperlyChunksLargeMessage() throws Exception { final Http2Headers headers = dummyHeaders(); @@ -169,8 +250,9 @@ public class Http2ConnectionRoundtripTest { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - http2Client.writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, newPromise()); - http2Client.writeData(ctx(), 3, data.retain(), 0, true, newPromise()); + http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, + false, newPromise()); + http2Client.encoder().writeData(ctx(), 3, data.retain(), 0, true, newPromise()); } }); @@ -222,10 +304,12 @@ public class Http2ConnectionRoundtripTest { @Override public void run() { for (int i = 0, nextStream = 3; i < NUM_STREAMS; ++i, nextStream += 2) { - http2Client.writeHeaders(ctx(), nextStream, headers, 0, (short) 16, false, 0, false, + http2Client.encoder().writeHeaders(ctx(), nextStream, headers, 0, + (short) 16, false, 0, false, newPromise()); + http2Client.encoder().writePing(ctx(), false, pingData.slice().retain(), newPromise()); - http2Client.writePing(ctx(), false, pingData.slice().retain(), newPromise()); - http2Client.writeData(ctx(), nextStream, data.slice().retain(), 0, true, newPromise()); + http2Client.encoder().writeData(ctx(), nextStream, data.slice().retain(), + 0, true, newPromise()); } } }); diff --git a/codec/src/main/java/io/netty/handler/codec/DefaultBinaryHeaders.java b/codec/src/main/java/io/netty/handler/codec/DefaultBinaryHeaders.java index 467956b3a8..4f9a1be054 100644 --- a/codec/src/main/java/io/netty/handler/codec/DefaultBinaryHeaders.java +++ b/codec/src/main/java/io/netty/handler/codec/DefaultBinaryHeaders.java @@ -14,6 +14,7 @@ */ package io.netty.handler.codec; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.util.internal.PlatformDependent; import java.util.Arrays; @@ -289,12 +290,6 @@ public class DefaultBinaryHeaders implements BinaryHeaders { return Utils.toStringUtf8(this); } - static void checkNotNull(T value, String name) { - if (value == null) { - throw new NullPointerException(name); - } - } - private static final class AsciiStringHeaderEntry implements Map.Entry { private final Entry entry; diff --git a/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java b/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java index d82ad42a70..be2a95bbfc 100644 --- a/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java +++ b/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; @@ -596,12 +597,6 @@ public class DefaultTextHeaders implements TextHeaders { return true; } - private static void checkNotNull(T value, String name) { - if (value == null) { - throw new NullPointerException(name); - } - } - private static final class StringHeaderEntry implements Entry { private final Entry entry; private String name; diff --git a/codec/src/main/java/io/netty/handler/codec/HeaderMap.java b/codec/src/main/java/io/netty/handler/codec/HeaderMap.java index 066515b7cf..f741e70ce4 100644 --- a/codec/src/main/java/io/netty/handler/codec/HeaderMap.java +++ b/codec/src/main/java/io/netty/handler/codec/HeaderMap.java @@ -14,6 +14,8 @@ */ package io.netty.handler.codec; +import static io.netty.util.internal.ObjectUtil.checkNotNull; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -747,13 +749,6 @@ public class HeaderMap implements Iterable> { return nameConverter.convertName(checkNotNull(name, "name")); } - private static T checkNotNull(T value, String name) { - if (value == null) { - throw new NullPointerException(name); - } - return value; - } - private static int hashCode(CharSequence name) { return AsciiString.caseInsensitiveHashCode(name); } diff --git a/common/src/main/java/io/netty/util/internal/ObjectUtil.java b/common/src/main/java/io/netty/util/internal/ObjectUtil.java new file mode 100644 index 0000000000..db127c109b --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/ObjectUtil.java @@ -0,0 +1,35 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.util.internal; + +/** + * A grab-bag of useful utility methods. + */ +public final class ObjectUtil { + + private ObjectUtil() { + } + + /** + * Checks that the given argument is not null. If it is, throws {@link NullPointerException}. + * Otherwise, returns the argument. + */ + public static T checkNotNull(T arg, String text) { + if (arg == null) { + throw new NullPointerException(text); + } + return arg; + } +} diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java index 4608e1be9f..ee1bdfb02d 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java @@ -29,7 +29,7 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.Http2OutboundConnectionAdapter; +import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; import io.netty.handler.codec.http2.Http2Connection; @@ -38,8 +38,8 @@ import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2InboundFrameLogger; import io.netty.handler.codec.http2.Http2OutboundFrameLogger; -import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter; import io.netty.handler.codec.http2.Http2ToHttpConnectionHandler; +import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter; import io.netty.handler.ssl.SslContext; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -66,11 +66,12 @@ public class Http2ClientInitializer extends ChannelInitializer { final Http2Connection connection = new DefaultHttp2Connection(false); final Http2FrameWriter frameWriter = frameWriter(); connectionHandler = new Http2ToHttpConnectionHandler(connection, - new DelegatingDecompressorFrameListener(connection, - InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)), frameReader(), + frameWriter, new DefaultHttp2InboundFlowController(connection, frameWriter), - new Http2OutboundConnectionAdapter(connection, frameWriter)); + new DefaultHttp2OutboundFlowController(connection, frameWriter), + new DelegatingDecompressorFrameListener(connection, + InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength))); responseHandler = new HttpResponseHandler(); settingsHandler = new Http2SettingsHandler(ch.newPromise()); if (sslCtx != null) { diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index f75a2b3bfa..9de6724824 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -27,16 +27,15 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Headers; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2InboundFrameLogger; -import io.netty.handler.codec.http2.Http2OutboundConnectionAdapter; import io.netty.handler.codec.http2.Http2OutboundFrameLogger; import io.netty.util.CharsetUtil; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -51,18 +50,14 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler { static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8)); public HelloWorldHttp2Handler() { - this(new DefaultHttp2Connection(true), new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger)); + this(new DefaultHttp2Connection(true), new Http2InboundFrameLogger( + new DefaultHttp2FrameReader(), logger), new Http2OutboundFrameLogger( + new DefaultHttp2FrameWriter(), logger)); } - private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) { - this(connection, frameWriter, new Http2OutboundConnectionAdapter(connection, frameWriter)); - } - - private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter, - Http2OutboundConnectionAdapter outbound) { - super(connection, new SimpleHttp2FrameListener(outbound), - new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger), - new DefaultHttp2InboundFlowController(connection, frameWriter), outbound); + private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameReader frameReader, + Http2FrameWriter frameWriter) { + super(connection, frameReader, frameWriter, new SimpleHttp2FrameListener(frameWriter)); } /** @@ -76,7 +71,7 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler { Http2Headers headers = new DefaultHttp2Headers().status(new AsciiString("200")) .set(new AsciiString(UPGRADE_RESPONSE_HEADER), new AsciiString("true")); - writeHeaders(ctx, 1, headers, 0, true, ctx.newPromise()); + encoder().writeHeaders(ctx, 1, headers, 0, true, ctx.newPromise()); } super.userEventTriggered(ctx, evt); } @@ -88,10 +83,10 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler { } private static class SimpleHttp2FrameListener extends Http2FrameAdapter { - private Http2OutboundConnectionAdapter outbound; + private Http2FrameWriter frameWriter; - public SimpleHttp2FrameListener(Http2OutboundConnectionAdapter outbound) { - this.outbound = outbound; + public SimpleHttp2FrameListener(Http2FrameWriter frameWriter) { + this.frameWriter = frameWriter; } /** @@ -123,8 +118,8 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler { private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) { // Send a frame for the response status Http2Headers headers = new DefaultHttp2Headers().status(new AsciiString("200")); - outbound.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise()); - outbound.writeData(ctx, streamId, payload, 0, true, ctx.newPromise()); + frameWriter.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise()); + frameWriter.writeData(ctx, streamId, payload, 0, true, ctx.newPromise()); } }; }