h2childchan: Ability to open outbound/local streams. Fixes #4913

Motivation:

The HTTP/2 child channel API does not allow to create local/outbound HTTP/2 streams.

Modifications:

Add a Http2StreamChannelBootstrap that allows to create outbound streams.

Result:

The HTTP/2 child channel API now supports outbound streams.
This commit is contained in:
buchgr 2016-08-16 15:22:39 +02:00 committed by Scott Mitchell
parent d1d954da35
commit f010033590
18 changed files with 1141 additions and 373 deletions

View File

@ -28,6 +28,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.ThrowableUtil;
import java.net.SocketAddress;
@ -67,10 +68,12 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
}
};
// Volatile, as parent and child channel may be on different eventloops.
private volatile int streamId = -1;
private boolean closed;
private boolean readInProgress;
public AbstractHttp2StreamChannel(Channel parent) {
protected AbstractHttp2StreamChannel(Channel parent) {
super(parent);
}
@ -91,7 +94,7 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
@Override
public boolean isActive() {
return !closed;
return isOpen();
}
@Override
@ -280,6 +283,20 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
}
}
/**
* This method must only be called within the parent channel's eventloop.
*/
protected void streamId(int streamId) {
if (this.streamId != -1) {
throw new IllegalStateException("Stream identifier may only be set once.");
}
this.streamId = ObjectUtil.checkPositiveOrZero(streamId, "streamId");
}
protected int streamId() {
return streamId;
}
/**
* Returns whether reads should continue. The only reason reads shouldn't continue is that the
* channel was just closed.

View File

@ -24,10 +24,11 @@ import io.netty.util.internal.UnstableApi;
@UnstableApi
public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame {
private int streamId = -1;
// Volatile as parent and child channel may be on different eventloops.
private volatile int streamId = -1;
@Override
public AbstractHttp2StreamFrame setStreamId(int streamId) {
public AbstractHttp2StreamFrame streamId(int streamId) {
if (this.streamId != -1) {
throw new IllegalStateException("Stream identifier may only be set once.");
}

View File

@ -76,8 +76,8 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
}
@Override
public DefaultHttp2DataFrame setStreamId(int streamId) {
super.setStreamId(streamId);
public DefaultHttp2DataFrame streamId(int streamId) {
super.streamId(streamId);
return this;
}

View File

@ -63,8 +63,8 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
}
@Override
public DefaultHttp2HeadersFrame setStreamId(int streamId) {
super.setStreamId(streamId);
public DefaultHttp2HeadersFrame streamId(int streamId) {
super.streamId(streamId);
return this;
}

View File

@ -45,8 +45,8 @@ public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame imple
}
@Override
public DefaultHttp2ResetFrame setStreamId(int streamId) {
super.setStreamId(streamId);
public DefaultHttp2ResetFrame streamId(int streamId) {
super.streamId(streamId);
return this;
}

View File

@ -32,8 +32,8 @@ public class DefaultHttp2WindowUpdateFrame extends AbstractHttp2StreamFrame impl
}
@Override
public DefaultHttp2WindowUpdateFrame setStreamId(int streamId) {
super.setStreamId(streamId);
public DefaultHttp2WindowUpdateFrame streamId(int streamId) {
super.streamId(streamId);
return this;
}

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.http2;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.util.internal.UnstableApi;
import static io.netty.handler.logging.LogLevel.INFO;
@ -39,31 +38,27 @@ public final class Http2Codec extends ChannelDuplexHandler {
*
* @param server {@code true} this is a server
* @param streamHandler the handler added to channels for remotely-created streams. It must be
* {@link ChannelHandler.Sharable}.
* {@link ChannelHandler.Sharable}. {@code null} if the event loop from the parent channel should be used.
*/
public Http2Codec(boolean server, ChannelHandler streamHandler) {
this(server, streamHandler, null, HTTP2_FRAME_LOGGER);
this(server, new Http2StreamChannelBootstrap().handler(streamHandler), HTTP2_FRAME_LOGGER);
}
/**
* Construct a new handler whose child channels run in a different event loop.
*
* @param server {@code true} this is a server
* @param streamHandler the handler added to channels for remotely-created streams. It must be
* {@link ChannelHandler.Sharable}.
* @param streamGroup event loop for registering child channels
* @param bootstrap bootstrap used to instantiate child channels for remotely-created streams.
*/
public Http2Codec(boolean server, ChannelHandler streamHandler,
EventLoopGroup streamGroup, Http2FrameLogger frameLogger) {
this(server, streamHandler, streamGroup, new DefaultHttp2FrameWriter(), frameLogger);
public Http2Codec(boolean server, Http2StreamChannelBootstrap bootstrap, Http2FrameLogger frameLogger) {
this(server, bootstrap, new DefaultHttp2FrameWriter(), frameLogger);
}
// Visible for testing
Http2Codec(boolean server, ChannelHandler streamHandler,
EventLoopGroup streamGroup, Http2FrameWriter frameWriter,
Http2Codec(boolean server, Http2StreamChannelBootstrap bootstrap, Http2FrameWriter frameWriter,
Http2FrameLogger frameLogger) {
frameCodec = new Http2FrameCodec(server, frameWriter, frameLogger);
multiplexCodec = new Http2MultiplexCodec(server, streamGroup, streamHandler);
multiplexCodec = new Http2MultiplexCodec(server, bootstrap);
}
Http2FrameCodec frameCodec() {

View File

@ -106,6 +106,24 @@ public final class Http2CodecUtil {
public static final int DEFAULT_MAX_HEADER_SIZE = 8192;
public static final int DEFAULT_MAX_FRAME_SIZE = MAX_FRAME_SIZE_LOWER_BOUND;
/**
* Returns {@code true} if the stream is an outbound stream.
*
* @param server {@code true} if the endpoint is a server, {@code false} otherwise.
* @param streamId the stream identifier
*/
public static boolean isOutboundStream(boolean server, int streamId) {
boolean even = (streamId & 1) == 0;
return streamId > 0 && server == even;
}
/**
* Returns true if the {@code streamId} is a valid HTTP/2 stream identifier.
*/
public static boolean isStreamIdValid(int streamId) {
return streamId >= 0;
}
/**
* The assumed minimum value for {@code SETTINGS_MAX_CONCURRENT_STREAMS} as
* recommended by the HTTP/2 spec.

View File

@ -21,14 +21,17 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.UnstableApi;
import static io.netty.handler.codec.http2.Http2CodecUtil.isOutboundStream;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.logging.LogLevel.INFO;
/**
* An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
* frame a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outgoing {@link Http2Frame}
* frame a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
* objects received via {@link #write} are converted to the HTTP/2 wire format.
*
* <p>A change in stream state is propagated through the channel pipeline as a user event via
@ -40,6 +43,63 @@ import static io.netty.handler.logging.LogLevel.INFO;
*
* <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over
* this API. This API is targeted to eventually replace or reduce the need for the Http2Connection-based API.
*
* <h3>Opening and Closing Streams</h3>
*
* <p>When the remote side opens a new stream, the frame codec first emits a {@link Http2StreamActiveEvent} with the
* stream identifier set.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2StreamActiveEvent(streamId=3, headers=null) |
* +------------------------------------------------------------->
* | |
* | Http2HeadersFrame(streamId=3) |
* +------------------------------------------------------------->
* | |
* + +
* </pre>
*
* <p>When a stream is closed either due to a reset frame by the remote side, or due to both sides having sent frames
* with the END_STREAM flag, then the frame codec emits a {@link Http2StreamClosedEvent}.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2StreamClosedEvent(streamId=3) |
* +--------------------------------------------------------->
* | |
* + +
* </pre>
*
* <p>When the local side wants to close a stream, it has to write a {@link Http2ResetFrame} to which the frame codec
* will respond to with a {@link Http2StreamClosedEvent}.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2ResetFrame(streamId=3) |
* <---------------------------------------------------------+
* | |
* | Http2StreamClosedEvent(streamId=3) |
* +--------------------------------------------------------->
* | |
* + +
* </pre>
*
* <p>Opening an outbound/local stream works by first sending the frame codec a {@link Http2HeadersFrame} with no
* stream identifier set (such that {@link Http2CodecUtil#isStreamIdValid} returns {@code false}). If opening the stream
* was successful, the frame codec responds with a {@link Http2StreamActiveEvent} that contains the stream's new
* identifier as well as the <em>same</em> {@link Http2HeadersFrame} object that opened the stream.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2HeadersFrame(streamId=-1) |
* <-----------------------------------------------------------------------------------------------+
* | |
* | Http2StreamActiveEvent(streamId=2, headers=Http2HeadersFrame(streamId=-1)) |
* +----------------------------------------------------------------------------------------------->
* | |
* + +
* </pre>
*/
@UnstableApi
public class Http2FrameCodec extends ChannelDuplexHandler {
@ -47,6 +107,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
private static final Http2FrameLogger HTTP2_FRAME_LOGGER = new Http2FrameLogger(INFO, Http2FrameCodec.class);
private final Http2ConnectionHandler http2Handler;
private final boolean server;
private ChannelHandlerContext ctx;
private ChannelHandlerContext http2HandlerCtx;
@ -80,6 +141,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
decoder.frameListener(new FrameListener());
http2Handler = new InternalHttp2ConnectionHandler(decoder, encoder, new Http2Settings());
http2Handler.connection().addListener(new ConnectionListener());
this.server = server;
}
Http2ConnectionHandler connectionHandler() {
@ -144,10 +206,6 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof Http2Frame)) {
ctx.write(msg, promise);
return;
}
try {
if (msg instanceof Http2WindowUpdateFrame) {
Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
@ -191,24 +249,39 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
private void writeStreamFrame(Http2StreamFrame frame, ChannelPromise promise) {
int streamId = frame.streamId();
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
http2Handler.encoder().writeData(http2HandlerCtx, streamId, dataFrame.content().retain(),
http2Handler.encoder().writeData(http2HandlerCtx, frame.streamId(), dataFrame.content().retain(),
dataFrame.padding(), dataFrame.isEndStream(), promise);
} else if (frame instanceof Http2HeadersFrame) {
Http2HeadersFrame headerFrame = (Http2HeadersFrame) frame;
http2Handler.encoder().writeHeaders(
http2HandlerCtx, streamId, headerFrame.headers(), headerFrame.padding(), headerFrame.isEndStream(),
promise);
writeHeadersFrame((Http2HeadersFrame) frame, promise);
} else if (frame instanceof Http2ResetFrame) {
Http2ResetFrame rstFrame = (Http2ResetFrame) frame;
http2Handler.resetStream(http2HandlerCtx, streamId, rstFrame.errorCode(), promise);
http2Handler.resetStream(http2HandlerCtx, frame.streamId(), rstFrame.errorCode(), promise);
} else {
throw new UnsupportedMessageTypeException(frame);
}
}
private void writeHeadersFrame(Http2HeadersFrame headersFrame, ChannelPromise promise) {
int streamId = headersFrame.streamId();
if (!isStreamIdValid(streamId)) {
final Endpoint<Http2LocalFlowController> localEndpoint = http2Handler.connection().local();
streamId = localEndpoint.incrementAndGetNextStreamId();
try {
// Try to create a stream in OPEN state before writing headers, to catch errors on stream creation
// early on i.e. max concurrent streams limit reached, stream id exhaustion, etc.
localEndpoint.createStream(streamId, false);
} catch (Http2Exception e) {
promise.setFailure(e);
return;
}
ctx.fireUserEventTriggered(new Http2StreamActiveEvent(streamId, headersFrame));
}
http2Handler.encoder().writeHeaders(http2HandlerCtx, streamId, headersFrame.headers(),
headersFrame.padding(), headersFrame.isEndStream(), promise);
}
private final class ConnectionListener extends Http2ConnectionAdapter {
@Override
public void onStreamActive(Http2Stream stream) {
@ -216,6 +289,10 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
// UPGRADE stream is active before handlerAdded().
return;
}
if (isOutboundStream(server, stream.id())) {
// Creation of outbound streams is notified in writeHeadersFrame().
return;
}
ctx.fireUserEventTriggered(new Http2StreamActiveEvent(stream.id()));
}
@ -251,11 +328,11 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
}
private final class FrameListener extends Http2FrameAdapter {
private static final class FrameListener extends Http2FrameAdapter {
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
Http2ResetFrame rstFrame = new DefaultHttp2ResetFrame(errorCode);
rstFrame.setStreamId(streamId);
rstFrame.streamId(streamId);
ctx.fireChannelRead(rstFrame);
}
@ -270,7 +347,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) {
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(headers, endOfStream, padding);
headersFrame.setStreamId(streamId);
headersFrame.streamId(streamId);
ctx.fireChannelRead(headersFrame);
}
@ -278,7 +355,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) {
Http2DataFrame dataFrame = new DefaultHttp2DataFrame(data.retain(), endOfStream, padding);
dataFrame.setStreamId(streamId);
dataFrame.streamId(streamId);
ctx.fireChannelRead(dataFrame);
// We return the bytes in bytesConsumed() once the stream channel consumed the bytes.

View File

@ -20,13 +20,16 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
@ -38,18 +41,22 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CLOSE_MESSAGE;
import static io.netty.handler.codec.http2.Http2CodecUtil.isOutboundStream;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static java.lang.String.format;
/**
* An HTTP/2 handler that creates child channels for each stream. Creating outgoing streams is not
* yet supported.
* An HTTP/2 handler that creates child channels for each stream.
*
* <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
* receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
* all writes that reach the head of the pipeline must be an instance of {@link Http2Frame}. Writes that reach the
* head of the pipeline are processed directly by this handler and cannot be intercepted.
* all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
* the head of the pipeline are processed directly by this handler and cannot be intercepted.
*
* <p>The child channel will be notified of user events that impact the stream, such as {@link
* Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
@ -59,6 +66,8 @@ import static java.lang.String.format;
* free to close the channel in response to such events if they don't have use for any queued
* messages.
*
* <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
*
* <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
*/
@UnstableApi
@ -66,8 +75,8 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2MultiplexCodec.class);
private final ChannelHandler streamHandler;
private final EventLoopGroup streamGroup;
private final Http2StreamChannelBootstrap bootstrap;
private final List<Http2StreamChannel> channelsToFireChildReadComplete = new ArrayList<Http2StreamChannel>();
private final boolean server;
private ChannelHandlerContext ctx;
@ -79,25 +88,20 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
* Construct a new handler whose child channels run in a different event loop.
*
* @param server {@code true} this is a server
* @param streamHandler the handler added to channels for remotely-created streams. It must be
* {@link ChannelHandler.Sharable}.
* @param streamGroup event loop for registering child channels
* @param bootstrap bootstrap used to instantiate child channels for remotely-created streams.
*/
public Http2MultiplexCodec(boolean server,
EventLoopGroup streamGroup,
ChannelHandler streamHandler) {
if (!streamHandler.getClass().isAnnotationPresent(Sharable.class)) {
throw new IllegalArgumentException("streamHandler must be Sharable");
public Http2MultiplexCodec(boolean server, Http2StreamChannelBootstrap bootstrap) {
if (bootstrap.parentChannel() != null) {
throw new IllegalStateException("The parent channel must not be set on the bootstrap.");
}
this.server = server;
this.streamHandler = streamHandler;
this.streamGroup = streamGroup;
this.bootstrap = new Http2StreamChannelBootstrap(bootstrap);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
bootstrap.parentChannel(ctx.channel());
}
@Override
@ -113,7 +117,8 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
if (childChannel != null) {
childChannel.pipeline().fireExceptionCaught(streamEx);
} else {
logger.warn(format("Exception caught for unknown HTTP/2 stream '%d'", streamEx.streamId()), streamEx);
logger.warn(format("Exception caught for unknown HTTP/2 stream '%d'", streamEx.streamId()),
streamEx);
}
} finally {
onStreamClosed(streamEx.streamId());
@ -148,7 +153,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
for (PrimitiveEntry<Http2StreamChannel> entry : childChannels.entries()) {
Http2StreamChannel childChannel = entry.value();
int streamId = entry.key();
if (streamId > goAwayFrame.lastStreamId() && isLocalStream(streamId)) {
if (streamId > goAwayFrame.lastStreamId() && isOutboundStream(server, streamId)) {
childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
}
}
@ -172,30 +177,31 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (!(evt instanceof Http2StreamStateEvent)) {
ctx.fireUserEventTriggered(evt);
return;
}
try {
int streamId = ((Http2StreamStateEvent) evt).streamId();
if (evt instanceof Http2StreamActiveEvent) {
onStreamActive(streamId);
Http2StreamActiveEvent activeEvent = (Http2StreamActiveEvent) evt;
onStreamActive(activeEvent.streamId(), activeEvent.headers());
} else if (evt instanceof Http2StreamClosedEvent) {
onStreamClosed(streamId);
onStreamClosed(((Http2StreamClosedEvent) evt).streamId());
} else {
throw new UnsupportedMessageTypeException(evt);
}
} finally {
ReferenceCountUtil.release(evt);
ctx.fireUserEventTriggered(evt);
}
}
private void onStreamActive(int streamId) {
ChannelFuture future = createStreamChannel(ctx, streamId, streamHandler);
Http2StreamChannel childChannel = (Http2StreamChannel) future.channel();
Http2StreamChannel oldChannel = childChannels.put(streamId, childChannel);
assert oldChannel == null;
private void onStreamActive(int streamId, Http2HeadersFrame headersFrame) {
final Http2StreamChannel childChannel;
if (isOutboundStream(server, streamId)) {
if (!(headersFrame instanceof ChannelCarryingHeadersFrame)) {
throw new IllegalArgumentException("needs to be wrapped");
}
childChannel = ((ChannelCarryingHeadersFrame) headersFrame).channel();
childChannel.streamId(streamId);
} else {
ChannelFuture future = bootstrap.connect(streamId);
childChannel = (Http2StreamChannel) future.channel();
}
Http2StreamChannel existing = childChannels.put(streamId, childChannel);
assert existing == null;
}
private void onStreamClosed(int streamId) {
@ -219,7 +225,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
assert childChannel.eventLoop().inEventLoop();
childChannel.onStreamClosedFired = true;
childChannel.fireChildRead(AbstractHttp2StreamChannel.CLOSE_MESSAGE);
childChannel.fireChildRead(CLOSE_MESSAGE);
}
void flushFromStreamChannel() {
@ -240,8 +246,11 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
void writeFromStreamChannel(final Object msg, final boolean flush) {
final ChannelPromise promise = ctx.newPromise();
void writeFromStreamChannel(Object msg, boolean flush) {
writeFromStreamChannel(msg, ctx.newPromise(), flush);
}
void writeFromStreamChannel(final Object msg, final ChannelPromise promise, final boolean flush) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
writeFromStreamChannel0(msg, flush, promise);
@ -270,25 +279,6 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
private ChannelFuture createStreamChannel(ChannelHandlerContext ctx, int streamId,
ChannelHandler handler) {
EventLoopGroup group = streamGroup != null ? streamGroup : ctx.channel().eventLoop();
Http2StreamChannel channel = new Http2StreamChannel(streamId);
channel.pipeline().addLast(handler);
ChannelFuture future = group.register(channel);
// Handle any errors that occurred on the local thread while registering. Even though
// failures can happen after this point, they will be handled by the channel by closing the
// channel.
if (future.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return future;
}
/**
* Notifies any child streams of the read completion.
*/
@ -303,8 +293,60 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
channelsToFireChildReadComplete.clear();
}
final class Http2StreamChannel extends AbstractHttp2StreamChannel {
private final int streamId;
ChannelFuture createStreamChannel(Channel parentChannel, EventLoopGroup group, ChannelHandler handler,
Map<ChannelOption<?>, Object> options,
Map<AttributeKey<?>, Object> attrs,
int streamId) {
final Http2StreamChannel channel = new Http2StreamChannel(parentChannel);
if (isStreamIdValid(streamId)) {
assert !isOutboundStream(server, streamId);
assert ctx.channel().eventLoop().inEventLoop();
channel.streamId(streamId);
}
channel.pipeline().addLast(handler);
initOpts(channel, options);
initAttrs(channel, attrs);
ChannelFuture future = group.register(channel);
// Handle any errors that occurred on the local thread while registering. Even though
// failures can happen after this point, they will be handled by the channel by closing the
// channel.
if (future.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return future;
}
@SuppressWarnings("unchecked")
private static void initOpts(Channel channel, Map<ChannelOption<?>, Object> opts) {
if (opts != null) {
for (Entry<ChannelOption<?>, Object> e: opts.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
}
@SuppressWarnings("unchecked")
private static void initAttrs(Channel channel, Map<AttributeKey<?>, Object> attrs) {
if (attrs != null) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
final class Http2StreamChannel extends AbstractHttp2StreamChannel implements ChannelFutureListener {
boolean onStreamClosedFired;
/**
@ -312,15 +354,14 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
*/
boolean inStreamsToFireChildReadComplete;
Http2StreamChannel(int streamId) {
super(ctx.channel());
this.streamId = streamId;
Http2StreamChannel(Channel parentChannel) {
super(parentChannel);
}
@Override
protected void doClose() throws Exception {
if (!onStreamClosedFired) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).setStreamId(streamId);
if (!onStreamClosedFired && isStreamIdValid(streamId())) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).streamId(streamId());
writeFromStreamChannel(resetFrame, true);
}
super.doClose();
@ -332,15 +373,23 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
ReferenceCountUtil.release(msg);
throw new IllegalArgumentException("Message must be an Http2StreamFrame: " + msg);
}
Http2StreamFrame frame = (Http2StreamFrame) msg;
if (frame.streamId() != -1) {
ChannelPromise promise = ctx.newPromise();
if (isStreamIdValid(frame.streamId())) {
ReferenceCountUtil.release(frame);
throw new IllegalArgumentException("Stream must not be set on the frame");
throw new IllegalArgumentException("Stream id must not be set on the frame. Was: " + frame.streamId());
}
frame.setStreamId(streamId);
writeFromStreamChannel(msg, false);
if (!isStreamIdValid(streamId())) {
if (!(frame instanceof Http2HeadersFrame)) {
throw new IllegalArgumentException("The first frame must be a headers frame. Was: " + frame.name());
}
frame = new ChannelCarryingHeadersFrame((Http2HeadersFrame) frame, this);
// Handle errors on stream creation
promise.addListener(this);
} else {
frame.streamId(streamId());
}
writeFromStreamChannel(frame, promise, false);
}
@Override
@ -355,12 +404,65 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
@Override
protected void bytesConsumed(final int bytes) {
ctx.write(new DefaultHttp2WindowUpdateFrame(bytes).setStreamId(streamId));
ctx.write(new DefaultHttp2WindowUpdateFrame(bytes).streamId(streamId()));
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
pipeline().fireExceptionCaught(cause);
close();
}
}
}
private boolean isLocalStream(int streamId) {
boolean even = (streamId & 1) == 0;
return streamId > 0 && server == even;
/**
* Wraps the first {@link Http2HeadersFrame} of local/outbound stream. This allows us to get to the child channel
* when receiving the {@link Http2StreamActiveEvent} from the frame codec. See {@link #onStreamActive}.
*/
private static final class ChannelCarryingHeadersFrame implements Http2HeadersFrame {
private final Http2HeadersFrame frame;
private final Http2StreamChannel childChannel;
ChannelCarryingHeadersFrame(Http2HeadersFrame frame, Http2StreamChannel childChannel) {
this.frame = frame;
this.childChannel = childChannel;
}
@Override
public Http2Headers headers() {
return frame.headers();
}
@Override
public boolean isEndStream() {
return frame.isEndStream();
}
@Override
public int padding() {
return frame.padding();
}
@Override
public Http2StreamFrame streamId(int streamId) {
return frame.streamId(streamId);
}
@Override
public int streamId() {
return frame.streamId();
}
@Override
public String name() {
return frame.name();
}
Http2StreamChannel channel() {
return childChannel;
}
}
}

View File

@ -13,13 +13,33 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* This event is emitted by the {@link Http2FrameCodec} when a stream becomes active.
*/
@UnstableApi
public class Http2StreamActiveEvent extends AbstractHttp2StreamStateEvent {
private final Http2HeadersFrame headers;
public Http2StreamActiveEvent(int streamId) {
this(streamId, null);
}
public Http2StreamActiveEvent(int streamId, Http2HeadersFrame headers) {
super(streamId);
this.headers = headers;
}
/**
* For outbound streams, this method returns the <em>same</em> {@link Http2HeadersFrame} object as the one that
* made the stream active. For inbound streams, this method returns {@code null}.
*/
public Http2HeadersFrame headers() {
return headers;
}
}

View File

@ -0,0 +1,225 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.AttributeKey;
import io.netty.util.internal.UnstableApi;
import java.util.LinkedHashMap;
import java.util.Map;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.util.Collections.synchronizedMap;
import static java.util.Collections.unmodifiableMap;
/**
* A class that makes it easy to bootstrap a new HTTP/2 stream as a {@link Channel}.
*
* <p>The bootstrap requires a registered parent {@link Channel} with a {@link ChannelPipeline} that contains the
* {@link Http2MultiplexCodec}.
*
* <p>A child channel becomes active as soon as it is registered to an eventloop. Therefore, an active channel does not
* map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been sent or received, does
* the channel map to an active HTTP/2 stream. In case it was not possible to open a new HTTP/2 stream (i.e. due to
* the maximum number of active streams being exceeded), the child channel receives an exception indicating the reason
* and is closed immediately thereafter.
*
* <p>This class is thread-safe.
*/
// TODO(buchgr): Should we deliver a user event when the stream becomes active? For all stream states?
@UnstableApi
public class Http2StreamChannelBootstrap {
private volatile ParentChannelAndMultiplexCodec channelAndCodec;
private volatile ChannelHandler handler;
private volatile EventLoopGroup group;
private final Map<ChannelOption<?>, Object> options;
private final Map<AttributeKey<?>, Object> attributes;
public Http2StreamChannelBootstrap() {
options = synchronizedMap(new LinkedHashMap<ChannelOption<?>, Object>());
attributes = synchronizedMap(new LinkedHashMap<AttributeKey<?>, Object>());
}
// Copy constructor
Http2StreamChannelBootstrap(Http2StreamChannelBootstrap bootstrap0) {
checkNotNull(bootstrap0, "bootstrap must not be null");
channelAndCodec = bootstrap0.channelAndCodec;
handler = bootstrap0.handler;
group = bootstrap0.group;
options = synchronizedMap(new LinkedHashMap<ChannelOption<?>, Object>(bootstrap0.options));
attributes = synchronizedMap(new LinkedHashMap<AttributeKey<?>, Object>(bootstrap0.attributes));
}
/**
* Creates a new channel that will eventually map to a local/outbound HTTP/2 stream.
*/
public ChannelFuture connect() {
return connect(-1);
}
/**
* Used by the {@link Http2MultiplexCodec} to instantiate incoming/remotely-created streams.
*/
ChannelFuture connect(int streamId) {
validateState();
ParentChannelAndMultiplexCodec channelAndCodec0 = channelAndCodec;
Channel parentChannel = channelAndCodec0.parentChannel;
Http2MultiplexCodec multiplexCodec = channelAndCodec0.multiplexCodec;
EventLoopGroup group0 = group;
group0 = group0 == null ? parentChannel.eventLoop() : group0;
return multiplexCodec.createStreamChannel(parentChannel, group0, handler, options, attributes, streamId);
}
/**
* Sets the parent channel that must have the {@link Http2MultiplexCodec} in its pipeline.
*
* @param parent a registered channel with the {@link Http2MultiplexCodec} in its pipeline. This channel will
* be the {@link Channel#parent()} of all channels created via {@link #connect()}.
* @return {@code this}
*/
public Http2StreamChannelBootstrap parentChannel(Channel parent) {
channelAndCodec = new ParentChannelAndMultiplexCodec(parent);
return this;
}
/**
* Sets the channel handler that should be added to the channels's pipeline.
*
* @param handler the channel handler to add to the channel's pipeline. The handler must be
* {@link Sharable}.
* @return {@code this}
*/
public Http2StreamChannelBootstrap handler(ChannelHandler handler) {
this.handler = checkSharable(checkNotNull(handler, "handler"));
return this;
}
/**
* Sets the {@link EventLoop} to which channels created with this bootstrap are registered.
*
* @param group the eventloop or {@code null} if the eventloop of the parent channel should be used.
* @return {@code this}
*/
public Http2StreamChannelBootstrap group(EventLoopGroup group) {
this.group = group;
return this;
}
/**
* Specify {@link ChannelOption}s to be set on newly created channels. An option can be removed by specifying a
* value of {@code null}.
*/
public <T> Http2StreamChannelBootstrap option(ChannelOption<T> option, T value) {
checkNotNull(option, "option must not be null");
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
return this;
}
/**
* Specify attributes with an initial value to be set on newly created channels. An attribute can be removed by
* specifying a value of {@code null}.
*/
public <T> Http2StreamChannelBootstrap attr(AttributeKey<T> key, T value) {
checkNotNull(key, "key must not be null");
if (value == null) {
attributes.remove(key);
} else {
attributes.put(key, value);
}
return this;
}
public Channel parentChannel() {
ParentChannelAndMultiplexCodec channelAndCodec0 = channelAndCodec;
if (channelAndCodec0 != null) {
return channelAndCodec0.parentChannel;
}
return null;
}
public ChannelHandler handler() {
return handler;
}
public EventLoopGroup group() {
return group;
}
public Map<ChannelOption<?>, Object> options() {
return unmodifiableMap(new LinkedHashMap<ChannelOption<?>, Object>(options));
}
public Map<AttributeKey<?>, Object> attributes() {
return unmodifiableMap(new LinkedHashMap<AttributeKey<?>, Object>(attributes));
}
private void validateState() {
checkNotNull(handler, "handler must be set");
checkNotNull(channelAndCodec, "parent channel must be set");
}
private static ChannelHandler checkSharable(ChannelHandler handler) {
if (!handler.getClass().isAnnotationPresent(Sharable.class)) {
throw new IllegalArgumentException("The handler must be Sharable");
}
return handler;
}
private static class ParentChannelAndMultiplexCodec {
final Channel parentChannel;
final Http2MultiplexCodec multiplexCodec;
ParentChannelAndMultiplexCodec(Channel parentChannel) {
this.parentChannel = checkRegistered(checkNotNull(parentChannel, "parentChannel"));
this.multiplexCodec = requireMultiplexCodec(parentChannel.pipeline());
}
private static Http2MultiplexCodec requireMultiplexCodec(ChannelPipeline pipeline) {
ChannelHandlerContext ctx = pipeline.context(Http2MultiplexCodec.class);
if (ctx == null) {
throw new IllegalArgumentException(Http2MultiplexCodec.class.getSimpleName()
+ " was not found in the channel pipeline.");
}
return (Http2MultiplexCodec) ctx.handler();
}
private static Channel checkRegistered(Channel channel) {
if (!channel.isRegistered()) {
throw new IllegalArgumentException("The channel must be registered to an eventloop.");
}
return channel;
}
}
}

View File

@ -23,15 +23,19 @@ import io.netty.util.internal.UnstableApi;
* cases, the {@link #streamId()} must return {@code 0}. If the frame applies to a stream, the
* {@link #streamId()} must be greater than zero.
*/
//TODO(buchgr): Do we REALLY need the flexibility of supporting stream id 0? It seems confusing.
@UnstableApi
public interface Http2StreamFrame extends Http2Frame {
/**
* Sets the identifier of the stream this frame applies to.
* Sets the identifier of the stream this frame applies to. This method may be called at most once.
*
* <p><em>NOTE:</em> This method is supposed to be called by the HTTP/2 transport only. It must not be called by
* users.
*
* @return {@code this}
*/
Http2StreamFrame setStreamId(int streamId);
Http2StreamFrame streamId(int streamId);
/**
* The identifier of the stream this frame applies to.

View File

@ -0,0 +1,165 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static junit.framework.TestCase.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link Http2Codec}.
*/
public class Http2CodecTest {
private static EventLoopGroup group;
private Channel serverChannel;
private Channel clientChannel;
private LastInboundHandler serverLastInboundHandler;
@BeforeClass
public static void init() {
group = new DefaultEventLoop();
}
@Before
public void setUp() throws InterruptedException {
LocalAddress serverAddress = new LocalAddress(getClass().getName());
serverLastInboundHandler = new SharableLastInboundHandler();
ServerBootstrap sb = new ServerBootstrap()
.channel(LocalServerChannel.class)
.group(group)
.childHandler(new Http2Codec(true, serverLastInboundHandler));
serverChannel = sb.bind(serverAddress).sync().channel();
Bootstrap cb = new Bootstrap()
.channel(LocalChannel.class)
.group(group)
.handler(new Http2Codec(false, new TestChannelInitializer()));
clientChannel = cb.connect(serverAddress).sync().channel();
}
@AfterClass
public static void shutdown() {
group.shutdownGracefully();
}
@After
public void tearDown() throws Exception {
clientChannel.close().sync();
serverChannel.close().sync();
}
@Test
public void multipleOutboundStreams() {
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
b.parentChannel(clientChannel).handler(new TestChannelInitializer());
Channel childChannel1 = b.connect().syncUninterruptibly().channel();
assertTrue(childChannel1.isActive());
assertFalse(isStreamIdValid(((AbstractHttp2StreamChannel) childChannel1).streamId()));
Channel childChannel2 = b.connect().channel();
assertTrue(childChannel2.isActive());
assertFalse(isStreamIdValid(((AbstractHttp2StreamChannel) childChannel2).streamId()));
Http2Headers headers1 = new DefaultHttp2Headers();
Http2Headers headers2 = new DefaultHttp2Headers();
// Test that streams can be made active (headers sent) in different order than the corresponding channels
// have been created.
childChannel2.writeAndFlush(new DefaultHttp2HeadersFrame(headers2));
childChannel1.writeAndFlush(new DefaultHttp2HeadersFrame(headers1));
Http2HeadersFrame headersFrame2 = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame2);
assertEquals(3, headersFrame2.streamId());
Http2HeadersFrame headersFrame1 = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame1);
assertEquals(5, headersFrame1.streamId());
assertEquals(3, ((AbstractHttp2StreamChannel) childChannel2).streamId());
assertEquals(5, ((AbstractHttp2StreamChannel) childChannel1).streamId());
childChannel1.close();
childChannel2.close();
}
@Test
public void createOutboundStream() {
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
Channel childChannel = b.parentChannel(clientChannel).handler(new TestChannelInitializer())
.connect().syncUninterruptibly().channel();
assertTrue(childChannel.isRegistered());
assertTrue(childChannel.isActive());
Http2Headers headers = new DefaultHttp2Headers();
childChannel.write(new DefaultHttp2HeadersFrame(headers));
ByteBuf data = Unpooled.buffer(100).writeZero(100);
childChannel.writeAndFlush(ReferenceCountUtil.releaseLater(new DefaultHttp2DataFrame(data, true)));
Http2HeadersFrame headersFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame);
assertEquals(3, headersFrame.streamId());
assertEquals(headers, headersFrame.headers());
Http2DataFrame dataFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(dataFrame);
ReferenceCountUtil.releaseLater(dataFrame);
assertEquals(3, dataFrame.streamId());
assertEquals(data.resetReaderIndex(), dataFrame.content());
assertTrue(dataFrame.isEndStream());
childChannel.close();
Http2ResetFrame rstFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(rstFrame);
assertEquals(3, rstFrame.streamId());
}
@Sharable
private static class SharableLastInboundHandler extends LastInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
}
}

View File

@ -18,7 +18,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -33,19 +32,15 @@ import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.handler.logging.LogLevel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.PlatformDependent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Queue;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.hamcrest.Matchers.instanceOf;
@ -123,11 +118,11 @@ public class Http2FrameCodecTest {
assertNotNull(stream);
assertEquals(State.HALF_CLOSED_REMOTE, stream.state());
assertEquals(new DefaultHttp2HeadersFrame(request, true, 31).setStreamId(stream.id()),
assertEquals(new DefaultHttp2HeadersFrame(request, true, 31).streamId(stream.id()),
inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).setStreamId(stream.id()));
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).streamId(stream.id()));
verify(frameWriter).writeHeaders(
eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
eq(27), eq(true), anyChannelPromise());
@ -146,7 +141,7 @@ public class Http2FrameCodecTest {
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
assertEquals(new DefaultHttp2HeadersFrame(request, false).setStreamId(stream.id()),
assertEquals(new DefaultHttp2HeadersFrame(request, false).streamId(stream.id()),
inboundHandler.readInbound());
assertNull(inboundHandler.readInbound());
@ -155,17 +150,17 @@ public class Http2FrameCodecTest {
// Release hello to emulate ByteToMessageDecoder
hello.release();
Http2DataFrame inboundData = inboundHandler.readInbound();
assertEquals(releaseLater(new DefaultHttp2DataFrame(bb("hello"), true, 31).setStreamId(stream.id())),
assertEquals(releaseLater(new DefaultHttp2DataFrame(bb("hello"), true, 31).streamId(stream.id())),
releaseLater(inboundData));
assertEquals(1, inboundData.refCnt());
assertNull(inboundHandler.readInbound());
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).setStreamId(stream.id()));
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).streamId(stream.id()));
verify(frameWriter).writeHeaders(eq(http2HandlerCtx), eq(1), eq(response), anyInt(),
anyShort(), anyBoolean(), eq(0), eq(false), anyChannelPromise());
inboundHandler.writeOutbound(releaseLater(new DefaultHttp2DataFrame(bb("world"), true, 27)
.setStreamId(stream.id())));
.streamId(stream.id())));
ArgumentCaptor<ByteBuf> outboundData = ArgumentCaptor.forClass(ByteBuf.class);
verify(frameWriter).writeData(eq(http2HandlerCtx), eq(1), outboundData.capture(), eq(27),
eq(true), anyChannelPromise());
@ -184,7 +179,7 @@ public class Http2FrameCodecTest {
assertNotNull(stream);
assertEquals(State.HALF_CLOSED_REMOTE, stream.state());
inboundHandler.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).setStreamId(stream.id()));
inboundHandler.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).streamId(stream.id()));
verify(frameWriter).writeRstStream(
eq(http2HandlerCtx), eq(3), eq(314L), anyChannelPromise());
assertEquals(State.CLOSED, stream.state());
@ -203,13 +198,13 @@ public class Http2FrameCodecTest {
assertNotNull(activeEvent);
assertEquals(stream.id(), activeEvent.streamId());
Http2HeadersFrame expectedHeaders = new DefaultHttp2HeadersFrame(request, false, 31).setStreamId(stream.id());
Http2HeadersFrame expectedHeaders = new DefaultHttp2HeadersFrame(request, false, 31).streamId(stream.id());
Http2HeadersFrame actualHeaders = inboundHandler.readInboundMessagesAndEvents();
assertEquals(expectedHeaders, actualHeaders);
frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.NO_ERROR.code());
Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).setStreamId(stream.id());
Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).streamId(stream.id());
Http2ResetFrame actualRst = inboundHandler.readInboundMessagesAndEvents();
assertEquals(expectedRst, actualRst);
@ -321,7 +316,7 @@ public class Http2FrameCodecTest {
}
@Test
public void outgoingStreamActiveShouldFireUserEvent() throws Exception {
public void outboundStreamShouldNotFireStreamActiveEvent() throws Exception {
Http2ConnectionEncoder encoder = framingCodec.connectionHandler().encoder();
encoder.writeHeaders(http2HandlerCtx, 2, request, 31, false, channel.newPromise());
@ -330,9 +325,6 @@ public class Http2FrameCodecTest {
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
Http2StreamActiveEvent streamActiveEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), streamActiveEvent.streamId());
assertNull(inboundHandler.readInbound());
assertNull(inboundHandler.readUserEvent());
}
@ -404,7 +396,7 @@ public class Http2FrameCodecTest {
frameListener.onDataRead(http2HandlerCtx, 3, releaseLater(data), 0, true);
int before = connection.local().flowController().unconsumedBytes(stream);
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).setStreamId(stream.id()));
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).streamId(stream.id()));
int after = connection.local().flowController().unconsumedBytes(stream);
assertEquals(100, before - after);
assertTrue(f.isSuccess());
@ -418,7 +410,7 @@ public class Http2FrameCodecTest {
assertNotNull(stream);
// Fails, cause trying to return too many bytes to the flow controller
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).setStreamId(stream.id()));
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).streamId(stream.id()));
assertTrue(f.isDone());
assertFalse(f.isSuccess());
assertThat(f.cause(), instanceOf(Http2Exception.class));
@ -436,103 +428,7 @@ public class Http2FrameCodecTest {
return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
}
static final class LastInboundHandler extends ChannelDuplexHandler {
private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
private final Queue<Object> userEvents = new ArrayDeque<Object>();
private final Queue<Object> inboundMessagesAndUserEvents = new ArrayDeque<Object>();
private Throwable lastException;
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundMessages.add(msg);
inboundMessagesAndUserEvents.add(msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
userEvents.add(evt);
inboundMessagesAndUserEvents.add(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (lastException != null) {
cause.printStackTrace();
} else {
lastException = cause;
}
}
public void checkException() throws Exception {
if (lastException == null) {
return;
}
Throwable t = lastException;
lastException = null;
PlatformDependent.throwException(t);
}
@SuppressWarnings("unchecked")
public <T> T readInbound() {
T message = (T) inboundMessages.poll();
if (message == inboundMessagesAndUserEvents.peek()) {
inboundMessagesAndUserEvents.poll();
}
return message;
}
@SuppressWarnings("unchecked")
public <T> T readUserEvent() {
T message = (T) userEvents.poll();
if (message == inboundMessagesAndUserEvents.peek()) {
inboundMessagesAndUserEvents.poll();
}
return message;
}
/**
* Useful to test order of events and messages.
*/
@SuppressWarnings("unchecked")
public <T> T readInboundMessagesAndEvents() {
T message = (T) inboundMessagesAndUserEvents.poll();
if (message == inboundMessages.peek()) {
inboundMessages.poll();
} else if (message == userEvents.peek()) {
userEvents.poll();
}
return message;
}
public void writeOutbound(Object... msgs) throws Exception {
for (Object msg : msgs) {
ctx.write(msg);
}
ctx.flush();
EmbeddedChannel ch = (EmbeddedChannel) ctx.channel();
ch.runPendingTasks();
ch.checkException();
checkException();
}
public void finishAndReleaseAll() throws Exception {
checkException();
Object o;
while ((o = readInboundMessagesAndEvents()) != null) {
ReferenceCountUtil.release(o);
}
}
}
public static class VerifiableHttp2FrameWriter extends DefaultHttp2FrameWriter {
private static class VerifiableHttp2FrameWriter extends DefaultHttp2FrameWriter {
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {

View File

@ -18,32 +18,38 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.AsciiString;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static io.netty.util.ReferenceCountUtil.release;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link Http2MultiplexCodec}.
* Unit tests for {@link Http2MultiplexCodec} and {@link Http2StreamChannelBootstrap}.
*/
public class Http2MultiplexCodecTest {
@ -60,9 +66,10 @@ public class Http2MultiplexCodecTest {
@Before
public void setUp() {
childChannelInitializer = new TestChannelInitializer();
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap().handler(childChannelInitializer);
parentChannel = new EmbeddedChannel();
parentChannel.connect(new InetSocketAddress(0));
parentChannel.pipeline().addLast(new Http2MultiplexCodec(true, null, childChannelInitializer));
parentChannel.pipeline().addLast(new Http2MultiplexCodec(true, bootstrap));
}
@After
@ -77,7 +84,6 @@ public class Http2MultiplexCodecTest {
// TODO(buchgr): Flush from child channel
// TODO(buchgr): ChildChannel.childReadComplete()
// TODO(buchgr): GOAWAY Logic
// TODO(buchgr): Reset frame on close
// TODO(buchgr): Test ChannelConfig.setMaxMessagesPerRead
@Test
@ -86,13 +92,13 @@ public class Http2MultiplexCodecTest {
childChannelInitializer.handler = inboundHandler;
Http2StreamActiveEvent streamActive = new Http2StreamActiveEvent(streamId);
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).setStreamId(streamId);
Http2DataFrame dataFrame1 = releaseLater(new DefaultHttp2DataFrame(bb("hello")).setStreamId(streamId));
Http2DataFrame dataFrame2 = releaseLater(new DefaultHttp2DataFrame(bb("world")).setStreamId(streamId));
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).streamId(streamId);
Http2DataFrame dataFrame1 = releaseLater(new DefaultHttp2DataFrame(bb("hello")).streamId(streamId));
Http2DataFrame dataFrame2 = releaseLater(new DefaultHttp2DataFrame(bb("world")).streamId(streamId));
assertFalse(inboundHandler.channelActive);
assertFalse(inboundHandler.isChannelActive());
parentChannel.pipeline().fireUserEventTriggered(streamActive);
assertTrue(inboundHandler.channelActive);
assertTrue(inboundHandler.isChannelActive());
// Make sure the stream active event is not delivered as a user event on the child channel.
assertNull(inboundHandler.readUserEvent());
parentChannel.pipeline().fireChannelRead(headersFrame);
@ -115,10 +121,10 @@ public class Http2MultiplexCodecTest {
verifyFramesMultiplexedToCorrectChannel(5, inboundHandler5, 1);
verifyFramesMultiplexedToCorrectChannel(11, inboundHandler11, 1);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("hello"), false).setStreamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), true).setStreamId(3));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("world"), true).setStreamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).setStreamId(11));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("hello"), false).streamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), true).streamId(3));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("world"), true).streamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).streamId(11));
verifyFramesMultiplexedToCorrectChannel(5, inboundHandler5, 2);
verifyFramesMultiplexedToCorrectChannel(3, inboundHandler3, 1);
verifyFramesMultiplexedToCorrectChannel(11, inboundHandler11, 1);
@ -128,7 +134,7 @@ public class Http2MultiplexCodecTest {
public void inboundDataFrameShouldEmitWindowUpdateFrame() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
ByteBuf tenBytes = bb("0123456789");
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(tenBytes, true).setStreamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(tenBytes, true).streamId(streamId));
parentChannel.pipeline().flush();
Http2WindowUpdateFrame windowUpdate = parentChannel.readOutbound();
@ -150,14 +156,14 @@ public class Http2MultiplexCodecTest {
childChannel.config().setAutoRead(false);
parentChannel.pipeline().fireChannelRead(
new DefaultHttp2DataFrame(bb("hello world"), false).setStreamId(streamId));
new DefaultHttp2DataFrame(bb("hello world"), false).streamId(streamId));
parentChannel.pipeline().fireChannelReadComplete();
Http2DataFrame dataFrame0 = inboundHandler.readInbound();
assertNotNull(dataFrame0);
release(dataFrame0);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), false).setStreamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).setStreamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), false).streamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).streamId(streamId));
parentChannel.pipeline().fireChannelReadComplete();
dataFrame0 = inboundHandler.readInbound();
@ -167,50 +173,181 @@ public class Http2MultiplexCodecTest {
verifyFramesMultiplexedToCorrectChannel(streamId, inboundHandler, 2);
}
/**
* A child channel for a HTTP/2 stream in IDLE state (that is no headers sent or received),
* should not emit a RST_STREAM frame on close, as this is a connection error of type protocol error.
*/
@Test
public void streamClosedShouldFireChannelInactive() {
public void idleOutboundStreamShouldNotWriteResetFrameOnClose() {
childChannelInitializer.handler = new LastInboundHandler();
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
b.parentChannel(parentChannel).handler(childChannelInitializer);
Channel childChannel = b.connect().channel();
assertTrue(childChannel.isActive());
childChannel.close();
parentChannel.runPendingTasks();
assertFalse(childChannel.isOpen());
assertFalse(childChannel.isActive());
assertNull(parentChannel.readOutbound());
}
@Test
public void outboundStreamShouldWriteResetFrameOnClose_headersSent() {
childChannelInitializer.handler = new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
ctx.fireChannelActive();
}
};
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
b.parentChannel(parentChannel).handler(childChannelInitializer);
Channel childChannel = b.connect().channel();
assertTrue(childChannel.isActive());
Http2HeadersFrame headersFrame = parentChannel.readOutbound();
assertNotNull(headersFrame);
assertFalse(Http2CodecUtil.isStreamIdValid(headersFrame.streamId()));
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamActiveEvent(2, headersFrame));
childChannel.close();
parentChannel.runPendingTasks();
Http2ResetFrame reset = parentChannel.readOutbound();
assertEquals(2, reset.streamId());
assertEquals(Http2Error.CANCEL.code(), reset.errorCode());
}
@Test
public void inboundStreamClosedShouldFireChannelInactive() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
assertTrue(inboundHandler.channelActive);
assertTrue(inboundHandler.isChannelActive());
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamClosedEvent(streamId));
parentChannel.runPendingTasks();
parentChannel.checkException();
parentChannel.flush();
assertFalse(inboundHandler.channelActive);
assertFalse(inboundHandler.isChannelActive());
// A RST_STREAM frame should NOT be emitted, as we received the close.
assertNull(parentChannel.readOutbound());
}
@Test(expected = StreamException.class)
public void streamExceptionTriggersChildChannelExceptionCaught() throws Exception {
public void streamExceptionClosesChildChannel() throws Exception {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
assertTrue(inboundHandler.isChannelActive());
StreamException e = new StreamException(streamId, Http2Error.PROTOCOL_ERROR, "baaam!");
parentChannel.pipeline().fireExceptionCaught(e);
parentChannel.runPendingTasks();
assertFalse(inboundHandler.isChannelActive());
inboundHandler.checkException();
}
@Test
public void creatingWritingReadingAndClosingOutboundStreamShouldWork() {
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
b.parentChannel(parentChannel).handler(childChannelInitializer);
AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel) b.connect().channel();
assertThat(childChannel, Matchers.instanceOf(Http2MultiplexCodec.Http2StreamChannel.class));
assertTrue(childChannel.isActive());
assertTrue(inboundHandler.isChannelActive());
// Write to the child channel
Http2Headers headers = new DefaultHttp2Headers().scheme("https").method("GET").path("/foo.txt");
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
Http2HeadersFrame headersFrame = parentChannel.readOutbound();
assertNotNull(headersFrame);
assertSame(headers, headersFrame.headers());
assertFalse(Http2CodecUtil.isStreamIdValid(headersFrame.streamId()));
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamActiveEvent(2, headersFrame));
// Read from the child channel
headers = new DefaultHttp2Headers().scheme("https").status("200");
parentChannel.pipeline().fireChannelRead(new DefaultHttp2HeadersFrame(headers).streamId(
childChannel.streamId()));
parentChannel.pipeline().fireChannelReadComplete();
headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
assertSame(headers, headersFrame.headers());
// Close the child channel.
childChannel.close();
parentChannel.runPendingTasks();
// An active outbound stream should emit a RST_STREAM frame.
Http2ResetFrame rstFrame = parentChannel.readOutbound();
assertNotNull(rstFrame);
assertEquals(childChannel.streamId(), rstFrame.streamId());
assertFalse(childChannel.isOpen());
assertFalse(childChannel.isActive());
assertFalse(inboundHandler.isChannelActive());
}
/**
* Test failing the promise of the first headers frame of an outbound stream. In practice this error case would most
* likely happen due to the max concurrent streams limit being hit or the channel running out of stream identifiers.
*/
@Test(expected = Http2NoMoreStreamIdsException.class)
public void failedOutboundStreamCreationThrowsAndClosesChannel() throws Exception {
parentChannel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
promise.tryFailure(new Http2NoMoreStreamIdsException());
}
});
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
Channel childChannel = b.parentChannel(parentChannel).handler(childChannelInitializer).connect().channel();
assertTrue(childChannel.isActive());
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
parentChannel.flush();
assertFalse(childChannel.isActive());
assertFalse(childChannel.isOpen());
inboundHandler.checkException();
}
@Test
public void streamExceptionClosesChildChannel() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
public void settingChannelOptsAndAttrsOnBootstrap() {
AttributeKey<String> key = AttributeKey.newInstance("foo");
WriteBufferWaterMark mark = new WriteBufferWaterMark(1024, 4096);
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
b.parentChannel(parentChannel).handler(childChannelInitializer)
.option(ChannelOption.AUTO_READ, false).option(ChannelOption.WRITE_BUFFER_WATER_MARK, mark)
.attr(key, "bar");
assertTrue(inboundHandler.channelActive);
StreamException e = new StreamException(streamId, Http2Error.PROTOCOL_ERROR, "baaam!");
parentChannel.pipeline().fireExceptionCaught(e);
Channel channel = b.connect().channel();
parentChannel.runPendingTasks();
parentChannel.checkException();
assertFalse(inboundHandler.channelActive);
assertFalse(channel.config().isAutoRead());
assertSame(mark, channel.config().getWriteBufferWaterMark());
assertEquals("bar", channel.attr(key).get());
}
private LastInboundHandler streamActiveAndWriteHeaders(int streamId) {
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
assertFalse(inboundHandler.channelActive);
assertFalse(inboundHandler.isChannelActive());
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamActiveEvent(streamId));
assertTrue(inboundHandler.channelActive);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2HeadersFrame(request).setStreamId(streamId));
assertTrue(inboundHandler.isChannelActive());
parentChannel.pipeline().fireChannelRead(new DefaultHttp2HeadersFrame(request).streamId(streamId));
parentChannel.pipeline().fireChannelReadComplete();
return inboundHandler;
@ -227,98 +364,7 @@ public class Http2MultiplexCodecTest {
assertNull(inboundHandler.readInbound());
}
@Sharable
static class TestChannelInitializer extends ChannelInitializer<Channel> {
ChannelHandler handler;
@Override
public void initChannel(Channel channel) {
if (handler != null) {
channel.pipeline().addLast(handler);
handler = null;
}
}
}
private static ByteBuf bb(String s) {
return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
}
static final class LastInboundHandler extends ChannelDuplexHandler {
private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
private final Queue<Object> userEvents = new ArrayDeque<Object>();
private Throwable lastException;
private ChannelHandlerContext ctx;
private boolean channelActive;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channelActive = true;
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channelActive = false;
super.channelInactive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundMessages.add(msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
userEvents.add(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (lastException != null) {
cause.printStackTrace();
} else {
lastException = cause;
}
}
public void checkException() throws Exception {
if (lastException == null) {
return;
}
Throwable t = lastException;
lastException = null;
PlatformDependent.throwException(t);
}
@SuppressWarnings("unchecked")
public <T> T readInbound() {
return (T) inboundMessages.poll();
}
@SuppressWarnings("unchecked")
public <T> T readUserEvent() {
return (T) userEvents.poll();
}
public Channel channel() {
return ctx.channel();
}
public void finishAndReleaseAll() throws Exception {
checkException();
Object o;
while ((o = readInbound()) != null) {
release(o);
}
while ((o = readUserEvent()) != null) {
release(o);
}
}
}
}

View File

@ -0,0 +1,164 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.LockSupport;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* Channel handler that allows to easily access inbound messages.
*/
public class LastInboundHandler extends ChannelDuplexHandler {
private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
private final Queue<Object> userEvents = new ArrayDeque<Object>();
private final Queue<Object> inboundMessagesAndUserEvents = new ArrayDeque<Object>();
private Throwable lastException;
private ChannelHandlerContext ctx;
private boolean channelActive;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (channelActive) {
throw new IllegalStateException("channelActive may only be fired once.");
}
channelActive = true;
super.channelActive(ctx);
}
public boolean isChannelActive() {
return channelActive;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (!channelActive) {
throw new IllegalStateException("channelInactive may only be fired once after channelActive.");
}
channelActive = false;
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundMessages.add(msg);
inboundMessagesAndUserEvents.add(msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
userEvents.add(evt);
inboundMessagesAndUserEvents.add(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (lastException != null) {
cause.printStackTrace();
} else {
lastException = cause;
}
}
public void checkException() throws Exception {
if (lastException == null) {
return;
}
Throwable t = lastException;
lastException = null;
PlatformDependent.throwException(t);
}
@SuppressWarnings("unchecked")
public <T> T readInbound() {
T message = (T) inboundMessages.poll();
if (message == inboundMessagesAndUserEvents.peek()) {
inboundMessagesAndUserEvents.poll();
}
return message;
}
public <T> T blockingReadInbound() {
T msg;
while ((msg = readInbound()) == null) {
LockSupport.parkNanos(MILLISECONDS.toNanos(10));
}
return msg;
}
@SuppressWarnings("unchecked")
public <T> T readUserEvent() {
T message = (T) userEvents.poll();
if (message == inboundMessagesAndUserEvents.peek()) {
inboundMessagesAndUserEvents.poll();
}
return message;
}
/**
* Useful to test order of events and messages.
*/
@SuppressWarnings("unchecked")
public <T> T readInboundMessagesAndEvents() {
T message = (T) inboundMessagesAndUserEvents.poll();
if (message == inboundMessages.peek()) {
inboundMessages.poll();
} else if (message == userEvents.peek()) {
userEvents.poll();
}
return message;
}
public void writeOutbound(Object... msgs) throws Exception {
for (Object msg : msgs) {
ctx.write(msg);
}
ctx.flush();
EmbeddedChannel ch = (EmbeddedChannel) ctx.channel();
ch.runPendingTasks();
ch.checkException();
checkException();
}
public void finishAndReleaseAll() throws Exception {
checkException();
Object o;
while ((o = readInboundMessagesAndEvents()) != null) {
ReferenceCountUtil.release(o);
}
}
public Channel channel() {
return ctx.channel();
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelInitializer;
/**
* Channel initializer useful in tests.
*/
@Sharable
public class TestChannelInitializer extends ChannelInitializer<Channel> {
ChannelHandler handler;
@Override
public void initChannel(Channel channel) {
if (handler != null) {
channel.pipeline().addLast(handler);
handler = null;
}
}
}