HTTP/2 Child Channel and FrameCodec Feature Parity.

Motivation:

This PR (unfortunately) does 4 things:
1) Add outbound flow control to the Http2MultiplexCodec:
   The HTTP/2 child channel API should interact with HTTP/2 outbound/remote flow control. That is,
   if a H2 stream used up all its flow control window, the corresponding child channel should be
   marked unwritable and a writability-changed event should be fired. Similarly, a unwritable
   child channel should be marked writable and a writability-event should be fired, once a
   WINDOW_UPDATE frame has been received. The changes are (mostly) contained in ChannelOutboundBuffer,
   AbstractHttp2StreamChannel and Http2MultiplexCodec.

2) Introduce a Http2Stream2 object, that is used instead of stream identifiers on stream frames. A
   Http2Stream2 object allows an application to attach state to it, and so a application handler
   no longer needs to maintain stream state (i.e. in a map(id -> state)) himself.

3) Remove stream state events, which are no longer necessary due to the introduction of Http2Stream2.
   Also those stream state events have been found hard and complex to work with, when porting gRPC
   to the Http2FrameCodec.

4) Add support for HTTP/2 frames that have not yet been implemented, like PING and SETTINGS. Also add
   a Http2FrameCodecBuilder that exposes options from the Http2ConnectionHandler API that couldn't else
   be used with the frame codec, like buffering outbound streams, window update ratio, frame logger, etc.

Modifications:

1) A child channel's writability and a H2 stream's outbound flow control window interact, as described
   in the motivation. A channel handler is free to ignore the channel's writability, in which case the
   parent channel is reponsible for buffering writes until a WINDOW_UPDATE is received.

   The connection-level flow control window is ignored for now. That is, a child channel's writability
   is only affected by the stream-level flow control window. So a child channel could be marked writable,
   even though the connection-level flow control window is zero.

2) Modify Http2StreamFrame and the Http2FrameCodec to take a Http2Stream2 object intstead of a primitive
   integer. Introduce a special Http2ChannelDuplexHandler that has newStream() and forEachActiveStream()
   methods. It's recommended for a user to extend from this handler, to use those advanced features.

3) As explained in the documentation, a new inbound stream active can be detected by checking if the
   Http2Stream2.managedState() of a Http2HeadersFrame is null. An outbound stream active can be detected
   by adding a listener to the ChannelPromise of the write of the first Http2HeadersFrame. A stream
   closed event can be listened to by adding a listener to the Http2Stream2.closeFuture().

4) Add a simple Http2FrameCodecBuilder and implement the missing frame types.

Result:

1) The Http2MultiplexCodec supports outbound flow control.
2) The Http2FrameCodec API makes it easy for a user to manage custom stream specific state and to create
   new outbound streams.
3) The Http2FrameCodec API is much cleaner and easier to work with. Hacks like the ChannelCarryingHeadersFrame
   are no longer necessary.
4) The Http2FrameCodec now also supports PING and SETTINGS frames. The Http2FrameCodecBuilder allows the Http2FrameCodec
   to use some of the rich features of the Http2ConnectionHandler API.
This commit is contained in:
buchgr 2016-08-23 13:03:39 +02:00 committed by Norman Maurer
parent 8cb5d0fa8c
commit 5380c7c3e3
33 changed files with 2337 additions and 731 deletions

View File

@ -78,13 +78,12 @@ import static java.util.concurrent.TimeUnit.SECONDS;
public abstract class AbstractHttp2ConnectionHandlerBuilder<T extends Http2ConnectionHandler,
B extends AbstractHttp2ConnectionHandlerBuilder<T, B>> {
private static final long DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS = MILLISECONDS.convert(30, SECONDS);
private static final SensitivityDetector DEFAULT_HEADER_SENSITIVITY_DETECTOR = Http2HeadersEncoder.NEVER_SENSITIVE;
// The properties that can always be set.
private Http2Settings initialSettings = Http2Settings.defaultSettings();
private Http2FrameListener frameListener;
private long gracefulShutdownTimeoutMillis = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
private long gracefulShutdownTimeoutMillis = Http2CodecUtil.DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
// The property that will prohibit connection() and codec() if set by server(),
// because this property is used only when this builder creates a Http2Connection.

View File

@ -15,37 +15,52 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.ThrowableUtil;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* Child {@link Channel} of another channel, for use for modeling streams as channels.
*/
abstract class AbstractHttp2StreamChannel extends AbstractChannel {
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> OUTBOUND_FLOW_CONTROL_WINDOW_UPDATER;
/**
* Used by subclasses to queue a close channel within the read queue. When read, it will close
* the channel (using Unsafe) instead of notifying handlers of the message with {@code
* channelRead()}. Additional inbound messages must not arrive after this one.
*/
protected static final Object CLOSE_MESSAGE = new Object();
/**
* Used to add a message to the {@link ChannelOutboundBuffer}, so as to have it re-evaluate its writability state.
*/
private static final Object REEVALUATE_WRITABILITY_MESSAGE = new Object();
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractHttp2StreamChannel.class, "doWrite(...)");
@ -55,7 +70,7 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
*/
private static final int ARBITRARY_MESSAGE_SIZE = 9;
private final ChannelConfig config = new DefaultChannelConfig(this);
private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
private final Queue<Object> inboundBuffer = new ArrayDeque<Object>(4);
private final Runnable fireChildReadCompleteTask = new Runnable() {
@Override
@ -68,13 +83,35 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
}
};
// Volatile, as parent and child channel may be on different eventloops.
private volatile int streamId = -1;
private final Http2Stream2 stream;
private boolean closed;
private boolean readInProgress;
protected AbstractHttp2StreamChannel(Channel parent) {
/**
* The flow control window of the remote side i.e. the number of bytes this channel is allowed to send to the remote
* peer. The window can become negative if a channel handler ignores the channel's writability. We are using a long
* so that we realistically don't have to worry about underflow.
*/
@SuppressWarnings("UnusedDeclaration")
private volatile long outboundFlowControlWindow;
static {
@SuppressWarnings("rawtypes")
AtomicLongFieldUpdater<AbstractHttp2StreamChannel> updater = AtomicLongFieldUpdater.newUpdater(
AbstractHttp2StreamChannel.class, "outboundFlowControlWindow");
if (updater == null) {
updater = AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "outboundFlowControlWindow");
}
OUTBOUND_FLOW_CONTROL_WINDOW_UPDATER = updater;
}
protected AbstractHttp2StreamChannel(Channel parent, Http2Stream2 stream) {
super(parent);
this.stream = stream;
}
protected Http2Stream2 stream() {
return stream;
}
@Override
@ -97,6 +134,16 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
return isOpen();
}
@Override
public boolean isWritable() {
return isStreamIdValid(stream.id())
// So that the channel doesn't become active before the initial flow control window has been set.
&& outboundFlowControlWindow > 0
// Could be null if channel closed.
&& unsafe().outboundBuffer() != null
&& unsafe().outboundBuffer().isWritable();
}
@Override
protected AbstractUnsafe newUnsafe() {
return new Unsafe();
@ -168,71 +215,53 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
if (closed) {
throw CLOSED_CHANNEL_EXCEPTION;
}
EventExecutor preferredExecutor = preferredEventExecutor();
// TODO: this is pretty broken; futures should only be completed after they are processed on
// the parent channel. However, it isn't currently possible due to ChannelOutboundBuffer's
// behavior which requires completing the current future before getting the next message. It
// should become easier once we have outbound flow control support.
// https://github.com/netty/netty/issues/4941
if (preferredExecutor.inEventLoop()) {
for (;;) {
Object msg = in.current();
if (msg == null) {
break;
}
try {
doWrite(ReferenceCountUtil.retain(msg));
} catch (Throwable t) {
// It would be nice to fail the future, but we can't do that if not on the event
// loop. So we instead opt for a solution that is consistent.
pipeline().fireExceptionCaught(t);
}
in.remove();
final MessageSizeEstimator.Handle sizeEstimator = config().getMessageSizeEstimator().newHandle();
for (;;) {
final Object msg = in.current();
if (msg == null) {
break;
}
doWriteComplete();
} else {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
// TODO(buchgr): Detecting cancellation relies on ChannelOutboundBuffer internals. NOT COOL!
if (msg == Unpooled.EMPTY_BUFFER /* The write was cancelled. */
|| msg == REEVALUATE_WRITABILITY_MESSAGE /* Write to trigger writability after window update. */) {
in.remove();
continue;
}
final int bytes = sizeEstimator.size(msg);
/**
* The flow control window needs to be decrement before stealing the message from the buffer (and thereby
* decrementing the number of pending bytes). Else, when calling steal() the number of pending bytes could
* be less than the writebuffer watermark (=flow control window) and thus trigger a writability change.
*
* This code must never trigger a writability change. Only reading window updates or channel writes may
* change the channel's writability.
*/
incrementOutboundFlowControlWindow(-bytes);
final ChannelPromise promise = in.steal();
if (bytes > 0) {
promise.addListener(new ReturnFlowControlWindowOnFailureListener(bytes));
}
// TODO(buchgr): Should we also the change the writability if END_STREAM is set?
try {
doWrite(msg, promise);
} catch (Throwable t) {
promise.tryFailure(t);
}
preferredExecutor.execute(new Runnable() {
@Override
public void run() {
for (Object msg : msgsCopy) {
try {
doWrite(msg);
} catch (Throwable t) {
pipeline().fireExceptionCaught(t);
}
}
doWriteComplete();
}
});
}
doWriteComplete();
}
/**
* Process a single write. Guaranteed to eventually be followed by a {@link #doWriteComplete()},
* which denotes the end of the batch of writes. May be called from any thread.
*/
protected abstract void doWrite(Object msg) throws Exception;
protected abstract void doWrite(Object msg, ChannelPromise promise) throws Exception;
/**
* Process end of batch of {@link #doWrite(ChannelOutboundBuffer)}s. May be called from any thread.
*/
protected abstract void doWriteComplete();
/**
* The ideal thread for events like {@link #doWrite(ChannelOutboundBuffer)} to be processed on. May be used for
* efficient batching, but not required.
*/
protected abstract EventExecutor preferredEventExecutor();
/**
* {@code bytes}-count of bytes provided to {@link #fireChildRead} have been read. May be called
* from any thread. Must not throw an exception.
@ -283,18 +312,16 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
}
}
/**
* This method must only be called within the parent channel's eventloop.
*/
protected void streamId(int streamId) {
if (this.streamId != -1) {
throw new IllegalStateException("Stream identifier may only be set once.");
protected void incrementOutboundFlowControlWindow(int bytes) {
if (bytes == 0) {
return;
}
this.streamId = ObjectUtil.checkPositiveOrZero(streamId, "streamId");
OUTBOUND_FLOW_CONTROL_WINDOW_UPDATER.addAndGet(this, bytes);
}
protected int streamId() {
return streamId;
// Visible for testing
long getOutboundFlowControlWindow() {
return outboundFlowControlWindow;
}
/**
@ -305,13 +332,18 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
if (msg == CLOSE_MESSAGE) {
allocHandle.readComplete();
pipeline().fireChannelReadComplete();
unsafe().close(voidPromise());
close();
return false;
}
if (msg instanceof Http2WindowUpdateFrame) {
Http2WindowUpdateFrame windowUpdate = (Http2WindowUpdateFrame) msg;
incrementOutboundFlowControlWindow(windowUpdate.windowSizeIncrement());
reevaluateWritability();
return true;
}
int numBytesToBeConsumed = 0;
if (msg instanceof Http2DataFrame) {
Http2DataFrame data = (Http2DataFrame) msg;
numBytesToBeConsumed = data.content().readableBytes() + data.padding();
numBytesToBeConsumed = dataFrameFlowControlBytes((Http2DataFrame) msg);
allocHandle.lastBytesRead(numBytesToBeConsumed);
} else {
allocHandle.lastBytesRead(ARBITRARY_MESSAGE_SIZE);
@ -324,6 +356,23 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
return true;
}
private void reevaluateWritability() {
ChannelOutboundBuffer buffer = unsafe().outboundBuffer();
// If the buffer is not writable but should be writable, then write and flush a dummy object
// to trigger a writability change.
if (!buffer.isWritable() && buffer.totalPendingWriteBytes() < config.getWriteBufferHighWaterMark()) {
unsafe().outboundBuffer().addMessage(REEVALUATE_WRITABILITY_MESSAGE, 1, voidPromise());
unsafe().flush();
}
}
private static int dataFrameFlowControlBytes(Http2DataFrame frame) {
return frame.content().readableBytes()
+ frame.padding()
// +1 to account for the pad length field. See http://httpwg.org/specs/rfc7540.html#DATA
+ (frame.padding() & 1);
}
private final class Unsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
@ -331,4 +380,109 @@ abstract class AbstractHttp2StreamChannel extends AbstractChannel {
promise.setFailure(new UnsupportedOperationException());
}
}
/**
* Returns the flow-control size for DATA frames, and 0 for all other frames.
*/
private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
private static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
private static final class EstimatorHandle implements MessageSizeEstimator.Handle {
private static final EstimatorHandle INSTANCE = new EstimatorHandle();
@Override
public int size(Object msg) {
if (msg instanceof Http2DataFrame) {
return dataFrameFlowControlBytes((Http2DataFrame) msg);
}
return 0;
}
}
@Override
public Handle newHandle() {
return EstimatorHandle.INSTANCE;
}
}
/**
* {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
* window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
* changes.
*/
private final class Http2StreamChannelConfig extends DefaultChannelConfig {
// TODO(buchgr): Overwrite the RecvByteBufAllocator. We only need it to implement max messages per read.
Http2StreamChannelConfig(Channel channel) {
super(channel);
}
@Override
@Deprecated
public int getWriteBufferHighWaterMark() {
int window = (int) min(Integer.MAX_VALUE, outboundFlowControlWindow);
return max(0, window);
}
@Override
@Deprecated
public int getWriteBufferLowWaterMark() {
return getWriteBufferHighWaterMark();
}
@Override
public MessageSizeEstimator getMessageSizeEstimator() {
return FlowControlledFrameSizeEstimator.INSTANCE;
}
// TODO(buchgr): Throwing exceptions is not ideal. Maybe NO-OP and log a warning?
@Override
public WriteBufferWaterMark getWriteBufferWaterMark() {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
throw new UnsupportedOperationException();
}
}
private class ReturnFlowControlWindowOnFailureListener implements ChannelFutureListener {
private final int bytes;
ReturnFlowControlWindowOnFailureListener(int bytes) {
this.bytes = bytes;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
/**
* Return the flow control window of the failed data frame. We expect this code to be rarely executed
* and by implementing it as a window update, we don't have to worry about thread-safety.
*/
fireChildRead(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
}
}
}
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.UnstableApi;
/**
@ -24,21 +23,17 @@ import io.netty.util.internal.UnstableApi;
@UnstableApi
public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame {
// Volatile as parent and child channel may be on different eventloops.
private volatile int streamId = -1;
private volatile Http2Stream2 stream;
@Override
public AbstractHttp2StreamFrame streamId(int streamId) {
if (this.streamId != -1) {
throw new IllegalStateException("Stream identifier may only be set once.");
}
this.streamId = ObjectUtil.checkPositiveOrZero(streamId, "streamId");
public AbstractHttp2StreamFrame stream(Http2Stream2 stream) {
this.stream = stream;
return this;
}
@Override
public int streamId() {
return streamId;
public Http2Stream2 stream() {
return stream;
}
/**
@ -50,11 +45,11 @@ public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame {
return false;
}
Http2StreamFrame other = (Http2StreamFrame) o;
return streamId == other.streamId();
return stream == other.stream() || (stream != null && stream.equals(other.stream()));
}
@Override
public int hashCode() {
return streamId;
return stream.hashCode();
}
}

View File

@ -76,8 +76,8 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
}
@Override
public DefaultHttp2DataFrame streamId(int streamId) {
super.streamId(streamId);
public DefaultHttp2DataFrame stream(Http2Stream2 stream) {
super.stream(stream);
return this;
}
@ -87,7 +87,7 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
}
@Override
public boolean isEndStream() {
public boolean endStream() {
return endStream;
}
@ -153,7 +153,7 @@ public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implem
@Override
public String toString() {
return "DefaultHttp2DataFrame(streamId=" + streamId() + ", content=" + content
return "DefaultHttp2DataFrame(stream=" + stream() + ", content=" + content
+ ", endStream=" + endStream + ", padding=" + padding + ")";
}

View File

@ -19,6 +19,7 @@ import io.netty.util.internal.UnstableApi;
import static io.netty.handler.codec.http2.Http2CodecUtil.verifyPadding;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static io.netty.util.internal.ObjectUtil.checkPositive;
/**
* The default {@link Http2HeadersFrame} implementation.
@ -63,8 +64,8 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
}
@Override
public DefaultHttp2HeadersFrame streamId(int streamId) {
super.streamId(streamId);
public DefaultHttp2HeadersFrame stream(Http2Stream2 stream) {
super.stream(stream);
return this;
}
@ -78,8 +79,7 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
return headers;
}
@Override
public boolean isEndStream() {
public boolean endStream() {
return endStream;
}
@ -90,7 +90,7 @@ public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame imp
@Override
public String toString() {
return "DefaultHttp2HeadersFrame(streamId=" + streamId() + ", headers=" + headers
return "DefaultHttp2HeadersFrame(stream=" + stream() + ", headers=" + headers
+ ", endStream=" + endStream + ", padding=" + padding + ")";
}

View File

@ -0,0 +1,127 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.UnstableApi;
/**
* The default {@link Http2PingFrame} implementation.
*/
@UnstableApi
public class DefaultHttp2PingFrame extends DefaultByteBufHolder implements Http2PingFrame {
private final boolean ack;
public DefaultHttp2PingFrame(ByteBuf content) {
this(content, false);
}
/**
* A user cannot send a ping ack, as this is done automatically when a ping is received.
*/
DefaultHttp2PingFrame(ByteBuf content, boolean ack) {
super(mustBeEightBytes(content));
this.ack = ack;
}
@Override
public boolean ack() {
return ack;
}
@Override
public String name() {
return "PING";
}
@Override
public DefaultHttp2PingFrame copy() {
return new DefaultHttp2PingFrame(content().copy(), ack);
}
@Override
public DefaultHttp2PingFrame duplicate() {
return (DefaultHttp2PingFrame) super.duplicate();
}
@Override
public DefaultHttp2PingFrame retainedDuplicate() {
return (DefaultHttp2PingFrame) super.retainedDuplicate();
}
@Override
public DefaultHttp2PingFrame replace(ByteBuf content) {
return new DefaultHttp2PingFrame(content, ack);
}
@Override
public DefaultHttp2PingFrame retain() {
super.retain();
return this;
}
@Override
public DefaultHttp2PingFrame retain(int increment) {
super.retain(increment);
return this;
}
@Override
public DefaultHttp2PingFrame touch() {
super.touch();
return this;
}
@Override
public DefaultHttp2PingFrame touch(Object hint) {
super.touch(hint);
return this;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof Http2PingFrame)) {
return false;
}
Http2PingFrame other = (Http2PingFrame) o;
return super.equals(o) && ack == other.ack();
}
@Override
public int hashCode() {
int hash = super.hashCode();
hash = hash * 31 + (ack ? 1 : 0);
return hash;
}
private static ByteBuf mustBeEightBytes(ByteBuf content) {
ObjectUtil.checkNotNull(content, "content must not be null.");
if (content.readableBytes() != 8) {
throw new IllegalArgumentException("PING frames require 8 bytes of content. Was " +
content.readableBytes() + " bytes.");
}
return content;
}
@Override
public String toString() {
return "DefaultHttp2PingFrame(content=" + contentToString() + ", ack=" + ack + ')';
}
}

View File

@ -18,12 +18,14 @@ package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static io.netty.util.internal.ObjectUtil.checkPositive;
/**
* The default {@link Http2ResetFrame} implementation.
*/
@UnstableApi
public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame implements Http2ResetFrame {
private final long errorCode;
/**
@ -45,8 +47,8 @@ public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame imple
}
@Override
public DefaultHttp2ResetFrame streamId(int streamId) {
super.streamId(streamId);
public DefaultHttp2ResetFrame stream(Http2Stream2 stream) {
super.stream(stream);
return this;
}
@ -62,7 +64,7 @@ public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame imple
@Override
public String toString() {
return "DefaultHttp2ResetFrame(stream=" + streamId() + "errorCode=" + errorCode + ")";
return "DefaultHttp2ResetFrame(stream=" + stream() + ", errorCode=" + errorCode + ')';
}
@Override

View File

@ -13,22 +13,32 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.UnstableApi;
/**
* The default {@link Http2SettingsFrame} implementation.
*/
public class DefaultHttp2SettingsFrame implements Http2SettingsFrame {
@UnstableApi
public abstract class AbstractHttp2StreamStateEvent implements Http2StreamStateEvent {
private final Http2Settings settings;
private final int streamId;
protected AbstractHttp2StreamStateEvent(int streamId) {
this.streamId = ObjectUtil.checkPositiveOrZero(streamId, "streamId");
public DefaultHttp2SettingsFrame(Http2Settings settings) {
this.settings = settings;
}
@Override
public int streamId() {
return streamId;
public Http2Settings settings() {
return settings;
}
@Override
public String name() {
return "SETTINGS";
}
@Override
public String toString() {
return "DefaultHttp2SettingsFrame(settings=" + settings + ')';
}
}

View File

@ -32,8 +32,8 @@ public class DefaultHttp2WindowUpdateFrame extends AbstractHttp2StreamFrame impl
}
@Override
public DefaultHttp2WindowUpdateFrame streamId(int streamId) {
super.streamId(streamId);
public DefaultHttp2WindowUpdateFrame stream(Http2Stream2 stream) {
super.stream(stream);
return this;
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
/**
* A {@link ChannelDuplexHandler} providing additional functionality for HTTP/2. Specifically it allows to:
* <ul>
* <li>Create new outbound streams using {@link #newStream()}.</li>
* <li>Iterate over all active streams using {@link #forEachActiveStream(Http2Stream2Visitor)}.</li>
* </ul>
*
* <p>The {@link Http2FrameCodec} is required to be part of the {@link ChannelPipeline} before this handler is added,
* or else an {@link IllegalStateException} will be thrown.
*/
public class Http2ChannelDuplexHandler extends ChannelDuplexHandler {
private Http2FrameCodec frameCodec;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
frameCodec = requireHttp2FrameCodec(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
frameCodec = null;
}
/**
* Creates a new {@link Http2Stream2} object.
*
* <p>This method is <em>thread-safe</em>.
*/
public final Http2Stream2 newStream() {
return newStream0();
}
/**
* Allows to iterate over all currently active streams.
*
* <p>This method may only be called from the eventloop thread.
*/
protected final void forEachActiveStream(Http2Stream2Visitor streamVisitor) throws Http2Exception {
forEachActiveStream0(streamVisitor);
}
// So that it can be overwritten by tests, without being visible to the public.
void forEachActiveStream0(Http2Stream2Visitor streamVisitor) throws Http2Exception {
frameCodec.forEachActiveStream(streamVisitor);
}
// So that it can be overwritten by tests, without being visible to the public.
Http2Stream2 newStream0() {
if (frameCodec == null) {
throw new IllegalStateException("Frame codec not found. Has the handler been added to a pipeline?");
}
return frameCodec.newStream();
}
private static Http2FrameCodec requireHttp2FrameCodec(ChannelHandlerContext ctx) {
ChannelHandlerContext frameCodecCtx = ctx.pipeline().context(Http2FrameCodec.class);
if (frameCodecCtx == null) {
throw new IllegalArgumentException(Http2FrameCodec.class.getSimpleName()
+ " was not found in the channel pipeline.");
}
return (Http2FrameCodec) frameCodecCtx.handler();
}
}

View File

@ -15,6 +15,7 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.FullHttpResponse;
@ -45,6 +46,15 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
private final String handlerName;
private final Http2ConnectionHandler connectionHandler;
private final ChannelHandler upgradeToHandler;
public Http2ClientUpgradeCodec(Http2FrameCodec frameCodec, ChannelHandler upgradeToHandler) {
this(null, frameCodec, upgradeToHandler);
}
public Http2ClientUpgradeCodec(String handlerName, Http2FrameCodec frameCodec, ChannelHandler upgradeToHandler) {
this(handlerName, frameCodec.connectionHandler(), upgradeToHandler);
}
/**
* Creates the codec using a default name for the connection handler when adding to the
@ -53,7 +63,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
* @param connectionHandler the HTTP/2 connection handler
*/
public Http2ClientUpgradeCodec(Http2ConnectionHandler connectionHandler) {
this(null, connectionHandler);
this((String) null, connectionHandler);
}
/**
@ -64,8 +74,14 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
* @param connectionHandler the HTTP/2 connection handler
*/
public Http2ClientUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler) {
this(handlerName, connectionHandler, connectionHandler);
}
private Http2ClientUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler, ChannelHandler
upgradeToHandler) {
this.handlerName = handlerName;
this.connectionHandler = checkNotNull(connectionHandler, "connectionHandler");
this.upgradeToHandler = checkNotNull(upgradeToHandler, "upgradeToHandler");
}
@Override
@ -88,7 +104,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
connectionHandler.onHttpClientUpgrade();
// Add the handler to the pipeline.
ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler);
ctx.pipeline().addAfter(ctx.name(), handlerName, upgradeToHandler);
}
/**

View File

@ -30,7 +30,11 @@ public final class Http2Codec extends ChannelDuplexHandler {
Http2Codec(boolean server, Http2StreamChannelBootstrap bootstrap, Http2FrameWriter frameWriter,
Http2FrameLogger frameLogger, Http2Settings initialSettings) {
frameCodec = new Http2FrameCodec(server, frameWriter, frameLogger, initialSettings);
Http2FrameCodecBuilder frameBuilder = server
? Http2FrameCodecBuilder.forServer()
: Http2FrameCodecBuilder.forClient();
frameBuilder.frameWriter(frameWriter).frameLogger(frameLogger).initialSettings(initialSettings);
frameCodec = frameBuilder.build();
multiplexCodec = new Http2MultiplexCodec(server, bootstrap);
}

View File

@ -35,6 +35,8 @@ import static io.netty.handler.codec.http2.Http2Exception.headerListSizeError;
import static io.netty.util.CharsetUtil.UTF_8;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Constants and utility method used for encoding/decoding HTTP2 frames.
@ -132,6 +134,8 @@ public final class Http2CodecUtil {
return maxHeaderListSize + (maxHeaderListSize >>> 2);
}
public static final long DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS = MILLISECONDS.convert(30, SECONDS);
/**
* Returns {@code true} if the stream is an outbound stream.
*

View File

@ -25,11 +25,6 @@ import io.netty.util.internal.UnstableApi;
@UnstableApi
public interface Http2DataFrame extends Http2StreamFrame, ByteBufHolder {
/**
* {@code true} if this frame is the last one in this direction of the stream.
*/
boolean isEndStream();
/**
* Frame padding to use. Will be non-negative and less than 256.
*/
@ -41,6 +36,11 @@ public interface Http2DataFrame extends Http2StreamFrame, ByteBufHolder {
@Override
ByteBuf content();
/**
* Returns {@code true} if the END_STREAM flag ist set.
*/
boolean endStream();
@Override
Http2DataFrame copy();

View File

@ -16,115 +16,202 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.handler.codec.http2.Http2CodecUtil.isOutboundStream;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.logging.LogLevel.INFO;
/**
* An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
* frame a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
* objects received via {@link #write} are converted to the HTTP/2 wire format.
* <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
* This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
*
* <p>A change in stream state is propagated through the channel pipeline as a user event via
* {@link Http2StreamStateEvent} objects. When a HTTP/2 stream first becomes active a {@link Http2StreamActiveEvent}
* and when it gets closed a {@link Http2StreamClosedEvent} is emitted.
* <p>A HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
* frame, a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
* objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
* implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
* {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
* {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
* creating outbound streams.
*
* <p>Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
* <h3>Stream Lifecycle</h3>
*
* The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
* {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
* {@link Http2StreamFrame} has a {@link Http2Stream2} object attached that uniquely identifies a particular stream.
*
* <p>Application specific state can be maintained by attaching a custom object to a stream via
* {@link Http2Stream2#managedState(Object)}. As the name suggests, the state object is cleaned up automatically when a
* stream or the channel is closed.
*
* <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2Stream2} object set, while when writing a
* {@link Http2StreamFrame} the application code needs to set a {@link Http2Stream2} object using
* {@link Http2StreamFrame#stream(Http2Stream2)}.
*
* <h3>Flow control</h3>
*
* The frame codec automatically increments stream and connection flow control windows. It's possible to customize
* when flow control windows are updated via {@link Http2FrameCodecBuilder#windowUpdateRatio(float)}.
*
* <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
* number of bytes and the corresponding stream identifier set to the frame codec.
*
* <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
* {@link Http2Settings#initialWindowSize()} set to the targeted value.
*
* <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
* desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
* connection-level flow control window is the same as initial stream-level flow control window.
*
* <h3>New inbound Streams</h3>
*
* The first frame of a HTTP/2 stream must be a {@link Http2HeadersFrame}, which will have a {@link Http2Stream2} object
* attached. An application can detect if it's a new stream by inspecting the {@link Http2Stream2#managedState()} for
* {@code null}, and if so attach application specific state via {@link Http2Stream2#managedState(Object)}.
*
* <pre>
* public class MyChannelHandler extends Http2ChannelDuplexHandler {
*
* @Override
* public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
* if (msg instanceof Http2HeadersFrame) {
* Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg;
* if (msg.stream().managedState() == null) {
* // A new inbound stream.
* msg.stream().managedState(new ApplicationState());
* }
* }
* }
* }
* </pre>
*
* <h3>New outbound Streams</h3>
*
* A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2Stream2} object via
* {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
* attached.
*
* <pre>
* final Http2Stream2 stream = handler.newStream();
* ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
*
* @Override
* public void operationComplete(ChannelFuture f) {
* if (f.isSuccess()) {
* // Stream is active and stream.id() returns a valid stream identifier.
* System.out.println("New stream with id " + stream.id() + " created.");
* } else {
* // Stream failed to become active. Handle error.
* if (f.cause() instanceof Http2NoMoreStreamIdsException) {
*
* } else if (f.cause() instanceof Http2GoAwayException) {
*
* } else {
*
* }
* }
* }
* }
* </pre>
*
* <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
* HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
*
* <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
* {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
* the {@link Http2FrameCodec} can be build with {@link Http2FrameCodecBuilder#bufferOutgoingStreams} enabled, in which
* case a new stream and its associated frames will be buffered until either the limit is increased or an active
* stream is closed. It's, however, possible that a buffered stream will never become active. That is, the channel might
* get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
* {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
* the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
*
* <h3>Error Handling</h3>
*
* Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
* a specific HTTP/2 stream are wrapped in a {@link Http2Stream2Exception} and have the corresponding
* {@link Http2Stream2} object attached.
*
* <h3>Reference Counting</h3>
*
* Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
* reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
* propagating a reference counted object through the pipeline, and thus an application handler needs to release such
* an object after having consumed it. For more information on reference counting take a look at
* http://netty.io/wiki/reference-counted-objects.html
*
* <h3>HTTP Upgrade</h3>
*
* Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
* HTTP-to-HTTP/2 conversion is performed automatically.
*
* <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over
* this API. This API is targeted to eventually replace or reduce the need for the Http2Connection-based API.
*
* <h3>Opening and Closing Streams</h3>
*
* <p>When the remote side opens a new stream, the frame codec first emits a {@link Http2StreamActiveEvent} with the
* stream identifier set.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2StreamActiveEvent(streamId=3, headers=null) |
* +------------------------------------------------------------->
* | |
* | Http2HeadersFrame(streamId=3) |
* +------------------------------------------------------------->
* | |
* + +
* </pre>
*
* <p>When a stream is closed either due to a reset frame by the remote side, or due to both sides having sent frames
* with the END_STREAM flag, then the frame codec emits a {@link Http2StreamClosedEvent}.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2StreamClosedEvent(streamId=3) |
* +--------------------------------------------------------->
* | |
* + +
* </pre>
*
* <p>When the local side wants to close a stream, it has to write a {@link Http2ResetFrame} to which the frame codec
* will respond to with a {@link Http2StreamClosedEvent}.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2ResetFrame(streamId=3) |
* <---------------------------------------------------------+
* | |
* | Http2StreamClosedEvent(streamId=3) |
* +--------------------------------------------------------->
* | |
* + +
* </pre>
*
* <p>Opening an outbound/local stream works by first sending the frame codec a {@link Http2HeadersFrame} with no
* stream identifier set (such that {@link Http2CodecUtil#isStreamIdValid} returns {@code false}). If opening the stream
* was successful, the frame codec responds with a {@link Http2StreamActiveEvent} that contains the stream's new
* identifier as well as the <em>same</em> {@link Http2HeadersFrame} object that opened the stream.
* <pre>
* {@link Http2FrameCodec} {@link Http2MultiplexCodec}
* + +
* | Http2HeadersFrame(streamId=-1) |
* <-----------------------------------------------------------------------------------------------+
* | |
* | Http2StreamActiveEvent(streamId=2, headers=Http2HeadersFrame(streamId=-1)) |
* +----------------------------------------------------------------------------------------------->
* | |
* + +
* </pre>
*/
@UnstableApi
public class Http2FrameCodec extends ChannelDuplexHandler {
private static final Http2FrameLogger HTTP2_FRAME_LOGGER = new Http2FrameLogger(INFO, Http2FrameCodec.class);
private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
private final Http2ConnectionHandler http2Handler;
private final boolean server;
private final PropertyKey streamKey;
// Used to adjust flow control window on channel active. Set to null afterwards.
private Integer initialLocalConnectionWindow;
private ChannelHandlerContext ctx;
private ChannelHandlerContext http2HandlerCtx;
private Http2Stream2Impl pendingOutboundStreamsTail;
/** Lock protecting modifications to idle outbound streams. **/
private final Object lock = new Object();
/** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
private int numBufferedStreams;
/**
* Construct a new handler.
*
* @param server {@code true} this is a server
* Create a new handler. Use {@link Http2FrameCodecBuilder}.
*/
public Http2FrameCodec(boolean server) {
this(server, HTTP2_FRAME_LOGGER);
Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
long gracefulShutdownTimeoutMillis) {
decoder.frameListener(new FrameListener());
http2Handler = new InternalHttp2ConnectionHandler(decoder, encoder, initialSettings);
http2Handler.connection().addListener(new ConnectionListener());
http2Handler.gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
server = http2Handler.connection().isServer();
streamKey = connection().newKey();
initialLocalConnectionWindow = initialSettings.initialWindowSize();
}
Http2ConnectionHandler connectionHandler() {
return http2Handler;
}
/**
* Construct a new handler.
* Creates a new outbound/local stream.
*
* @param server {@code true} this is a server
* <p>The object is added to a list of idle streams, so that in case the stream object is never made active, the
* {@link Http2Stream2#closeFuture()} still completes.
*
* <p>This method may only be called after the handler has been added to a {@link io.netty.channel.ChannelPipeline}.
*
* <p>This method is thread-safe.
*/
public Http2FrameCodec(boolean server, Http2FrameLogger frameLogger) {
this(server, new DefaultHttp2FrameWriter(), frameLogger, Http2Settings.defaultSettings());
@ -146,11 +233,55 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
decoder.frameListener(new FrameListener());
http2Handler = new InternalHttp2ConnectionHandler(decoder, encoder, initialSettings);
http2Handler.connection().addListener(new ConnectionListener());
streamKey = connection().newKey();
this.server = server;
}
Http2ConnectionHandler connectionHandler() {
return http2Handler;
// TODO(buchgr): Discuss: Should this method be thread safe?
Http2Stream2 newStream() {
ChannelHandlerContext ctx0 = ctx;
if (ctx0 == null) {
throw new IllegalStateException("Channel handler not added to a channel pipeline.");
}
Http2Stream2Impl stream = new Http2Stream2Impl(ctx0.channel());
addPendingStream(stream);
return stream;
}
/**
* Iterates over all active HTTP/2 streams.
*
* <p>This method must not be called outside of the event loop.
*/
void forEachActiveStream(final Http2Stream2Visitor streamVisitor) throws Http2Exception {
assert ctx.channel().eventLoop().inEventLoop();
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) {
Http2Stream2 stream2 = stream.getProperty(streamKey);
if (stream2 == null) {
/**
* This code is expected to almost never execute. However, in rare cases it's possible that a
* stream is active without a {@link Http2Stream2} object attached, as it's set in a listener of
* the HEADERS frame write.
*/
stream2 = findPendingStream(stream.id());
if (stream2 == null) {
throw new AssertionError("All active streams must have a stream object attached.");
}
}
try {
return streamVisitor.visit(stream2);
} catch (Throwable cause) {
connectionHandler().onError(http2HandlerCtx, cause);
return false;
}
}
});
}
/**
@ -161,6 +292,13 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
this.ctx = ctx;
ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, http2Handler);
http2HandlerCtx = ctx.pipeline().context(http2Handler);
sendInitialConnectionWindow();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
sendInitialConnectionWindow();
super.channelActive(ctx);
}
/**
@ -168,9 +306,31 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
cleanupPendingStreams();
ctx.pipeline().remove(http2Handler);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
cleanupPendingStreams();
super.channelInactive(ctx);
}
private void sendInitialConnectionWindow() throws Http2Exception {
if (ctx.channel().isActive() && initialLocalConnectionWindow != null) {
Http2Stream connectionStream = http2Handler.connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = initialLocalConnectionWindow - currentSize;
http2Handler.decoder().flowController().incrementWindowSize(connectionStream, delta);
initialLocalConnectionWindow = null;
ctx.flush();
}
}
private Http2Connection connection() {
return http2Handler.connection();
}
/**
* Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
* HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
@ -199,24 +359,26 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
}
// Override this to signal it will never throw an exception.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
/**
* Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
* streams.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof Http2Frame)) {
ctx.write(msg, promise);
return;
}
try {
if (msg instanceof Http2WindowUpdateFrame) {
Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
consumeBytes(frame.streamId(), frame.windowSizeIncrement(), promise);
writeWindowUpdate(frame.stream().id(), frame.windowSizeIncrement(), promise);
} else if (msg instanceof Http2StreamFrame) {
writeStreamFrame((Http2StreamFrame) msg, promise);
} else if (msg instanceof Http2PingFrame) {
writePingFrame((Http2PingFrame) msg, promise);
} else if (msg instanceof Http2SettingsFrame) {
writeSettingsFrame((Http2SettingsFrame) msg, promise);
} else if (msg instanceof Http2GoAwayFrame) {
writeGoAwayFrame((Http2GoAwayFrame) msg, promise);
} else {
@ -227,17 +389,40 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
}
private void consumeBytes(int streamId, int bytes, ChannelPromise promise) {
private void writePingFrame(Http2PingFrame pingFrame, ChannelPromise promise) {
http2Handler.encoder().writePing(http2HandlerCtx, pingFrame.ack(), pingFrame.content().retain(), promise);
}
private void writeWindowUpdate(int streamId, int bytes, ChannelPromise promise) {
try {
Http2Stream stream = http2Handler.connection().stream(streamId);
http2Handler.connection().local().flowController()
.consumeBytes(stream, bytes);
if (streamId == 0) {
increaseInitialConnectionWindow(bytes);
} else {
consumeBytes(streamId, bytes);
}
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
}
private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
Http2LocalFlowController localFlow = connection().local().flowController();
int targetConnectionWindow = localFlow.initialWindowSize() + deltaBytes;
localFlow.incrementWindowSize(connection().connectionStream(), deltaBytes);
localFlow.initialWindowSize(targetConnectionWindow);
}
private void consumeBytes(int streamId, int bytes) throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
http2Handler.connection().local().flowController()
.consumeBytes(stream, bytes);
}
private void writeSettingsFrame(Http2SettingsFrame frame, ChannelPromise promise) {
http2Handler.encoder().writeSettings(http2HandlerCtx, frame.settings(), promise);
}
private void writeGoAwayFrame(Http2GoAwayFrame frame, ChannelPromise promise) {
if (frame.lastStreamId() > -1) {
throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
@ -254,56 +439,77 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
private void writeStreamFrame(Http2StreamFrame frame, ChannelPromise promise) {
if (!(frame.stream() instanceof Http2Stream2Impl)) {
throw new IllegalArgumentException("A stream object created by the frame codec needs to be set. " + frame);
}
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
http2Handler.encoder().writeData(http2HandlerCtx, frame.streamId(), dataFrame.content().retain(),
dataFrame.padding(), dataFrame.isEndStream(), promise);
http2Handler.encoder().writeData(http2HandlerCtx, frame.stream().id(), dataFrame.content().retain(),
dataFrame.padding(), dataFrame.endStream(), promise);
} else if (frame instanceof Http2HeadersFrame) {
writeHeadersFrame((Http2HeadersFrame) frame, promise);
} else if (frame instanceof Http2ResetFrame) {
Http2ResetFrame rstFrame = (Http2ResetFrame) frame;
http2Handler.resetStream(http2HandlerCtx, frame.streamId(), rstFrame.errorCode(), promise);
http2Handler.encoder().writeRstStream(http2HandlerCtx, frame.stream().id(), rstFrame.errorCode(), promise);
} else {
throw new UnsupportedMessageTypeException(frame);
}
}
private void writeHeadersFrame(Http2HeadersFrame headersFrame, ChannelPromise promise) {
int streamId = headersFrame.streamId();
if (!isStreamIdValid(streamId)) {
final Endpoint<Http2LocalFlowController> localEndpoint = http2Handler.connection().local();
streamId = localEndpoint.incrementAndGetNextStreamId();
try {
// Try to create a stream in OPEN state before writing headers, to catch errors on stream creation
// early on i.e. max concurrent streams limit reached, stream id exhaustion, etc.
localEndpoint.createStream(streamId, false);
} catch (Http2Exception e) {
promise.setFailure(e);
final int streamId;
if (isStreamIdValid(headersFrame.stream().id())) {
streamId = headersFrame.stream().id();
} else {
final Http2Stream2Impl stream = (Http2Stream2Impl) headersFrame.stream();
final Http2Connection connection = http2Handler.connection();
streamId = connection.local().incrementAndGetNextStreamId();
if (streamId < 0) {
promise.setFailure(new Http2NoMoreStreamIdsException());
return;
}
ctx.fireUserEventTriggered(new Http2StreamActiveEvent(streamId, headersFrame));
numBufferedStreams++;
// Set the stream id before completing the promise, as any listener added by a user will be executed
// before the below listener, and so the stream identifier is accessible in a user's listener.
stream.id(streamId);
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
numBufferedStreams--;
Http2Stream connectionStream = connection.stream(streamId);
if (future.isSuccess() && connectionStream != null) {
connectionStream.setProperty(streamKey, stream);
} else {
stream.setClosed();
}
removePendingStream(stream);
}
});
}
http2Handler.encoder().writeHeaders(http2HandlerCtx, streamId, headersFrame.headers(),
headersFrame.padding(), headersFrame.isEndStream(), promise);
http2Handler.encoder().writeHeaders(http2HandlerCtx, streamId, headersFrame.headers(), headersFrame.padding(),
headersFrame.endStream(), promise);
}
private final class ConnectionListener extends Http2ConnectionAdapter {
@Override
public void onStreamActive(Http2Stream stream) {
if (ctx == null) {
// UPGRADE stream is active before handlerAdded().
return;
}
if (isOutboundStream(server, stream.id())) {
// Creation of outbound streams is notified in writeHeadersFrame().
return;
}
ctx.fireUserEventTriggered(new Http2StreamActiveEvent(stream.id()));
stream.setProperty(streamKey, new Http2Stream2Impl(ctx.channel()).id(stream.id()));
}
@Override
public void onStreamClosed(Http2Stream stream) {
ctx.fireUserEventTriggered(new Http2StreamClosedEvent(stream.id()));
Http2Stream2Impl stream2 = stream.getProperty(streamKey);
if (stream2 != null) {
stream2.setClosed();
}
}
@Override
@ -312,33 +518,89 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
}
private static final class InternalHttp2ConnectionHandler extends Http2ConnectionHandler {
private final class InternalHttp2ConnectionHandler extends Http2ConnectionHandler {
InternalHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
}
@Override
protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) {
ctx.fireExceptionCaught(cause);
}
/**
* Exceptions for streams unknown streams, that is streams that have no {@link Http2Stream2} object attached
* are simply logged and replied to by sending a RST_STREAM frame. There is not much value in propagating such
* exceptions through the pipeline, as a user will not have any additional information / state about this
* stream and thus can't do any meaningful error handling.
*/
@Override
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
Http2Exception.StreamException http2Ex) {
try {
Http2Stream stream = connection().stream(http2Ex.streamId());
if (stream == null) {
Http2Exception.StreamException streamException) {
int streamId = streamException.streamId();
Http2Stream connectionStream = connection().stream(streamId);
if (connectionStream == null) {
Http2Stream2 stream2 = findPendingStream(streamId);
if (stream2 == null) {
LOG.warn("Stream exception thrown for unkown stream.", cause);
// Write a RST_STREAM
super.onStreamError(ctx, cause, streamException);
return;
}
ctx.fireExceptionCaught(http2Ex);
} finally {
super.onStreamError(ctx, cause, http2Ex);
fireHttp2Stream2Exception(stream2, streamException.error(), cause);
} else {
Http2Stream2 stream2 = connectionStream.getProperty(streamKey);
if (stream2 == null) {
LOG.warn("Stream exception thrown without stream object attached.", cause);
// Write a RST_STREAM
super.onStreamError(ctx, cause, streamException);
return;
}
fireHttp2Stream2Exception(stream2, streamException.error(), cause);
}
}
@Override
protected boolean isGracefulShutdownComplete() {
return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
}
private void fireHttp2Stream2Exception(Http2Stream2 stream, Http2Error error, Throwable cause) {
ctx.fireExceptionCaught(new Http2Stream2Exception(stream, error, cause));
}
}
private static final class FrameListener extends Http2FrameAdapter {
private final class FrameListener extends Http2FrameAdapter {
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
ctx.fireChannelRead(new DefaultHttp2SettingsFrame(settings));
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) {
ctx.fireChannelRead(new DefaultHttp2PingFrame(data.retain(), false));
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) {
ctx.fireChannelRead(new DefaultHttp2PingFrame(data.retain(), true));
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
Http2ResetFrame rstFrame = new DefaultHttp2ResetFrame(errorCode);
rstFrame.streamId(streamId);
ctx.fireChannelRead(rstFrame);
ctx.fireChannelRead(new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
if (streamId == 0) {
// Ignore connection window updates.
return;
}
ctx.fireChannelRead(new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
}
@Override
@ -351,20 +613,170 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) {
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(headers, endOfStream, padding);
headersFrame.streamId(streamId);
ctx.fireChannelRead(headersFrame);
ctx.fireChannelRead(new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
.stream(requireStream(streamId)));
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) {
Http2DataFrame dataFrame = new DefaultHttp2DataFrame(data.retain(), endOfStream, padding);
dataFrame.streamId(streamId);
ctx.fireChannelRead(dataFrame);
ctx.fireChannelRead(new DefaultHttp2DataFrame(data.retain(), endOfStream, padding)
.stream(requireStream(streamId)));
// We return the bytes in consumeBytes() once the stream channel consumed the bytes.
return 0;
}
private <V> Http2Stream2 requireStream(int streamId) {
Http2Stream2 stream = connection().stream(streamId).getProperty(streamKey);
if (stream == null) {
throw new IllegalStateException("Stream object required for identifier: " + streamId);
}
return stream;
}
}
/**
* {@link Http2Stream2} implementation.
*/
static final class Http2Stream2Impl extends DefaultChannelPromise implements Http2Stream2 {
private Http2Stream2Impl prev;
private Http2Stream2Impl next;
private volatile int id = -1;
private volatile Object managedState;
Http2Stream2Impl(Channel channel) {
super(channel);
setUncancellable();
}
@Override
public Http2Stream2Impl id(int id) {
if (!isStreamIdValid(id)) {
throw new IllegalArgumentException("Stream identifier invalid. Was: " + id);
}
this.id = id;
return this;
}
@Override
public int id() {
return id;
}
@Override
public Http2Stream2Impl managedState(Object state) {
managedState = state;
return this;
}
@Override
public Object managedState() {
return managedState;
}
@Override
public ChannelFuture closeFuture() {
return this;
}
@Override
public ChannelPromise setSuccess() {
throw new UnsupportedOperationException();
}
@Override
public ChannelPromise setSuccess(Void result) {
throw new UnsupportedOperationException();
}
@Override
public boolean trySuccess() {
throw new UnsupportedOperationException();
}
@Override
public ChannelPromise setFailure(Throwable cause) {
throw new UnsupportedOperationException();
}
@Override
public boolean tryFailure(Throwable cause) {
throw new UnsupportedOperationException();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
void setClosed() {
super.trySuccess();
}
@Override
public String toString() {
return String.valueOf(id);
}
}
private void addPendingStream(Http2Stream2Impl stream) {
synchronized (lock) {
if (pendingOutboundStreamsTail == null) {
pendingOutboundStreamsTail = stream;
return;
}
pendingOutboundStreamsTail.next = stream;
stream.prev = pendingOutboundStreamsTail;
}
}
private void removePendingStream(Http2Stream2Impl stream) {
try {
synchronized (lock) {
if (pendingOutboundStreamsTail == null) {
return;
}
if (pendingOutboundStreamsTail == stream) {
pendingOutboundStreamsTail = null;
}
stream.prev = stream.next;
if (stream.next != null) {
stream.next.prev = stream.prev;
}
}
} finally {
// Avoid GC nepotism
stream.next = null;
stream.prev = null;
}
}
private Http2Stream2 findPendingStream(int streamId) {
if (isOutboundStream(server, streamId)) {
synchronized (lock) {
Http2Stream2Impl idleStream = pendingOutboundStreamsTail;
while (idleStream != null) {
if (idleStream.id() == streamId) {
return idleStream;
}
idleStream = idleStream.prev;
}
}
}
return null;
}
private void cleanupPendingStreams() {
synchronized (lock) {
while (pendingOutboundStreamsTail != null) {
pendingOutboundStreamsTail.setClosed();
pendingOutboundStreamsTail = pendingOutboundStreamsTail.prev;
}
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.util.internal.UnstableApi;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
/**
* Builder for the {@link Http2FrameCodec}.
*/
@UnstableApi
public final class Http2FrameCodecBuilder {
private final boolean server;
private Http2FrameWriter frameWriter;
private Http2FrameReader frameReader;
private Http2Settings initialSettings;
private long gracefulShutdownTimeoutMillis;
private float windowUpdateRatio;
private Http2FrameLogger frameLogger;
private boolean bufferOutboundStreams;
private Http2FrameCodecBuilder(boolean server) {
this.server = server;
frameWriter = new DefaultHttp2FrameWriter();
frameReader = new DefaultHttp2FrameReader();
initialSettings = new Http2Settings();
gracefulShutdownTimeoutMillis = Http2CodecUtil.DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
windowUpdateRatio = DEFAULT_WINDOW_UPDATE_RATIO;
}
/**
* Creates a builder for a HTTP/2 client.
*/
public static Http2FrameCodecBuilder forClient() {
return new Http2FrameCodecBuilder(false);
}
/**
* Creates a builder for a HTTP/2 server.
*/
public static Http2FrameCodecBuilder forServer() {
return new Http2FrameCodecBuilder(true);
}
/**
* Specify the {@link Http2FrameWriter} to use.
*
* <p>If not set, the {@link DefaultHttp2FrameWriter} is used.
*/
public Http2FrameCodecBuilder frameWriter(Http2FrameWriter frameWriter) {
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
return this;
}
/**
* Specify the {@link Http2FrameWriter} to use.
*
* <p>If not set, the {@link DefaultHttp2FrameReader} is used.
*/
public Http2FrameCodecBuilder frameReader(Http2FrameReader frameReader) {
this.frameReader = checkNotNull(frameReader, "frameReader");
return this;
}
/**
* Specify the initial {@link Http2Settings} to send to the remote endpoint.
*
* <p>If not set, the default values of {@link Http2Settings} are used.
*/
public Http2FrameCodecBuilder initialSettings(Http2Settings initialSettings) {
this.initialSettings = checkNotNull(initialSettings, "initialSettings");
return this;
}
/**
* The amount of time to wait for all active streams to be closed, before the connection is closed.
*
* <p>The default value is {@link Http2CodecUtil#DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS}.
*/
public Http2FrameCodecBuilder gracefulShutdownTimeout(long timeout, TimeUnit unit) {
gracefulShutdownTimeoutMillis =
checkNotNull(unit, "unit").toMillis(checkPositiveOrZero(timeout, "timeout"));
return this;
}
/**
* Specify the HTTP/2 flow control window update ratio for both the connection and stream window.
*/
public Http2FrameCodecBuilder windowUpdateRatio(float windowUpdateRatio) {
if (Float.compare(windowUpdateRatio, 0) < 1 || Float.compare(windowUpdateRatio, 1) > -1) {
throw new IllegalArgumentException("windowUpdateRatio must be (0,1). Was: " + windowUpdateRatio);
}
this.windowUpdateRatio = windowUpdateRatio;
return this;
}
/**
* Specify the {@link Http2FrameLogger} to use.
*
* <p>By default no frame logger is used.
*/
public Http2FrameCodecBuilder frameLogger(Http2FrameLogger frameLogger) {
this.frameLogger = checkNotNull(frameLogger, "frameLogger");
return this;
}
/**
* Whether to buffer new outbound HTTP/2 streams when the {@code MAX_CONCURRENT_STREAMS} limit is reached.
*
* <p>When this limit is hit, instead of rejecting any new streams, newly created streams and their corresponding
* frames are buffered. Once an active stream gets closed or the maximum number of concurrent streams is increased,
* the codec will automatically try to empty its buffer and create as many new streams as possible.
*
* <p>If a {@code GOAWAY} frame is received from the remote endpoint, all buffered writes for streams with an ID
* less than the specified {@code lastStreamId} will immediately fail with a {@link Http2GoAwayException}.
*
* <p>If the channel gets closed, all new and buffered writes will immediately fail with a
* {@link Http2ChannelClosedException}.
*
* <p>This implementation makes the buffering mostly transparent and does not enforce an upper bound as to how many
* streams/frames can be buffered.
*/
public Http2FrameCodecBuilder bufferOutboundStreams(boolean bufferOutboundStreams) {
this.bufferOutboundStreams = bufferOutboundStreams;
return this;
}
/**
* Build a {@link Http2FrameCodec} object.
*/
public Http2FrameCodec build() {
Http2Connection connection = new DefaultHttp2Connection(server);
if (frameLogger != null) {
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
}
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
if (bufferOutboundStreams) {
encoder = new StreamBufferingEncoder(encoder);
}
connection.local().flowController(new DefaultHttp2LocalFlowController(connection, windowUpdateRatio,
true /* auto refill conn window */));
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
return new Http2FrameCodec(encoder, decoder, initialSettings, gracefulShutdownTimeoutMillis);
}
}

View File

@ -28,13 +28,13 @@ public interface Http2HeadersFrame extends Http2StreamFrame {
*/
Http2Headers headers();
/**
* {@code true} if this frame is the last one in this direction of the stream.
*/
boolean isEndStream();
/**
* Frame padding to use. Must be non-negative and less than 256.
*/
int padding();
/**
* Returns {@code true} if the END_STREAM flag ist set.
*/
boolean endStream();
}

View File

@ -18,22 +18,17 @@ package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
@ -44,11 +39,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CLOSE_MESSAGE;
import static io.netty.handler.codec.http2.Http2CodecUtil.isOutboundStream;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static java.lang.String.format;
/**
* An HTTP/2 handler that creates child channels for each stream.
@ -69,20 +61,30 @@ import static java.lang.String.format;
* <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
*
* <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
*
* <h3>Reference Counting</h3>
*
* Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
* reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
* before propagating a reference counted object through the pipeline, and thus an application handler needs to release
* such an object after having consumed it. For more information on reference counting take a look at
* http://netty.io/wiki/reference-counted-objects.html
*/
@UnstableApi
public final class Http2MultiplexCodec extends ChannelDuplexHandler {
public class Http2MultiplexCodec extends Http2ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2MultiplexCodec.class);
private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2MultiplexCodec.class);
private final Http2StreamChannelBootstrap bootstrap;
// Visible for testing
final Http2StreamChannelBootstrap bootstrap;
private final List<Http2StreamChannel> channelsToFireChildReadComplete = new ArrayList<Http2StreamChannel>();
private final boolean server;
private ChannelHandlerContext ctx;
// Visible for testing
ChannelHandlerContext ctx;
private volatile Runnable flushTask;
private final IntObjectMap<Http2StreamChannel> childChannels = new IntObjectHashMap<Http2StreamChannel>();
private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
/**
* Construct a new handler whose child channels run in a different event loop.
@ -98,31 +100,19 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
this.bootstrap = new Http2StreamChannelBootstrap(bootstrap);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
bootstrap.parentChannel(ctx.channel());
private static Http2StreamChannel requireChildChannel(Http2Stream2 stream2) {
Object state = stream2.managedState();
if (!(state instanceof Http2StreamChannel)) {
throw new IllegalStateException("Stream must have child channel attached");
}
return (Http2StreamChannel) state;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (!(cause instanceof StreamException)) {
ctx.fireExceptionCaught(cause);
return;
}
StreamException streamEx = (StreamException) cause;
try {
Http2StreamChannel childChannel = childChannels.get(streamEx.streamId());
if (childChannel != null) {
childChannel.pipeline().fireExceptionCaught(streamEx);
} else {
logger.warn(format("Exception caught for unknown HTTP/2 stream '%d'", streamEx.streamId()),
streamEx);
}
} finally {
onStreamClosed(streamEx.streamId());
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
bootstrap.parentChannel(ctx.channel());
super.handlerAdded(ctx);
}
// Override this to signal it will never throw an exception.
@ -137,31 +127,72 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
ctx.fireChannelRead(msg);
return;
}
if (msg instanceof Http2StreamFrame) {
Http2StreamFrame frame = (Http2StreamFrame) msg;
int streamId = frame.streamId();
Http2StreamChannel childChannel = childChannels.get(streamId);
if (childChannel == null) {
// TODO: Combine with DefaultHttp2ConnectionDecoder.shouldIgnoreHeadersOrDataFrame logic.
ReferenceCountUtil.release(msg);
throw new StreamException(streamId, STREAM_CLOSED, format("Received %s frame for an unknown stream %d",
frame.name(), streamId));
}
fireChildReadAndRegister(childChannel, frame);
channelReadStreamFrame((Http2StreamFrame) msg);
} else if (msg instanceof Http2GoAwayFrame) {
Http2GoAwayFrame goAwayFrame = (Http2GoAwayFrame) msg;
for (PrimitiveEntry<Http2StreamChannel> entry : childChannels.entries()) {
Http2StreamChannel childChannel = entry.value();
int streamId = entry.key();
if (streamId > goAwayFrame.lastStreamId() && isOutboundStream(server, streamId)) {
childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
final Http2GoAwayFrame goAwayFrame = (Http2GoAwayFrame) msg;
forEachActiveStream(new Http2Stream2Visitor() {
@Override
public boolean visit(Http2Stream2 stream) {
final int streamId = stream.id();
final Http2StreamChannel childChannel = requireChildChannel(stream);
if (streamId > goAwayFrame.lastStreamId() && isOutboundStream(server, streamId)) {
childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
}
return true;
}
}
});
goAwayFrame.release();
} else if (msg instanceof Http2SettingsFrame) {
Http2Settings settings = ((Http2SettingsFrame) msg).settings();
if (settings.initialWindowSize() != null) {
initialOutboundStreamWindow = settings.initialWindowSize();
}
}
}
private void channelReadStreamFrame(Http2StreamFrame frame) {
Http2Stream2 stream = frame.stream();
if (stream.managedState() == null) {
onStreamActive(stream);
}
Http2StreamChannel childChannel = requireChildChannel(stream);
fireChildReadAndRegister(childChannel, frame);
}
private void onStreamActive(Http2Stream2 stream) {
final Http2StreamChannel childChannel;
if (stream.managedState() == null) {
ChannelFuture future = bootstrap.connect(stream);
childChannel = (Http2StreamChannel) future.channel();
stream.managedState(childChannel);
} else {
// It's safe to release, as UnsupportedMessageTypeException just calls msg.getClass()
ReferenceCountUtil.release(msg);
throw new UnsupportedMessageTypeException(msg);
childChannel = requireChildChannel(stream);
}
assert !childChannel.isWritable();
childChannel.incrementOutboundFlowControlWindow(initialOutboundStreamWindow);
childChannel.pipeline().fireChannelWritabilityChanged();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Http2Stream2Exception) {
Http2Stream2Exception streamException = (Http2Stream2Exception) cause;
Http2Stream2 stream = streamException.stream();
Http2StreamChannel childChannel = requireChildChannel(stream);
try {
childChannel.pipeline().fireExceptionCaught(streamException.getCause());
} finally {
childChannel.close();
}
} else {
ctx.fireExceptionCaught(cause);
}
}
@ -175,59 +206,6 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof Http2StreamActiveEvent) {
Http2StreamActiveEvent activeEvent = (Http2StreamActiveEvent) evt;
onStreamActive(activeEvent.streamId(), activeEvent.headers());
} else if (evt instanceof Http2StreamClosedEvent) {
onStreamClosed(((Http2StreamClosedEvent) evt).streamId());
} else {
ctx.fireUserEventTriggered(evt);
}
}
private void onStreamActive(int streamId, Http2HeadersFrame headersFrame) {
final Http2StreamChannel childChannel;
if (isOutboundStream(server, streamId)) {
if (!(headersFrame instanceof ChannelCarryingHeadersFrame)) {
throw new IllegalArgumentException("needs to be wrapped");
}
childChannel = ((ChannelCarryingHeadersFrame) headersFrame).channel();
childChannel.streamId(streamId);
} else {
ChannelFuture future = bootstrap.connect(streamId);
childChannel = (Http2StreamChannel) future.channel();
}
Http2StreamChannel existing = childChannels.put(streamId, childChannel);
assert existing == null;
}
private void onStreamClosed(int streamId) {
final Http2StreamChannel childChannel = childChannels.remove(streamId);
if (childChannel != null) {
final EventLoop eventLoop = childChannel.eventLoop();
if (eventLoop.inEventLoop()) {
onStreamClosed0(childChannel);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
onStreamClosed0(childChannel);
}
});
}
}
}
private void onStreamClosed0(Http2StreamChannel childChannel) {
assert childChannel.eventLoop().inEventLoop();
childChannel.onStreamClosedFired = true;
childChannel.fireChildRead(CLOSE_MESSAGE);
}
void flushFromStreamChannel() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
@ -246,20 +224,16 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
void writeFromStreamChannel(Object msg, boolean flush) {
writeFromStreamChannel(msg, ctx.newPromise(), flush);
}
void writeFromStreamChannel(final Object msg, final ChannelPromise promise, final boolean flush) {
void writeFromStreamChannel(final Http2Frame frame, final ChannelPromise promise, final boolean flush) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
writeFromStreamChannel0(msg, flush, promise);
writeFromStreamChannel0(frame, flush, promise);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
writeFromStreamChannel0(msg, flush, promise);
writeFromStreamChannel0(frame, flush, promise);
}
});
} catch (Throwable cause) {
@ -268,9 +242,9 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
private void writeFromStreamChannel0(Object msg, boolean flush, ChannelPromise promise) {
private void writeFromStreamChannel0(Http2Frame frame, boolean flush, ChannelPromise promise) {
try {
write(ctx, msg, promise);
ctx.write(frame, promise);
} catch (Throwable cause) {
promise.tryFailure(cause);
}
@ -296,27 +270,22 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
ChannelFuture createStreamChannel(Channel parentChannel, EventLoopGroup group, ChannelHandler handler,
Map<ChannelOption<?>, Object> options,
Map<AttributeKey<?>, Object> attrs,
int streamId) {
final Http2StreamChannel channel = new Http2StreamChannel(parentChannel);
if (isStreamIdValid(streamId)) {
assert !isOutboundStream(server, streamId);
assert ctx.channel().eventLoop().inEventLoop();
channel.streamId(streamId);
}
channel.pipeline().addLast(handler);
Http2Stream2 stream) {
final Http2StreamChannel childChannel = new Http2StreamChannel(parentChannel, stream);
childChannel.pipeline().addLast(handler);
initOpts(channel, options);
initAttrs(channel, attrs);
initOpts(childChannel, options);
initAttrs(childChannel, attrs);
ChannelFuture future = group.register(channel);
ChannelFuture future = group.register(childChannel);
// Handle any errors that occurred on the local thread while registering. Even though
// failures can happen after this point, they will be handled by the channel by closing the
// channel.
// childChannel.
if (future.cause() != null) {
if (channel.isRegistered()) {
channel.close();
if (childChannel.isRegistered()) {
childChannel.close();
} else {
channel.unsafe().closeForcibly();
childChannel.unsafe().closeForcibly();
}
}
return future;
@ -328,10 +297,10 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
for (Entry<ChannelOption<?>, Object> e: opts.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
LOG.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
LOG.warn("Failed to set a channel option: " + channel, t);
}
}
}
@ -347,54 +316,85 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
final class Http2StreamChannel extends AbstractHttp2StreamChannel implements ChannelFutureListener {
boolean onStreamClosedFired;
/**
* {@code true} if stream is in {@link Http2MultiplexCodec#channelsToFireChildReadComplete}.
*/
/** {@code true} after the first HEADERS frame has been written **/
boolean firstFrameWritten;
/** {@code true} if a close without an error was initiated **/
boolean streamClosedWithoutError;
/** {@code true} if stream is in {@link Http2MultiplexCodec#channelsToFireChildReadComplete}. **/
boolean inStreamsToFireChildReadComplete;
Http2StreamChannel(Channel parentChannel) {
super(parentChannel);
Http2StreamChannel(Channel parentChannel, Http2Stream2 stream) {
super(parentChannel, stream);
stream.managedState(this);
stream.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
streamClosedWithoutError = true;
fireChildRead(CLOSE_MESSAGE);
}
});
}
@Override
protected void doClose() throws Exception {
if (!onStreamClosedFired && isStreamIdValid(streamId())) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).streamId(streamId());
writeFromStreamChannel(resetFrame, true);
if (!streamClosedWithoutError && isStreamIdValid(stream().id())) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
writeFromStreamChannel(resetFrame, ctx.newPromise(), true);
}
super.doClose();
}
@Override
protected void doWrite(Object msg) {
protected void doWrite(Object msg, ChannelPromise childPromise) {
if (msg instanceof Http2StreamFrame) {
Http2StreamFrame frame = (Http2StreamFrame) msg;
ChannelPromise promise = ctx.newPromise();
if (isStreamIdValid(frame.streamId())) {
// Http2StreamFrame frame = (Http2StreamFrame) msg;
Http2StreamFrame frame = validateStreamFrame(msg);
if (!firstFrameWritten && !isStreamIdValid(stream().id())) {
if (!(frame instanceof Http2HeadersFrame)) {
throw new IllegalArgumentException("The first frame must be a headers frame. Was: "
+ frame.name());
}
childPromise.addListener(this);
firstFrameWritten = true;
}
frame.stream(stream());
/**
* Wrap the ChannelPromise of the child channel in a ChannelPromise of the parent channel
* in order to be able to use it on the parent channel. We don't need to worry about the
* channel being cancelled, as the outbound buffer of the child channel marks it uncancelable.
*/
assert !childPromise.isCancellable();
ChannelFutureListener childPromiseNotifier = new ChannelPromiseNotifier(childPromise);
ChannelPromise parentPromise = ctx.newPromise().addListener(childPromiseNotifier);
/*
if (isStreamValid(frame.stream())) {
ReferenceCountUtil.release(frame);
throw new IllegalArgumentException("Stream id must not be set on the frame. Was: "
+ frame.streamId());
+ frame.stream().id());
}
if (!isStreamIdValid(streamId())) {
if (!isStreamValid(frame.stream())) {
if (!(frame instanceof Http2HeadersFrame)) {
ReferenceCountUtil.release(frame);
throw new IllegalArgumentException("The first frame must be a headers frame. Was: "
+ frame.name());
}
frame = new ChannelCarryingHeadersFrame((Http2HeadersFrame) frame, this);
//frame = new ChannelCarryingHeadersFrame((Http2HeadersFrame) frame, this);
// Handle errors on stream creation
promise.addListener(this);
parentPromise.addListener(this);
} else {
frame.streamId(streamId());
frame.stream(stream());
}
writeFromStreamChannel(frame, promise, false);
*/
writeFromStreamChannel(frame, parentPromise, false);
} else if (msg instanceof Http2GoAwayFrame) {
ChannelPromise promise = ctx.newPromise();
promise.addListener(this);
writeFromStreamChannel(msg, promise, false);
writeFromStreamChannel((Http2GoAwayFrame) msg, promise, false);
} else {
ReferenceCountUtil.release(msg);
throw new IllegalArgumentException("Message must be an Http2GoAwayFrame or Http2StreamFrame: " + msg);
@ -406,72 +406,32 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
flushFromStreamChannel();
}
@Override
protected EventExecutor preferredEventExecutor() {
return ctx.executor();
}
@Override
protected void bytesConsumed(final int bytes) {
ctx.write(new DefaultHttp2WindowUpdateFrame(bytes).streamId(streamId()));
ctx.write(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream()));
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
pipeline().fireExceptionCaught(cause);
if (future.isSuccess()) {
onStreamActive(stream());
} else {
pipeline().fireExceptionCaught(future.cause());
close();
}
}
}
/**
* Wraps the first {@link Http2HeadersFrame} of local/outbound stream. This allows us to get to the child channel
* when receiving the {@link Http2StreamActiveEvent} from the frame codec. See {@link #onStreamActive}.
*/
private static final class ChannelCarryingHeadersFrame implements Http2HeadersFrame {
private final Http2HeadersFrame frame;
private final Http2StreamChannel childChannel;
ChannelCarryingHeadersFrame(Http2HeadersFrame frame, Http2StreamChannel childChannel) {
this.frame = frame;
this.childChannel = childChannel;
}
@Override
public Http2Headers headers() {
return frame.headers();
}
@Override
public boolean isEndStream() {
return frame.isEndStream();
}
@Override
public int padding() {
return frame.padding();
}
@Override
public Http2StreamFrame streamId(int streamId) {
return frame.streamId(streamId);
}
@Override
public int streamId() {
return frame.streamId();
}
@Override
public String name() {
return frame.name();
}
Http2StreamChannel channel() {
return childChannel;
private Http2StreamFrame validateStreamFrame(Object msg) {
if (!(msg instanceof Http2StreamFrame)) {
ReferenceCountUtil.release(msg);
throw new IllegalArgumentException("Message must be a Http2StreamFrame: " + msg);
}
Http2StreamFrame frame = (Http2StreamFrame) msg;
if (frame.stream() != null) {
ReferenceCountUtil.release(frame);
throw new IllegalArgumentException("Stream must not be set on the frame.");
}
return frame;
}
}
}

View File

@ -13,19 +13,27 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.internal.UnstableApi;
/**
* An event describing a state change of a particular HTTP/2 stream. Such events
* are typically emitted by channel handlers to exchange stream state information.
* HTTP/2 PING Frame.
*/
@UnstableApi
public interface Http2StreamStateEvent {
public interface Http2PingFrame extends Http2Frame, ByteBufHolder {
/**
* Returns the HTTP/2 stream identifier for this event.
* When {@code true}, indicates that this ping is a ping response.
*/
int streamId();
boolean ack();
/**
* Returns the eight byte opaque data.
*/
@Override
ByteBuf content();
}

View File

@ -70,7 +70,7 @@ public class Http2ServerDowngrader extends MessageToMessageCodec<Http2StreamFram
Http2HeadersFrame headersFrame = (Http2HeadersFrame) frame;
Http2Headers headers = headersFrame.headers();
if (headersFrame.isEndStream()) {
if (headersFrame.endStream()) {
if (headers.method() == null) {
LastHttpContent last = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
HttpConversionUtil.addHttp2ToHttpHeaders(id, headers, last.trailingHeaders(),
@ -90,7 +90,7 @@ public class Http2ServerDowngrader extends MessageToMessageCodec<Http2StreamFram
}
} else if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
if (dataFrame.isEndStream()) {
if (dataFrame.endStream()) {
out.add(new DefaultLastHttpContent(dataFrame.content(), validateHeaders));
} else {
out.add(new DefaultHttpContent(dataFrame.content()));

View File

@ -13,13 +13,16 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* HTTP/2 SETTINGS frame.
*/
public interface Http2SettingsFrame extends Http2Frame {
@UnstableApi
public class Http2StreamClosedEvent extends AbstractHttp2StreamStateEvent {
public Http2StreamClosedEvent(int streamId) {
super(streamId);
}
Http2Settings settings();
@Override
String name();
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelFuture;
import io.netty.util.internal.UnstableApi;
/**
* A single stream within a HTTP/2 connection. To be used with the {@link Http2FrameCodec}.
*/
@UnstableApi
public interface Http2Stream2 {
/**
* The stream with identifier 0, representing the HTTP/2 connection.
*/
Http2Stream2 CONNECTION_STREAM = new Http2Stream2() {
@Override
public Http2Stream2 id(int id) {
throw new UnsupportedOperationException();
}
@Override
public int id() {
return 0;
}
@Override
public Http2Stream2 managedState(Object state) {
throw new UnsupportedOperationException();
}
@Override
public Object managedState() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture closeFuture() {
throw new UnsupportedOperationException();
}
};
/**
* Set the stream identifier to a value greater than zero.
*
* <p>This method must never be called by user code, except it might be useful in tests. This method may be called
* at most once.
*/
Http2Stream2 id(int id);
/**
* Returns the stream identifier.
*
* <p>Use {@link Http2CodecUtil#isStreamIdValid(int)} to check if the stream has already been assigned an
* identifier.
*/
int id();
/**
* Attach application specific state to this HTTP/2 stream.
*
* <p>The state is maintained until the stream or the channel are closed (whatever happens first).
*/
Http2Stream2 managedState(Object state);
/**
* Returns the application specific state object or {@code null} if no state has been attached yet.
*/
Object managedState();
/**
* A {@link ChannelFuture} that will complete when a stream or the channel are closed (whatever happens first).
*
* <p>The {@link ChannelFuture} is guaranteed to be completed eventually, even if the stream never became active,
* and will always succeed.
*/
ChannelFuture closeFuture();
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
/**
* A HTTP/2 exception for a specific {@link Http2Stream2}.
*/
public class Http2Stream2Exception extends Exception {
private static final long serialVersionUID = -4407186173493887044L;
private final Http2Error error;
private final Http2Stream2 stream;
public <T> Http2Stream2Exception(Http2Stream2 stream, Http2Error error, Throwable cause) {
super(cause.getMessage(), cause);
this.stream = stream;
this.error = error;
}
public Http2Error error() {
return error;
}
public Http2Stream2 stream() {
return stream;
}
}

View File

@ -19,27 +19,20 @@ package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* This event is emitted by the {@link Http2FrameCodec} when a stream becomes active.
* A visitor that allows to iterate over a collection of {@link Http2Stream2}s.
*/
@UnstableApi
public class Http2StreamActiveEvent extends AbstractHttp2StreamStateEvent {
private final Http2HeadersFrame headers;
public Http2StreamActiveEvent(int streamId) {
this(streamId, null);
}
public Http2StreamActiveEvent(int streamId, Http2HeadersFrame headers) {
super(streamId);
this.headers = headers;
}
public interface Http2Stream2Visitor {
/**
* For outbound streams, this method returns the <em>same</em> {@link Http2HeadersFrame} object as the one that
* made the stream active. For inbound streams, this method returns {@code null}.
* This method is called once for each stream of the collection.
*
* <p>If an {@link Exception} is thrown, the loop is stopped.
*
* @return <ul>
* <li>{@code true} if the visitor wants to continue the loop and handle the stream.</li>
* <li>{@code false} if the visitor wants to stop handling the stream and abort the loop.</li>
* </ul>
*/
public Http2HeadersFrame headers() {
return headers;
}
boolean visit(Http2Stream2 stream);
}

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http2.Http2MultiplexCodec.Http2StreamChannel;
import io.netty.util.AttributeKey;
import io.netty.util.internal.UnstableApi;
@ -41,11 +42,23 @@ import static java.util.Collections.unmodifiableMap;
* <p>The bootstrap requires a registered parent {@link Channel} with a {@link ChannelPipeline} that contains the
* {@link Http2MultiplexCodec}.
*
* <p>A child channel becomes active as soon as it is registered to an eventloop. Therefore, an active channel does not
* map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been sent or received, does
* the channel map to an active HTTP/2 stream. In case it was not possible to open a new HTTP/2 stream (i.e. due to
* the maximum number of active streams being exceeded), the child channel receives an exception indicating the reason
* and is closed immediately thereafter.
* <h3>Channel Events</h3>
*
* A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
* does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
* or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
* (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
* indicating the cause and is closed immediately thereafter.
*
* <h3>Writability and Flow Control</h3>
*
* A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
* when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
* does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
* channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
* note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control. So it's perfectly legal (and expected)
* by a handler that aims to respect the channel's writability to e.g. write a {@link Http2DataFrame} even if the
* channel is marked unwritable.
*
* <p>This class is thread-safe.
*/
@ -79,13 +92,14 @@ public class Http2StreamChannelBootstrap {
* Creates a new channel that will eventually map to a local/outbound HTTP/2 stream.
*/
public ChannelFuture connect() {
return connect(-1);
Http2Stream2 newStream = channelAndCodec.multiplexCodec.newStream();
return connect(newStream);
}
/**
* Used by the {@link Http2MultiplexCodec} to instantiate incoming/remotely-created streams.
*/
ChannelFuture connect(int streamId) {
ChannelFuture connect(Http2Stream2 stream) {
validateState();
ParentChannelAndMultiplexCodec channelAndCodec0 = channelAndCodec;
@ -95,7 +109,7 @@ public class Http2StreamChannelBootstrap {
EventLoopGroup group0 = group;
group0 = group0 == null ? parentChannel.eventLoop() : group0;
return multiplexCodec.createStreamChannel(parentChannel, group0, handler, options, attributes, streamId);
return multiplexCodec.createStreamChannel(parentChannel, group0, handler, options, attributes, stream);
}
/**

View File

@ -18,31 +18,22 @@ package io.netty.handler.codec.http2;
import io.netty.util.internal.UnstableApi;
/**
* A frame whose meaning <em>may</em> apply to a particular stream, instead of the entire
* connection. It is still possible for this frame type to apply to the entire connection. In such
* cases, the {@link #streamId()} must return {@code 0}. If the frame applies to a stream, the
* {@link #streamId()} must be greater than zero.
* A frame whose meaning <em>may</em> apply to a particular stream, instead of the entire connection. It is still
* possible for this frame type to apply to the entire connection. In such cases, the {@link #stream()} must return
* {@link Http2Stream2#CONNECTION_STREAM}. If the frame applies to a stream, the {@link Http2Stream2#id()} must be
* greater than zero.
*/
//TODO(buchgr): Do we REALLY need the flexibility of supporting stream id 0? It seems confusing.
@UnstableApi
public interface Http2StreamFrame extends Http2Frame {
/**
* Sets the identifier of the stream this frame applies to. This method may be called at most once.
*
* <p><em>NOTE:</em> This method is supposed to be called by the HTTP/2 transport only. It must not be called by
* users.
*
* @return {@code this}
* Set the {@link Http2Stream2} object for this frame.
*/
Http2StreamFrame streamId(int streamId);
Http2StreamFrame stream(Http2Stream2 stream);
/**
* The identifier of the stream this frame applies to.
*
* @return {@code 0} if the frame applies to the entire connection, a value greater than {@code 0} if the frame
* applies to a particular stream, or a value less than {@code 0} if the frame has yet to be associated with
* the connection or a stream.
* Returns the {@link Http2Stream2} object for this frame, or {@code null} if the frame has yet to be associated
* with a stream.
*/
int streamId();
Http2Stream2 stream();
}

View File

@ -39,9 +39,9 @@ import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static java.util.concurrent.TimeUnit.SECONDS;
import static junit.framework.TestCase.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@ -115,10 +115,10 @@ public class Http2CodecTest {
Channel childChannel1 = b.connect().syncUninterruptibly().channel();
assertTrue(childChannel1.isActive());
assertFalse(isStreamIdValid(((AbstractHttp2StreamChannel) childChannel1).streamId()));
assertFalse(isStreamIdValid(((AbstractHttp2StreamChannel) childChannel1).stream().id()));
Channel childChannel2 = b.connect().channel();
assertTrue(childChannel2.isActive());
assertFalse(isStreamIdValid(((AbstractHttp2StreamChannel) childChannel2).streamId()));
assertFalse(isStreamIdValid(((AbstractHttp2StreamChannel) childChannel2).stream().id()));
Http2Headers headers1 = new DefaultHttp2Headers();
Http2Headers headers2 = new DefaultHttp2Headers();
@ -129,14 +129,14 @@ public class Http2CodecTest {
Http2HeadersFrame headersFrame2 = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame2);
assertEquals(3, headersFrame2.streamId());
assertEquals(3, headersFrame2.stream().id());
Http2HeadersFrame headersFrame1 = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame1);
assertEquals(5, headersFrame1.streamId());
assertEquals(5, headersFrame1.stream().id());
assertEquals(3, ((AbstractHttp2StreamChannel) childChannel2).streamId());
assertEquals(5, ((AbstractHttp2StreamChannel) childChannel1).streamId());
assertEquals(3, ((AbstractHttp2StreamChannel) childChannel2).stream().id());
assertEquals(5, ((AbstractHttp2StreamChannel) childChannel1).stream().id());
childChannel1.close();
childChannel2.close();
@ -151,27 +151,27 @@ public class Http2CodecTest {
assertTrue(childChannel.isActive());
Http2Headers headers = new DefaultHttp2Headers();
childChannel.write(new DefaultHttp2HeadersFrame(headers));
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
ByteBuf data = Unpooled.buffer(100).writeZero(100);
childChannel.writeAndFlush(new DefaultHttp2DataFrame(data, true));
Http2HeadersFrame headersFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame);
assertEquals(3, headersFrame.streamId());
assertEquals(3, headersFrame.stream().id());
assertEquals(headers, headersFrame.headers());
Http2DataFrame dataFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(dataFrame);
assertEquals(3, dataFrame.streamId());
assertEquals(3, dataFrame.stream().id());
assertEquals(data.resetReaderIndex(), dataFrame.content());
assertTrue(dataFrame.isEndStream());
assertTrue(dataFrame.endStream());
dataFrame.release();
childChannel.close();
Http2ResetFrame rstFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(rstFrame);
assertEquals(3, rstFrame.streamId());
assertEquals(3, rstFrame.stream().id());
}
@Sharable

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@ -33,15 +34,23 @@ import io.netty.handler.logging.LogLevel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Stream2.CONNECTION_STREAM;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@ -53,7 +62,7 @@ public class Http2FrameCodecTest {
// For verifying outbound frames
private Http2FrameWriter frameWriter;
private Http2FrameCodec framingCodec;
private Http2FrameCodec frameCodec;
private EmbeddedChannel channel;
// For injecting inbound frames
private Http2FrameListener frameListener;
@ -68,33 +77,42 @@ public class Http2FrameCodecTest {
@Before
public void setUp() throws Exception {
setUp(Http2FrameCodecBuilder.forServer(), new Http2Settings());
}
@After
public void tearDown() throws Exception {
inboundHandler.finishAndReleaseAll();
channel.close();
}
private void setUp(Http2FrameCodecBuilder frameCodecBuilder, Http2Settings initialRemoteSettings) throws Exception {
frameWriter = spy(new VerifiableHttp2FrameWriter());
framingCodec = new Http2FrameCodec(true, frameWriter, new Http2FrameLogger(LogLevel.TRACE),
new Http2Settings());
frameListener = ((DefaultHttp2ConnectionDecoder) framingCodec.connectionHandler().decoder())
frameCodec = frameCodecBuilder.frameWriter(frameWriter).frameLogger(new Http2FrameLogger(LogLevel.TRACE))
.initialSettings(initialRemoteSettings).build();
frameListener = ((DefaultHttp2ConnectionDecoder) frameCodec.connectionHandler().decoder())
.internalFrameListener();
inboundHandler = new LastInboundHandler();
channel = new EmbeddedChannel();
channel.connect(new InetSocketAddress(0));
channel.pipeline().addLast(framingCodec);
channel.pipeline().addLast(frameCodec);
channel.pipeline().addLast(inboundHandler);
http2HandlerCtx = channel.pipeline().context(framingCodec.connectionHandler());
channel.pipeline().fireChannelActive();
http2HandlerCtx = channel.pipeline().context(frameCodec.connectionHandler());
// Handshake
verify(frameWriter).writeSettings(eq(http2HandlerCtx),
anyHttp2Settings(), anyChannelPromise());
verifyNoMoreInteractions(frameWriter);
channel.writeInbound(Http2CodecUtil.connectionPrefaceBuf());
frameListener.onSettingsRead(http2HandlerCtx, new Http2Settings());
frameListener.onSettingsRead(http2HandlerCtx, initialRemoteSettings);
verify(frameWriter).writeSettingsAck(eq(http2HandlerCtx), anyChannelPromise());
frameListener.onSettingsAckRead(http2HandlerCtx);
}
@After
public void tearDown() throws Exception {
inboundHandler.finishAndReleaseAll();
channel.finishAndReleaseAll();
Http2SettingsFrame settingsFrame = inboundHandler.readInbound();
assertNotNull(settingsFrame);
}
@Test
@ -114,15 +132,18 @@ public class Http2FrameCodecTest {
public void headerRequestHeaderResponse() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(1);
Http2Stream stream = frameCodec.connectionHandler().connection().stream(1);
assertNotNull(stream);
assertEquals(State.HALF_CLOSED_REMOTE, stream.state());
assertEquals(new DefaultHttp2HeadersFrame(request, true, 31).streamId(stream.id()),
inboundHandler.readInbound());
Http2StreamFrame inboundFrame = inboundHandler.readInbound();
Http2Stream2 stream2 = inboundFrame.stream();
assertNotNull(stream2);
assertEquals(1, stream2.id());
assertEquals(inboundFrame, new DefaultHttp2HeadersFrame(request, true, 31).stream(stream2));
assertNull(inboundHandler.readInbound());
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).streamId(stream.id()));
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27).stream(stream2));
verify(frameWriter).writeHeaders(
eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(),
eq(27), eq(true), anyChannelPromise());
@ -137,12 +158,15 @@ public class Http2FrameCodecTest {
public void entityRequestEntityResponse() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 1, request, 0, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(1);
Http2Stream stream = frameCodec.connectionHandler().connection().stream(1);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
assertEquals(new DefaultHttp2HeadersFrame(request, false).streamId(stream.id()),
inboundHandler.readInbound());
Http2HeadersFrame inboundHeaders = inboundHandler.readInbound();
Http2Stream2 stream2 = inboundHeaders.stream();
assertNotNull(stream2);
assertEquals(1, stream2.id());
assertEquals(new DefaultHttp2HeadersFrame(request, false).stream(stream2), inboundHeaders);
assertNull(inboundHandler.readInbound());
ByteBuf hello = bb("hello");
@ -150,19 +174,19 @@ public class Http2FrameCodecTest {
// Release hello to emulate ByteToMessageDecoder
hello.release();
Http2DataFrame inboundData = inboundHandler.readInbound();
Http2DataFrame expected = new DefaultHttp2DataFrame(bb("hello"), true, 31).streamId(stream.id());
Http2DataFrame expected = new DefaultHttp2DataFrame(bb("hello"), true, 31).stream(stream2);
assertEquals(expected, inboundData);
assertEquals(1, inboundData.refCnt());
expected.release();
inboundData.release();
assertNull(inboundHandler.readInbound());
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).streamId(stream.id()));
inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).stream(stream2));
verify(frameWriter).writeHeaders(eq(http2HandlerCtx), eq(1), eq(response), anyInt(),
anyShort(), anyBoolean(), eq(0), eq(false), anyChannelPromise());
inboundHandler.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27)
.streamId(stream.id()));
inboundHandler.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27).stream(stream2));
ArgumentCaptor<ByteBuf> outboundData = ArgumentCaptor.forClass(ByteBuf.class);
verify(frameWriter).writeData(eq(http2HandlerCtx), eq(1), outboundData.capture(), eq(27),
eq(true), anyChannelPromise());
@ -180,11 +204,19 @@ public class Http2FrameCodecTest {
public void sendRstStream() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, true);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
Http2Stream stream = frameCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
assertEquals(State.HALF_CLOSED_REMOTE, stream.state());
inboundHandler.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).streamId(stream.id()));
Http2HeadersFrame inboundHeaders = inboundHandler.readInbound();
assertNotNull(inboundHeaders);
assertTrue(inboundHeaders.endStream());
Http2Stream2 stream2 = inboundHeaders.stream();
assertNotNull(stream2);
assertEquals(3, stream2.id());
inboundHandler.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */).stream(stream2));
verify(frameWriter).writeRstStream(
eq(http2HandlerCtx), eq(3), eq(314L), anyChannelPromise());
assertEquals(State.CLOSED, stream.state());
@ -195,36 +227,28 @@ public class Http2FrameCodecTest {
public void receiveRstStream() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
Http2Stream stream = frameCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessageOrUserEvent();
assertNotNull(activeEvent);
assertEquals(stream.id(), activeEvent.streamId());
Http2HeadersFrame expectedHeaders = new DefaultHttp2HeadersFrame(request, false, 31).streamId(stream.id());
Http2HeadersFrame actualHeaders = inboundHandler.readInboundMessageOrUserEvent();
assertEquals(expectedHeaders, actualHeaders);
Http2HeadersFrame expectedHeaders = new DefaultHttp2HeadersFrame(request, false, 31);
Http2HeadersFrame actualHeaders = inboundHandler.readInbound();
assertEquals(expectedHeaders.stream(actualHeaders.stream()), actualHeaders);
frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.NO_ERROR.code());
Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).streamId(stream.id());
Http2ResetFrame actualRst = inboundHandler.readInboundMessageOrUserEvent();
Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).stream(actualHeaders.stream());
Http2ResetFrame actualRst = inboundHandler.readInbound();
assertEquals(expectedRst, actualRst);
Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessageOrUserEvent();
assertNotNull(closedEvent);
assertEquals(stream.id(), closedEvent.streamId());
assertNull(inboundHandler.readInboundMessageOrUserEvent());
assertNull(inboundHandler.readInbound());
}
@Test
public void sendGoAway() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
Http2Stream stream = frameCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
@ -288,28 +312,11 @@ public class Http2FrameCodecTest {
assertEquals(0, frame.refCnt());
}
@Test
public void incomingStreamActiveShouldFireUserEvent() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
Http2HeadersFrame frame = inboundHandler.readInbound();
assertNotNull(frame);
Http2StreamActiveEvent streamActiveEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), streamActiveEvent.streamId());
assertNull(inboundHandler.readInbound());
assertNull(inboundHandler.readUserEvent());
}
@Test
public void goAwayLastStreamIdOverflowed() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 5, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(5);
Http2Stream stream = frameCodec.connectionHandler().connection().stream(5);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
@ -327,56 +334,14 @@ public class Http2FrameCodecTest {
}
@Test
public void outboundStreamShouldNotFireStreamActiveEvent() throws Exception {
Http2ConnectionEncoder encoder = framingCodec.connectionHandler().encoder();
encoder.writeHeaders(http2HandlerCtx, 2, request, 31, false, channel.newPromise());
Http2Stream stream = framingCodec.connectionHandler().connection().stream(2);
assertNotNull(stream);
assertEquals(State.OPEN, stream.state());
assertNull(inboundHandler.readInbound());
assertNull(inboundHandler.readUserEvent());
}
@Test
public void streamClosedShouldFireUserEvent() throws Exception {
public void streamErrorShouldFireException() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
Http2Stream stream = frameCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.INTERNAL_ERROR.code());
assertThat(inboundHandler.readInbound(), instanceOf(Http2HeadersFrame.class));
assertThat(inboundHandler.readInbound(), instanceOf(Http2ResetFrame.class));
assertEquals(State.CLOSED, stream.state());
Http2StreamActiveEvent activeEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), activeEvent.streamId());
Http2StreamClosedEvent closedEvent = inboundHandler.readUserEvent();
assertEquals(stream.id(), closedEvent.streamId());
assertNull(inboundHandler.readInbound());
assertNull(inboundHandler.readUserEvent());
}
@Test
public void streamErrorShouldFireUserEvent() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = framingCodec.connectionHandler().connection().stream(3);
assertNotNull(stream);
Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessageOrUserEvent();
assertNotNull(activeEvent);
assertEquals(stream.id(), activeEvent.streamId());
StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo");
framingCodec.connectionHandler().onError(http2HandlerCtx, streamEx);
frameCodec.connectionHandler().onError(http2HandlerCtx, streamEx);
Http2HeadersFrame headersFrame = inboundHandler.readInboundMessageOrUserEvent();
assertNotNull(headersFrame);
@ -384,14 +349,10 @@ public class Http2FrameCodecTest {
try {
inboundHandler.checkException();
fail("stream exception expected");
} catch (StreamException e) {
assertEquals(streamEx, e);
} catch (Http2Stream2Exception e) {
assertEquals(streamEx, e.getCause());
}
Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessageOrUserEvent();
assertNotNull(closedEvent);
assertEquals(stream.id(), closedEvent.streamId());
assertNull(inboundHandler.readInboundMessageOrUserEvent());
}
@ -399,15 +360,21 @@ public class Http2FrameCodecTest {
public void windowUpdateFrameDecrementsConsumedBytes() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Connection connection = framingCodec.connectionHandler().connection();
Http2Connection connection = frameCodec.connectionHandler().connection();
Http2Stream stream = connection.stream(3);
assertNotNull(stream);
ByteBuf data = Unpooled.buffer(100).writeZero(100);
frameListener.onDataRead(http2HandlerCtx, 3, data, 0, true);
Http2HeadersFrame inboundHeaders = inboundHandler.readInbound();
assertNotNull(inboundHeaders);
assertNotNull(inboundHeaders.stream());
Http2Stream2 stream2 = inboundHeaders.stream();
int before = connection.local().flowController().unconsumedBytes(stream);
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).streamId(stream.id()));
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).stream(stream2));
int after = connection.local().flowController().unconsumedBytes(stream);
assertEquals(100, before - after);
assertTrue(f.isSuccess());
@ -417,17 +384,279 @@ public class Http2FrameCodecTest {
@Test
public void windowUpdateMayFail() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Connection connection = framingCodec.connectionHandler().connection();
Http2Connection connection = frameCodec.connectionHandler().connection();
Http2Stream stream = connection.stream(3);
assertNotNull(stream);
Http2HeadersFrame inboundHeaders = inboundHandler.readInbound();
assertNotNull(inboundHeaders);
Http2Stream2 stream2 = inboundHeaders.stream();
// Fails, cause trying to return too many bytes to the flow controller
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).streamId(stream.id()));
ChannelFuture f = channel.write(new DefaultHttp2WindowUpdateFrame(100).stream(stream2));
assertTrue(f.isDone());
assertFalse(f.isSuccess());
assertThat(f.cause(), instanceOf(Http2Exception.class));
}
@Test
public void inboundWindowUpdateShouldBeForwarded() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
frameListener.onWindowUpdateRead(http2HandlerCtx, 3, 100);
// Connection-level window update
frameListener.onWindowUpdateRead(http2HandlerCtx, 0, 100);
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
Http2WindowUpdateFrame windowUpdateFrame = inboundHandler.readInbound();
assertNotNull(windowUpdateFrame);
assertEquals(3, windowUpdateFrame.stream().id());
assertEquals(100, windowUpdateFrame.windowSizeIncrement());
// Window update for the connection should not be forwarded.
assertNull(inboundHandler.readInbound());
}
@Test
public void streamZeroWindowUpdateIncrementsConnectionWindow() throws Exception {
Http2Connection connection = frameCodec.connectionHandler().connection();
Http2LocalFlowController localFlow = connection.local().flowController();
int initialWindowSizeBefore = localFlow.initialWindowSize();
int windowUpdate = 1024;
channel.write(new DefaultHttp2WindowUpdateFrame(windowUpdate).stream(CONNECTION_STREAM));
assertEquals(initialWindowSizeBefore + windowUpdate, localFlow.initialWindowSize());
}
@Test
public void sendSettingsFrame() {
Http2Settings settings = new Http2Settings();
channel.write(new DefaultHttp2SettingsFrame(settings));
verify(frameWriter).writeSettings(eq(http2HandlerCtx), same(settings), any(ChannelPromise.class));
}
@Test(timeout = 1000)
public void createAndCloseIdleStreamObject() {
Http2Stream2 stream = frameCodec.newStream();
assertNotNull(stream);
assertFalse(isStreamIdValid(stream.id()));
assertFalse(stream.closeFuture().isDone());
assertFalse(stream.closeFuture().isCancellable());
channel.close().syncUninterruptibly();
assertTrue(stream.closeFuture().isDone());
}
@Test(timeout = 1000)
public void newOutboundStream() {
final Http2Stream2 stream = frameCodec.newStream();
assertNotNull(stream);
assertFalse(isStreamIdValid(stream.id()));
final Promise<Void> listenerExecuted = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), false).stream(stream))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(future.isSuccess());
assertTrue(isStreamIdValid(stream.id()));
assertFalse(stream.closeFuture().isDone());
listenerExecuted.setSuccess(null);
}
}
);
ByteBuf data = Unpooled.buffer().writeZero(100);
ChannelFuture f = channel.writeAndFlush(new DefaultHttp2DataFrame(data).stream(stream));
assertTrue(f.isSuccess());
listenerExecuted.syncUninterruptibly();
assertTrue(listenerExecuted.isSuccess());
}
@Test
public void newOutboundStreamsShouldBeBuffered() throws Exception {
setUp(Http2FrameCodecBuilder.forServer().bufferOutboundStreams(true),
new Http2Settings().maxConcurrentStreams(1));
Http2Stream2 stream1 = frameCodec.newStream();
Http2Stream2 stream2 = frameCodec.newStream();
ChannelPromise promise1 = channel.newPromise();
ChannelPromise promise2 = channel.newPromise();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream1), promise1);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream2), promise2);
assertTrue(isStreamIdValid(stream1.id()));
assertTrue(isStreamIdValid(stream2.id()));
assertTrue(promise1.syncUninterruptibly().isSuccess());
assertFalse(promise2.isDone());
// Increase concurrent streams limit to 2
frameListener.onSettingsRead(http2HandlerCtx, new Http2Settings().maxConcurrentStreams(2));
channel.flush();
assertTrue(promise2.syncUninterruptibly().isSuccess());
}
@Test
public void closeFutureShouldCompleteIfStreamFailsToBecomeActive() throws Exception {
setUp(Http2FrameCodecBuilder.forServer().bufferOutboundStreams(true),
new Http2Settings().maxConcurrentStreams(0));
Http2Stream2 stream = frameCodec.newStream();
ChannelPromise promise = channel.newPromise();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream), promise);
assertTrue(isStreamIdValid(stream.id()));
assertFalse(promise.isDone());
assertFalse(stream.closeFuture().isDone());
promise.setFailure(new Exception());
assertTrue(stream.closeFuture().isDone());
}
@Test
public void streamIdentifiersExhausted() throws Http2Exception {
int maxServerStreamId = Integer.MAX_VALUE - 1;
assertNotNull(frameCodec.connectionHandler().connection().local().createStream(maxServerStreamId, false));
Http2Stream2 stream = frameCodec.newStream();
assertNotNull(stream);
ChannelPromise writePromise = channel.newPromise();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream), writePromise);
assertThat(writePromise.cause(), instanceOf(Http2NoMoreStreamIdsException.class));
}
@Test
public void receivePing() throws Http2Exception {
ByteBuf data = Unpooled.buffer(8).writeLong(12345);
frameListener.onPingRead(http2HandlerCtx, releaseLater(data));
Http2PingFrame pingFrame = inboundHandler.readInbound();
assertNotNull(pingFrame);
assertEquals(data, pingFrame.content());
assertFalse(pingFrame.ack());
pingFrame.release();
}
@Test
public void sendPing() {
ByteBuf data = Unpooled.buffer(8).writeLong(12345);
channel.writeAndFlush(new DefaultHttp2PingFrame(data));
verify(frameWriter).writePing(eq(http2HandlerCtx), eq(false), eq(data), anyChannelPromise());
}
@Test
public void receiveSettings() throws Http2Exception {
Http2Settings settings = new Http2Settings().maxConcurrentStreams(1);
frameListener.onSettingsRead(http2HandlerCtx, settings);
Http2SettingsFrame settingsFrame = inboundHandler.readInbound();
assertNotNull(settingsFrame);
assertEquals(settings, settingsFrame.settings());
}
@Test
public void sendSettings() {
Http2Settings settings = new Http2Settings().maxConcurrentStreams(1);
channel.writeAndFlush(new DefaultHttp2SettingsFrame(settings));
verify(frameWriter).writeSettings(eq(http2HandlerCtx), eq(settings), anyChannelPromise());
}
@Test
public void managedStateShouldPersist() throws Http2Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 0, false);
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
Http2Stream2 inboundStream = headersFrame.stream();
assertNotNull(inboundStream);
assertNull(inboundStream.managedState());
Object inboundState = new Object();
inboundStream.managedState(inboundState);
Http2Stream2 outboundStream = frameCodec.newStream();
Object outboundState = new Object();
outboundStream.managedState(outboundState);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(outboundStream));
ByteBuf inboundData = Unpooled.buffer(100).writeZero(100);
frameListener.onDataRead(http2HandlerCtx, inboundStream.id(), inboundData, 0, false);
Http2DataFrame dataFrame = inboundHandler.readInbound();
assertNotNull(dataFrame);
assertSame(inboundStream, dataFrame.stream());
assertSame(inboundState, dataFrame.stream().managedState());
dataFrame.release();
frameListener.onHeadersRead(http2HandlerCtx, outboundStream.id(), new DefaultHttp2Headers(), 0, false);
headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
assertSame(outboundStream, headersFrame.stream());
assertSame(outboundState, headersFrame.stream().managedState());
}
@Test
public void iterateActiveStreams() throws Exception {
setUp(Http2FrameCodecBuilder.forServer().bufferOutboundStreams(true),
new Http2Settings().maxConcurrentStreams(1));
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 0, false);
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
Http2Stream2 activeInbond = headersFrame.stream();
Http2Stream2 activeOutbound = frameCodec.newStream();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(activeOutbound));
Http2Stream2 bufferedOutbound = frameCodec.newStream();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(bufferedOutbound));
@SuppressWarnings("unused")
Http2Stream2 idleStream = frameCodec.newStream();
final Set<Http2Stream2> activeStreams = new HashSet<Http2Stream2>();
frameCodec.forEachActiveStream(new Http2Stream2Visitor() {
@Override
public boolean visit(Http2Stream2 stream) {
activeStreams.add(stream);
return true;
}
});
assertEquals(2, activeStreams.size());
Set<Http2Stream2> expectedStreams = new HashSet<Http2Stream2>();
expectedStreams.add(activeInbond);
expectedStreams.add(activeOutbound);
assertEquals(expectedStreams, activeStreams);
}
private static ChannelPromise anyChannelPromise() {
return any(ChannelPromise.class);
}

View File

@ -16,14 +16,16 @@ package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpScheme;
@ -32,7 +34,6 @@ import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
@ -60,7 +61,11 @@ public class Http2MultiplexCodecTest {
.method(HttpMethod.GET.asciiName()).scheme(HttpScheme.HTTPS.name())
.authority(new AsciiString("example.org")).path(new AsciiString("/foo"));
private static final int streamId = 3;
private Http2Stream2 inboundStream;
private Http2Stream2 outboundStream;
private static final int initialRemoteStreamWindow = 1024;
@Before
public void setUp() {
@ -68,7 +73,14 @@ public class Http2MultiplexCodecTest {
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap().handler(childChannelInitializer);
parentChannel = new EmbeddedChannel();
parentChannel.connect(new InetSocketAddress(0));
parentChannel.pipeline().addLast(new Http2MultiplexCodec(true, bootstrap));
parentChannel.pipeline().addLast(new TestableHttp2MultiplexCodec(true, bootstrap));
parentChannel.runPendingTasks();
Http2Settings settings = new Http2Settings().initialWindowSize(initialRemoteStreamWindow);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2SettingsFrame(settings));
inboundStream = new Http2Stream2Impl(parentChannel).id(3);
outboundStream = new Http2Stream2Impl(parentChannel).id(2);
}
@After
@ -77,6 +89,9 @@ public class Http2MultiplexCodecTest {
((LastInboundHandler) childChannelInitializer.handler).finishAndReleaseAll();
}
parentChannel.finishAndReleaseAll();
((ChannelPromise) inboundStream.closeFuture()).trySuccess();
((ChannelPromise) outboundStream.closeFuture()).trySuccess();
}
// TODO(buchgr): Thread model of child channel
@ -90,17 +105,13 @@ public class Http2MultiplexCodecTest {
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
Http2StreamActiveEvent streamActive = new Http2StreamActiveEvent(streamId);
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).streamId(streamId);
Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("hello")).streamId(streamId);
Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("world")).streamId(streamId);
Http2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(request).stream(inboundStream);
Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("hello")).stream(inboundStream);
Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("world")).stream(inboundStream);
assertFalse(inboundHandler.isChannelActive());
parentChannel.pipeline().fireUserEventTriggered(streamActive);
assertTrue(inboundHandler.isChannelActive());
// Make sure the stream active event is not delivered as a user event on the child channel.
assertNull(inboundHandler.readUserEvent());
parentChannel.pipeline().fireChannelRead(headersFrame);
assertTrue(inboundHandler.isChannelActive());
parentChannel.pipeline().fireChannelRead(dataFrame1);
parentChannel.pipeline().fireChannelRead(dataFrame2);
@ -115,42 +126,55 @@ public class Http2MultiplexCodecTest {
@Test
public void framesShouldBeMultiplexed() {
LastInboundHandler inboundHandler3 = streamActiveAndWriteHeaders(3);
LastInboundHandler inboundHandler11 = streamActiveAndWriteHeaders(11);
LastInboundHandler inboundHandler5 = streamActiveAndWriteHeaders(5);
verifyFramesMultiplexedToCorrectChannel(3, inboundHandler3, 1);
verifyFramesMultiplexedToCorrectChannel(5, inboundHandler5, 1);
verifyFramesMultiplexedToCorrectChannel(11, inboundHandler11, 1);
Http2Stream2 stream3 = new Http2Stream2Impl(parentChannel).id(3);
Http2Stream2 stream5 = new Http2Stream2Impl(parentChannel).id(5);
Http2Stream2 stream11 = new Http2Stream2Impl(parentChannel).id(11);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("hello"), false).streamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), true).streamId(3));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("world"), true).streamId(5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).streamId(11));
verifyFramesMultiplexedToCorrectChannel(5, inboundHandler5, 2);
verifyFramesMultiplexedToCorrectChannel(3, inboundHandler3, 1);
verifyFramesMultiplexedToCorrectChannel(11, inboundHandler11, 1);
LastInboundHandler inboundHandler3 = streamActiveAndWriteHeaders(stream3);
LastInboundHandler inboundHandler5 = streamActiveAndWriteHeaders(stream5);
LastInboundHandler inboundHandler11 = streamActiveAndWriteHeaders(stream11);
verifyFramesMultiplexedToCorrectChannel(stream3, inboundHandler3, 1);
verifyFramesMultiplexedToCorrectChannel(stream5, inboundHandler5, 1);
verifyFramesMultiplexedToCorrectChannel(stream11, inboundHandler11, 1);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("hello"), false).stream(stream5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), true).stream(stream3));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("world"), true).stream(stream5));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).stream(stream11));
verifyFramesMultiplexedToCorrectChannel(stream5, inboundHandler5, 2);
verifyFramesMultiplexedToCorrectChannel(stream3, inboundHandler3, 1);
verifyFramesMultiplexedToCorrectChannel(stream11, inboundHandler11, 1);
((ChannelPromise) stream3.closeFuture()).setSuccess();
((ChannelPromise) stream5.closeFuture()).setSuccess();
((ChannelPromise) stream11.closeFuture()).setSuccess();
}
@Test
public void inboundDataFrameShouldEmitWindowUpdateFrame() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
ByteBuf tenBytes = bb("0123456789");
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(tenBytes, true).streamId(streamId));
parentChannel.pipeline().flush();
parentChannel.pipeline().fireChannelRead(
new DefaultHttp2DataFrame(tenBytes, true).stream(inboundStream));
parentChannel.pipeline().fireChannelReadComplete();
// Flush is only necessary cause of EmbeddedChannel
parentChannel.flush();
Http2WindowUpdateFrame windowUpdate = parentChannel.readOutbound();
assertNotNull(windowUpdate);
assertEquals(streamId, windowUpdate.streamId());
assertEquals(inboundStream, windowUpdate.stream());
assertEquals(10, windowUpdate.windowSizeIncrement());
// headers and data frame
verifyFramesMultiplexedToCorrectChannel(streamId, inboundHandler, 2);
verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2);
}
@Test
public void channelReadShouldRespectAutoRead() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
Channel childChannel = inboundHandler.channel();
assertTrue(childChannel.config().isAutoRead());
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
@ -158,21 +182,21 @@ public class Http2MultiplexCodecTest {
childChannel.config().setAutoRead(false);
parentChannel.pipeline().fireChannelRead(
new DefaultHttp2DataFrame(bb("hello world"), false).streamId(streamId));
new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream));
parentChannel.pipeline().fireChannelReadComplete();
Http2DataFrame dataFrame0 = inboundHandler.readInbound();
assertNotNull(dataFrame0);
release(dataFrame0);
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), false).streamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).streamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream));
parentChannel.pipeline().fireChannelReadComplete();
dataFrame0 = inboundHandler.readInbound();
assertNull(dataFrame0);
childChannel.config().setAutoRead(true);
verifyFramesMultiplexedToCorrectChannel(streamId, inboundHandler, 2);
verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2);
}
/**
@ -211,41 +235,55 @@ public class Http2MultiplexCodecTest {
Channel childChannel = b.connect().channel();
assertTrue(childChannel.isActive());
Http2HeadersFrame headersFrame = parentChannel.readOutbound();
assertNotNull(headersFrame);
assertFalse(Http2CodecUtil.isStreamIdValid(headersFrame.streamId()));
parentChannel.flush();
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamActiveEvent(2, headersFrame));
Http2Stream2 stream2 = readOutboundHeadersAndAssignId();
childChannel.close();
parentChannel.runPendingTasks();
Http2ResetFrame reset = parentChannel.readOutbound();
assertEquals(2, reset.streamId());
assertEquals(stream2, reset.stream());
assertEquals(Http2Error.CANCEL.code(), reset.errorCode());
}
@Test
public void inboundStreamClosedShouldFireChannelInactive() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
public void inboundRstStreamFireChannelInactive() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
assertTrue(inboundHandler.isChannelActive());
parentChannel.pipeline().fireChannelRead(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR)
.stream(inboundStream));
parentChannel.pipeline().fireChannelReadComplete();
// This will be called by the frame codec.
((ChannelPromise) inboundStream.closeFuture()).setSuccess();
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamClosedEvent(streamId));
parentChannel.runPendingTasks();
parentChannel.flush();
assertFalse(inboundHandler.isChannelActive());
// A RST_STREAM frame should NOT be emitted, as we received the close.
// A RST_STREAM frame should NOT be emitted, as we received a RST_STREAM.
assertNull(parentChannel.readOutbound());
}
@Test(expected = StreamException.class)
public void streamExceptionTriggersChildChannelExceptionAndClose() throws Exception {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
StreamException cause = new StreamException(inboundStream.id(), Http2Error.PROTOCOL_ERROR, "baaam!");
Exception http2Ex = new Http2Stream2Exception(inboundStream, Http2Error.PROTOCOL_ERROR, cause);
parentChannel.pipeline().fireExceptionCaught(http2Ex);
inboundHandler.checkException();
}
@Test(expected = StreamException.class)
public void streamExceptionClosesChildChannel() throws Exception {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(streamId);
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
assertTrue(inboundHandler.isChannelActive());
StreamException e = new StreamException(streamId, Http2Error.PROTOCOL_ERROR, "baaam!");
parentChannel.pipeline().fireExceptionCaught(e);
StreamException cause = new StreamException(inboundStream.id(), Http2Error.PROTOCOL_ERROR, "baaam!");
Exception http2Ex = new Http2Stream2Exception(inboundStream, Http2Error.PROTOCOL_ERROR, cause);
parentChannel.pipeline().fireExceptionCaught(http2Ex);
parentChannel.runPendingTasks();
assertFalse(inboundHandler.isChannelActive());
@ -268,20 +306,15 @@ public class Http2MultiplexCodecTest {
Http2Headers headers = new DefaultHttp2Headers().scheme("https").method("GET").path("/foo.txt");
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
Http2HeadersFrame headersFrame = parentChannel.readOutbound();
assertNotNull(headersFrame);
assertSame(headers, headersFrame.headers());
assertFalse(Http2CodecUtil.isStreamIdValid(headersFrame.streamId()));
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamActiveEvent(2, headersFrame));
readOutboundHeadersAndAssignId();
// Read from the child channel
headers = new DefaultHttp2Headers().scheme("https").status("200");
parentChannel.pipeline().fireChannelRead(new DefaultHttp2HeadersFrame(headers).streamId(
childChannel.streamId()));
parentChannel.pipeline().fireChannelRead(
new DefaultHttp2HeadersFrame(headers).stream(childChannel.stream()));
parentChannel.pipeline().fireChannelReadComplete();
headersFrame = inboundHandler.readInbound();
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
assertNotNull(headersFrame);
assertSame(headers, headersFrame.headers());
@ -292,7 +325,7 @@ public class Http2MultiplexCodecTest {
// An active outbound stream should emit a RST_STREAM frame.
Http2ResetFrame rstFrame = parentChannel.readOutbound();
assertNotNull(rstFrame);
assertEquals(childChannel.streamId(), rstFrame.streamId());
assertEquals(childChannel.stream(), rstFrame.stream());
assertFalse(childChannel.isOpen());
assertFalse(childChannel.isActive());
assertFalse(inboundHandler.isChannelActive());
@ -330,16 +363,15 @@ public class Http2MultiplexCodecTest {
@Test
public void settingChannelOptsAndAttrsOnBootstrap() {
AttributeKey<String> key = AttributeKey.newInstance("foo");
WriteBufferWaterMark mark = new WriteBufferWaterMark(1024, 4096);
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
b.parentChannel(parentChannel).handler(childChannelInitializer)
.option(ChannelOption.AUTO_READ, false).option(ChannelOption.WRITE_BUFFER_WATER_MARK, mark)
.option(ChannelOption.AUTO_READ, false).option(ChannelOption.WRITE_SPIN_COUNT, 1000)
.attr(key, "bar");
Channel channel = b.connect().channel();
assertFalse(channel.config().isAutoRead());
assertSame(mark, channel.config().getWriteBufferWaterMark());
assertEquals(1000, channel.config().getWriteSpinCount());
assertEquals("bar", channel.attr(key).get());
}
@ -369,24 +401,156 @@ public class Http2MultiplexCodecTest {
assertNull(reset);
}
private LastInboundHandler streamActiveAndWriteHeaders(int streamId) {
@Test
public void outboundFlowControlWindowShouldBeSetAndUpdated() {
Http2StreamChannelBootstrap b = new Http2StreamChannelBootstrap();
b.parentChannel(parentChannel).handler(childChannelInitializer);
AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel) b.connect().channel();
assertTrue(childChannel.isActive());
assertEquals(0, childChannel.getOutboundFlowControlWindow());
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
parentChannel.flush();
Http2Stream2 stream2 = readOutboundHeadersAndAssignId();
// Test for initial window size
assertEquals(initialRemoteStreamWindow, childChannel.getOutboundFlowControlWindow());
// Test for increment via WINDOW_UPDATE
parentChannel.pipeline().fireChannelRead(new DefaultHttp2WindowUpdateFrame(1).stream(stream2));
parentChannel.pipeline().fireChannelReadComplete();
assertEquals(initialRemoteStreamWindow + 1, childChannel.getOutboundFlowControlWindow());
}
@Test
public void onlyDataFramesShouldBeFlowControlled() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel) inboundHandler.channel();
assertTrue(childChannel.isWritable());
assertEquals(initialRemoteStreamWindow, childChannel.getOutboundFlowControlWindow());
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
assertTrue(childChannel.isWritable());
assertEquals(initialRemoteStreamWindow, childChannel.getOutboundFlowControlWindow());
ByteBuf data = Unpooled.buffer(100).writeZero(100);
childChannel.writeAndFlush(new DefaultHttp2DataFrame(data));
assertTrue(childChannel.isWritable());
assertEquals(initialRemoteStreamWindow - 100, childChannel.getOutboundFlowControlWindow());
}
@Test
public void writabilityAndFlowControl() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel) inboundHandler.channel();
verifyFlowControlWindowAndWritability(childChannel, initialRemoteStreamWindow);
assertEquals("true", inboundHandler.writabilityStates());
// HEADERS frames are not flow controlled, so they should not affect the flow control window.
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
verifyFlowControlWindowAndWritability(childChannel, initialRemoteStreamWindow);
assertEquals("true", inboundHandler.writabilityStates());
ByteBuf data = Unpooled.buffer(initialRemoteStreamWindow - 1).writeZero(initialRemoteStreamWindow - 1);
childChannel.writeAndFlush(new DefaultHttp2DataFrame(data));
verifyFlowControlWindowAndWritability(childChannel, 1);
assertEquals("true,false,true", inboundHandler.writabilityStates());
ByteBuf data1 = Unpooled.buffer(100).writeZero(100);
childChannel.writeAndFlush(new DefaultHttp2DataFrame(data1));
verifyFlowControlWindowAndWritability(childChannel, -99);
assertEquals("true,false,true,false", inboundHandler.writabilityStates());
parentChannel.pipeline().fireChannelRead(new DefaultHttp2WindowUpdateFrame(99).stream(inboundStream));
parentChannel.pipeline().fireChannelReadComplete();
// the flow control window should be updated, but the channel should still not be writable.
verifyFlowControlWindowAndWritability(childChannel, 0);
assertEquals("true,false,true,false", inboundHandler.writabilityStates());
parentChannel.pipeline().fireChannelRead(new DefaultHttp2WindowUpdateFrame(1).stream(inboundStream));
parentChannel.pipeline().fireChannelReadComplete();
verifyFlowControlWindowAndWritability(childChannel, 1);
assertEquals("true,false,true,false,true", inboundHandler.writabilityStates());
}
@Test
public void failedWriteShouldReturnFlowControlWindow() {
ByteBuf data = Unpooled.buffer().writeZero(initialRemoteStreamWindow);
final Http2DataFrame frameToCancel = new DefaultHttp2DataFrame(data);
parentChannel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg == frameToCancel) {
promise.tryFailure(new Throwable());
} else {
super.write(ctx, msg, promise);
}
}
});
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
Channel childChannel = inboundHandler.channel();
childChannel.write(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
data = Unpooled.buffer().writeZero(initialRemoteStreamWindow / 2);
childChannel.write(new DefaultHttp2DataFrame(data));
assertEquals("true", inboundHandler.writabilityStates());
childChannel.write(frameToCancel);
assertEquals("true,false", inboundHandler.writabilityStates());
assertFalse(childChannel.isWritable());
childChannel.flush();
assertTrue(childChannel.isWritable());
assertEquals("true,false,true", inboundHandler.writabilityStates());
}
@Test
public void cancellingWritesBeforeFlush() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
Channel childChannel = inboundHandler.channel();
Http2HeadersFrame headers1 = new DefaultHttp2HeadersFrame(new DefaultHttp2Headers());
Http2HeadersFrame headers2 = new DefaultHttp2HeadersFrame(new DefaultHttp2Headers());
ChannelPromise writePromise = childChannel.newPromise();
childChannel.write(headers1, writePromise);
childChannel.write(headers2);
assertTrue(writePromise.cancel(false));
childChannel.flush();
Http2HeadersFrame headers = parentChannel.readOutbound();
assertSame(headers, headers2);
}
private static void verifyFlowControlWindowAndWritability(AbstractHttp2StreamChannel channel,
int expectedWindowSize) {
assertEquals(expectedWindowSize, channel.getOutboundFlowControlWindow());
assertEquals(Math.max(0, expectedWindowSize), channel.config().getWriteBufferHighWaterMark());
assertEquals(channel.config().getWriteBufferHighWaterMark(), channel.config().getWriteBufferLowWaterMark());
assertEquals(expectedWindowSize > 0, channel.isWritable());
}
private LastInboundHandler streamActiveAndWriteHeaders(Http2Stream2 stream) {
LastInboundHandler inboundHandler = new LastInboundHandler();
childChannelInitializer.handler = inboundHandler;
assertFalse(inboundHandler.isChannelActive());
parentChannel.pipeline().fireUserEventTriggered(new Http2StreamActiveEvent(streamId));
assertTrue(inboundHandler.isChannelActive());
parentChannel.pipeline().fireChannelRead(new DefaultHttp2HeadersFrame(request).streamId(streamId));
parentChannel.pipeline().fireChannelRead(new DefaultHttp2HeadersFrame(request).stream(stream));
parentChannel.pipeline().fireChannelReadComplete();
assertTrue(inboundHandler.isChannelActive());
return inboundHandler;
}
private static void verifyFramesMultiplexedToCorrectChannel(int streamId, LastInboundHandler inboundHandler,
private static void verifyFramesMultiplexedToCorrectChannel(Http2Stream2 stream,
LastInboundHandler inboundHandler,
int numFrames) {
for (int i = 0; i < numFrames; i++) {
Http2StreamFrame frame = inboundHandler.readInbound();
assertNotNull(frame);
assertEquals(streamId, frame.streamId());
assertEquals(stream, frame.stream());
release(frame);
}
assertNull(inboundHandler.readInbound());
@ -395,4 +559,86 @@ public class Http2MultiplexCodecTest {
private static ByteBuf bb(String s) {
return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
}
/**
* Simulates the frame codec, in first assigning an identifier and the completing the write promise.
*/
Http2Stream2 readOutboundHeadersAndAssignId() {
// Only peek at the frame, so to not complete the promise of the write. We need to first
// assign a stream identifier, as the frame codec would do.
Http2HeadersFrame headersFrame = (Http2HeadersFrame) parentChannel.outboundMessages().peek();
assertNotNull(headersFrame);
assertNotNull(headersFrame.stream());
assertFalse(Http2CodecUtil.isStreamIdValid(headersFrame.stream().id()));
headersFrame.stream().id(outboundStream.id());
// Now read it and complete the write promise.
assertSame(headersFrame, parentChannel.readOutbound());
return headersFrame.stream();
}
/**
* This class removes the bits that would require the frame codec, so that the class becomes testable.
*/
static final class TestableHttp2MultiplexCodec extends Http2MultiplexCodec {
TestableHttp2MultiplexCodec(boolean server, Http2StreamChannelBootstrap bootstrap) {
super(server, bootstrap);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
bootstrap.parentChannel(ctx.channel());
}
@Override
void forEachActiveStream0(Http2Stream2Visitor streamVisitor) {
throw new UnsupportedOperationException();
}
@Override
Http2Stream2 newStream0() {
return new Http2Stream2Impl(ctx.channel());
}
}
static final class Http2Stream2Impl implements Http2Stream2 {
private int id = -1;
private Object managedState;
private final ChannelPromise closeFuture;
Http2Stream2Impl(Channel ch) {
closeFuture = new DefaultChannelPromise(ch);
}
@Override
public Http2Stream2 id(int id) {
this.id = id;
return this;
}
@Override
public int id() {
return id;
}
@Override
public Http2Stream2 managedState(Object state) {
managedState = state;
return this;
}
@Override
public Object managedState() {
return managedState;
}
@Override
public ChannelFuture closeFuture() {
return closeFuture;
}
}
}

View File

@ -54,7 +54,7 @@ public class Http2ServerDowngraderTest {
Http2HeadersFrame headersFrame = ch.readOutbound();
assertThat(headersFrame.headers().status().toString(), is("200"));
assertTrue(headersFrame.isEndStream());
assertTrue(headersFrame.endStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
@ -68,12 +68,12 @@ public class Http2ServerDowngraderTest {
Http2HeadersFrame headersFrame = ch.readOutbound();
assertThat(headersFrame.headers().status().toString(), is("200"));
assertFalse(headersFrame.isEndStream());
assertFalse(headersFrame.endStream());
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertTrue(dataFrame.isEndStream());
assertTrue(dataFrame.endStream());
} finally {
dataFrame.release();
}
@ -92,11 +92,11 @@ public class Http2ServerDowngraderTest {
Http2HeadersFrame headersFrame = ch.readOutbound();
assertThat(headersFrame.headers().status().toString(), is("200"));
assertFalse(headersFrame.isEndStream());
assertFalse(headersFrame.endStream());
Http2HeadersFrame trailersFrame = ch.readOutbound();
assertThat(trailersFrame.headers().get("key").toString(), is("value"));
assertTrue(trailersFrame.isEndStream());
assertTrue(trailersFrame.endStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
@ -113,19 +113,19 @@ public class Http2ServerDowngraderTest {
Http2HeadersFrame headersFrame = ch.readOutbound();
assertThat(headersFrame.headers().status().toString(), is("200"));
assertFalse(headersFrame.isEndStream());
assertFalse(headersFrame.endStream());
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertFalse(dataFrame.isEndStream());
assertFalse(dataFrame.endStream());
} finally {
dataFrame.release();
}
Http2HeadersFrame trailersFrame = ch.readOutbound();
assertThat(trailersFrame.headers().get("key").toString(), is("value"));
assertTrue(trailersFrame.isEndStream());
assertTrue(trailersFrame.endStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
@ -139,7 +139,7 @@ public class Http2ServerDowngraderTest {
Http2HeadersFrame headersFrame = ch.readOutbound();
assertThat(headersFrame.headers().status().toString(), is("200"));
assertFalse(headersFrame.isEndStream());
assertFalse(headersFrame.endStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
@ -155,7 +155,7 @@ public class Http2ServerDowngraderTest {
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertFalse(dataFrame.isEndStream());
assertFalse(dataFrame.endStream());
} finally {
dataFrame.release();
}
@ -173,7 +173,7 @@ public class Http2ServerDowngraderTest {
Http2DataFrame emptyFrame = ch.readOutbound();
try {
assertThat(emptyFrame.content().readableBytes(), is(0));
assertTrue(emptyFrame.isEndStream());
assertTrue(emptyFrame.endStream());
} finally {
emptyFrame.release();
}
@ -192,7 +192,7 @@ public class Http2ServerDowngraderTest {
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertTrue(dataFrame.isEndStream());
assertTrue(dataFrame.endStream());
} finally {
dataFrame.release();
}
@ -211,7 +211,7 @@ public class Http2ServerDowngraderTest {
Http2HeadersFrame headerFrame = ch.readOutbound();
assertThat(headerFrame.headers().get("key").toString(), is("value"));
assertTrue(headerFrame.isEndStream());
assertTrue(headerFrame.endStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
@ -229,14 +229,14 @@ public class Http2ServerDowngraderTest {
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertFalse(dataFrame.isEndStream());
assertFalse(dataFrame.endStream());
} finally {
dataFrame.release();
}
Http2HeadersFrame headerFrame = ch.readOutbound();
assertThat(headerFrame.headers().get("key").toString(), is("value"));
assertTrue(headerFrame.isEndStream());
assertTrue(headerFrame.endStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());

View File

@ -37,9 +37,11 @@ public class LastInboundHandler extends ChannelDuplexHandler {
private Throwable lastException;
private ChannelHandlerContext ctx;
private boolean channelActive;
private String writabilityStates = "";
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
this.ctx = ctx;
}
@ -56,6 +58,10 @@ public class LastInboundHandler extends ChannelDuplexHandler {
return channelActive;
}
public String writabilityStates() {
return writabilityStates;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (!channelActive) {
@ -65,6 +71,16 @@ public class LastInboundHandler extends ChannelDuplexHandler {
super.channelInactive(ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (writabilityStates == "") {
writabilityStates = String.valueOf(ctx.channel().isWritable());
} else {
writabilityStates += "," + ctx.channel().isWritable();
}
super.channelWritabilityChanged(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
queue.add(msg);

View File

@ -69,7 +69,7 @@ public class HelloWorldHttp2Handler extends ChannelDuplexHandler {
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
private static void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
if (data.isEndStream()) {
if (data.endStream()) {
sendResponse(ctx, data.content());
} else {
// We do not send back the response to the remote-peer, so we need to release it.
@ -82,7 +82,7 @@ public class HelloWorldHttp2Handler extends ChannelDuplexHandler {
*/
private static void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headers)
throws Exception {
if (headers.isEndStream()) {
if (headers.endStream()) {
ByteBuf content = ctx.alloc().buffer();
content.writeBytes(RESPONSE_BYTES.duplicate());
ByteBufUtil.writeAscii(content, " - via HTTP/2");

View File

@ -264,6 +264,33 @@ public final class ChannelOutboundBuffer {
return true;
}
/**
* Removes the current flushed message and returns its {@link ChannelPromise}. Unlike {@link #remove()} this method
* does not release the message or complete the promise. If no flushed message exist, this method returns
* {@code null}.
*/
public ChannelPromise steal() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return null;
}
ChannelPromise promise = e.promise;
final int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled && size > 0) {
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
e.recycle();
return promise;
}
/**
* Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
* and return {@code true}. If no flushed message exists at the time this method is called it will return