diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java
index 55505afa0e..449989674a 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java
@@ -28,6 +28,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.ThrowableUtil;
import java.net.SocketAddress;
@@ -67,10 +68,12 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
}
};
+ // Volatile, as parent and child channel may be on different eventloops.
+ private volatile int streamId = -1;
private boolean closed;
private boolean readInProgress;
- public AbstractHttp2StreamChannel(Channel parent) {
+ protected AbstractHttp2StreamChannel(Channel parent) {
super(parent);
}
@@ -91,7 +94,7 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
@Override
public boolean isActive() {
- return !closed;
+ return isOpen();
}
@Override
@@ -280,6 +283,20 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
}
}
+ /**
+ * This method must only be called within the parent channel's eventloop.
+ */
+ protected void streamId(int streamId) {
+ if (this.streamId != -1) {
+ throw new IllegalStateException("Stream identifier may only be set once.");
+ }
+ this.streamId = ObjectUtil.checkPositiveOrZero(streamId, "streamId");
+ }
+
+ protected int streamId() {
+ return streamId;
+ }
+
/**
* Returns whether reads should continue. The only reason reads shouldn't continue is that the
* channel was just closed.
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamFrame.java
index 68724b631b..9aa255bd35 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamFrame.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamFrame.java
@@ -24,10 +24,11 @@ import io.netty.util.internal.UnstableApi;
@UnstableApi
public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame {
- private int streamId = -1;
+ // Volatile as parent and child channel may be on different eventloops.
+ private volatile int streamId = -1;
@Override
- public AbstractHttp2StreamFrame setStreamId(int streamId) {
+ public AbstractHttp2StreamFrame streamId(int streamId) {
if (this.streamId != -1) {
throw new IllegalStateException("Stream identifier may only be set once.");
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2DataFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2DataFrame.java
index 7ca5e40160..4ec0986d3e 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2DataFrame.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2DataFrame.java
@@ -76,8 +76,8 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
}
@Override
- public DefaultHttp2DataFrame setStreamId(int streamId) {
- super.setStreamId(streamId);
+ public DefaultHttp2DataFrame streamId(int streamId) {
+ super.streamId(streamId);
return this;
}
@@ -154,7 +154,7 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
@Override
public String toString() {
return "DefaultHttp2DataFrame(streamId=" + streamId() + ", content=" + content
- + ", endStream=" + endStream + ", padding=" + padding + ")";
+ + ", endStream=" + endStream + ", padding=" + padding + ")";
}
@Override
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersFrame.java
index 76f0e57dd9..c6a00cedba 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersFrame.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersFrame.java
@@ -63,8 +63,8 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
}
@Override
- public DefaultHttp2HeadersFrame setStreamId(int streamId) {
- super.setStreamId(streamId);
+ public DefaultHttp2HeadersFrame streamId(int streamId) {
+ super.streamId(streamId);
return this;
}
@@ -91,7 +91,7 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
@Override
public String toString() {
return "DefaultHttp2HeadersFrame(streamId=" + streamId() + ", headers=" + headers
- + ", endStream=" + endStream + ", padding=" + padding + ")";
+ + ", endStream=" + endStream + ", padding=" + padding + ")";
}
@Override
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ResetFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ResetFrame.java
index 27e77e999b..5615e97ebf 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ResetFrame.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ResetFrame.java
@@ -45,8 +45,8 @@ public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame imple
}
@Override
- public DefaultHttp2ResetFrame setStreamId(int streamId) {
- super.setStreamId(streamId);
+ public DefaultHttp2ResetFrame streamId(int streamId) {
+ super.streamId(streamId);
return this;
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2WindowUpdateFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2WindowUpdateFrame.java
index 9edae85dad..f929e4a7e9 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2WindowUpdateFrame.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2WindowUpdateFrame.java
@@ -32,8 +32,8 @@ public class DefaultHttp2WindowUpdateFrame extends AbstractHttp2StreamFrame impl
}
@Override
- public DefaultHttp2WindowUpdateFrame setStreamId(int streamId) {
- super.setStreamId(streamId);
+ public DefaultHttp2WindowUpdateFrame streamId(int streamId) {
+ super.streamId(streamId);
return this;
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Codec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Codec.java
index bec7984772..a61428d96d 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Codec.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Codec.java
@@ -18,7 +18,6 @@ package io.netty.handler.codec.http2;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
import io.netty.util.internal.UnstableApi;
import static io.netty.handler.logging.LogLevel.INFO;
@@ -39,31 +38,27 @@ public final class Http2Codec extends ChannelDuplexHandler {
*
* @param server {@code true} this is a server
* @param streamHandler the handler added to channels for remotely-created streams. It must be
- * {@link ChannelHandler.Sharable}.
+ * {@link ChannelHandler.Sharable}. {@code null} if the event loop from the parent channel should be used.
*/
public Http2Codec(boolean server, ChannelHandler streamHandler) {
- this(server, streamHandler, null, HTTP2_FRAME_LOGGER);
+ this(server, new Http2StreamChannelBootstrap().handler(streamHandler), HTTP2_FRAME_LOGGER);
}
/**
* Construct a new handler whose child channels run in a different event loop.
*
* @param server {@code true} this is a server
- * @param streamHandler the handler added to channels for remotely-created streams. It must be
- * {@link ChannelHandler.Sharable}.
- * @param streamGroup event loop for registering child channels
+ * @param bootstrap bootstrap used to instantiate child channels for remotely-created streams.
*/
- public Http2Codec(boolean server, ChannelHandler streamHandler,
- EventLoopGroup streamGroup, Http2FrameLogger frameLogger) {
- this(server, streamHandler, streamGroup, new DefaultHttp2FrameWriter(), frameLogger);
+ public Http2Codec(boolean server, Http2StreamChannelBootstrap bootstrap, Http2FrameLogger frameLogger) {
+ this(server, bootstrap, new DefaultHttp2FrameWriter(), frameLogger);
}
// Visible for testing
- Http2Codec(boolean server, ChannelHandler streamHandler,
- EventLoopGroup streamGroup, Http2FrameWriter frameWriter,
+ Http2Codec(boolean server, Http2StreamChannelBootstrap bootstrap, Http2FrameWriter frameWriter,
Http2FrameLogger frameLogger) {
frameCodec = new Http2FrameCodec(server, frameWriter, frameLogger);
- multiplexCodec = new Http2MultiplexCodec(server, streamGroup, streamHandler);
+ multiplexCodec = new Http2MultiplexCodec(server, bootstrap);
}
Http2FrameCodec frameCodec() {
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java
index 5a4c91ee89..0c3776204a 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java
@@ -106,6 +106,24 @@ public final class Http2CodecUtil {
public static final int DEFAULT_MAX_HEADER_SIZE = 8192;
public static final int DEFAULT_MAX_FRAME_SIZE = MAX_FRAME_SIZE_LOWER_BOUND;
+ /**
+ * Returns {@code true} if the stream is an outbound stream.
+ *
+ * @param server {@code true} if the endpoint is a server, {@code false} otherwise.
+ * @param streamId the stream identifier
+ */
+ public static boolean isOutboundStream(boolean server, int streamId) {
+ boolean even = (streamId & 1) == 0;
+ return streamId > 0 && server == even;
+ }
+
+ /**
+ * Returns true if the {@code streamId} is a valid HTTP/2 stream identifier.
+ */
+ public static boolean isStreamIdValid(int streamId) {
+ return streamId >= 0;
+ }
+
/**
* The assumed minimum value for {@code SETTINGS_MAX_CONCURRENT_STREAMS} as
* recommended by the HTTP/2 spec.
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java
index 6e20e04c45..2a941866f6 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java
@@ -21,14 +21,17 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
+import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.UnstableApi;
+import static io.netty.handler.codec.http2.Http2CodecUtil.isOutboundStream;
+import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.logging.LogLevel.INFO;
/**
* An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
- * frame a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outgoing {@link Http2Frame}
+ * frame a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
* objects received via {@link #write} are converted to the HTTP/2 wire format.
*
*
A change in stream state is propagated through the channel pipeline as a user event via
@@ -40,6 +43,63 @@ import static io.netty.handler.logging.LogLevel.INFO;
*
*
This API is very immature. The Http2Connection-based API is currently preferred over
* this API. This API is targeted to eventually replace or reduce the need for the Http2Connection-based API.
+ *
+ *
Opening and Closing Streams
+ *
+ *
When the remote side opens a new stream, the frame codec first emits a {@link Http2StreamActiveEvent} with the
+ * stream identifier set.
+ *
When a stream is closed either due to a reset frame by the remote side, or due to both sides having sent frames
+ * with the END_STREAM flag, then the frame codec emits a {@link Http2StreamClosedEvent}.
+ *
When the local side wants to close a stream, it has to write a {@link Http2ResetFrame} to which the frame codec
+ * will respond to with a {@link Http2StreamClosedEvent}.
+ *
Opening an outbound/local stream works by first sending the frame codec a {@link Http2HeadersFrame} with no
+ * stream identifier set (such that {@link Http2CodecUtil#isStreamIdValid} returns {@code false}). If opening the stream
+ * was successful, the frame codec responds with a {@link Http2StreamActiveEvent} that contains the stream's new
+ * identifier as well as the same {@link Http2HeadersFrame} object that opened the stream.
+ *
*/
@UnstableApi
public class Http2FrameCodec extends ChannelDuplexHandler {
@@ -47,6 +107,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
private static final Http2FrameLogger HTTP2_FRAME_LOGGER = new Http2FrameLogger(INFO, Http2FrameCodec.class);
private final Http2ConnectionHandler http2Handler;
+ private final boolean server;
private ChannelHandlerContext ctx;
private ChannelHandlerContext http2HandlerCtx;
@@ -80,6 +141,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
decoder.frameListener(new FrameListener());
http2Handler = new InternalHttp2ConnectionHandler(decoder, encoder, new Http2Settings());
http2Handler.connection().addListener(new ConnectionListener());
+ this.server = server;
}
Http2ConnectionHandler connectionHandler() {
@@ -144,10 +206,6 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- if (!(msg instanceof Http2Frame)) {
- ctx.write(msg, promise);
- return;
- }
try {
if (msg instanceof Http2WindowUpdateFrame) {
Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
@@ -191,24 +249,39 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
private void writeStreamFrame(Http2StreamFrame frame, ChannelPromise promise) {
- int streamId = frame.streamId();
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
- http2Handler.encoder().writeData(http2HandlerCtx, streamId, dataFrame.content().retain(),
+ http2Handler.encoder().writeData(http2HandlerCtx, frame.streamId(), dataFrame.content().retain(),
dataFrame.padding(), dataFrame.isEndStream(), promise);
} else if (frame instanceof Http2HeadersFrame) {
- Http2HeadersFrame headerFrame = (Http2HeadersFrame) frame;
- http2Handler.encoder().writeHeaders(
- http2HandlerCtx, streamId, headerFrame.headers(), headerFrame.padding(), headerFrame.isEndStream(),
- promise);
+ writeHeadersFrame((Http2HeadersFrame) frame, promise);
} else if (frame instanceof Http2ResetFrame) {
Http2ResetFrame rstFrame = (Http2ResetFrame) frame;
- http2Handler.resetStream(http2HandlerCtx, streamId, rstFrame.errorCode(), promise);
+ http2Handler.resetStream(http2HandlerCtx, frame.streamId(), rstFrame.errorCode(), promise);
} else {
throw new UnsupportedMessageTypeException(frame);
}
}
+ private void writeHeadersFrame(Http2HeadersFrame headersFrame, ChannelPromise promise) {
+ int streamId = headersFrame.streamId();
+ if (!isStreamIdValid(streamId)) {
+ final Endpoint localEndpoint = http2Handler.connection().local();
+ streamId = localEndpoint.incrementAndGetNextStreamId();
+ try {
+ // Try to create a stream in OPEN state before writing headers, to catch errors on stream creation
+ // early on i.e. max concurrent streams limit reached, stream id exhaustion, etc.
+ localEndpoint.createStream(streamId, false);
+ } catch (Http2Exception e) {
+ promise.setFailure(e);
+ return;
+ }
+ ctx.fireUserEventTriggered(new Http2StreamActiveEvent(streamId, headersFrame));
+ }
+ http2Handler.encoder().writeHeaders(http2HandlerCtx, streamId, headersFrame.headers(),
+ headersFrame.padding(), headersFrame.isEndStream(), promise);
+ }
+
private final class ConnectionListener extends Http2ConnectionAdapter {
@Override
public void onStreamActive(Http2Stream stream) {
@@ -216,6 +289,10 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
// UPGRADE stream is active before handlerAdded().
return;
}
+ if (isOutboundStream(server, stream.id())) {
+ // Creation of outbound streams is notified in writeHeadersFrame().
+ return;
+ }
ctx.fireUserEventTriggered(new Http2StreamActiveEvent(stream.id()));
}
@@ -251,11 +328,11 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
}
- private final class FrameListener extends Http2FrameAdapter {
+ private static final class FrameListener extends Http2FrameAdapter {
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
Http2ResetFrame rstFrame = new DefaultHttp2ResetFrame(errorCode);
- rstFrame.setStreamId(streamId);
+ rstFrame.streamId(streamId);
ctx.fireChannelRead(rstFrame);
}
@@ -270,7 +347,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) {
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(headers, endOfStream, padding);
- headersFrame.setStreamId(streamId);
+ headersFrame.streamId(streamId);
ctx.fireChannelRead(headersFrame);
}
@@ -278,7 +355,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) {
Http2DataFrame dataFrame = new DefaultHttp2DataFrame(data.retain(), endOfStream, padding);
- dataFrame.setStreamId(streamId);
+ dataFrame.streamId(streamId);
ctx.fireChannelRead(dataFrame);
// We return the bytes in bytesConsumed() once the stream channel consumed the bytes.
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java
index bb970c499c..8b3ec622d8 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java
@@ -20,13 +20,16 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
@@ -38,18 +41,22 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CLOSE_MESSAGE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.isOutboundStream;
+import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static java.lang.String.format;
/**
- * An HTTP/2 handler that creates child channels for each stream. Creating outgoing streams is not
- * yet supported.
+ * An HTTP/2 handler that creates child channels for each stream.
*
*
When a new stream is created, a new {@link Channel} is created for it. Applications send and
* receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
- * all writes that reach the head of the pipeline must be an instance of {@link Http2Frame}. Writes that reach the
- * head of the pipeline are processed directly by this handler and cannot be intercepted.
+ * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
+ * the head of the pipeline are processed directly by this handler and cannot be intercepted.
*
*
The child channel will be notified of user events that impact the stream, such as {@link
* Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
@@ -59,6 +66,8 @@ import static java.lang.String.format;
* free to close the channel in response to such events if they don't have use for any queued
* messages.
*
+ *
Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
+ *
*
{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
*/
@UnstableApi
@@ -66,8 +75,8 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2MultiplexCodec.class);
- private final ChannelHandler streamHandler;
- private final EventLoopGroup streamGroup;
+ private final Http2StreamChannelBootstrap bootstrap;
+
private final List channelsToFireChildReadComplete = new ArrayList();
private final boolean server;
private ChannelHandlerContext ctx;
@@ -79,25 +88,20 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
* Construct a new handler whose child channels run in a different event loop.
*
* @param server {@code true} this is a server
- * @param streamHandler the handler added to channels for remotely-created streams. It must be
- * {@link ChannelHandler.Sharable}.
- * @param streamGroup event loop for registering child channels
+ * @param bootstrap bootstrap used to instantiate child channels for remotely-created streams.
*/
- public Http2MultiplexCodec(boolean server,
- EventLoopGroup streamGroup,
- ChannelHandler streamHandler) {
- if (!streamHandler.getClass().isAnnotationPresent(Sharable.class)) {
- throw new IllegalArgumentException("streamHandler must be Sharable");
+ public Http2MultiplexCodec(boolean server, Http2StreamChannelBootstrap bootstrap) {
+ if (bootstrap.parentChannel() != null) {
+ throw new IllegalStateException("The parent channel must not be set on the bootstrap.");
}
-
this.server = server;
- this.streamHandler = streamHandler;
- this.streamGroup = streamGroup;
+ this.bootstrap = new Http2StreamChannelBootstrap(bootstrap);
}
@Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
+ bootstrap.parentChannel(ctx.channel());
}
@Override
@@ -113,7 +117,8 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
if (childChannel != null) {
childChannel.pipeline().fireExceptionCaught(streamEx);
} else {
- logger.warn(format("Exception caught for unknown HTTP/2 stream '%d'", streamEx.streamId()), streamEx);
+ logger.warn(format("Exception caught for unknown HTTP/2 stream '%d'", streamEx.streamId()),
+ streamEx);
}
} finally {
onStreamClosed(streamEx.streamId());
@@ -148,7 +153,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
for (PrimitiveEntry entry : childChannels.entries()) {
Http2StreamChannel childChannel = entry.value();
int streamId = entry.key();
- if (streamId > goAwayFrame.lastStreamId() && isLocalStream(streamId)) {
+ if (streamId > goAwayFrame.lastStreamId() && isOutboundStream(server, streamId)) {
childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
}
}
@@ -172,30 +177,31 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (!(evt instanceof Http2StreamStateEvent)) {
+ if (evt instanceof Http2StreamActiveEvent) {
+ Http2StreamActiveEvent activeEvent = (Http2StreamActiveEvent) evt;
+ onStreamActive(activeEvent.streamId(), activeEvent.headers());
+ } else if (evt instanceof Http2StreamClosedEvent) {
+ onStreamClosed(((Http2StreamClosedEvent) evt).streamId());
+ } else {
ctx.fireUserEventTriggered(evt);
- return;
- }
-
- try {
- int streamId = ((Http2StreamStateEvent) evt).streamId();
- if (evt instanceof Http2StreamActiveEvent) {
- onStreamActive(streamId);
- } else if (evt instanceof Http2StreamClosedEvent) {
- onStreamClosed(streamId);
- } else {
- throw new UnsupportedMessageTypeException(evt);
- }
- } finally {
- ReferenceCountUtil.release(evt);
}
}
- private void onStreamActive(int streamId) {
- ChannelFuture future = createStreamChannel(ctx, streamId, streamHandler);
- Http2StreamChannel childChannel = (Http2StreamChannel) future.channel();
- Http2StreamChannel oldChannel = childChannels.put(streamId, childChannel);
- assert oldChannel == null;
+ private void onStreamActive(int streamId, Http2HeadersFrame headersFrame) {
+ final Http2StreamChannel childChannel;
+ if (isOutboundStream(server, streamId)) {
+ if (!(headersFrame instanceof ChannelCarryingHeadersFrame)) {
+ throw new IllegalArgumentException("needs to be wrapped");
+ }
+ childChannel = ((ChannelCarryingHeadersFrame) headersFrame).channel();
+ childChannel.streamId(streamId);
+ } else {
+ ChannelFuture future = bootstrap.connect(streamId);
+ childChannel = (Http2StreamChannel) future.channel();
+ }
+
+ Http2StreamChannel existing = childChannels.put(streamId, childChannel);
+ assert existing == null;
}
private void onStreamClosed(int streamId) {
@@ -219,7 +225,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
assert childChannel.eventLoop().inEventLoop();
childChannel.onStreamClosedFired = true;
- childChannel.fireChildRead(AbstractHttp2StreamChannel.CLOSE_MESSAGE);
+ childChannel.fireChildRead(CLOSE_MESSAGE);
}
void flushFromStreamChannel() {
@@ -240,8 +246,11 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
- void writeFromStreamChannel(final Object msg, final boolean flush) {
- final ChannelPromise promise = ctx.newPromise();
+ void writeFromStreamChannel(Object msg, boolean flush) {
+ writeFromStreamChannel(msg, ctx.newPromise(), flush);
+ }
+
+ void writeFromStreamChannel(final Object msg, final ChannelPromise promise, final boolean flush) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
writeFromStreamChannel0(msg, flush, promise);
@@ -270,25 +279,6 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
- private ChannelFuture createStreamChannel(ChannelHandlerContext ctx, int streamId,
- ChannelHandler handler) {
- EventLoopGroup group = streamGroup != null ? streamGroup : ctx.channel().eventLoop();
- Http2StreamChannel channel = new Http2StreamChannel(streamId);
- channel.pipeline().addLast(handler);
- ChannelFuture future = group.register(channel);
- // Handle any errors that occurred on the local thread while registering. Even though
- // failures can happen after this point, they will be handled by the channel by closing the
- // channel.
- if (future.cause() != null) {
- if (channel.isRegistered()) {
- channel.close();
- } else {
- channel.unsafe().closeForcibly();
- }
- }
- return future;
- }
-
/**
* Notifies any child streams of the read completion.
*/
@@ -303,8 +293,60 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
channelsToFireChildReadComplete.clear();
}
- final class Http2StreamChannel extends AbstractHttp2StreamChannel {
- private final int streamId;
+ ChannelFuture createStreamChannel(Channel parentChannel, EventLoopGroup group, ChannelHandler handler,
+ Map, Object> options,
+ Map, Object> attrs,
+ int streamId) {
+ final Http2StreamChannel channel = new Http2StreamChannel(parentChannel);
+ if (isStreamIdValid(streamId)) {
+ assert !isOutboundStream(server, streamId);
+ assert ctx.channel().eventLoop().inEventLoop();
+ channel.streamId(streamId);
+ }
+ channel.pipeline().addLast(handler);
+
+ initOpts(channel, options);
+ initAttrs(channel, attrs);
+
+ ChannelFuture future = group.register(channel);
+ // Handle any errors that occurred on the local thread while registering. Even though
+ // failures can happen after this point, they will be handled by the channel by closing the
+ // channel.
+ if (future.cause() != null) {
+ if (channel.isRegistered()) {
+ channel.close();
+ } else {
+ channel.unsafe().closeForcibly();
+ }
+ }
+ return future;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void initOpts(Channel channel, Map, Object> opts) {
+ if (opts != null) {
+ for (Entry, Object> e: opts.entrySet()) {
+ try {
+ if (!channel.config().setOption((ChannelOption