diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java new file mode 100644 index 0000000000..664681bc1c --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -0,0 +1,321 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.EventLoop; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.EmptyArrays; +import io.netty.util.internal.OneTimeTask; + +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * Child {@link Channel} of another channel, for use for modeling streams as channels. + */ +abstract class AbstractHttp2StreamChannel extends AbstractChannel { + /** + * Used by subclasses to queue a close channel within the read queue. When read, it will close + * the channel (using Unsafe) instead of notifying handlers of the message with {@code + * channelRead()}. Additional inbound messages must not arrive after this one. + */ + protected static final Object CLOSE_MESSAGE = new Object(); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); + private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException(); + /** + * Number of bytes to consider non-payload messages, to determine when to stop reading. 9 is + * arbitrary, but also the minimum size of an HTTP/2 frame. Primarily is non-zero. + */ + private static final int ARBITRARY_MESSAGE_SIZE = 9; + + static { + CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); + } + + private final ChannelConfig config = new DefaultChannelConfig(this); + private final Queue inboundBuffer = new ArrayDeque(4); + private final Runnable fireChildReadCompleteTask = new Runnable() { + @Override + public void run() { + if (readInProgress) { + readInProgress = false; + unsafe().recvBufAllocHandle().readComplete(); + pipeline().fireChannelReadComplete(); + } + } + }; + + private boolean closed; + private boolean readInProgress; + + public AbstractHttp2StreamChannel(Channel parent) { + super(parent); + } + + @Override + public ChannelMetadata metadata() { + return METADATA; + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + public boolean isOpen() { + return !closed; + } + + @Override + public boolean isActive() { + return !closed; + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new Unsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return true; + } + + @Override + protected SocketAddress localAddress0() { + return parent().localAddress(); + } + + @Override + protected SocketAddress remoteAddress0() { + return parent().remoteAddress(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doClose() throws Exception { + closed = true; + while (!inboundBuffer.isEmpty()) { + ReferenceCountUtil.release(inboundBuffer.poll()); + } + } + + @Override + protected void doBeginRead() { + if (readInProgress) { + return; + } + + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.reset(config()); + if (inboundBuffer.isEmpty()) { + readInProgress = true; + return; + } + + do { + Object m = inboundBuffer.poll(); + if (m == null) { + break; + } + if (!doRead0(m, allocHandle)) { + // Channel closed, and already cleaned up. + return; + } + } while (allocHandle.continueReading()); + + allocHandle.readComplete(); + pipeline().fireChannelReadComplete(); + } + + @Override + protected final void doWrite(ChannelOutboundBuffer in) throws Exception { + if (closed) { + throw CLOSED_CHANNEL_EXCEPTION; + } + + EventExecutor preferredExecutor = preferredEventExecutor(); + + // TODO: this is pretty broken; futures should only be completed after they are processed on + // the parent channel. However, it isn't currently possible due to ChannelOutboundBuffer's + // behavior which requires completing the current future before getting the next message. It + // should become easier once we have outbound flow control support. + // https://github.com/netty/netty/issues/4941 + if (preferredExecutor.inEventLoop()) { + for (;;) { + Object msg = in.current(); + if (msg == null) { + break; + } + try { + doWrite(ReferenceCountUtil.retain(msg)); + } catch (Throwable t) { + // It would be nice to fail the future, but we can't do that if not on the event + // loop. So we instead opt for a solution that is consistent. + pipeline().fireExceptionCaught(t); + } + in.remove(); + } + doWriteComplete(); + } else { + // Use a copy because the original msgs will be recycled by AbstractChannel. + final Object[] msgsCopy = new Object[in.size()]; + for (int i = 0; i < msgsCopy.length; i ++) { + msgsCopy[i] = ReferenceCountUtil.retain(in.current()); + in.remove(); + } + + preferredExecutor.execute(new OneTimeTask() { + @Override + public void run() { + for (Object msg : msgsCopy) { + try { + doWrite(msg); + } catch (Throwable t) { + pipeline().fireExceptionCaught(t); + } + } + doWriteComplete(); + } + }); + } + } + + /** + * Process a single write. Guaranteed to eventually be followed by a {@link #doWriteComplete()}, + * which denotes the end of the batch of writes. May be called from any thread. + */ + protected abstract void doWrite(Object msg) throws Exception; + + /** + * Process end of batch of {@link #doWrite()}s. May be called from any thread. + */ + protected abstract void doWriteComplete(); + + /** + * The ideal thread for events like {@link #doWrite()} to be processed on. May be used for + * efficient batching, but not required. + */ + protected abstract EventExecutor preferredEventExecutor(); + + /** + * {@code bytes}-count of bytes provided to {@link #fireChildRead} have been read. May be called + * from any thread. Must not throw an exception. + */ + protected abstract void bytesConsumed(int bytes); + + /** + * Receive a read message. This does not notify handlers unless a read is in progress on the + * channel. May be called from any thread. + */ + protected void fireChildRead(final Object msg) { + if (eventLoop().inEventLoop()) { + fireChildRead0(msg); + } else { + eventLoop().execute(new OneTimeTask() { + @Override + public void run() { + fireChildRead0(msg); + } + }); + } + } + + private void fireChildRead0(Object msg) { + if (closed) { + ReferenceCountUtil.release(msg); + return; + } + if (readInProgress) { + assert inboundBuffer.isEmpty(); + // Check for null because inboundBuffer doesn't support null; we want to be consistent + // for what values are supported. + RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + readInProgress = doRead0(checkNotNull(msg, "msg"), allocHandle); + if (!allocHandle.continueReading()) { + fireChildReadCompleteTask.run(); + } + } else { + inboundBuffer.add(msg); + } + } + + protected void fireChildReadComplete() { + if (eventLoop().inEventLoop()) { + fireChildReadCompleteTask.run(); + } else { + eventLoop().execute(fireChildReadCompleteTask); + } + } + + /** + * Returns whether reads should continue. The only reason reads shouldn't continue is that the + * channel was just closed. + */ + private boolean doRead0(Object msg, RecvByteBufAllocator.Handle allocHandle) { + if (msg == CLOSE_MESSAGE) { + allocHandle.readComplete(); + pipeline().fireChannelReadComplete(); + unsafe().close(voidPromise()); + return false; + } + int numBytesToBeConsumed = 0; + if (msg instanceof Http2DataFrame) { + Http2DataFrame data = (Http2DataFrame) msg; + numBytesToBeConsumed = data.content().readableBytes() + data.padding(); + allocHandle.lastBytesRead(numBytesToBeConsumed); + } else { + allocHandle.lastBytesRead(ARBITRARY_MESSAGE_SIZE); + } + allocHandle.incMessagesRead(1); + pipeline().fireChannelRead(msg); + if (numBytesToBeConsumed != 0) { + bytesConsumed(numBytesToBeConsumed); + } + return true; + } + + private final class Unsafe extends AbstractUnsafe { + @Override + public void connect(final SocketAddress remoteAddress, + SocketAddress localAddress, final ChannelPromise promise) { + promise.setFailure(new UnsupportedOperationException()); + } + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamFrame.java new file mode 100644 index 0000000000..237a2e35fa --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamFrame.java @@ -0,0 +1,57 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +/** + * Abstract implementation of {@link Http2StreamFrame}. + */ +public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame { + private Object stream; + + @Override + public AbstractHttp2StreamFrame setStream(Object stream) { + this.stream = stream; + return this; + } + + @Override + public Object stream() { + return stream; + } + + /** + * Returns {@code true} if {@code o} has equal {@code stream} to this object. + */ + @Override + public boolean equals(Object o) { + if (!(o instanceof Http2StreamFrame)) { + return false; + } + Http2StreamFrame other = (Http2StreamFrame) o; + if (stream == null) { + return other.stream() == null; + } + return stream.equals(other.stream()); + } + + @Override + public int hashCode() { + if (stream == null) { + return 61432814; + } + return stream.hashCode(); + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index a138854b7c..298fccd125 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -96,6 +96,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { return listener; } + // Visible for testing + Http2FrameListener internalFrameListener() { + return internalFrameListener; + } + @Override public boolean prefaceReceived() { return FrameReadListener.class == internalFrameListener.getClass(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2DataFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2DataFrame.java new file mode 100644 index 0000000000..70de2e7e3c --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2DataFrame.java @@ -0,0 +1,173 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.IllegalReferenceCountException; + +/** + * The default {@link Http2DataFrame} implementation. + */ +public final class DefaultHttp2DataFrame extends AbstractHttp2StreamFrame implements Http2DataFrame { + private final ByteBuf content; + private final boolean endStream; + private final int padding; + + /** + * Equivalent to {@code new DefaultHttp2DataFrame(content, false)}. + * + * @param content non-{@code null} payload + */ + public DefaultHttp2DataFrame(ByteBuf content) { + this(content, false); + } + + /** + * Equivalent to {@code new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, endStream)}. + * + * @param endStream whether this data should terminate the stream + */ + public DefaultHttp2DataFrame(boolean endStream) { + this(Unpooled.EMPTY_BUFFER, endStream); + } + + /** + * Equivalent to {@code new DefaultHttp2DataFrame(content, endStream, 0)}. + * + * @param content non-{@code null} payload + * @param endStream whether this data should terminate the stream + */ + public DefaultHttp2DataFrame(ByteBuf content, boolean endStream) { + this(content, endStream, 0); + } + + /** + * Construct a new data message. + * + * @param content non-{@code null} payload + * @param endStream whether this data should terminate the stream + * @param padding additional bytes that should be added to obscure the true content size + */ + public DefaultHttp2DataFrame(ByteBuf content, boolean endStream, int padding) { + this.content = checkNotNull(content, "content"); + this.endStream = endStream; + if (padding < 0 || padding > Http2CodecUtil.MAX_UNSIGNED_BYTE) { + throw new IllegalArgumentException("padding must be non-negative and less than 256"); + } + this.padding = padding; + } + + @Override + public DefaultHttp2DataFrame setStream(Object stream) { + super.setStream(stream); + return this; + } + + @Override + public boolean isEndStream() { + return endStream; + } + + @Override + public int padding() { + return padding; + } + + @Override + public ByteBuf content() { + if (content.refCnt() <= 0) { + throw new IllegalReferenceCountException(content.refCnt()); + } + return content; + } + + @Override + public DefaultHttp2DataFrame copy() { + return new DefaultHttp2DataFrame(content().copy(), endStream, padding); + } + + @Override + public DefaultHttp2DataFrame duplicate() { + return new DefaultHttp2DataFrame(content().duplicate(), endStream, padding); + } + + @Override + public int refCnt() { + return content.refCnt(); + } + + @Override + public boolean release() { + return content.release(); + } + + @Override + public boolean release(int decrement) { + return content.release(decrement); + } + + @Override + public DefaultHttp2DataFrame retain() { + content.retain(); + return this; + } + + @Override + public DefaultHttp2DataFrame retain(int increment) { + content.retain(increment); + return this; + } + + @Override + public String toString() { + return "DefaultHttp2DataFrame(stream=" + stream() + ", content=" + content + + ", endStream=" + endStream + ", padding=" + padding + ")"; + } + + @Override + public DefaultHttp2DataFrame touch() { + content.touch(); + return this; + } + + @Override + public DefaultHttp2DataFrame touch(Object hint) { + content.touch(hint); + return this; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultHttp2DataFrame)) { + return false; + } + DefaultHttp2DataFrame other = (DefaultHttp2DataFrame) o; + return super.equals(other) && content.equals(other.content()) + && endStream == other.endStream && padding == other.padding; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + hash = hash * 31 + content.hashCode(); + hash = hash * 31 + (endStream ? 0 : 1); + hash = hash * 31 + padding; + return hash; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2GoAwayFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2GoAwayFrame.java new file mode 100644 index 0000000000..a289b47e95 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2GoAwayFrame.java @@ -0,0 +1,146 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DefaultByteBufHolder; +import io.netty.buffer.Unpooled; +import io.netty.util.IllegalReferenceCountException; + +/** + * The default {@link Http2GoAwayFrame} implementation. + */ +public final class DefaultHttp2GoAwayFrame extends DefaultByteBufHolder implements Http2GoAwayFrame { + private final long errorCode; + private int extraStreamIds; + + /** + * Equivalent to {@code new DefaultHttp2GoAwayFrame(error.code())}. + * + * @param error non-{@code null} reason for the go away + */ + public DefaultHttp2GoAwayFrame(Http2Error error) { + this(error.code()); + } + + /** + * Equivalent to {@code new DefaultHttp2GoAwayFrame(content, Unpooled.EMPTY_BUFFER)}. + * + * @param error reason for the go away + */ + public DefaultHttp2GoAwayFrame(long errorCode) { + this(errorCode, Unpooled.EMPTY_BUFFER); + } + + /** + * Equivalent to {@code new DefaultHttp2GoAwayFrame(error.code(), content)}. + * + * @param error non-{@code null} reason for the go away + * @param content non-{@code null} debug data + */ + public DefaultHttp2GoAwayFrame(Http2Error error, ByteBuf content) { + this(error.code(), content); + } + + /** + * Construct a new GOAWAY message. + * + * @param error reason for the go away + * @param content non-{@code null} debug data + */ + public DefaultHttp2GoAwayFrame(long errorCode, ByteBuf content) { + super(content); + this.errorCode = errorCode; + } + + @Override + public long errorCode() { + return errorCode; + } + + @Override + public int extraStreamIds() { + return extraStreamIds; + } + + @Override + public DefaultHttp2GoAwayFrame setExtraStreamIds(int extraStreamIds) { + if (extraStreamIds < 0) { + throw new IllegalArgumentException("extraStreamIds must be non-negative"); + } + this.extraStreamIds = extraStreamIds; + return this; + } + + @Override + public DefaultHttp2GoAwayFrame copy() { + return new DefaultHttp2GoAwayFrame(errorCode, content().copy()).setExtraStreamIds(extraStreamIds); + } + + @Override + public DefaultHttp2GoAwayFrame duplicate() { + return new DefaultHttp2GoAwayFrame(errorCode, content().duplicate()).setExtraStreamIds(extraStreamIds); + } + + @Override + public DefaultHttp2GoAwayFrame retain() { + super.retain(); + return this; + } + + @Override + public DefaultHttp2GoAwayFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public String toString() { + return "DefaultHttp2GoAwayFrame(errorCode=" + errorCode + ", content=" + content() + + ", extraStreamIds=" + extraStreamIds + ")"; + } + + @Override + public DefaultHttp2GoAwayFrame touch() { + super.touch(); + return this; + } + + @Override + public DefaultHttp2GoAwayFrame touch(Object hint) { + super.touch(hint); + return this; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultHttp2GoAwayFrame)) { + return false; + } + DefaultHttp2GoAwayFrame other = (DefaultHttp2GoAwayFrame) o; + return super.equals(o) && errorCode == other.errorCode && content().equals(other.content()) + && extraStreamIds == other.extraStreamIds; + } + + @Override + public int hashCode() { + int hash = 237395317; + hash = hash * 31 + (int) (errorCode ^ (errorCode >>> 32)); + hash = hash * 31 + content().hashCode(); + hash = hash * 31 + extraStreamIds; + return hash; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersFrame.java new file mode 100644 index 0000000000..ac9ecd672c --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersFrame.java @@ -0,0 +1,107 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +/** + * The default {@link Http2HeadersFrame} implementation. + */ +public final class DefaultHttp2HeadersFrame extends AbstractHttp2StreamFrame implements Http2HeadersFrame { + private final Http2Headers headers; + private final boolean endStream; + private final int padding; + + /** + * Equivalent to {@code new DefaultHttp2HeadersFrame(headers, false)}. + * + * @param headers the non-{@code null} headers to send + */ + public DefaultHttp2HeadersFrame(Http2Headers headers) { + this(headers, false); + } + + /** + * Equivalent to {@code new DefaultHttp2HeadersFrame(headers, endStream, 0)}. + * + * @param headers the non-{@code null} headers to send + */ + public DefaultHttp2HeadersFrame(Http2Headers headers, boolean endStream) { + this(headers, endStream, 0); + } + + /** + * Construct a new headers message. + * + * @param headers the non-{@code null} headers to send + * @param endStream whether these headers should terminate the stream + * @param padding additional bytes that should be added to obscure the true content size + */ + public DefaultHttp2HeadersFrame(Http2Headers headers, boolean endStream, int padding) { + this.headers = checkNotNull(headers, "headers"); + this.endStream = endStream; + if (padding < 0 || padding > Http2CodecUtil.MAX_UNSIGNED_BYTE) { + throw new IllegalArgumentException("padding must be non-negative and less than 256"); + } + this.padding = padding; + } + + @Override + public DefaultHttp2HeadersFrame setStream(Object stream) { + super.setStream(stream); + return this; + } + + @Override + public Http2Headers headers() { + return headers; + } + + @Override + public boolean isEndStream() { + return endStream; + } + + @Override + public int padding() { + return padding; + } + + @Override + public String toString() { + return "DefaultHttp2HeadersFrame(stream=" + stream() + ", headers=" + headers + + ", endStream=" + endStream + ", padding=" + padding + ")"; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultHttp2HeadersFrame)) { + return false; + } + DefaultHttp2HeadersFrame other = (DefaultHttp2HeadersFrame) o; + return super.equals(other) && headers.equals(other.headers) + && endStream == other.endStream && padding == other.padding; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + hash = hash * 31 + headers.hashCode(); + hash = hash * 31 + (endStream ? 0 : 1); + hash = hash * 31 + padding; + return hash; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ResetFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ResetFrame.java new file mode 100644 index 0000000000..2c7e2b0590 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ResetFrame.java @@ -0,0 +1,75 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +/** + * The default {@link Http2ResetFrame} implementation. + */ +public final class DefaultHttp2ResetFrame extends AbstractHttp2StreamFrame implements Http2ResetFrame { + private final long errorCode; + + /** + * Construct a reset message. + * + * @param error the non-{@code null} reason for reset + */ + public DefaultHttp2ResetFrame(Http2Error error) { + this.errorCode = checkNotNull(error, "error").code(); + } + + /** + * Construct a reset message. + * + * @param errorCode the reason for reset + */ + public DefaultHttp2ResetFrame(long errorCode) { + this.errorCode = errorCode; + } + + @Override + public DefaultHttp2ResetFrame setStream(Object stream) { + super.setStream(stream); + return this; + } + + @Override + public long errorCode() { + return errorCode; + } + + @Override + public String toString() { + return "DefaultHttp2ResetFrame(stream=" + stream() + "errorCode=" + errorCode + ")"; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultHttp2ResetFrame)) { + return false; + } + DefaultHttp2ResetFrame other = (DefaultHttp2ResetFrame) o; + return super.equals(o) && errorCode == other.errorCode; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + hash = hash * 31 + (int) (errorCode ^ (errorCode >>> 32)); + return hash; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataFrame.java new file mode 100644 index 0000000000..9e6cd855c4 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataFrame.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; + +/** + * HTTP/2 DATA frame. + */ +public interface Http2DataFrame extends Http2StreamFrame, ByteBufHolder { + @Override + Http2DataFrame setStream(Object stream); + + /** + * {@code true} if this frame is the last one in this direction of the stream. + */ + boolean isEndStream(); + + /** + * Frame padding to use. Will be non-negative and less than 256. + */ + int padding(); + + /** + * Payload of DATA frame. Will not be {@code null}. + */ + @Override + ByteBuf content(); + + @Override + Http2DataFrame copy(); + + @Override + Http2DataFrame duplicate(); + + @Override + Http2DataFrame retain(); + + @Override + Http2DataFrame retain(int increment); + + @Override + Http2DataFrame touch(); + + @Override + Http2DataFrame touch(Object hint); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Frame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Frame.java new file mode 100644 index 0000000000..d27fe5c0e2 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Frame.java @@ -0,0 +1,19 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +/** An HTTP/2 frame. */ +public interface Http2Frame { } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2GoAwayFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2GoAwayFrame.java new file mode 100644 index 0000000000..01a7001f6d --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2GoAwayFrame.java @@ -0,0 +1,70 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; + +/** + * HTTP/2 GOAWAY frame. Last-Stream-Id is not exposed directly, but instead via the relative {@link + * #extraStreamIds()}. + */ +public interface Http2GoAwayFrame extends Http2Frame, ByteBufHolder { + /** + * The reason for beginning closure of the connection. Represented as an HTTP/2 error code. + */ + long errorCode(); + + /** + * The number of IDs to reserve for the receiver to use while GOAWAY is in transit. This allows + * for new streams currently en route to still be created, up to a point, which allows for very + * graceful shutdown of both sides. + */ + int extraStreamIds(); + + /** + * Sets the number of IDs to reserve for the receiver to use while GOAWAY is in transit. + * + * @see #extraStreamIds + * @return {@code this} + */ + Http2GoAwayFrame setExtraStreamIds(int extraStreamIds); + + /** + * Optional debugging information describing cause the GOAWAY. Will not be {@code null}, but may + * be empty. + */ + @Override + ByteBuf content(); + + @Override + Http2GoAwayFrame copy(); + + @Override + Http2GoAwayFrame duplicate(); + + @Override + Http2GoAwayFrame retain(); + + @Override + Http2GoAwayFrame retain(int increment); + + @Override + Http2GoAwayFrame touch(); + + @Override + Http2GoAwayFrame touch(Object hint); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HeadersFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HeadersFrame.java new file mode 100644 index 0000000000..392780ada4 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HeadersFrame.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +/** + * HTTP/2 HEADERS frame. + */ +public interface Http2HeadersFrame extends Http2StreamFrame { + @Override + Http2HeadersFrame setStream(Object stream); + + /** + * A complete header list. CONTINUATION frames are automatically handled. + */ + Http2Headers headers(); + + /** + * {@code true} if this frame is the last one in this direction of the stream. + */ + boolean isEndStream(); + + /** + * Frame padding to use. Must be non-negative and less than 256. + */ + int padding(); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java new file mode 100644 index 0000000000..5ab86439c1 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -0,0 +1,469 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.handler.logging.LogLevel.INFO; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerInvoker; +import io.netty.channel.ChannelHandlerInvokerUtil; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.handler.codec.UnsupportedMessageTypeException; +import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.OneTimeTask; + +import java.util.ArrayList; +import java.util.List; + +/** + * An HTTP/2 handler that creates child channels for each stream. Creating outgoing streams is not + * yet supported. Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link + * Http2ServerUpgradeCodec}; the necessary HTTP-to-HTTP/2 conversion is performed automatically. + * + *

This API is very immature. The Http2Connection-based API is currently preferred over + * this API. This API is targeted to eventually replace or reduce the need for the + * Http2Connection-based API. + * + *

This handler notifies the pipeline of channel events, such as {@link Http2GoAwayFrame}. It + * is also capable of writing such messages. Directly writing {@link Http2StreamFrame}s for this + * handler is unsupported. + * + *

Child Channels

+ * + *

When a new stream is created, a new {@link Channel} is created for it. Applications send and + * receive {@link Http2StreamFrame}s on the created channel. The {@link Http2StreamFrame#stream} is + * expected to be {@code null}, but the channel can use the field for its own bookkeeping. {@link + * ByteBuf}s cannot be processed by the channel; all writes that reach the head of the pipeline must + * be an instance of {@link Http2StreamFrame}. Writes that reach the head of the pipeline are + * processed directly by this handler and cannot be intercepted. + * + *

The child channel will be notified of user events that impact the stream, such as {@link + * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code + * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further + * communication, closing of the channel is delayed until any inbound queue is drained with {@link + * Channel#read()}, which follows the default behavior of channels in Netty. Applications are + * free to close the channel in response to such events if they don't have use for any queued + * messages. + * + *

{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link + * ChannelConfig#setAutoRead(boolean)} are supported. + */ +public final class Http2MultiplexCodec extends ChannelDuplexHandler { + private static final Http2FrameLogger HTTP2_FRAME_LOGGER = new Http2FrameLogger(INFO, Http2MultiplexCodec.class); + + private final ChannelHandler streamHandler; + private final EventLoopGroup streamGroup; + private final Http2ConnectionHandler http2Handler; + private final Http2Connection.PropertyKey streamInfoKey; + private final List streamsToFireChildReadComplete = new ArrayList(); + private ChannelHandlerContext ctx; + private ChannelHandlerContext http2HandlerCtx; + + /** + * Construct a new handler whose child channels run in the same event loop as this handler. + * + * @param server {@code true} this is a server + * @param streamHandler the handler added to channels for remotely-created streams. It must be + * {@link ChannelHandler.Sharable}. + */ + public Http2MultiplexCodec(boolean server, ChannelHandler streamHandler) { + this(server, streamHandler, null); + } + + /** + * Construct a new handler whose child channels run in a different event loop. + * + * @param server {@code true} this is a server + * @param streamHandler the handler added to channels for remotely-created streams. It must be + * {@link ChannelHandler.Sharable}. + * @param streamGroup event loop for registering child channels + */ + public Http2MultiplexCodec(boolean server, ChannelHandler streamHandler, + EventLoopGroup streamGroup) { + this(server, streamHandler, streamGroup, new DefaultHttp2FrameWriter()); + } + + Http2MultiplexCodec(boolean server, ChannelHandler streamHandler, + EventLoopGroup streamGroup, Http2FrameWriter frameWriter) { + if (!streamHandler.getClass().isAnnotationPresent(Sharable.class)) { + throw new IllegalArgumentException("streamHandler must be Sharable"); + } + this.streamHandler = streamHandler; + this.streamGroup = streamGroup; + Http2Connection connection = new DefaultHttp2Connection(server); + frameWriter = new Http2OutboundFrameLogger(frameWriter, HTTP2_FRAME_LOGGER); + Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); + Http2FrameReader reader = new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), HTTP2_FRAME_LOGGER); + Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, reader); + decoder.frameListener(new FrameListener()); + http2Handler = new InternalHttp2ConnectionHandler(decoder, encoder, new Http2Settings()); + http2Handler.connection().addListener(new ConnectionListener()); + streamInfoKey = http2Handler.connection().newKey(); + } + + Http2ConnectionHandler connectionHandler() { + return http2Handler; + } + + /** + * Save context and load any dependencies. + */ + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, http2Handler); + http2HandlerCtx = ctx.pipeline().context(http2Handler); + } + + /** + * Clean up any dependencies. + */ + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().remove(http2Handler); + } + + /** + * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via + * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade). + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (!(evt instanceof UpgradeEvent)) { + super.userEventTriggered(ctx, evt); + return; + } + + UpgradeEvent upgrade = (UpgradeEvent) evt; + ctx.fireUserEventTriggered(upgrade.retain()); + try { + Http2Stream stream = http2Handler.connection().stream(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID); + // TODO: improve handler/stream lifecycle so that stream isn't active before handler added. + // The stream was already made active, but ctx may have been null so it wasn't initialized. + // https://github.com/netty/netty/issues/4942 + new ConnectionListener().onStreamActive(stream); + upgrade.upgradeRequest().headers().setInt( + HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Http2CodecUtil.HTTP_UPGRADE_STREAM_ID); + new InboundHttpToHttp2Adapter(http2Handler.connection(), http2Handler.decoder().frameListener()) + .channelRead(ctx, upgrade.upgradeRequest().retain()); + } finally { + upgrade.release(); + } + } + + /** + * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child + * streams. + */ + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (!(msg instanceof Http2Frame)) { + super.write(ctx, msg, promise); + return; + } + try { + if (msg instanceof Http2StreamFrame) { + Object streamObject = ((Http2StreamFrame) msg).stream(); + int streamId = ((Http2StreamChannel) streamObject).stream.id(); + if (msg instanceof Http2DataFrame) { + Http2DataFrame frame = (Http2DataFrame) msg; + http2Handler.encoder().writeData(http2HandlerCtx, streamId, frame.content().retain(), + frame.padding(), frame.isEndStream(), promise); + } else if (msg instanceof Http2HeadersFrame) { + Http2HeadersFrame frame = (Http2HeadersFrame) msg; + http2Handler.encoder().writeHeaders( + http2HandlerCtx, streamId, frame.headers(), frame.padding(), frame.isEndStream(), promise); + } else if (msg instanceof Http2ResetFrame) { + Http2ResetFrame frame = (Http2ResetFrame) msg; + http2Handler.resetStream(http2HandlerCtx, streamId, frame.errorCode(), promise); + } else { + throw new UnsupportedMessageTypeException(msg); + } + } else if (msg instanceof Http2GoAwayFrame) { + Http2GoAwayFrame frame = (Http2GoAwayFrame) msg; + int lastStreamId = http2Handler.connection().remote().lastStreamCreated() + + frame.extraStreamIds() * 2; + http2Handler.goAway( + http2HandlerCtx, lastStreamId, frame.errorCode(), frame.content().retain(), promise); + } else { + throw new UnsupportedMessageTypeException(msg); + } + } finally { + ReferenceCountUtil.release(msg); + } + } + + ChannelFuture createStreamChannel(ChannelHandlerContext ctx, Http2Stream stream, + ChannelHandler handler) { + EventLoopGroup group = streamGroup != null ? streamGroup : ctx.channel().eventLoop(); + Http2StreamChannel channel = new Http2StreamChannel(stream); + channel.pipeline().addLast(handler); + ChannelFuture future = group.register(channel); + // Handle any errors that occurred on the local thread while registering. Even though + // failures can happen after this point, they will be handled by the channel by closing the + // channel. + if (future.cause() != null) { + if (channel.isRegistered()) { + channel.close(); + } else { + channel.unsafe().closeForcibly(); + } + } + return future; + } + + /** + * Notifies any child streams of the read completion. + */ + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + for (int i = 0; i < streamsToFireChildReadComplete.size(); i++) { + final StreamInfo streamInfo = streamsToFireChildReadComplete.get(i); + // Clear early in case fireChildReadComplete() causes it to need to be re-processed + streamInfo.inStreamsToFireChildReadComplete = false; + streamInfo.childChannel.fireChildReadComplete(); + } + streamsToFireChildReadComplete.clear(); + } + + void fireChildReadAndRegister(StreamInfo streamInfo, Object msg) { + // Can't use childChannel.fireChannelRead() as it would fire independent of whether + // channel.read() had been called. + streamInfo.childChannel.fireChildRead(msg); + if (!streamInfo.inStreamsToFireChildReadComplete) { + streamsToFireChildReadComplete.add(streamInfo); + streamInfo.inStreamsToFireChildReadComplete = true; + } + } + + final class ConnectionListener extends Http2ConnectionAdapter { + @Override + public void onStreamActive(Http2Stream stream) { + if (ctx == null) { + // UPGRADE stream is active before handlerAdded(). + return; + } + // If it is an outgoing stream, then we already created the channel. + // TODO: support outgoing streams. https://github.com/netty/netty/issues/4913 + if (stream.getProperty(streamInfoKey) != null) { + return; + } + ChannelFuture future = createStreamChannel(ctx, stream, streamHandler); + stream.setProperty(streamInfoKey, new StreamInfo((Http2StreamChannel) future.channel())); + } + + @Override + public void onStreamClosed(Http2Stream stream) { + final StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey); + if (streamInfo != null) { + final EventLoop eventLoop = streamInfo.childChannel.eventLoop(); + if (eventLoop.inEventLoop()) { + onStreamClosed0(streamInfo); + } else { + eventLoop.execute(new OneTimeTask() { + @Override + public void run() { + onStreamClosed0(streamInfo); + } + }); + } + } + } + + private void onStreamClosed0(StreamInfo streamInfo) { + streamInfo.childChannel.onStreamClosedFired = true; + streamInfo.childChannel.fireChildRead(AbstractHttp2StreamChannel.CLOSE_MESSAGE); + } + + @Override + public void onGoAwayReceived(final int lastStreamId, long errorCode, ByteBuf debugData) { + final Http2GoAwayFrame goAway = new DefaultHttp2GoAwayFrame(errorCode, debugData); + try { + http2Handler.connection().forEachActiveStream(new Http2StreamVisitor() { + @Override + public boolean visit(Http2Stream stream) { + if (stream.id() > lastStreamId + && http2Handler.connection().local().isValidStreamId(stream.id())) { + StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey); + // TODO: Can we force a user interaction pattern that doesn't require us to duplicate()? + // https://github.com/netty/netty/issues/4943 + streamInfo.childChannel.pipeline().fireUserEventTriggered(goAway.duplicate().retain()); + } + return true; + } + }); + } catch (Throwable t) { + ctx.invoker().invokeExceptionCaught(ctx, t); + } + ctx.fireUserEventTriggered(goAway.duplicate().retain()); + } + } + + class InternalHttp2ConnectionHandler extends Http2ConnectionHandler { + public InternalHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, + Http2Settings initialSettings) { + super(decoder, encoder, initialSettings); + } + + @Override + protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, + Http2Exception.StreamException http2Ex) { + try { + Http2Stream stream = http2Handler.connection().stream(http2Ex.streamId()); + if (stream == null) { + return; + } + StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey); + if (streamInfo == null) { + return; + } + streamInfo.childChannel.pipeline().fireExceptionCaught(http2Ex); + } finally { + super.onStreamError(ctx, cause, http2Ex); + } + } + } + + class FrameListener extends Http2FrameAdapter { + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + throws Http2Exception { + Http2Stream stream = http2Handler.connection().stream(streamId); + StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey); + // Use a user event in order to circumvent read queue. + streamInfo.childChannel.pipeline().fireUserEventTriggered(new DefaultHttp2ResetFrame(errorCode)); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, boolean + exclusive, int padding, boolean endStream) throws Http2Exception { + onHeadersRead(ctx, streamId, headers, padding, endStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int padding, boolean endOfStream) throws Http2Exception { + Http2Stream stream = http2Handler.connection().stream(streamId); + StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey); + fireChildReadAndRegister(streamInfo, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)); + } + + @Override + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception { + Http2Stream stream = http2Handler.connection().stream(streamId); + StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey); + fireChildReadAndRegister(streamInfo, new DefaultHttp2DataFrame(data.retain(), endOfStream, padding)); + // We return the bytes in bytesConsumed() once the stream channel consumed the bytes. + return 0; + } + } + + static final class StreamInfo { + final Http2StreamChannel childChannel; + /** + * {@code true} if stream is in {@link Http2MultiplexCodec#steamsToFireChildReadComplete}. + */ + boolean inStreamsToFireChildReadComplete; + + StreamInfo(Http2StreamChannel childChannel) { + this.childChannel = childChannel; + } + } + + // This class uses ctx.invoker().invoke* instead of ctx.* to send to the ctx's handler instead + // of the 'next' handler. + final class Http2StreamChannel extends AbstractHttp2StreamChannel { + private final Http2Stream stream; + boolean onStreamClosedFired; + + Http2StreamChannel(Http2Stream stream) { + super(ctx.channel()); + this.stream = stream; + } + + @Override + protected void doClose() throws Exception { + if (!onStreamClosedFired) { + Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).setStream(this); + ChannelHandlerInvoker invoker = ctx.invoker(); + invoker.invokeWrite(ctx, resetFrame, ctx.newPromise()); + invoker.invokeFlush(ctx); + } + super.doClose(); + } + + @Override + protected void doWrite(Object msg) { + if (!(msg instanceof Http2StreamFrame)) { + ReferenceCountUtil.release(msg); + throw new IllegalArgumentException("Message must be an Http2StreamFrame: " + msg); + } + Http2StreamFrame frame = (Http2StreamFrame) msg; + if (frame.stream() != null) { + ReferenceCountUtil.release(frame); + throw new IllegalArgumentException("Stream must be null on the frame"); + } + frame.setStream(this); + ctx.invoker().invokeWrite(ctx, frame, ctx.newPromise()); + } + + @Override + protected void doWriteComplete() { + ctx.invoker().invokeFlush(ctx); + } + + @Override + protected EventExecutor preferredEventExecutor() { + return ctx.executor(); + } + + @Override + protected void bytesConsumed(final int bytes) { + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + bytesConsumed0(bytes); + } else { + executor.execute(new OneTimeTask() { + @Override + public void run() { + bytesConsumed0(bytes); + } + }); + } + } + + private void bytesConsumed0(int bytes) { + try { + http2Handler.connection().local().flowController().consumeBytes(stream, bytes); + } catch (Throwable t) { + ChannelHandlerInvokerUtil.invokeExceptionCaughtNow(ctx, t); + } + } + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ResetFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ResetFrame.java new file mode 100644 index 0000000000..3a818f3456 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ResetFrame.java @@ -0,0 +1,27 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +/** HTTP/2 RST_STREAM frame. */ +public interface Http2ResetFrame extends Http2StreamFrame { + @Override + Http2ResetFrame setStream(Object stream); + + /** + * The reason for resetting the stream. Represented as an HTTP/2 error code. + */ + long errorCode(); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java index 5affc301e5..66f4faee6b 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.FullHttpRequest; @@ -49,6 +50,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade private final String handlerName; private final Http2ConnectionHandler connectionHandler; + private final ChannelHandler upgradeToHandler; private final Http2FrameReader frameReader; /** @@ -61,6 +63,16 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade this(null, connectionHandler); } + /** + * Creates the codec using a default name for the connection handler when adding to the + * pipeline. + * + * @param multiplexCodec the HTTP/2 multiplexing handler. + */ + public Http2ServerUpgradeCodec(Http2MultiplexCodec multiplexCodec) { + this(null, multiplexCodec); + } + /** * Creates the codec providing an upgrade to the given handler for HTTP/2. * @@ -69,8 +81,24 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade * @param connectionHandler the HTTP/2 connection handler */ public Http2ServerUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler) { + this(handlerName, connectionHandler, connectionHandler); + } + + /** + * Creates the codec providing an upgrade to the given handler for HTTP/2. + * + * @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline. + * @param multiplexCodec the HTTP/2 multiplexing handler. + */ + public Http2ServerUpgradeCodec(String handlerName, Http2MultiplexCodec multiplexCodec) { + this(handlerName, multiplexCodec.connectionHandler(), multiplexCodec); + } + + Http2ServerUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler, + ChannelHandler upgradeToHandler) { this.handlerName = handlerName; this.connectionHandler = checkNotNull(connectionHandler, "connectionHandler"); + this.upgradeToHandler = checkNotNull(upgradeToHandler, "upgradeToHandler"); frameReader = new DefaultHttp2FrameReader(); } @@ -103,7 +131,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade @Override public void upgradeTo(final ChannelHandlerContext ctx, FullHttpRequest upgradeRequest) { // Add the HTTP/2 connection handler to the pipeline immediately following the current handler. - ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler); + ctx.pipeline().addAfter(ctx.name(), handlerName, upgradeToHandler); } /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamFrame.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamFrame.java new file mode 100644 index 0000000000..df34d7e3f7 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamFrame.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +/** + * A frame whose meaning may apply to a particular stream, instead of the entire + * connection. It is still possibly for this frame type to apply to the entire connection. In such + * cases, the {@code stream} reference should be {@code null} or an object referring to the + * connection. + * + *

The meaning of {@code stream} is context-dependent and may change as a frame is processed in + * the pipeline. + */ +public interface Http2StreamFrame extends Http2Frame { + /** + * Set the stream identifier for this message. + * + * @return {@code this} + */ + Http2StreamFrame setStream(Object stream); + + /** + * The stream this frame applies to. + */ + Object stream(); +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttpToHttp2Adapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttpToHttp2Adapter.java new file mode 100644 index 0000000000..73fb3d80f6 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttpToHttp2Adapter.java @@ -0,0 +1,72 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.FullHttpMessage; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpScheme; + +/** + * Translates HTTP/1.x object reads into HTTP/2 frames. + */ +public class InboundHttpToHttp2Adapter extends ChannelInboundHandlerAdapter { + private final Http2Connection connection; + private final Http2FrameListener listener; + + public InboundHttpToHttp2Adapter(Http2Connection connection, Http2FrameListener listener) { + this.connection = connection; + this.listener = listener; + } + + private int getStreamId(HttpHeaders httpHeaders) { + return httpHeaders.getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), + connection.remote().nextStreamId()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpMessage) { + FullHttpMessage message = (FullHttpMessage) msg; + try { + int streamId = getStreamId(message.headers()); + Http2Stream stream = connection.stream(streamId); + if (stream == null) { + stream = connection.remote().createStream(streamId, false); + } + message.headers().set(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), HttpScheme.HTTP.name()); + Http2Headers messageHeaders = HttpConversionUtil.toHttp2Headers(message, true); + boolean hasContent = message.content().isReadable(); + boolean hasTrailers = !message.trailingHeaders().isEmpty(); + listener.onHeadersRead( + ctx, streamId, messageHeaders, 0, !(hasContent || hasTrailers)); + if (hasContent) { + listener.onDataRead(ctx, streamId, message.content(), 0, !hasTrailers); + } + if (hasTrailers) { + Http2Headers headers = HttpConversionUtil.toHttp2Headers(message.trailingHeaders(), true); + listener.onHeadersRead(ctx, streamId, headers, 0, true); + } + stream.closeRemoteSide(); + } finally { + message.release(); + } + } else { + super.channelRead(ctx, msg); + } + } +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java new file mode 100644 index 0000000000..feabb702f2 --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java @@ -0,0 +1,286 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +import static io.netty.util.ReferenceCountUtil.releaseLater; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpScheme; +import io.netty.util.AsciiString; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.PlatformDependent; + +import java.util.ArrayDeque; +import java.util.Queue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +/** + * Unit tests for {@link Http2MultiplexCodec}. + */ +public class Http2MultiplexCodecTest { + private final TestChannelInitializer streamInit = new TestChannelInitializer(); + // For verifying outbound frames + private final Http2FrameWriter frameWriter = spy(new VerifiableHttp2FrameWriter()); + private final Http2MultiplexCodec serverCodec = new Http2MultiplexCodec(true, streamInit, null, frameWriter); + private final EmbeddedChannel channel = new EmbeddedChannel(); + // For injecting inbound frames + private final Http2FrameListener frameListener + = ((DefaultHttp2ConnectionDecoder) serverCodec.connectionHandler().decoder()) + .internalFrameListener(); + private ChannelHandlerContext http2HandlerCtx; + private Http2Headers request = new DefaultHttp2Headers() + .method(HttpMethod.GET.asciiName()).scheme(HttpScheme.HTTPS.name()) + .authority(new AsciiString("example.org")).path(new AsciiString("/foo")); + private Http2Headers response = new DefaultHttp2Headers() + .status(HttpResponseStatus.OK.codeAsText()); + + @Before + public void setUp() throws Exception { + channel.connect(null); + channel.pipeline().addLast(serverCodec); + http2HandlerCtx = channel.pipeline().context(serverCodec.connectionHandler()); + + // Handshake + verify(frameWriter).writeSettings(eq(http2HandlerCtx), + anyHttp2Settings(), anyChannelPromise()); + verifyNoMoreInteractions(frameWriter); + channel.writeInbound(Http2CodecUtil.connectionPrefaceBuf()); + frameListener.onSettingsRead(http2HandlerCtx, new Http2Settings()); + verify(frameWriter).writeSettingsAck(eq(http2HandlerCtx), anyChannelPromise()); + frameListener.onSettingsAckRead(http2HandlerCtx); + } + + @After + public void tearDown() { + Object o; + while ((o = channel.readOutbound()) != null) { + ReferenceCountUtil.release(o); + } + } + + @Test + public void startStop() throws Exception { + assertTrue(channel.isActive()); + channel.close(); + verify(frameWriter).writeGoAway( + eq(http2HandlerCtx), eq(0), eq(0L), eq(Unpooled.EMPTY_BUFFER), anyChannelPromise()); + assertTrue(!channel.isActive()); + } + + @Test + public void headerRequestHeaderResponse() throws Exception { + LastInboundHandler stream = new LastInboundHandler(); + streamInit.handler = stream; + frameListener.onHeadersRead(http2HandlerCtx, 1, request, 31, true); + assertNull(streamInit.handler); + assertEquals(new DefaultHttp2HeadersFrame(request, true, 31), stream.readInbound()); + assertNull(stream.readInbound()); + assertTrue(stream.channel().isActive()); + + stream.writeOutbound(new DefaultHttp2HeadersFrame(response, true, 27)); + verify(frameWriter).writeHeaders( + eq(http2HandlerCtx), eq(1), eq(response), anyInt(), anyShort(), anyBoolean(), + eq(27), eq(true), anyChannelPromise()); + verify(frameWriter, never()).writeRstStream( + any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise()); + assertFalse(stream.channel().isActive()); + assertTrue(channel.isActive()); + } + + @Test + public void entityRequestEntityResponse() throws Exception { + LastInboundHandler stream = new LastInboundHandler(); + streamInit.handler = stream; + frameListener.onHeadersRead(http2HandlerCtx, 1, request, 0, false); + assertEquals(new DefaultHttp2HeadersFrame(request, false), stream.readInbound()); + assertNull(stream.readInbound()); + assertTrue(stream.channel().isActive()); + + ByteBuf hello = bb("hello"); + frameListener.onDataRead(http2HandlerCtx, 1, hello, 31, true); + // Release hello to emulate ByteToMessageDecoder + hello.release(); + Http2DataFrame inboundData = stream.readInbound(); + assertEquals(releaseLater(new DefaultHttp2DataFrame(bb("hello"), true, 31)), inboundData); + assertEquals(1, inboundData.refCnt()); + assertNull(stream.readInbound()); + assertTrue(stream.channel().isActive()); + + stream.writeOutbound(new DefaultHttp2HeadersFrame(response, false)); + verify(frameWriter).writeHeaders(eq(http2HandlerCtx), eq(1), eq(response), anyInt(), + anyShort(), anyBoolean(), eq(0), eq(false), anyChannelPromise()); + assertTrue(stream.channel().isActive()); + + stream.writeOutbound(new DefaultHttp2DataFrame(bb("world"), true, 27)); + ArgumentCaptor outboundData = ArgumentCaptor.forClass(ByteBuf.class); + verify(frameWriter).writeData(eq(http2HandlerCtx), eq(1), outboundData.capture(), eq(27), + eq(true), anyChannelPromise()); + assertEquals(releaseLater(bb("world")), outboundData.getValue()); + assertEquals(1, outboundData.getValue().refCnt()); + verify(frameWriter, never()).writeRstStream( + any(ChannelHandlerContext.class), anyInt(), anyLong(), anyChannelPromise()); + assertFalse(stream.channel().isActive()); + assertTrue(channel.isActive()); + } + + @Test + public void closeCausesReset() throws Exception { + LastInboundHandler stream = new LastInboundHandler(); + streamInit.handler = stream; + frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, true); + + stream.channel().close(); + channel.runPendingTasks(); + channel.checkException(); + stream.checkException(); + verify(frameWriter).writeRstStream( + eq(http2HandlerCtx), eq(3), eq(8L), anyChannelPromise()); + assertFalse(stream.channel().isActive()); + assertTrue(channel.isActive()); + } + + @Test + public void sendRstStream() throws Exception { + LastInboundHandler stream = new LastInboundHandler(); + streamInit.handler = stream; + frameListener.onHeadersRead(http2HandlerCtx, 5, request, 31, true); + + stream.writeOutbound(new DefaultHttp2ResetFrame(314 /* non-standard error */)); + verify(frameWriter).writeRstStream( + eq(http2HandlerCtx), eq(5), eq(314L), anyChannelPromise()); + assertFalse(stream.channel().isActive()); + assertTrue(channel.isActive()); + } + + private static ChannelPromise anyChannelPromise() { + return any(ChannelPromise.class); + } + + private static Http2Settings anyHttp2Settings() { + return any(Http2Settings.class); + } + + private static ByteBuf bb(String s) { + ByteBuf buf = Unpooled.buffer(s.length() * 4); + ByteBufUtil.writeUtf8(buf, s); + return buf; + } + + static class TestChannelInitializer extends ChannelInitializer { + ChannelHandler handler; + + @Override + public void initChannel(Channel channel) { + if (handler != null) { + channel.pipeline().addLast(handler); + handler = null; + } + } + } + + static class LastInboundHandler extends ChannelDuplexHandler { + private final Queue inboundMessages = new ArrayDeque(); + private final Queue userEvents = new ArrayDeque(); + private Throwable lastException; + private ChannelHandlerContext ctx; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + inboundMessages.add(msg); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + userEvents.add(evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (lastException != null) { + cause.printStackTrace(); + } else { + lastException = cause; + } + } + + public void checkException() throws Exception { + if (lastException == null) { + return; + } + Throwable t = lastException; + lastException = null; + PlatformDependent.throwException(t); + } + + @SuppressWarnings("unchecked") + public T readInbound() { + return (T) inboundMessages.poll(); + } + + @SuppressWarnings("unchecked") + public T readUserEvent() { + return (T) userEvents.poll(); + } + + public void writeOutbound(Object... msgs) throws Exception { + for (Object msg : msgs) { + ctx.write(msg); + } + ctx.flush(); + EmbeddedChannel parent = (EmbeddedChannel) ctx.channel().parent(); + parent.runPendingTasks(); + parent.checkException(); + checkException(); + } + + public Channel channel() { + return ctx.channel(); + } + } + + public static class VerifiableHttp2FrameWriter extends DefaultHttp2FrameWriter { + @Override + public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endStream, ChannelPromise promise) { + // duplicate 'data' to prevent readerIndex from being changed, to ease verification + return super.writeData(ctx, streamId, data.duplicate(), padding, endStream, promise); + } + } +} diff --git a/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/HelloWorldHttp2Handler.java new file mode 100644 index 0000000000..5e2ad9fa6b --- /dev/null +++ b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/HelloWorldHttp2Handler.java @@ -0,0 +1,94 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.example.http2.helloworld.multiplex.server; + +import static io.netty.buffer.Unpooled.copiedBuffer; +import static io.netty.buffer.Unpooled.unreleasableBuffer; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.util.CharsetUtil; + +/** + * A simple handler that responds with the message "Hello World!". + * + *

This example is making use of the "multiplexing" http2 API, where streams are mapped to child + * Channels. This API is very experimental and incomplete. + */ +@Sharable +public class HelloWorldHttp2Handler extends ChannelDuplexHandler { + + static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8)); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + cause.printStackTrace(); + ctx.close(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2HeadersFrame) { + onHeadersRead(ctx, (Http2HeadersFrame) msg); + } else if (msg instanceof Http2DataFrame) { + onDataRead(ctx, (Http2DataFrame) msg); + } else { + super.channelRead(ctx, msg); + } + } + + /** + * If receive a frame with end-of-stream set, send a pre-canned response. + */ + public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception { + if (data.isEndStream()) { + sendResponse(ctx, data.content().retain()); + } + } + + /** + * If receive a frame with end-of-stream set, send a pre-canned response. + */ + public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headers) + throws Exception { + if (headers.isEndStream()) { + ByteBuf content = ctx.alloc().buffer(); + content.writeBytes(RESPONSE_BYTES); + ByteBufUtil.writeAscii(content, " - via HTTP/2"); + sendResponse(ctx, content); + } + } + + /** + * Sends a "Hello World" DATA frame to the client. + */ + private void sendResponse(ChannelHandlerContext ctx, ByteBuf payload) { + // Send a frame for the response status + Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText()); + ctx.write(new DefaultHttp2HeadersFrame(headers)); + ctx.writeAndFlush(new DefaultHttp2DataFrame(payload, true)); + } +} diff --git a/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2OrHttpHandler.java b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2OrHttpHandler.java new file mode 100644 index 0000000000..c6319908f7 --- /dev/null +++ b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2OrHttpHandler.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.example.http2.helloworld.multiplex.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.example.http2.helloworld.server.HelloWorldHttp1Handler; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http2.Http2MultiplexCodec; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; + +/** + * Negotiates with the browser if HTTP2 or HTTP is going to be used. Once decided, the Netty + * pipeline is setup with the correct handlers for the selected protocol. + */ +public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler { + + private static final int MAX_CONTENT_LENGTH = 1024 * 100; + + protected Http2OrHttpHandler() { + super(ApplicationProtocolNames.HTTP_1_1); + } + + @Override + protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { + if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { + ctx.pipeline().addLast(new Http2MultiplexCodec(true, new HelloWorldHttp2Handler())); + return; + } + + if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { + ctx.pipeline().addLast(new HttpServerCodec(), + new HttpObjectAggregator(MAX_CONTENT_LENGTH), + new HelloWorldHttp1Handler("ALPN Negotiation")); + return; + } + + throw new IllegalStateException("unknown protocol: " + protocol); + } +} diff --git a/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2Server.java b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2Server.java new file mode 100644 index 0000000000..61b3f08252 --- /dev/null +++ b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2Server.java @@ -0,0 +1,96 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.example.http2.helloworld.multiplex.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolConfig.Protocol; +import io.netty.handler.ssl.ApplicationProtocolConfig.SelectedListenerFailureBehavior; +import io.netty.handler.ssl.ApplicationProtocolConfig.SelectorFailureBehavior; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.SupportedCipherSuiteFilter; +import io.netty.handler.ssl.util.SelfSignedCertificate; + +/** + * A HTTP/2 Server that responds to requests with a Hello World. Once started, you can test the + * server with the example client. + * + *

This example is making use of the "multiplexing" http2 API, where streams are mapped to child + * Channels. This API is very experimental and incomplete. + */ +public final class Http2Server { + + static final boolean SSL = System.getProperty("ssl") != null; + + static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080")); + + public static void main(String[] args) throws Exception { + // Configure SSL. + final SslContext sslCtx; + if (SSL) { + SslProvider provider = OpenSsl.isAlpnSupported() ? SslProvider.OPENSSL : SslProvider.JDK; + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .sslProvider(provider) + /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification. + * Please refer to the HTTP/2 specification for cipher requirements. */ + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .applicationProtocolConfig(new ApplicationProtocolConfig( + Protocol.ALPN, + // NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers. + SelectorFailureBehavior.NO_ADVERTISE, + // ACCEPT is currently the only mode supported by both OpenSsl and JDK providers. + SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2, + ApplicationProtocolNames.HTTP_1_1)) + .build(); + } else { + sslCtx = null; + } + // Configure the server. + EventLoopGroup group = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.option(ChannelOption.SO_BACKLOG, 1024); + b.group(group) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new Http2ServerInitializer(sslCtx)); + + Channel ch = b.bind(PORT).sync().channel(); + + System.err.println("Open your HTTP/2-enabled web browser and navigate to " + + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); + + ch.closeFuture().sync(); + } finally { + group.shutdownGracefully(); + } + } +} diff --git a/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2ServerInitializer.java b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2ServerInitializer.java new file mode 100644 index 0000000000..00770b1f78 --- /dev/null +++ b/example/src/main/java/io/netty/example/http2/helloworld/multiplex/server/Http2ServerInitializer.java @@ -0,0 +1,122 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.example.http2.helloworld.multiplex.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.example.http2.helloworld.server.HelloWorldHttp1Handler; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodec; +import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory; +import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2MultiplexCodec; +import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; +import io.netty.handler.ssl.SslContext; +import io.netty.util.AsciiString; + +/** + * Sets up the Netty pipeline for the example server. Depending on the endpoint config, sets up the + * pipeline for NPN or cleartext HTTP upgrade to HTTP/2. + */ +public class Http2ServerInitializer extends ChannelInitializer { + + private static final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() { + @Override + public UpgradeCodec newUpgradeCodec(CharSequence protocol) { + if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { + return new Http2ServerUpgradeCodec(new Http2MultiplexCodec(true, + new HelloWorldHttp2Handler())); + } else { + return null; + } + } + }; + + private final SslContext sslCtx; + private final int maxHttpContentLength; + + public Http2ServerInitializer(SslContext sslCtx) { + this(sslCtx, 16 * 1024); + } + + public Http2ServerInitializer(SslContext sslCtx, int maxHttpContentLength) { + if (maxHttpContentLength < 0) { + throw new IllegalArgumentException("maxHttpContentLength (expected >= 0): " + maxHttpContentLength); + } + this.sslCtx = sslCtx; + this.maxHttpContentLength = maxHttpContentLength; + } + + @Override + public void initChannel(SocketChannel ch) { + if (sslCtx != null) { + configureSsl(ch); + } else { + configureClearText(ch); + } + } + + /** + * Configure the pipeline for TLS NPN negotiation to HTTP/2. + */ + private void configureSsl(SocketChannel ch) { + ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), new Http2OrHttpHandler()); + } + + /** + * Configure the pipeline for a cleartext upgrade from HTTP to HTTP/2.0 + */ + private void configureClearText(SocketChannel ch) { + final ChannelPipeline p = ch.pipeline(); + final HttpServerCodec sourceCodec = new HttpServerCodec(); + + p.addLast(sourceCodec); + p.addLast(new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory)); + p.addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception { + // If this handler is hit then no upgrade has been attempted and the client is just talking HTTP. + System.err.println("Directly talking: " + msg.protocolVersion() + " (no upgrade was attempted)"); + ChannelPipeline pipeline = ctx.pipeline(); + ChannelHandlerContext thisCtx = pipeline.context(this); + pipeline.addAfter(thisCtx.name(), null, new HelloWorldHttp1Handler("Direct. No Upgrade Attempted.")); + pipeline.replace(this, null, new HttpObjectAggregator(maxHttpContentLength)); + ctx.fireChannelRead(msg); + } + }); + + p.addLast(new UserEventLogger()); + } + + /** + * Class that logs any User Events triggered on this channel. + */ + private static class UserEventLogger extends ChannelInboundHandlerAdapter { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + System.out.println("User Event Triggered: " + evt); + ctx.fireUserEventTriggered(evt); + } + } +} diff --git a/run-example.sh b/run-example.sh index d91bec7722..f81439441c 100755 --- a/run-example.sh +++ b/run-example.sh @@ -19,6 +19,7 @@ EXAMPLE_MAP=( 'http2-client:io.netty.example.http2.helloworld.client.Http2Client' 'http2-server:io.netty.example.http2.helloworld.server.Http2Server' 'http2-tiles:io.netty.example.http2.tiles.Launcher' + 'http2-multiplex-server:io.netty.example.http2.helloworld.multiplex.server.Http2Server' 'spdy-client:io.netty.example.spdy.client.SpdyClient' 'spdy-server:io.netty.example.spdy.server.SpdyServer' 'worldclock-client:io.netty.example.worldclock.WorldClockClient'