Split Http2MultiplexCodec into Frame- and MultiplexCodec + Tests. Fixes #4914

Motivation:

Quote from issue 4914:
"Http2MultiplexCodec currently does two things: mapping the existing h2 API to frames and managing the child channels.

It would be better if the two parts were separated. This would allow less-coupled development of the HTTP/2 handlers (flow control could be its own handler, for instance) and allow applications to insert themselves between all streams and the codec, which permits custom logic and could be used, in part, to implement custom frame types.

It would also greatly ease testing, as the child channel could be tested by itself without dealing with how frames are encoded on the wire."

Modifications:

- Split the Http2MultiplexCodec into Http2FrameCodec and Http2MultiplexCodec. The Http2FrameCodec interacts with the existing HTTP/2 callback-based API, while the Http2MulitplexCodec is completely independent of it and simply multiplexes Http2StreamFrames to the child channels. Additionally, the Http2Codec handler is introduced, which is a convenience class that simply sets up the Http2FrameCodec and Http2MultiplexCodec in the channel pipeline and removes itself.

- Improved test coverage quite a bit.

Result:

- The original Http2MultiplexCodec is split into Http2FrameCodec and Http2MultiplexCodec.
- More tests for higher confidence in the code.
This commit is contained in:
buchgr 2016-05-27 11:53:00 +02:00 committed by Norman Maurer
parent 804e058e27
commit 3613d15bca
25 changed files with 1518 additions and 529 deletions

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.UnstableApi;
/**
@ -22,17 +23,21 @@ import io.netty.util.internal.UnstableApi;
*/
@UnstableApi
public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame {
private Object stream;
private int streamId = -1;
@Override
public AbstractHttp2StreamFrame setStream(Object stream) {
this.stream = stream;
public AbstractHttp2StreamFrame setStreamId(int streamId) {
if (this.streamId != -1) {
throw new IllegalStateException("Stream identifier may only be set once.");
}
this.streamId = ObjectUtil.checkPositiveOrZero(streamId, "streamId");
return this;
}
@Override
public Object stream() {
return stream;
public int streamId() {
return streamId;
}
/**
@ -44,17 +49,11 @@ public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame {
return false;
}
Http2StreamFrame other = (Http2StreamFrame) o;
if (stream == null) {
return other.stream() == null;
}
return stream.equals(other.stream());
return streamId == other.streamId();
}
@Override
public int hashCode() {
if (stream == null) {
return 61432814;
}
return stream.hashCode();
return streamId;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.UnstableApi;
@UnstableApi
public abstract class AbstractHttp2StreamStateEvent implements Http2StreamStateEvent {
private final int streamId;
protected AbstractHttp2StreamStateEvent(int streamId) {
this.streamId = ObjectUtil.checkPositiveOrZero(streamId, "streamId");
}
@Override
public int streamId() {
return streamId;
}
}

View File

@ -76,9 +76,14 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
}
@Override
public DefaultHttp2DataFrame setStream(Object stream) {
super.setStream(stream);
return this;
public DefaultHttp2DataFrame setStreamId(int streamId) {
super.setStreamId(streamId);
return this;
}
@Override
public String name() {
return "DATA";
}
@Override
@ -148,7 +153,7 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
@Override
public String toString() {
return "DefaultHttp2DataFrame(stream=" + stream() + ", content=" + content
return "DefaultHttp2DataFrame(streamId=" + streamId() + ", content=" + content
+ ", endStream=" + endStream + ", padding=" + padding + ")";
}

View File

@ -25,7 +25,9 @@ import io.netty.util.internal.UnstableApi;
*/
@UnstableApi
public final class DefaultHttp2GoAwayFrame extends DefaultByteBufHolder implements Http2GoAwayFrame {
private final long errorCode;
private int lastStreamId;
private int extraStreamIds;
/**
@ -47,7 +49,7 @@ public final class DefaultHttp2GoAwayFrame extends DefaultByteBufHolder implemen
}
/**
* Equivalent to {@code new DefaultHttp2GoAwayFrame(error.code(), content)}.
*
*
* @param error non-{@code null} reason for the go away
* @param content non-{@code null} debug data
@ -63,8 +65,24 @@ public final class DefaultHttp2GoAwayFrame extends DefaultByteBufHolder implemen
* @param content non-{@code null} debug data
*/
public DefaultHttp2GoAwayFrame(long errorCode, ByteBuf content) {
this(-1, errorCode, content);
}
/**
* Construct a new GOAWAY message.
*
* This constructor is for internal use only. A user should not have to specify a specific last stream identifier,
* but use {@link #setExtraStreamIds(int)} instead.
*/
DefaultHttp2GoAwayFrame(int lastStreamId, long errorCode, ByteBuf content) {
super(content);
this.errorCode = errorCode;
this.lastStreamId = lastStreamId;
}
@Override
public String name() {
return "GOAWAY";
}
@Override
@ -86,9 +104,14 @@ public final class DefaultHttp2GoAwayFrame extends DefaultByteBufHolder implemen
return this;
}
@Override
public int lastStreamId() {
return lastStreamId;
}
@Override
public Http2GoAwayFrame copy() {
return (Http2GoAwayFrame) super.copy();
return new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, content().copy());
}
@Override
@ -152,6 +175,6 @@ public final class DefaultHttp2GoAwayFrame extends DefaultByteBufHolder implemen
@Override
public String toString() {
return "DefaultHttp2GoAwayFrame(errorCode=" + errorCode + ", content=" + content()
+ ", extraStreamIds=" + extraStreamIds + ")";
+ ", extraStreamIds=" + extraStreamIds + ", lastStreamId=" + lastStreamId + ")";
}
}

View File

@ -63,9 +63,14 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
}
@Override
public DefaultHttp2HeadersFrame setStream(Object stream) {
super.setStream(stream);
return this;
public DefaultHttp2HeadersFrame setStreamId(int streamId) {
super.setStreamId(streamId);
return this;
}
@Override
public String name() {
return "HEADERS";
}
@Override
@ -85,7 +90,7 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
@Override
public String toString() {
return "DefaultHttp2HeadersFrame(stream=" + stream() + ", headers=" + headers
return "DefaultHttp2HeadersFrame(streamId=" + streamId() + ", headers=" + headers
+ ", endStream=" + endStream + ", padding=" + padding + ")";
}

View File

@ -45,9 +45,14 @@ public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame imple
}
@Override
public DefaultHttp2ResetFrame setStream(Object stream) {
super.setStream(stream);
return this;
public DefaultHttp2ResetFrame setStreamId(int streamId) {
super.setStreamId(streamId);
return this;
}
@Override
public String name() {
return "RST_STREAM";
}
@Override
@ -57,7 +62,7 @@ public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame imple
@Override
public String toString() {
return "DefaultHttp2ResetFrame(stream=" + stream() + "errorCode=" + errorCode + ")";
return "DefaultHttp2ResetFrame(stream=" + streamId() + "errorCode=" + errorCode + ")";
}
@Override

View File

@ -0,0 +1,49 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
import static io.netty.util.internal.ObjectUtil.checkPositive;
/**
* The default {@link Http2WindowUpdateFrame} implementation.
*/
@UnstableApi
public class DefaultHttp2WindowUpdateFrame extends AbstractHttp2StreamFrame implements Http2WindowUpdateFrame {
private final int windowUpdateIncrement;
public DefaultHttp2WindowUpdateFrame(int windowUpdateIncrement) {
this.windowUpdateIncrement = checkPositive(windowUpdateIncrement, "windowUpdateIncrement");
}
@Override
public DefaultHttp2WindowUpdateFrame setStreamId(int streamId) {
super.setStreamId(streamId);
return this;
}
@Override
public String name() {
return "WINDOW_UPDATE";
}
@Override
public int windowSizeIncrement() {
return windowUpdateIncrement;
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.util.internal.UnstableApi;
/**
* An HTTP/2 channel handler that adds a {@link Http2FrameCodec} and {@link Http2MultiplexCodec} to the pipeline before
* removing itself.
*/
@UnstableApi
public final class Http2Codec extends ChannelDuplexHandler {
private final Http2FrameCodec frameCodec;
private final Http2MultiplexCodec multiplexCodec;
/**
* Construct a new handler whose child channels run in the same event loop as this handler.
*
* @param server {@code true} this is a server
* @param streamHandler the handler added to channels for remotely-created streams. It must be
* {@link ChannelHandler.Sharable}.
*/
public Http2Codec(boolean server, ChannelHandler streamHandler) {
this(server, streamHandler, null);
}
/**
* Construct a new handler whose child channels run in a different event loop.
*
* @param server {@code true} this is a server
* @param streamHandler the handler added to channels for remotely-created streams. It must be
* {@link ChannelHandler.Sharable}.
* @param streamGroup event loop for registering child channels
*/
public Http2Codec(boolean server, ChannelHandler streamHandler,
EventLoopGroup streamGroup) {
this(server, streamHandler, streamGroup, new DefaultHttp2FrameWriter());
}
// Visible for testing
Http2Codec(boolean server, ChannelHandler streamHandler,
EventLoopGroup streamGroup, Http2FrameWriter frameWriter) {
frameCodec = new Http2FrameCodec(server, frameWriter);
multiplexCodec = new Http2MultiplexCodec(server, streamGroup, streamHandler);
}
Http2FrameCodec frameCodec() {
return frameCodec;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, frameCodec);
ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, multiplexCodec);
ctx.pipeline().remove(this);
}
}

View File

@ -24,8 +24,6 @@ import io.netty.util.internal.UnstableApi;
*/
@UnstableApi
public interface Http2DataFrame extends Http2StreamFrame, ByteBufHolder {
@Override
Http2DataFrame setStream(Object stream);
/**
* {@code true} if this frame is the last one in this direction of the stream.

View File

@ -19,4 +19,11 @@ import io.netty.util.internal.UnstableApi;
/** An HTTP/2 frame. */
@UnstableApi
public interface Http2Frame { }
public interface Http2Frame {
/**
* Returns the name of the HTTP/2 frame e.g. DATA, GOAWAY, etc.
*/
String name();
}

View File

@ -0,0 +1,277 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.UnstableApi;
import static io.netty.handler.logging.LogLevel.INFO;
/**
* An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
* frame a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outgoing {@link Http2Frame}
* objects received via {@link #write} are converted to the HTTP/2 wire format.
*
* <p>A change in stream state is propagated through the channel pipeline as a user event via
* {@link Http2StreamStateEvent} objects. When a HTTP/2 stream first becomes active a {@link Http2StreamActiveEvent}
* and when it gets closed a {@link Http2StreamClosedEvent} is emitted.
*
* <p>Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
* HTTP-to-HTTP/2 conversion is performed automatically.
*
* <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over
* this API. This API is targeted to eventually replace or reduce the need for the Http2Connection-based API.
*/
@UnstableApi
public class Http2FrameCodec extends ChannelDuplexHandler {
private static final Http2FrameLogger HTTP2_FRAME_LOGGER = new Http2FrameLogger(INFO, Http2FrameCodec.class);
private final Http2ConnectionHandler http2Handler;
private ChannelHandlerContext ctx;
private ChannelHandlerContext http2HandlerCtx;
/**
* Construct a new handler.
*
* @param server {@code true} this is a server
*/
public Http2FrameCodec(boolean server) {
this(server, new DefaultHttp2FrameWriter());
}
// Visible for testing
Http2FrameCodec(boolean server, Http2FrameWriter frameWriter) {
Http2Connection connection = new DefaultHttp2Connection(server);
frameWriter = new Http2OutboundFrameLogger(frameWriter, HTTP2_FRAME_LOGGER);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
Http2FrameReader reader = new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), HTTP2_FRAME_LOGGER);
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, reader);
decoder.frameListener(new FrameListener());
http2Handler = new InternalHttp2ConnectionHandler(decoder, encoder, new Http2Settings());
http2Handler.connection().addListener(new ConnectionListener());
}
Http2ConnectionHandler connectionHandler() {
return http2Handler;
}
/**
* Load any dependencies.
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, http2Handler);
http2HandlerCtx = ctx.pipeline().context(http2Handler);
}
/**
* Clean up any dependencies.
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(http2Handler);
}
/**
* Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
* HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (!(evt instanceof UpgradeEvent)) {
super.userEventTriggered(ctx, evt);
return;
}
UpgradeEvent upgrade = (UpgradeEvent) evt;
ctx.fireUserEventTriggered(upgrade.retain());
try {
Http2Stream stream = http2Handler.connection().stream(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID);
// TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
// The stream was already made active, but ctx may have been null so it wasn't initialized.
// https://github.com/netty/netty/issues/4942
new ConnectionListener().onStreamActive(stream);
upgrade.upgradeRequest().headers().setInt(
HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Http2CodecUtil.HTTP_UPGRADE_STREAM_ID);
new InboundHttpToHttp2Adapter(http2Handler.connection(), http2Handler.decoder().frameListener())
.channelRead(ctx, upgrade.upgradeRequest().retain());
} finally {
upgrade.release();
}
}
// Override this to signal it will never throw an exception.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
/**
* Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
* streams.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof Http2Frame)) {
ctx.write(msg, promise);
return;
}
try {
if (msg instanceof Http2WindowUpdateFrame) {
Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
consumeBytes(frame.streamId(), frame.windowSizeIncrement());
} else if (msg instanceof Http2StreamFrame) {
writeStreamFrame((Http2StreamFrame) msg, promise);
} else if (msg instanceof Http2GoAwayFrame) {
writeGoAwayFrame((Http2GoAwayFrame) msg, promise);
} else {
throw new UnsupportedMessageTypeException(msg);
}
} finally {
ReferenceCountUtil.release(msg);
}
}
private void consumeBytes(int streamId, int bytes) {
try {
Http2Stream stream = http2Handler.connection().stream(streamId);
http2Handler.connection().local().flowController()
.consumeBytes(stream, bytes);
} catch (Throwable t) {
exceptionCaught(ctx, t);
}
}
private void writeGoAwayFrame(Http2GoAwayFrame frame, ChannelPromise promise) {
if (frame.lastStreamId() > -1) {
throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
}
int lastStreamCreated = http2Handler.connection().remote().lastStreamCreated();
int lastStreamId = lastStreamCreated + frame.extraStreamIds() * 2;
// Check if the computation overflowed.
if (lastStreamId < lastStreamCreated) {
lastStreamId = Integer.MAX_VALUE;
}
http2Handler.goAway(
http2HandlerCtx, lastStreamId, frame.errorCode(), frame.content().retain(), promise);
}
private void writeStreamFrame(Http2StreamFrame frame, ChannelPromise promise) {
int streamId = frame.streamId();
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
http2Handler.encoder().writeData(http2HandlerCtx, streamId, dataFrame.content().retain(),
dataFrame.padding(), dataFrame.isEndStream(), promise);
} else if (frame instanceof Http2HeadersFrame) {
Http2HeadersFrame headerFrame = (Http2HeadersFrame) frame;
http2Handler.encoder().writeHeaders(
http2HandlerCtx, streamId, headerFrame.headers(), headerFrame.padding(), headerFrame.isEndStream(),
promise);
} else if (frame instanceof Http2ResetFrame) {
Http2ResetFrame rstFrame = (Http2ResetFrame) frame;
http2Handler.resetStream(http2HandlerCtx, streamId, rstFrame.errorCode(), promise);
} else {
throw new UnsupportedMessageTypeException(frame);
}
}
private final class ConnectionListener extends Http2ConnectionAdapter {
@Override
public void onStreamActive(Http2Stream stream) {
if (ctx == null) {
// UPGRADE stream is active before handlerAdded().
return;
}
ctx.fireUserEventTriggered(new Http2StreamActiveEvent(stream.id()));
}
@Override
public void onStreamClosed(Http2Stream stream) {
ctx.fireUserEventTriggered(new Http2StreamClosedEvent(stream.id()));
}
@Override
public void onGoAwayReceived(final int lastStreamId, long errorCode, ByteBuf debugData) {
ctx.fireChannelRead(new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData));
}
}
private static final class InternalHttp2ConnectionHandler extends Http2ConnectionHandler {
InternalHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
}
@Override
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
Http2Exception.StreamException http2Ex) {
try {
Http2Stream stream = connection().stream(http2Ex.streamId());
if (stream == null) {
return;
}
ctx.fireExceptionCaught(http2Ex);
} finally {
super.onStreamError(ctx, cause, http2Ex);
}
}
}
private final class FrameListener extends Http2FrameAdapter {
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
Http2ResetFrame rstFrame = new DefaultHttp2ResetFrame(errorCode);
rstFrame.setStreamId(streamId);
ctx.fireChannelRead(rstFrame);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean
exclusive, int padding, boolean endStream) {
onHeadersRead(ctx, streamId, headers, padding, endStream);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) {
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(headers, endOfStream, padding);
headersFrame.setStreamId(streamId);
ctx.fireChannelRead(headersFrame);
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) {
Http2DataFrame dataFrame = new DefaultHttp2DataFrame(data.retain(), endOfStream, padding);
dataFrame.setStreamId(streamId);
ctx.fireChannelRead(dataFrame);
// We return the bytes in bytesConsumed() once the stream channel consumed the bytes.
return 0;
}
}
}

View File

@ -20,8 +20,14 @@ import io.netty.buffer.ByteBufHolder;
import io.netty.util.internal.UnstableApi;
/**
* HTTP/2 GOAWAY frame. Last-Stream-Id is not exposed directly, but instead via the relative {@link
* #extraStreamIds()}.
* HTTP/2 GOAWAY frame.
*
* <p>The last stream identifier <em>must not</em> be set by the application, but instead the
* relative {@link #extraStreamIds()} should be used. The {@link #lastStreamId()} will only be
* set for incoming GOAWAY frames by the HTTP/2 codec.
*
* <p>Graceful shutdown as described in the HTTP/2 spec can be accomplished by calling
* {@code #setExtraStreamIds(Integer.MAX_VALUE)}.
*/
@UnstableApi
public interface Http2GoAwayFrame extends Http2Frame, ByteBufHolder {
@ -45,6 +51,11 @@ public interface Http2GoAwayFrame extends Http2Frame, ByteBufHolder {
*/
Http2GoAwayFrame setExtraStreamIds(int extraStreamIds);
/**
* Returns the last stream identifier if set, or {@code -1} else.
*/
int lastStreamId();
/**
* Optional debugging information describing cause the GOAWAY. Will not be {@code null}, but may
* be empty.

View File

@ -22,8 +22,6 @@ import io.netty.util.internal.UnstableApi;
*/
@UnstableApi
public interface Http2HeadersFrame extends Http2StreamFrame {
@Override
Http2HeadersFrame setStream(Object stream);
/**
* A complete header list. CONTINUATION frames are automatically handled.

View File

@ -15,8 +15,6 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.logging.LogLevel.INFO;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
@ -28,35 +26,30 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.List;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static java.lang.String.format;
/**
* An HTTP/2 handler that creates child channels for each stream. Creating outgoing streams is not
* yet supported. Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link
* Http2ServerUpgradeCodec}; the necessary HTTP-to-HTTP/2 conversion is performed automatically.
*
* <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over
* this API. This API is targeted to eventually replace or reduce the need for the
* Http2Connection-based API.
*
* <p>This handler notifies the pipeline of channel events, such as {@link Http2GoAwayFrame}. It
* is also capable of writing such messages. Directly writing {@link Http2StreamFrame}s for this
* handler is unsupported.
*
* <h3>Child Channels</h3>
* yet supported.
*
* <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
* receive {@link Http2StreamFrame}s on the created channel. The {@link Http2StreamFrame#stream} is
* expected to be {@code null}, but the channel can use the field for its own bookkeeping. {@link
* ByteBuf}s cannot be processed by the channel; all writes that reach the head of the pipeline must
* be an instance of {@link Http2StreamFrame}. Writes that reach the head of the pipeline are
* processed directly by this handler and cannot be intercepted.
* receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
* all writes that reach the head of the pipeline must be an instance of {@link Http2Frame}. Writes that reach the
* head of the pipeline are processed directly by this handler and cannot be intercepted.
*
* <p>The child channel will be notified of user events that impact the stream, such as {@link
* Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
@ -66,32 +59,21 @@ import java.util.List;
* free to close the channel in response to such events if they don't have use for any queued
* messages.
*
* <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link
* ChannelConfig#setAutoRead(boolean)} are supported.
* <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
*/
@UnstableApi
public final class Http2MultiplexCodec extends ChannelDuplexHandler {
private static final Http2FrameLogger HTTP2_FRAME_LOGGER = new Http2FrameLogger(INFO, Http2MultiplexCodec.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2MultiplexCodec.class);
private final ChannelHandler streamHandler;
private final EventLoopGroup streamGroup;
private final Http2ConnectionHandler http2Handler;
private final Http2Connection.PropertyKey streamInfoKey;
private final List<StreamInfo> streamsToFireChildReadComplete = new ArrayList<StreamInfo>();
private final List<Http2StreamChannel> channelsToFireChildReadComplete = new ArrayList<Http2StreamChannel>();
private final boolean server;
private ChannelHandlerContext ctx;
private ChannelHandlerContext http2HandlerCtx;
private volatile Runnable flushTask;
/**
* Construct a new handler whose child channels run in the same event loop as this handler.
*
* @param server {@code true} this is a server
* @param streamHandler the handler added to channels for remotely-created streams. It must be
* {@link ChannelHandler.Sharable}.
*/
public Http2MultiplexCodec(boolean server, ChannelHandler streamHandler) {
this(server, streamHandler, null);
}
private final IntObjectMap<Http2StreamChannel> childChannels = new IntObjectHashMap<Http2StreamChannel>();
/**
* Construct a new handler whose child channels run in a different event loop.
@ -101,91 +83,145 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
* {@link ChannelHandler.Sharable}.
* @param streamGroup event loop for registering child channels
*/
public Http2MultiplexCodec(boolean server, ChannelHandler streamHandler,
EventLoopGroup streamGroup) {
this(server, streamHandler, streamGroup, new DefaultHttp2FrameWriter());
}
Http2MultiplexCodec(boolean server, ChannelHandler streamHandler,
EventLoopGroup streamGroup, Http2FrameWriter frameWriter) {
public Http2MultiplexCodec(boolean server,
EventLoopGroup streamGroup,
ChannelHandler streamHandler) {
if (!streamHandler.getClass().isAnnotationPresent(Sharable.class)) {
throw new IllegalArgumentException("streamHandler must be Sharable");
}
this.server = server;
this.streamHandler = streamHandler;
this.streamGroup = streamGroup;
Http2Connection connection = new DefaultHttp2Connection(server);
frameWriter = new Http2OutboundFrameLogger(frameWriter, HTTP2_FRAME_LOGGER);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
Http2FrameReader reader = new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), HTTP2_FRAME_LOGGER);
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, reader);
decoder.frameListener(new FrameListener());
http2Handler = new InternalHttp2ConnectionHandler(decoder, encoder, new Http2Settings());
http2Handler.connection().addListener(new ConnectionListener());
streamInfoKey = http2Handler.connection().newKey();
}
Http2ConnectionHandler connectionHandler() {
return http2Handler;
}
/**
* Save context and load any dependencies.
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, http2Handler);
http2HandlerCtx = ctx.pipeline().context(http2Handler);
}
/**
* Clean up any dependencies.
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(http2Handler);
}
/**
* Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
* HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (!(evt instanceof UpgradeEvent)) {
super.userEventTriggered(ctx, evt);
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (!(cause instanceof StreamException)) {
ctx.fireExceptionCaught(cause);
return;
}
UpgradeEvent upgrade = (UpgradeEvent) evt;
ctx.fireUserEventTriggered(upgrade.retain());
StreamException streamEx = (StreamException) cause;
try {
Http2Stream stream = http2Handler.connection().stream(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID);
// TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
// The stream was already made active, but ctx may have been null so it wasn't initialized.
// https://github.com/netty/netty/issues/4942
new ConnectionListener().onStreamActive(stream);
upgrade.upgradeRequest().headers().setInt(
HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Http2CodecUtil.HTTP_UPGRADE_STREAM_ID);
new InboundHttpToHttp2Adapter(http2Handler.connection(), http2Handler.decoder().frameListener())
.channelRead(ctx, upgrade.upgradeRequest().retain());
Http2StreamChannel childChannel = childChannels.get(streamEx.streamId());
if (childChannel != null) {
childChannel.pipeline().fireExceptionCaught(streamEx);
} else {
logger.warn(format("Exception caught for unknown HTTP/2 stream '%d'", streamEx.streamId()), streamEx);
}
} finally {
upgrade.release();
onStreamClosed(streamEx.streamId());
}
}
// Override this to signal it will never throw an exception.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
// Override this to signal it will never throw an exception.
@Override
public void flush(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof Http2Frame)) {
ctx.fireChannelRead(msg);
return;
}
if (msg instanceof Http2StreamFrame) {
Http2StreamFrame frame = (Http2StreamFrame) msg;
int streamId = frame.streamId();
Http2StreamChannel childChannel = childChannels.get(streamId);
if (childChannel == null) {
// TODO: Combine with DefaultHttp2ConnectionDecoder.shouldIgnoreHeadersOrDataFrame logic.
ReferenceCountUtil.release(msg);
throw new StreamException(streamId, STREAM_CLOSED, format("Received %s frame for an unknown stream %d",
frame.name(), streamId));
}
fireChildReadAndRegister(childChannel, frame);
} else if (msg instanceof Http2GoAwayFrame) {
Http2GoAwayFrame goAwayFrame = (Http2GoAwayFrame) msg;
for (PrimitiveEntry<Http2StreamChannel> entry : childChannels.entries()) {
Http2StreamChannel childChannel = entry.value();
int streamId = entry.key();
if (streamId > goAwayFrame.lastStreamId() && isLocalStream(streamId)) {
childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
}
}
goAwayFrame.release();
} else {
// It's safe to release, as UnsupportedMessageTypeException just calls msg.getClass()
ReferenceCountUtil.release(msg);
throw new UnsupportedMessageTypeException(msg);
}
}
private void fireChildReadAndRegister(Http2StreamChannel childChannel, Http2StreamFrame frame) {
// Can't use childChannel.fireChannelRead() as it would fire independent of whether
// channel.read() had been called.
childChannel.fireChildRead(frame);
if (!childChannel.inStreamsToFireChildReadComplete) {
channelsToFireChildReadComplete.add(childChannel);
childChannel.inStreamsToFireChildReadComplete = true;
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (!(evt instanceof Http2StreamStateEvent)) {
ctx.fireUserEventTriggered(evt);
return;
}
try {
int streamId = ((Http2StreamStateEvent) evt).streamId();
if (evt instanceof Http2StreamActiveEvent) {
onStreamActive(streamId);
} else if (evt instanceof Http2StreamClosedEvent) {
onStreamClosed(streamId);
} else {
throw new UnsupportedMessageTypeException(evt);
}
} finally {
ReferenceCountUtil.release(evt);
}
}
private void onStreamActive(int streamId) {
ChannelFuture future = createStreamChannel(ctx, streamId, streamHandler);
Http2StreamChannel childChannel = (Http2StreamChannel) future.channel();
Http2StreamChannel oldChannel = childChannels.put(streamId, childChannel);
assert oldChannel == null;
}
private void onStreamClosed(int streamId) {
final Http2StreamChannel childChannel = childChannels.remove(streamId);
if (childChannel != null) {
final EventLoop eventLoop = childChannel.eventLoop();
if (eventLoop.inEventLoop()) {
onStreamClosed0(childChannel);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
onStreamClosed0(childChannel);
}
});
}
}
}
private void onStreamClosed0(Http2StreamChannel childChannel) {
assert childChannel.eventLoop().inEventLoop();
childChannel.onStreamClosedFired = true;
childChannel.fireChildRead(AbstractHttp2StreamChannel.CLOSE_MESSAGE);
}
void flushFromStreamChannel() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
@ -234,52 +270,10 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
/**
* Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
* streams.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof Http2Frame)) {
ctx.write(msg, promise);
return;
}
try {
if (msg instanceof Http2StreamFrame) {
Object streamObject = ((Http2StreamFrame) msg).stream();
int streamId = ((Http2StreamChannel) streamObject).stream.id();
if (msg instanceof Http2DataFrame) {
Http2DataFrame frame = (Http2DataFrame) msg;
http2Handler.encoder().writeData(http2HandlerCtx, streamId, frame.content().retain(),
frame.padding(), frame.isEndStream(), promise);
} else if (msg instanceof Http2HeadersFrame) {
Http2HeadersFrame frame = (Http2HeadersFrame) msg;
http2Handler.encoder().writeHeaders(
http2HandlerCtx, streamId, frame.headers(), frame.padding(), frame.isEndStream(), promise);
} else if (msg instanceof Http2ResetFrame) {
Http2ResetFrame frame = (Http2ResetFrame) msg;
http2Handler.resetStream(http2HandlerCtx, streamId, frame.errorCode(), promise);
} else {
throw new UnsupportedMessageTypeException(msg);
}
} else if (msg instanceof Http2GoAwayFrame) {
Http2GoAwayFrame frame = (Http2GoAwayFrame) msg;
int lastStreamId = http2Handler.connection().remote().lastStreamCreated()
+ frame.extraStreamIds() * 2;
http2Handler.goAway(
http2HandlerCtx, lastStreamId, frame.errorCode(), frame.content().retain(), promise);
} else {
throw new UnsupportedMessageTypeException(msg);
}
} finally {
ReferenceCountUtil.release(msg);
}
}
ChannelFuture createStreamChannel(ChannelHandlerContext ctx, Http2Stream stream,
ChannelHandler handler) {
private ChannelFuture createStreamChannel(ChannelHandlerContext ctx, int streamId,
ChannelHandler handler) {
EventLoopGroup group = streamGroup != null ? streamGroup : ctx.channel().eventLoop();
Http2StreamChannel channel = new Http2StreamChannel(stream);
Http2StreamChannel channel = new Http2StreamChannel(streamId);
channel.pipeline().addLast(handler);
ChannelFuture future = group.register(channel);
// Handle any errors that occurred on the local thread while registering. Even though
@ -300,186 +294,33 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
for (int i = 0; i < streamsToFireChildReadComplete.size(); i++) {
final StreamInfo streamInfo = streamsToFireChildReadComplete.get(i);
for (int i = 0; i < channelsToFireChildReadComplete.size(); i++) {
Http2StreamChannel childChannel = channelsToFireChildReadComplete.get(i);
// Clear early in case fireChildReadComplete() causes it to need to be re-processed
streamInfo.inStreamsToFireChildReadComplete = false;
streamInfo.childChannel.fireChildReadComplete();
childChannel.inStreamsToFireChildReadComplete = false;
childChannel.fireChildReadComplete();
}
streamsToFireChildReadComplete.clear();
channelsToFireChildReadComplete.clear();
}
void fireChildReadAndRegister(StreamInfo streamInfo, Object msg) {
// Can't use childChannel.fireChannelRead() as it would fire independent of whether
// channel.read() had been called.
streamInfo.childChannel.fireChildRead(msg);
if (!streamInfo.inStreamsToFireChildReadComplete) {
streamsToFireChildReadComplete.add(streamInfo);
streamInfo.inStreamsToFireChildReadComplete = true;
}
}
final class Http2StreamChannel extends AbstractHttp2StreamChannel {
private final int streamId;
boolean onStreamClosedFired;
final class ConnectionListener extends Http2ConnectionAdapter {
@Override
public void onStreamActive(Http2Stream stream) {
if (ctx == null) {
// UPGRADE stream is active before handlerAdded().
return;
}
// If it is an outgoing stream, then we already created the channel.
// TODO: support outgoing streams. https://github.com/netty/netty/issues/4913
if (stream.getProperty(streamInfoKey) != null) {
return;
}
ChannelFuture future = createStreamChannel(ctx, stream, streamHandler);
stream.setProperty(streamInfoKey, new StreamInfo((Http2StreamChannel) future.channel()));
}
@Override
public void onStreamClosed(Http2Stream stream) {
final StreamInfo streamInfo = stream.getProperty(streamInfoKey);
if (streamInfo != null) {
final EventLoop eventLoop = streamInfo.childChannel.eventLoop();
if (eventLoop.inEventLoop()) {
onStreamClosed0(streamInfo);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
onStreamClosed0(streamInfo);
}
});
}
}
}
private void onStreamClosed0(StreamInfo streamInfo) {
streamInfo.childChannel.onStreamClosedFired = true;
streamInfo.childChannel.fireChildRead(AbstractHttp2StreamChannel.CLOSE_MESSAGE);
}
@Override
public void onGoAwayReceived(final int lastStreamId, long errorCode, ByteBuf debugData) {
final Http2GoAwayFrame goAway = new DefaultHttp2GoAwayFrame(errorCode, debugData);
try {
http2Handler.connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) {
if (stream.id() > lastStreamId
&& http2Handler.connection().local().isValidStreamId(stream.id())) {
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
// TODO: Can we force a user interaction pattern that doesn't require us to duplicate()?
// https://github.com/netty/netty/issues/4943
streamInfo.childChannel.pipeline().fireUserEventTriggered(goAway.retainedDuplicate());
}
return true;
}
});
} catch (final Throwable t) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
exceptionCaught(ctx, t);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
exceptionCaught(ctx, t);
}
});
}
}
ctx.fireUserEventTriggered(goAway.retainedDuplicate());
}
}
class InternalHttp2ConnectionHandler extends Http2ConnectionHandler {
InternalHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
}
@Override
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
Http2Exception.StreamException http2Ex) {
try {
Http2Stream stream = http2Handler.connection().stream(http2Ex.streamId());
if (stream == null) {
return;
}
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
if (streamInfo == null) {
return;
}
streamInfo.childChannel.pipeline().fireExceptionCaught(http2Ex);
} finally {
super.onStreamError(ctx, cause, http2Ex);
}
}
}
class FrameListener extends Http2FrameAdapter {
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
// Use a user event in order to circumvent read queue.
streamInfo.childChannel.pipeline().fireUserEventTriggered(new DefaultHttp2ResetFrame(errorCode));
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean
exclusive, int padding, boolean endStream) throws Http2Exception {
onHeadersRead(ctx, streamId, headers, padding, endStream);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
fireChildReadAndRegister(streamInfo, new DefaultHttp2HeadersFrame(headers, endOfStream, padding));
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
fireChildReadAndRegister(streamInfo, new DefaultHttp2DataFrame(data.retain(), endOfStream, padding));
// We return the bytes in bytesConsumed() once the stream channel consumed the bytes.
return 0;
}
}
static final class StreamInfo {
final Http2StreamChannel childChannel;
/**
* {@code true} if stream is in {@link Http2MultiplexCodec#streamsToFireChildReadComplete}.
* {@code true} if stream is in {@link Http2MultiplexCodec#channelsToFireChildReadComplete}.
*/
boolean inStreamsToFireChildReadComplete;
StreamInfo(Http2StreamChannel childChannel) {
this.childChannel = childChannel;
}
}
// This class uses ctx.invoker().invoke* instead of ctx.* to send to the ctx's handler instead
// of the 'next' handler.
final class Http2StreamChannel extends AbstractHttp2StreamChannel {
private final Http2Stream stream;
boolean onStreamClosedFired;
Http2StreamChannel(Http2Stream stream) {
Http2StreamChannel(int streamId) {
super(ctx.channel());
this.stream = stream;
this.streamId = streamId;
}
@Override
protected void doClose() throws Exception {
if (!onStreamClosedFired) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).setStream(this);
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).setStreamId(streamId);
writeFromStreamChannel(resetFrame, true);
}
super.doClose();
@ -491,12 +332,13 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
ReferenceCountUtil.release(msg);
throw new IllegalArgumentException("Message must be an Http2StreamFrame: " + msg);
}
Http2StreamFrame frame = (Http2StreamFrame) msg;
if (frame.stream() != null) {
if (frame.streamId() != -1) {
ReferenceCountUtil.release(frame);
throw new IllegalArgumentException("Stream must be null on the frame");
throw new IllegalArgumentException("Stream must not be set on the frame");
}
frame.setStream(this);
frame.setStreamId(streamId);
writeFromStreamChannel(msg, false);
}
@ -513,25 +355,12 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
@Override
protected void bytesConsumed(final int bytes) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
bytesConsumed0(bytes);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
bytesConsumed0(bytes);
}
});
}
}
private void bytesConsumed0(int bytes) {
try {
http2Handler.connection().local().flowController().consumeBytes(stream, bytes);
} catch (Throwable t) {
exceptionCaught(ctx, t);
}
ctx.write(new DefaultHttp2WindowUpdateFrame(bytes).setStreamId(streamId));
}
}
private boolean isLocalStream(int streamId) {
boolean even = (streamId & 1) == 0;
return streamId > 0 && server == even;
}
}

View File

@ -20,8 +20,6 @@ import io.netty.util.internal.UnstableApi;
/** HTTP/2 RST_STREAM frame. */
@UnstableApi
public interface Http2ResetFrame extends Http2StreamFrame {
@Override
Http2ResetFrame setStream(Object stream);
/**
* The reason for resetting the stream. Represented as an HTTP/2 error code.

View File

@ -69,10 +69,10 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
* Creates the codec using a default name for the connection handler when adding to the
* pipeline.
*
* @param multiplexCodec the HTTP/2 multiplexing handler.
* @param http2Codec the HTTP/2 multiplexing handler.
*/
public Http2ServerUpgradeCodec(Http2MultiplexCodec multiplexCodec) {
this(null, multiplexCodec);
public Http2ServerUpgradeCodec(Http2Codec http2Codec) {
this(null, http2Codec);
}
/**
@ -90,10 +90,10 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
* Creates the codec providing an upgrade to the given handler for HTTP/2.
*
* @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline.
* @param multiplexCodec the HTTP/2 multiplexing handler.
* @param http2Codec the HTTP/2 multiplexing handler.
*/
public Http2ServerUpgradeCodec(String handlerName, Http2MultiplexCodec multiplexCodec) {
this(handlerName, multiplexCodec.connectionHandler(), multiplexCodec);
public Http2ServerUpgradeCodec(String handlerName, Http2Codec http2Codec) {
this(handlerName, http2Codec.frameCodec().connectionHandler(), http2Codec);
}
Http2ServerUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler,

View File

@ -0,0 +1,25 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
@UnstableApi
public class Http2StreamActiveEvent extends AbstractHttp2StreamStateEvent {
public Http2StreamActiveEvent(int streamId) {
super(streamId);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
@UnstableApi
public class Http2StreamClosedEvent extends AbstractHttp2StreamStateEvent {
public Http2StreamClosedEvent(int streamId) {
super(streamId);
}
}

View File

@ -19,24 +19,26 @@ import io.netty.util.internal.UnstableApi;
/**
* A frame whose meaning <em>may</em> apply to a particular stream, instead of the entire
* connection. It is still possibly for this frame type to apply to the entire connection. In such
* cases, the {@code stream} reference should be {@code null} or an object referring to the
* connection.
*
* <p>The meaning of {@code stream} is context-dependent and may change as a frame is processed in
* the pipeline.
* connection. It is still possible for this frame type to apply to the entire connection. In such
* cases, the {@link #streamId()} must return {@code 0}. If the frame applies to a stream, the
* {@link #streamId()} must be greater than zero.
*/
@UnstableApi
public interface Http2StreamFrame extends Http2Frame {
/**
* Set the stream identifier for this message.
* Sets the identifier of the stream this frame applies to.
*
* @return {@code this}
*/
Http2StreamFrame setStream(Object stream);
Http2StreamFrame setStreamId(int streamId);
/**
* The stream this frame applies to.
* The identifier of the stream this frame applies to.
*
* @return {@code 0} if the frame applies to the entire connection, a value greater than {@code 0} if the frame
* applies to a particular stream, or a value less than {@code 0} if the frame has yet to be associated with
* the connection or a stream.
*/
Object stream();
int streamId();
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* An event describing a state change of a particular HTTP/2 stream. Such events
* are typically emitted by channel handlers to exchange stream state information.
*/
@UnstableApi
public interface Http2StreamStateEvent {
/**
* Returns the HTTP/2 stream identifier for this event.
*/
int streamId();
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* HTTP/2 WINDOW_UPDATE frame.
*/
@UnstableApi
public interface Http2WindowUpdateFrame extends Http2StreamFrame {
/**
* Number of bytes to increment the HTTP/2 stream's or connection's flow control window.
*/
int windowSizeIncrement();
}

View File

@ -0,0 +1,526 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.PlatformDependent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Queue;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
/**
* Unit tests for {@link Http2FrameCodec}.
*/
public class Http2FrameCodecTest {
// For verifying outbound frames
private Http2FrameWriter frameWriter;
private Http2FrameCodec framingCodec;
private EmbeddedChannel channel;
// For injecting inbound frames
private Http2FrameListener frameListener;
private ChannelHandlerContext http2HandlerCtx;
private LastInboundHandler inboundHandler;
private Http2Headers request = new DefaultHttp2Headers()
.method(HttpMethod.GET.asciiName()).scheme(HttpScheme.HTTPS.name())
.authority(new AsciiString("example.org")).path(new AsciiString("/foo"));
private Http2Headers response = new DefaultHttp2Headers()
.status(HttpResponseStatus.OK.codeAsText());
@Before
public void setUp() throws Exception {
frameWriter = spy(new VerifiableHttp2FrameWriter());
framingCodec = new Http2FrameCodec(true, frameWriter);
frameListener = ((DefaultHttp2ConnectionDecoder) framingCodec.connectionHandler().decoder())
.internalFrameListener();
inboundHandler = new LastInboundHandler();
channel = new EmbeddedChannel();
channel.connect(new InetSocketAddress(0));
channel.pipeline().addLast(framingCodec);
channel.pipeline().addLast(inboundHandler);
http2HandlerCtx = channel.pipeline().context(framingCodec.connectionHandler());
// Handshake
verify(frameWriter).writeSettings(eq(http2HandlerCtx),
anyHttp2Settings(), anyChannelPromise());
verifyNoMoreInteractions(frameWriter);
channel.writeInbound(Http2CodecUtil.connectionPrefaceBuf());
frameListener.onSettingsRead(http2HandlerCtx, new Http2Settings());
verify(frameWriter).writeSettingsAck(eq(http2HandlerCtx), anyChannelPromise());
frameListener.onSettingsAckRead(http2HandlerCtx);
}
@After
public void tearDown() throws Exception {
inboundHandler.finishAndReleaseAll();
channel.finishAndReleaseAll();
}
@Test
public void connectionHandlerShouldBeAddedBeforeFramingHandler() {
Iterator<Entry<String, ChannelHandler>> iter = channel.pipeline().iterator();
while (iter.hasNext()) {
ChannelHandler handler = iter.next().getValue();
if (handler instanceof Http2ConnectionHandler) {
break;
}
}
assertTrue(iter.hasNext());
assertThat(iter.next().getValue(), instanceOf(Http2FrameCodec.class));
}
@Test
public void headerRequestHeaderResponse() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(1);
assertNotNull(stream);
assertEquals(State.HALF_CLOSED_REMOTE, stream.state());
assertEquals(new DefaultHttp2HeadersFrame(request, true, 31).setStreamId(stream.id()),
inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).setStreamId(stream.id()));
verify(frameWriter).writeHeaders(
eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
eq(27), eq(true), anyChannelPromise());
verify(frameWriter, never()).writeRstStream(
any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise());
assertEquals(State.CLOSED, stream.state());
assertTrue(channel.isActive());
}
@Test
public void entityRequestEntityResponse() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 1, request, 0, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(1);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
assertEquals(new DefaultHttp2HeadersFrame(request, false).setStreamId(stream.id()),
inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
ByteBuf hello = bb("hello");
frameListener.onDataRead(http2HandlerCtx, 1, hello, 31, true);
// Release hello to emulate ByteToMessageDecoder
hello.release();
Http2DataFrame inboundData = inboundHandler.readInbound();
assertEquals(releaseLater(new DefaultHttp2DataFrame(bb("hello"), true, 31).setStreamId(stream.id())),
releaseLater(inboundData));
assertEquals(1, inboundData.refCnt());
assertNull(inboundHandler.readInbound());
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).setStreamId(stream.id()));
verify(frameWriter).writeHeaders(eq(http2HandlerCtx), eq(1), eq(response), anyInt(),
anyShort(), anyBoolean(), eq(0), eq(false), anyChannelPromise());
inboundHandler.writeOutbound(releaseLater(new DefaultHttp2DataFrame(bb("world"), true, 27)
.setStreamId(stream.id())));
ArgumentCaptor<ByteBuf> outboundData = ArgumentCaptor.forClass(ByteBuf.class);
verify(frameWriter).writeData(eq(http2HandlerCtx), eq(1), outboundData.capture(), eq(27),
eq(true), anyChannelPromise());
assertEquals(releaseLater(bb("world")), outboundData.getValue());
assertEquals(1, outboundData.getValue().refCnt());
verify(frameWriter, never()).writeRstStream(
any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise());
assertTrue(channel.isActive());
}
@Test
public void sendRstStream() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, true);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
assertEquals(State.HALF_CLOSED_REMOTE, stream.state());
inboundHandler.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).setStreamId(stream.id()));
verify(frameWriter).writeRstStream(
eq(http2HandlerCtx), eq(3), eq(314L), anyChannelPromise());
assertEquals(State.CLOSED, stream.state());
assertTrue(channel.isActive());
}
@Test
public void receiveRstStream() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessagesAndEvents();
assertNotNull(activeEvent);
assertEquals(stream.id(), activeEvent.streamId());
Http2HeadersFrame expectedHeaders = new DefaultHttp2HeadersFrame(request, false, 31).setStreamId(stream.id());
Http2HeadersFrame actualHeaders = inboundHandler.readInboundMessagesAndEvents();
assertEquals(expectedHeaders, actualHeaders);
frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.NO_ERROR.code());
Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).setStreamId(stream.id());
Http2ResetFrame actualRst = inboundHandler.readInboundMessagesAndEvents();
assertEquals(expectedRst, actualRst);
Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessagesAndEvents();
assertNotNull(closedEvent);
assertEquals(stream.id(), closedEvent.streamId());
assertNull(inboundHandler.readInboundMessagesAndEvents());
}
@Test
public void sendGoAway() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
ByteBuf debugData = bb("debug");
Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData.slice());
goAwayFrame.setExtraStreamIds(2);
inboundHandler.writeOutbound(releaseLater(goAwayFrame));
verify(frameWriter).writeGoAway(
eq(http2HandlerCtx), eq(7), eq(Http2Error.NO_ERROR.code()), eq(debugData), anyChannelPromise());
assertEquals(1, debugData.refCnt());
assertEquals(State.OPEN, stream.state());
assertTrue(channel.isActive());
}
@Test
public void receiveGoaway() throws Exception {
frameListener.onGoAwayRead(http2HandlerCtx, 2, Http2Error.NO_ERROR.code(), bb("foo"));
Http2GoAwayFrame expectedFrame = new DefaultHttp2GoAwayFrame(2, Http2Error.NO_ERROR.code(), bb("foo"));
Http2GoAwayFrame actualFrame = inboundHandler.readInbound();
assertEquals(releaseLater(expectedFrame), releaseLater(actualFrame));
assertNull(inboundHandler.readInbound());
}
@Test
public void unknownFrameTypeShouldThrowAndBeReleased() throws Exception {
class UnknownHttp2Frame extends AbstractReferenceCounted implements Http2Frame {
@Override
public String name() {
return "UNKNOWN";
}
@Override
protected void deallocate() {
}
@Override
public ReferenceCounted touch(Object hint) {
return this;
}
}
UnknownHttp2Frame frame = new UnknownHttp2Frame();
assertEquals(1, frame.refCnt());
ChannelFuture f = channel.write(frame);
f.await();
assertTrue(f.isDone());
assertFalse(f.isSuccess());
assertThat(f.cause(), instanceOf(UnsupportedMessageTypeException.class));
assertEquals(0, frame.refCnt());
}
@Test
public void incomingStreamActiveShouldFireUserEvent() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
Http2HeadersFrame frame = inboundHandler.readInbound();
assertNotNull(frame);
Http2StreamActiveEvent streamActiveEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), streamActiveEvent.streamId());
assertNull(inboundHandler.readInbound());
assertNull(inboundHandler.readUserEvent());
}
@Test
public void goAwayLastStreamIdOverflowed() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 5, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(5);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
ByteBuf debugData = bb("debug");
Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR.code(), debugData.slice());
goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
inboundHandler.writeOutbound(releaseLater(goAwayFrame));
// When the last stream id computation overflows, the last stream id should just be set to 2^31 - 1.
verify(frameWriter).writeGoAway(eq(http2HandlerCtx), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()),
eq(debugData), anyChannelPromise());
assertEquals(1, debugData.refCnt());
assertEquals(State.OPEN, stream.state());
assertTrue(channel.isActive());
}
@Test
public void outgoingStreamActiveShouldFireUserEvent() throws Exception {
Http2ConnectionEncoder encoder = framingCodec.connectionHandler().encoder();
encoder.writeHeaders(http2HandlerCtx, 2, request, 31, false, channel.newPromise());
Http2Stream stream = framingCodec.connectionHandler().connection().stream(2);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
Http2StreamActiveEvent streamActiveEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), streamActiveEvent.streamId());
assertNull(inboundHandler.readInbound());
assertNull(inboundHandler.readUserEvent());
}
@Test
public void streamClosedShouldFireUserEvent() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.INTERNAL_ERROR.code());
assertThat(inboundHandler.readInbound(), instanceOf(Http2HeadersFrame.class));
assertThat(inboundHandler.readInbound(), instanceOf(Http2ResetFrame.class));
assertEquals(State.CLOSED, stream.state());
Http2StreamActiveEvent activeEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), activeEvent.streamId());
Http2StreamClosedEvent closedEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), closedEvent.streamId());
assertNull(inboundHandler.readInbound());
assertNull(inboundHandler.readUserEvent());
}
@Test
public void streamErrorShouldFireUserEvent() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessagesAndEvents();
assertNotNull(activeEvent);
assertEquals(stream.id(), activeEvent.streamId());
StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo");
framingCodec.connectionHandler().onError(http2HandlerCtx, streamEx);
Http2HeadersFrame headersFrame = inboundHandler.readInboundMessagesAndEvents();
assertNotNull(headersFrame);
try {
inboundHandler.checkException();
fail("stream exception expected");
} catch (StreamException e) {
assertEquals(streamEx, e);
}
Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessagesAndEvents();
assertNotNull(closedEvent);
assertEquals(stream.id(), closedEvent.streamId());
assertNull(inboundHandler.readInboundMessagesAndEvents());
}
@Test
public void windowUpdateFrameDecrementsConsumedBytes() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Connection connection = framingCodec.connectionHandler().connection();
Http2Stream stream = connection.stream(3);
assertNotNull(stream);
ByteBuf data = Unpooled.buffer(100).writeZero(100);
frameListener.onDataRead(http2HandlerCtx, 3, releaseLater(data), 0, true);
int before = connection.local().flowController().unconsumedBytes(stream);
channel.writeOutbound(new DefaultHttp2WindowUpdateFrame(100).setStreamId(stream.id()));
int after = connection.local().flowController().unconsumedBytes(stream);
assertEquals(100, before - after);
}
private static ChannelPromise anyChannelPromise() {
return any(ChannelPromise.class);
}
private static Http2Settings anyHttp2Settings() {
return any(Http2Settings.class);
}
private static ByteBuf bb(String s) {
return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
}
static final class LastInboundHandler extends ChannelDuplexHandler {
private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
private final Queue<Object> userEvents = new ArrayDeque<Object>();
private final Queue<Object> inboundMessagesAndUserEvents = new ArrayDeque<Object>();
private Throwable lastException;
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundMessages.add(msg);
inboundMessagesAndUserEvents.add(msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
userEvents.add(evt);
inboundMessagesAndUserEvents.add(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (lastException != null) {
cause.printStackTrace();
} else {
lastException = cause;
}
}
public void checkException() throws Exception {
if (lastException == null) {
return;
}
Throwable t = lastException;
lastException = null;
PlatformDependent.throwException(t);
}
@SuppressWarnings("unchecked")
public <T> T readInbound() {
T message = (T) inboundMessages.poll();
if (message == inboundMessagesAndUserEvents.peek()) {
inboundMessagesAndUserEvents.poll();
}
return message;
}
@SuppressWarnings("unchecked")
public <T> T readUserEvent() {
T message = (T) userEvents.poll();
if (message == inboundMessagesAndUserEvents.peek()) {
inboundMessagesAndUserEvents.poll();
}
return message;
}
/**
* Useful to test order of events and messages.
*/
@SuppressWarnings("unchecked")
public <T> T readInboundMessagesAndEvents() {
T message = (T) inboundMessagesAndUserEvents.poll();
if (message == inboundMessages.peek()) {
inboundMessages.poll();
} else if (message == userEvents.peek()) {
userEvents.poll();
}
return message;
}
public void writeOutbound(Object... msgs) throws Exception {
for (Object msg : msgs) {
ctx.write(msg);
}
ctx.flush();
EmbeddedChannel ch = (EmbeddedChannel) ctx.channel();
ch.runPendingTasks();
ch.checkException();
checkException();
}
public void finishAndReleaseAll() throws Exception {
checkException();
Object o;
while ((o = readInboundMessagesAndEvents()) != null) {
ReferenceCountUtil.release(o);
}
}
}
public static class VerifiableHttp2FrameWriter extends DefaultHttp2FrameWriter {
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
// duplicate 'data' to prevent readerIndex from being changed, to ease verification
return super.writeData(ctx, streamId, data.duplicate(), padding, endStream, promise);
}
}
}

View File

@ -12,30 +12,22 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress;
@ -45,160 +37,197 @@ import java.util.Queue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import static io.netty.util.ReferenceCountUtil.release;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.junit.Assert.*;
/**
* Unit tests for {@link Http2MultiplexCodec}.
*/
public class Http2MultiplexCodecTest {
private final TestChannelInitializer streamInit = new TestChannelInitializer();
// For verifying outbound frames
private final Http2FrameWriter frameWriter = spy(new VerifiableHttp2FrameWriter());
private final Http2MultiplexCodec serverCodec = new Http2MultiplexCodec(true, streamInit, null, frameWriter);
private final EmbeddedChannel channel = new EmbeddedChannel();
// For injecting inbound frames
private final Http2FrameListener frameListener
= ((DefaultHttp2ConnectionDecoder) serverCodec.connectionHandler().decoder())
.internalFrameListener();
private ChannelHandlerContext http2HandlerCtx;
private Http2Headers request = new DefaultHttp2Headers()
private EmbeddedChannel parentChannel;
private TestChannelInitializer childChannelInitializer;
private static final Http2Headers request = new DefaultHttp2Headers()
.method(HttpMethod.GET.asciiName()).scheme(HttpScheme.HTTPS.name())
.authority(new AsciiString("example.org")).path(new AsciiString("/foo"));
private Http2Headers response = new DefaultHttp2Headers()
.status(HttpResponseStatus.OK.codeAsText());
private static final int streamId = 3;
@Before
public void setUp() throws Exception {
channel.connect(new InetSocketAddress(0));
channel.pipeline().addLast(serverCodec);
http2HandlerCtx = channel.pipeline().context(serverCodec.connectionHandler());
// Handshake
verify(frameWriter).writeSettings(eq(http2HandlerCtx),
anyHttp2Settings(), anyChannelPromise());
verifyNoMoreInteractions(frameWriter);
channel.writeInbound(Http2CodecUtil.connectionPrefaceBuf());
frameListener.onSettingsRead(http2HandlerCtx, new Http2Settings());
verify(frameWriter).writeSettingsAck(eq(http2HandlerCtx), anyChannelPromise());
frameListener.onSettingsAckRead(http2HandlerCtx);
public void setUp() {
childChannelInitializer = new TestChannelInitializer();
parentChannel = new EmbeddedChannel();
parentChannel.connect(new InetSocketAddress(0));
parentChannel.pipeline().addLast(new Http2MultiplexCodec(true, null, childChannelInitializer));
}
@After
public void tearDown() {
Object o;
while ((o = channel.readOutbound()) != null) {
ReferenceCountUtil.release(o);
public void tearDown() throws Exception {
if (childChannelInitializer.handler != null) {
((LastInboundHandler) childChannelInitializer.handler).finishAndReleaseAll();
}
parentChannel.finishAndReleaseAll();
}
// TODO(buchgr): Thread model of child channel
// TODO(buchgr): Flush from child channel
// TODO(buchgr): ChildChannel.childReadComplete()
// TODO(buchgr): GOAWAY Logic
// TODO(buchgr): Reset frame on close
// TODO(buchgr): Test ChannelConfig.setMaxMessagesPerRead
@Test
public void headerAndDataFramesShouldBeDelivered() {
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
Http2StreamActiveEvent streamActive = new Http2StreamActiveEvent(streamId);
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).setStreamId(streamId);
Http2DataFrame dataFrame1 = releaseLater(new DefaultHttp2DataFrame(bb("hello")).setStreamId(streamId));
Http2DataFrame dataFrame2 = releaseLater(new DefaultHttp2DataFrame(bb("world")).setStreamId(streamId));
assertFalse(inboundHandler.channelActive);
parentChannel.pipeline().fireUserEventTriggered(streamActive);
assertTrue(inboundHandler.channelActive);
// Make sure the stream active event is not delivered as a user event on the child channel.
assertNull(inboundHandler.readUserEvent());
parentChannel.pipeline().fireChannelRead(headersFrame);
parentChannel.pipeline().fireChannelRead(dataFrame1);
parentChannel.pipeline().fireChannelRead(dataFrame2);
assertEquals(headersFrame, inboundHandler.readInbound());
assertEquals(dataFrame1, inboundHandler.readInbound());
assertEquals(dataFrame2, inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
}
@Test
public void startStop() throws Exception {
assertTrue(channel.isActive());
channel.close();
verify(frameWriter).writeGoAway(
eq(http2HandlerCtx), eq(0), eq(0L), eq(Unpooled.EMPTY_BUFFER), anyChannelPromise());
assertTrue(!channel.isActive());
public void framesShouldBeMultiplexed() {
LastInboundHandler inboundHandler3 = streamActiveAndWriteHeaders(3);
LastInboundHandler inboundHandler11 = streamActiveAndWriteHeaders(11);
LastInboundHandler inboundHandler5 = streamActiveAndWriteHeaders(5);
verifyFramesMultiplexedToCorrectChannel(3, inboundHandler3, 1);
verifyFramesMultiplexedToCorrectChannel(5, inboundHandler5, 1);
verifyFramesMultiplexedToCorrectChannel(11, inboundHandler11, 1);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("hello"), false).setStreamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), true).setStreamId(3));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("world"), true).setStreamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).setStreamId(11));
verifyFramesMultiplexedToCorrectChannel(5, inboundHandler5, 2);
verifyFramesMultiplexedToCorrectChannel(3, inboundHandler3, 1);
verifyFramesMultiplexedToCorrectChannel(11, inboundHandler11, 1);
}
@Test
public void headerRequestHeaderResponse() throws Exception {
LastInboundHandler stream = new LastInboundHandler();
streamInit.handler = stream;
frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true);
assertNull(streamInit.handler);
assertEquals(new DefaultHttp2HeadersFrame(request, true, 31), stream.readInbound());
assertNull(stream.readInbound());
assertTrue(stream.channel().isActive());
public void inboundDataFrameShouldEmitWindowUpdateFrame() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
ByteBuf tenBytes = bb("0123456789");
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(tenBytes, true).setStreamId(streamId));
parentChannel.pipeline().flush();
stream.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27));
verify(frameWriter).writeHeaders(
eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
eq(27), eq(true), anyChannelPromise());
verify(frameWriter, never()).writeRstStream(
any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise());
assertFalse(stream.channel().isActive());
assertTrue(channel.isActive());
Http2WindowUpdateFrame windowUpdate = parentChannel.readOutbound();
assertNotNull(windowUpdate);
assertEquals(streamId, windowUpdate.streamId());
assertEquals(10, windowUpdate.windowSizeIncrement());
// headers and data frame
verifyFramesMultiplexedToCorrectChannel(streamId, inboundHandler, 2);
}
@Test
public void entityRequestEntityResponse() throws Exception {
LastInboundHandler stream = new LastInboundHandler();
streamInit.handler = stream;
frameListener.onHeadersRead(http2HandlerCtx, 1, request, 0, false);
assertEquals(new DefaultHttp2HeadersFrame(request, false), stream.readInbound());
assertNull(stream.readInbound());
assertTrue(stream.channel().isActive());
public void channelReadShouldRespectAutoRead() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
Channel childChannel = inboundHandler.channel();
assertTrue(childChannel.config().isAutoRead());
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
ByteBuf hello = bb("hello");
frameListener.onDataRead(http2HandlerCtx, 1, hello, 31, true);
// Release hello to emulate ByteToMessageDecoder
hello.release();
Http2DataFrame inboundData = stream.readInbound();
assertEquals(releaseLater(new DefaultHttp2DataFrame(bb("hello"), true, 31)), inboundData);
assertEquals(1, inboundData.refCnt());
assertNull(stream.readInbound());
assertTrue(stream.channel().isActive());
childChannel.config().setAutoRead(false);
parentChannel.pipeline().fireChannelRead(
new DefaultHttp2DataFrame(bb("hello world"), false).setStreamId(streamId));
parentChannel.pipeline().fireChannelReadComplete();
Http2DataFrame dataFrame0 = inboundHandler.readInbound();
assertNotNull(dataFrame0);
release(dataFrame0);
stream.writeOutbound(new DefaultHttp2HeadersFrame(response, false));
verify(frameWriter).writeHeaders(eq(http2HandlerCtx), eq(1), eq(response), anyInt(),
anyShort(), anyBoolean(), eq(0), eq(false), anyChannelPromise());
assertTrue(stream.channel().isActive());
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), false).setStreamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).setStreamId(streamId));
parentChannel.pipeline().fireChannelReadComplete();
stream.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27));
ArgumentCaptor<ByteBuf> outboundData = ArgumentCaptor.forClass(ByteBuf.class);
verify(frameWriter).writeData(eq(http2HandlerCtx), eq(1), outboundData.capture(), eq(27),
eq(true), anyChannelPromise());
assertEquals(releaseLater(bb("world")), outboundData.getValue());
assertEquals(1, outboundData.getValue().refCnt());
verify(frameWriter, never()).writeRstStream(
any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise());
assertFalse(stream.channel().isActive());
assertTrue(channel.isActive());
dataFrame0 = inboundHandler.readInbound();
assertNull(dataFrame0);
childChannel.config().setAutoRead(true);
verifyFramesMultiplexedToCorrectChannel(streamId, inboundHandler, 2);
}
@Test
public void closeCausesReset() throws Exception {
LastInboundHandler stream = new LastInboundHandler();
streamInit.handler = stream;
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, true);
public void streamClosedShouldFireChannelInactive() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
assertTrue(inboundHandler.channelActive);
stream.channel().close();
channel.runPendingTasks();
channel.checkException();
stream.checkException();
verify(frameWriter).writeRstStream(
eq(http2HandlerCtx), eq(3), eq(8L), anyChannelPromise());
assertFalse(stream.channel().isActive());
assertTrue(channel.isActive());
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamClosedEvent(streamId));
parentChannel.runPendingTasks();
parentChannel.checkException();
assertFalse(inboundHandler.channelActive);
}
@Test(expected = StreamException.class)
public void streamExceptionTriggersChildChannelExceptionCaught() throws Exception {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
StreamException e = new StreamException(streamId, Http2Error.PROTOCOL_ERROR, "baaam!");
parentChannel.pipeline().fireExceptionCaught(e);
inboundHandler.checkException();
}
@Test
public void sendRstStream() throws Exception {
LastInboundHandler stream = new LastInboundHandler();
streamInit.handler = stream;
frameListener.onHeadersRead(http2HandlerCtx, 5, request, 31, true);
public void streamExceptionClosesChildChannel() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
stream.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */));
verify(frameWriter).writeRstStream(
eq(http2HandlerCtx), eq(5), eq(314L), anyChannelPromise());
assertFalse(stream.channel().isActive());
assertTrue(channel.isActive());
assertTrue(inboundHandler.channelActive);
StreamException e = new StreamException(streamId, Http2Error.PROTOCOL_ERROR, "baaam!");
parentChannel.pipeline().fireExceptionCaught(e);
parentChannel.runPendingTasks();
parentChannel.checkException();
assertFalse(inboundHandler.channelActive);
}
private static ChannelPromise anyChannelPromise() {
return any(ChannelPromise.class);
private LastInboundHandler streamActiveAndWriteHeaders(int streamId) {
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
assertFalse(inboundHandler.channelActive);
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamActiveEvent(streamId));
assertTrue(inboundHandler.channelActive);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2HeadersFrame(request).setStreamId(streamId));
parentChannel.pipeline().fireChannelReadComplete();
return inboundHandler;
}
private static Http2Settings anyHttp2Settings() {
return any(Http2Settings.class);
}
private static ByteBuf bb(String s) {
ByteBuf buf = Unpooled.buffer(s.length() * 4);
ByteBufUtil.writeUtf8(buf, s);
return buf;
private static void verifyFramesMultiplexedToCorrectChannel(int streamId, LastInboundHandler inboundHandler,
int numFrames) {
for (int i = 0; i < numFrames; i++) {
Http2StreamFrame frame = inboundHandler.readInbound();
assertNotNull(frame);
assertEquals(streamId, frame.streamId());
release(frame);
}
assertNull(inboundHandler.readInbound());
}
@Sharable
static class TestChannelInitializer extends ChannelInitializer<Channel> {
ChannelHandler handler;
@ -211,11 +240,28 @@ public class Http2MultiplexCodecTest {
}
}
static class LastInboundHandler extends ChannelDuplexHandler {
private static ByteBuf bb(String s) {
return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
}
static final class LastInboundHandler extends ChannelDuplexHandler {
private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
private final Queue<Object> userEvents = new ArrayDeque<Object>();
private Throwable lastException;
private ChannelHandlerContext ctx;
private boolean channelActive;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channelActive = true;
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channelActive = false;
super.channelInactive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
@ -260,28 +306,19 @@ public class Http2MultiplexCodecTest {
return (T) userEvents.poll();
}
public void writeOutbound(Object... msgs) throws Exception {
for (Object msg : msgs) {
ctx.write(msg);
}
ctx.flush();
EmbeddedChannel parent = (EmbeddedChannel) ctx.channel().parent();
parent.runPendingTasks();
parent.checkException();
checkException();
}
public Channel channel() {
return ctx.channel();
}
}
public static class VerifiableHttp2FrameWriter extends DefaultHttp2FrameWriter {
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
// duplicate 'data' to prevent readerIndex from being changed, to ease verification
return super.writeData(ctx, streamId, data.duplicate(), padding, endStream, promise);
public void finishAndReleaseAll() throws Exception {
checkException();
Object o;
while ((o = readInbound()) != null) {
release(o);
}
while ((o = readUserEvent()) != null) {
release(o);
}
}
}
}

View File

@ -18,7 +18,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.example.http2.helloworld.server.HelloWorldHttp1Handler;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http2.Http2MultiplexCodec;
import io.netty.handler.codec.http2.Http2Codec;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
@ -37,7 +37,7 @@ public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(new Http2MultiplexCodec(true, new HelloWorldHttp2Handler()));
ctx.pipeline().addLast(new Http2Codec(true, new HelloWorldHttp2Handler()));
return;
}

View File

@ -29,8 +29,8 @@ import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory;
import io.netty.handler.codec.http2.Http2Codec;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2MultiplexCodec;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.ssl.SslContext;
import io.netty.util.AsciiString;
@ -45,8 +45,7 @@ public class Http2ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(new Http2MultiplexCodec(true,
new HelloWorldHttp2Handler()));
return new Http2ServerUpgradeCodec(new Http2Codec(true, new HelloWorldHttp2Handler()));
} else {
return null;
}