Split multiplexing from frame decoding to allow easier customization of frame processing and better seperation of responsibilities (#9239)

Motivation:

In the past we had the following class hierarchy:

Http2ConnectionHandler --- Http2FrameCodec -- Http2MultiplexCodec

This hierarchy makes it impossible to plug in any code that would like to act on Http2Frame and Http2StreamFrame which can be quite useful for various situations (like metrics, logging etc). Beside this it also made the implementtion very hacky. To allow easier maintainance and also allow more flexible costumizations we should split Http2MultiplexCodec and Http2FrameCode.

Modifications:

- Introduce Http2MultiplexHandler (which is a replacement for Http2MultiplexCodec when used together with Http2FrameCodec)
- Mark Http2MultiplexCodecBuilder and Http2MultiplexCodec as deprecated. People should use Http2FrameCodecBuilder / Http2FrameCodec together with Http2MultiplexHandlder in the future
- Adjust / Add tests
- Adjust examples

Result:

More flexible usage possible and less hacky / coupled implementation for http2 multiplexing
This commit is contained in:
Norman Maurer 2019-06-24 09:17:15 +02:00
parent a05adceae8
commit bf72d6d2d9
17 changed files with 2754 additions and 2107 deletions

View File

@ -91,4 +91,16 @@ public abstract class Http2ChannelDuplexHandler implements ChannelHandler {
} }
return (Http2FrameCodec) frameCodecCtx.handler(); return (Http2FrameCodec) frameCodecCtx.handler();
} }
boolean isValidLocalStreamId(Http2FrameStream stream) {
return frameCodec.connection().local().isValidStreamId(stream.id());
}
boolean streamMayHaveExisted(Http2FrameStream stream) {
return frameCodec.connection().streamMayHaveExisted(stream.id());
}
boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception {
return frameCodec.consumeBytes(stream.id(), bytes);
}
} }

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -186,14 +187,16 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception { final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
assert ctx.executor().inEventLoop(); assert ctx.executor().inEventLoop();
connection().forEachActiveStream(stream -> { if (connection().numActiveStreams() > 0) {
try { connection().forEachActiveStream(stream -> {
return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey)); try {
} catch (Throwable cause) { return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
onError(ctx, false, cause); } catch (Throwable cause) {
return false; onError(ctx, false, cause);
} return false;
}); }
});
}
} }
@Override @Override
@ -410,7 +413,8 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
} }
private void onStreamActive0(Http2Stream stream) { private void onStreamActive0(Http2Stream stream) {
if (connection().local().isValidStreamId(stream.id())) { if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID &&
connection().local().isValidStreamId(stream.id())) {
return; return;
} }
@ -618,11 +622,6 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
ctx.fireExceptionCaught(cause); ctx.fireExceptionCaught(cause);
} }
final boolean isWritable(DefaultHttp2FrameStream stream) {
Http2Stream s = stream.stream;
return s != null && connection().remote().flowController().isWritable(s);
}
private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener { private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
@Override @Override
public void writabilityChanged(Http2Stream stream) { public void writabilityChanged(Http2Stream stream) {
@ -644,6 +643,8 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
private volatile int id = -1; private volatile int id = -1;
volatile Http2Stream stream; volatile Http2Stream stream;
Channel attachment;
DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) { DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
assert id == -1 || stream.id() == id; assert id == -1 || stream.id() == id;
this.stream = stream; this.stream = stream;

View File

@ -23,7 +23,10 @@ import static java.util.Objects.requireNonNull;
/** /**
* A builder for {@link Http2MultiplexCodec}. * A builder for {@link Http2MultiplexCodec}.
*
* @deprecated use {@link Http2FrameCodecBuilder} together with {@link Http2MultiplexHandler}.
*/ */
@Deprecated
@UnstableApi @UnstableApi
public class Http2MultiplexCodecBuilder public class Http2MultiplexCodecBuilder
extends AbstractHttp2ConnectionHandlerBuilder<Http2MultiplexCodec, Http2MultiplexCodecBuilder> { extends AbstractHttp2ConnectionHandlerBuilder<Http2MultiplexCodec, Http2MultiplexCodecBuilder> {

View File

@ -0,0 +1,387 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.UnstableApi;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
/**
* An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
* with {@link Http2FrameCodec}.
*
* <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
* receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
* all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
* the head of the pipeline are processed directly by this handler and cannot be intercepted.
*
* <p>The child channel will be notified of user events that impact the stream, such as {@link
* Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
* Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
* communication, closing of the channel is delayed until any inbound queue is drained with {@link
* Channel#read()}, which follows the default behavior of channels in Netty. Applications are
* free to close the channel in response to such events if they don't have use for any queued
* messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
* will be processed internally and also propagated down the pipeline for other handlers to act on.
*
* <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
*
* <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
*
* <h3>Reference Counting</h3>
*
* Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
* reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
* before propagating a reference counted object through the pipeline, and thus an application handler needs to release
* such an object after having consumed it. For more information on reference counting take a look at
* http://netty.io/wiki/reference-counted-objects.html
*
* <h3>Channel Events</h3>
*
* A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
* does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
* or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
* (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
* indicating the cause and is closed immediately thereafter.
*
* <h3>Writability and Flow Control</h3>
*
* A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
* when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
* does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
* channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
* note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
*/
@UnstableApi
public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = Http2MultiplexHandler::registerDone;
private final ChannelHandler inboundStreamHandler;
private final ChannelHandler upgradeStreamHandler;
private boolean parentReadInProgress;
private int idCount;
// Linked-List for Http2MultiplexHandlerStreamChannel instances that need to be processed by
// channelReadComplete(...)
private AbstractHttp2StreamChannel head;
private AbstractHttp2StreamChannel tail;
// Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
private volatile ChannelHandlerContext ctx;
/**
* Creates a new instance
*
* @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
* the {@link Channel}s created for new inbound streams.
*/
public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
this(inboundStreamHandler, null);
}
/**
* Creates a new instance
*
* @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
* the {@link Channel}s created for new inbound streams.
* @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
* upgraded {@link Channel}.
*/
public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
this.upgradeStreamHandler = upgradeStreamHandler;
}
static void registerDone(ChannelFuture future) {
// Handle any errors that occurred on the local thread while registering. Even though
// failures can happen after this point, they will be handled by the channel by closing the
// childChannel.
if (!future.isSuccess()) {
Channel childChannel = future.channel();
if (childChannel.isRegistered()) {
childChannel.close();
} else {
childChannel.unsafe().closeForcibly();
}
}
}
@Override
protected void handlerAdded0(ChannelHandlerContext ctx) {
if (ctx.executor() != ctx.channel().eventLoop()) {
throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
}
this.ctx = ctx;
}
@Override
protected void handlerRemoved0(ChannelHandlerContext ctx) {
// Unlink the linked list to guard against GC nepotism.
AbstractHttp2StreamChannel ch = head;
while (ch != null) {
AbstractHttp2StreamChannel curr = ch;
ch = curr.next;
curr.next = curr.previous = null;
}
head = tail = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
parentReadInProgress = true;
if (msg instanceof Http2StreamFrame) {
Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
DefaultHttp2FrameStream s =
(DefaultHttp2FrameStream) streamFrame.stream();
((AbstractHttp2StreamChannel) s.attachment).fireChildRead(streamFrame);
return;
}
if (msg instanceof Http2GoAwayFrame) {
onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
}
// Send everything down the pipeline
ctx.fireChannelRead(msg);
}
@Override
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
// While the writability state may change during iterating of the streams we just set all of the streams
// to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
}
ctx.fireChannelWritabilityChanged();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof Http2FrameStreamEvent) {
Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
if (event.type() == Http2FrameStreamEvent.Type.State) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
// Ignore everything which was not caused by an upgrade
break;
}
// We must have an upgrade handler or else we can't handle the stream
if (upgradeStreamHandler == null) {
throw connectionError(INTERNAL_ERROR,
"Client is misconfigured for upgrade requests");
}
// fall-trough
case HALF_CLOSED_REMOTE:
// fall-trough
case OPEN:
if (stream.attachment != null) {
// ignore if child channel was already created.
break;
}
final AbstractHttp2StreamChannel ch;
if (stream.state() == Http2Stream.State.HALF_CLOSED_LOCAL) {
ch = new Http2MultiplexHandlerStreamChannel(stream, null);
ch.closeOutbound();
// Add our upgrade handler to the channel and then register the channel.
// The register call fires the channelActive, etc.
ch.pipeline().addLast(upgradeStreamHandler);
} else {
ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
}
ChannelFuture future = ch.register();
if (future.isDone()) {
registerDone(future);
} else {
future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
}
break;
case CLOSED:
AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
if (channel != null) {
channel.streamClosed();
}
break;
default:
// ignore for now
break;
}
}
return;
}
ctx.fireUserEventTriggered(evt);
}
// TODO: This is most likely not the best way to expose this, need to think more about it.
Http2StreamChannel newOutboundStream() {
return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Http2FrameStreamException) {
Http2FrameStreamException exception = (Http2FrameStreamException) cause;
Http2FrameStream stream = exception.stream();
AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
((DefaultHttp2FrameStream) stream).attachment;
try {
childChannel.pipeline().fireExceptionCaught(cause.getCause());
} finally {
childChannel.unsafe().closeForcibly();
}
return;
}
ctx.fireExceptionCaught(cause);
}
private boolean isChildChannelInReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
return childChannel.previous != null || childChannel.next != null || head == childChannel;
}
private boolean tryAddChildChannelToReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
if (!isChildChannelInReadPendingQueue(childChannel)) {
if (tail == null) {
assert head == null;
tail = head = childChannel;
} else {
childChannel.previous = tail;
tail.next = childChannel;
tail = childChannel;
}
return true;
}
return false;
}
private void tryRemoveChildChannelFromReadPendingQueue(AbstractHttp2StreamChannel childChannel) {
if (isChildChannelInReadPendingQueue(childChannel)) {
AbstractHttp2StreamChannel previous = childChannel.previous;
if (childChannel.next != null) {
childChannel.next.previous = previous;
} else {
tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back.
}
if (previous != null) {
previous.next = childChannel.next;
} else {
head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward.
}
childChannel.next = childChannel.previous = null;
}
}
private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
try {
forEachActiveStream(stream -> {
final int streamId = stream.id();
if (streamId > goAwayFrame.lastStreamId() && isValidLocalStreamId(stream)) {
final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
((DefaultHttp2FrameStream) stream).attachment;
childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
}
return true;
});
} catch (Http2Exception e) {
ctx.fireExceptionCaught(e);
ctx.close();
}
}
/**
* Notifies any child streams of the read completion.
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
parentReadInProgress = true;
// If we have many child channel we can optimize for the case when multiple call flush() in
// channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
// write calls on the socket which is expensive.
AbstractHttp2StreamChannel current = head;
if (current != null) {
try {
do {
AbstractHttp2StreamChannel childChannel = current;
// Clear early in case fireChildReadComplete() causes it to need to be re-processed
current = current.next;
childChannel.next = childChannel.previous = null;
childChannel.fireChildReadComplete();
} while (current != null);
} finally {
parentReadInProgress = false;
tail = head = null;
ctx.flush();
}
} else {
parentReadInProgress = false;
}
ctx.fireChannelReadComplete();
}
private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
super(stream, ++idCount, inboundHandler);
}
@Override
protected boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception {
return Http2MultiplexHandler.this.consumeBytes(stream, bytes);
}
@Override
protected boolean isParentReadInProgress() {
return parentReadInProgress;
}
@Override
protected boolean streamMayHaveExisted(Http2FrameStream stream) {
return Http2MultiplexHandler.this.streamMayHaveExisted(stream);
}
@Override
protected void tryRemoveChildChannelFromReadPendingQueue() {
Http2MultiplexHandler.this.tryRemoveChildChannelFromReadPendingQueue(this);
}
@Override
protected boolean tryAddChildChannelToReadPendingQueue() {
return Http2MultiplexHandler.this.tryAddChildChannelToReadPendingQueue(this);
}
@Override
protected ChannelHandlerContext parentContext() {
return ctx;
}
}
}

View File

@ -102,7 +102,10 @@ public final class Http2StreamChannelBootstrap {
} }
public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise) { public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise) {
final ChannelHandlerContext ctx = channel.pipeline().context(Http2MultiplexCodec.class); ChannelHandlerContext ctx = channel.pipeline().context(Http2MultiplexCodec.class);
if (ctx == null) {
ctx = channel.pipeline().context(Http2MultiplexHandler.class);
}
if (ctx == null) { if (ctx == null) {
if (channel.isActive()) { if (channel.isActive()) {
promise.setFailure(new IllegalStateException(StringUtil.simpleClassName(Http2MultiplexCodec.class) + promise.setFailure(new IllegalStateException(StringUtil.simpleClassName(Http2MultiplexCodec.class) +
@ -115,7 +118,8 @@ public final class Http2StreamChannelBootstrap {
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
open0(ctx, promise); open0(ctx, promise);
} else { } else {
executor.execute(() -> open0(ctx, promise)); final ChannelHandlerContext finalCtx = ctx;
executor.execute(() -> open0(finalCtx, promise));
} }
} }
return promise; return promise;
@ -123,7 +127,12 @@ public final class Http2StreamChannelBootstrap {
public void open0(ChannelHandlerContext ctx, final Promise<Http2StreamChannel> promise) { public void open0(ChannelHandlerContext ctx, final Promise<Http2StreamChannel> promise) {
assert ctx.executor().inEventLoop(); assert ctx.executor().inEventLoop();
final Http2StreamChannel streamChannel = ((Http2MultiplexCodec) ctx.handler()).newOutboundStream(); final Http2StreamChannel streamChannel;
if (ctx.handler() instanceof Http2MultiplexCodec) {
streamChannel = ((Http2MultiplexCodec) ctx.handler()).newOutboundStream();
} else {
streamChannel = ((Http2MultiplexHandler) ctx.handler()).newOutboundStream();
}
try { try {
init(streamChannel); init(streamChannel);
} catch (Exception e) { } catch (Exception e) {

View File

@ -0,0 +1,78 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import org.junit.Test;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public abstract class Http2MultiplexClientUpgradeTest<C extends Http2FrameCodec> {
@ChannelHandler.Sharable
final class NoopHandler implements ChannelHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.channel().close();
}
}
private final class UpgradeHandler implements ChannelHandler {
Http2Stream.State stateOnActive;
int streamId;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Http2StreamChannel ch = (Http2StreamChannel) ctx.channel();
stateOnActive = ch.stream().state();
streamId = ch.stream().id();
ctx.fireChannelActive();
}
}
protected abstract C newCodec(ChannelHandler upgradeHandler);
protected abstract ChannelHandler newMultiplexer(ChannelHandler upgradeHandler);
@Test
public void upgradeHandlerGetsActivated() throws Exception {
UpgradeHandler upgradeHandler = new UpgradeHandler();
C codec = newCodec(upgradeHandler);
EmbeddedChannel ch = new EmbeddedChannel(codec, newMultiplexer(upgradeHandler));
codec.onHttpClientUpgrade();
assertFalse(upgradeHandler.stateOnActive.localSideOpen());
assertTrue(upgradeHandler.stateOnActive.remoteSideOpen());
assertEquals(1, upgradeHandler.streamId);
assertTrue(ch.finishAndReleaseAll());
}
@Test(expected = Http2Exception.class)
public void clientUpgradeWithoutUpgradeHandlerThrowsHttp2Exception() throws Http2Exception {
C codec = newCodec(null);
EmbeddedChannel ch = new EmbeddedChannel(codec, newMultiplexer(null));
try {
codec.onHttpClientUpgrade();
} finally {
assertTrue(ch.finishAndReleaseAll());
}
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2018 The Netty Project * Copyright 2019 The Netty Project
* *
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a * "License"); you may not use this file except in compliance with the License. You may obtain a
@ -14,68 +14,21 @@
*/ */
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.channel.ChannelInboundHandler;
import org.junit.Test;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import static org.junit.Assert.assertEquals; public class Http2MultiplexCodecClientUpgradeTest extends Http2MultiplexClientUpgradeTest<Http2MultiplexCodec> {
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class Http2MultiplexCodecClientUpgradeTest { @Override
protected Http2MultiplexCodec newCodec(ChannelHandler upgradeHandler) {
@ChannelHandler.Sharable
private final class NoopHandler implements ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.channel().close();
}
}
private final class UpgradeHandler implements ChannelInboundHandler {
Http2Stream.State stateOnActive;
int streamId;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Http2StreamChannel ch = (Http2StreamChannel) ctx.channel();
stateOnActive = ch.stream().state();
streamId = ch.stream().id();
ctx.fireChannelActive();
}
}
private Http2MultiplexCodec newCodec(ChannelHandler upgradeHandler) {
Http2MultiplexCodecBuilder builder = Http2MultiplexCodecBuilder.forClient(new NoopHandler()); Http2MultiplexCodecBuilder builder = Http2MultiplexCodecBuilder.forClient(new NoopHandler());
builder.withUpgradeStreamHandler(upgradeHandler); if (upgradeHandler != null) {
builder.withUpgradeStreamHandler(upgradeHandler);
}
return builder.build(); return builder.build();
} }
@Test @Override
public void upgradeHandlerGetsActivated() throws Exception { protected ChannelHandler newMultiplexer(ChannelHandler upgradeHandler) {
UpgradeHandler upgradeHandler = new UpgradeHandler(); return null;
Http2MultiplexCodec codec = newCodec(upgradeHandler);
EmbeddedChannel ch = new EmbeddedChannel(codec);
codec.onHttpClientUpgrade();
assertFalse(upgradeHandler.stateOnActive.localSideOpen());
assertTrue(upgradeHandler.stateOnActive.remoteSideOpen());
assertEquals(1, upgradeHandler.streamId);
assertTrue(ch.finishAndReleaseAll());
}
@Test(expected = Http2Exception.class)
public void clientUpgradeWithoutUpgradeHandlerThrowsHttp2Exception() throws Http2Exception {
Http2MultiplexCodec codec = Http2MultiplexCodecBuilder.forClient(new NoopHandler()).build();
EmbeddedChannel ch = new EmbeddedChannel(codec);
try {
codec.onHttpClientUpgrade();
} finally {
assertTrue(ch.finishAndReleaseAll());
}
} }
} }

View File

@ -0,0 +1,30 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandler;
public class Http2MultiplexHandlerClientUpgradeTest extends Http2MultiplexClientUpgradeTest<Http2FrameCodec> {
@Override
protected Http2FrameCodec newCodec(ChannelHandler upgradeHandler) {
return Http2FrameCodecBuilder.forClient().build();
}
@Override
protected ChannelHandler newMultiplexer(ChannelHandler upgradeHandler) {
return new Http2MultiplexHandler(new NoopHandler(), upgradeHandler);
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandler;
import org.junit.Ignore;
/**
* Unit tests for {@link Http2MultiplexHandler}.
*/
public class Http2MultiplexHandlerTest extends Http2MultiplexTest<Http2FrameCodec> {
@Override
protected Http2FrameCodec newCodec(TestChannelInitializer childChannelInitializer, Http2FrameWriter frameWriter) {
return new Http2FrameCodecBuilder(true).frameWriter(frameWriter).build();
}
@Override
protected ChannelHandler newMultiplexer(TestChannelInitializer childChannelInitializer) {
return new Http2MultiplexHandler(childChannelInitializer, null);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
public class Http2MultiplexCodecTransportTest { public class Http2MultiplexTransportTest {
private EventLoopGroup eventLoopGroup; private EventLoopGroup eventLoopGroup;
private Channel clientChannel; private Channel clientChannel;
private Channel serverChannel; private Channel serverChannel;
@ -64,7 +64,18 @@ public class Http2MultiplexCodecTransportTest {
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void asyncSettingsAck() throws InterruptedException { public void asyncSettingsAckWithMultiplexCodec() throws InterruptedException {
asyncSettingsAck0(new Http2MultiplexCodecBuilder(true, new HttpInboundHandler()).build(), null);
}
@Test(timeout = 10000)
public void asyncSettingsAckWithMultiplexHandler() throws InterruptedException {
asyncSettingsAck0(new Http2FrameCodecBuilder(true).build(),
new Http2MultiplexHandler(new HttpInboundHandler()));
}
private void asyncSettingsAck0(final Http2FrameCodec codec, final ChannelHandler multiplexer)
throws InterruptedException {
// The client expects 2 settings frames. One from the connection setup and one from this test. // The client expects 2 settings frames. One from the connection setup and one from this test.
final CountDownLatch serverAckOneLatch = new CountDownLatch(1); final CountDownLatch serverAckOneLatch = new CountDownLatch(1);
final CountDownLatch serverAckAllLatch = new CountDownLatch(2); final CountDownLatch serverAckAllLatch = new CountDownLatch(2);
@ -77,7 +88,10 @@ public class Http2MultiplexCodecTransportTest {
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) { protected void initChannel(Channel ch) {
ch.pipeline().addLast(Http2MultiplexCodecBuilder.forServer(new HttpInboundHandler()).build()); ch.pipeline().addLast(codec);
if (multiplexer != null) {
ch.pipeline().addLast(multiplexer);
}
ch.pipeline().addLast(new ChannelHandler() { ch.pipeline().addLast(new ChannelHandler() {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) { public void channelActive(ChannelHandlerContext ctx) {

View File

@ -36,20 +36,26 @@ public class Http2ServerUpgradeCodecTest {
@Test @Test
public void testUpgradeToHttp2ConnectionHandler() { public void testUpgradeToHttp2ConnectionHandler() {
testUpgrade(new Http2ConnectionHandlerBuilder().frameListener(new Http2FrameAdapter()).build()); testUpgrade(new Http2ConnectionHandlerBuilder().frameListener(new Http2FrameAdapter()).build(), null);
} }
@Test @Test
public void testUpgradeToHttp2FrameCodec() { public void testUpgradeToHttp2FrameCodec() {
testUpgrade(new Http2FrameCodecBuilder(true).build()); testUpgrade(new Http2FrameCodecBuilder(true).build(), null);
} }
@Test @Test
public void testUpgradeToHttp2MultiplexCodec() { public void testUpgradeToHttp2MultiplexCodec() {
testUpgrade(new Http2MultiplexCodecBuilder(true, new HttpInboundHandler()).build()); testUpgrade(new Http2MultiplexCodecBuilder(true, new HttpInboundHandler()).build(), null);
} }
private static void testUpgrade(Http2ConnectionHandler handler) { @Test
public void testUpgradeToHttp2FrameCodecWithMultiplexer() {
testUpgrade(new Http2FrameCodecBuilder(true).build(),
new Http2MultiplexHandler(new HttpInboundHandler()));
}
private static void testUpgrade(Http2ConnectionHandler handler, ChannelHandler multiplexer) {
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "*"); FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "*");
request.headers().set(HttpHeaderNames.HOST, "netty.io"); request.headers().set(HttpHeaderNames.HOST, "netty.io");
request.headers().set(HttpHeaderNames.CONNECTION, "Upgrade, HTTP2-Settings"); request.headers().set(HttpHeaderNames.CONNECTION, "Upgrade, HTTP2-Settings");
@ -58,13 +64,18 @@ public class Http2ServerUpgradeCodecTest {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() { }); EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() { });
ChannelHandlerContext ctx = channel.pipeline().firstContext(); ChannelHandlerContext ctx = channel.pipeline().firstContext();
Http2ServerUpgradeCodec codec = new Http2ServerUpgradeCodec("connectionHandler", handler); Http2ServerUpgradeCodec codec;
if (multiplexer == null) {
codec = new Http2ServerUpgradeCodec(handler);
} else {
codec = new Http2ServerUpgradeCodec((Http2FrameCodec) handler, multiplexer);
}
assertTrue(codec.prepareUpgradeResponse(ctx, request, new DefaultHttpHeaders())); assertTrue(codec.prepareUpgradeResponse(ctx, request, new DefaultHttpHeaders()));
codec.upgradeTo(ctx, request); codec.upgradeTo(ctx, request);
// Flush the channel to ensure we write out all buffered data // Flush the channel to ensure we write out all buffered data
channel.flush(); channel.flush();
assertSame(handler, channel.pipeline().remove("connectionHandler")); assertSame(handler, channel.pipeline().remove(handler.getClass()));
assertNull(channel.pipeline().get(handler.getClass())); assertNull(channel.pipeline().get(handler.getClass()));
assertTrue(channel.finish()); assertTrue(channel.finish());

View File

@ -18,7 +18,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.example.http2.helloworld.server.HelloWorldHttp1Handler; import io.netty.example.http2.helloworld.server.HelloWorldHttp1Handler;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder; import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
@ -37,7 +38,8 @@ public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler {
@Override @Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(Http2MultiplexCodecBuilder.forServer(new HelloWorldHttp2Handler()).build()); ctx.pipeline().addLast(Http2FrameCodecBuilder.forServer().build());
ctx.pipeline().addLast(new Http2MultiplexHandler(new HelloWorldHttp2Handler()));
return; return;
} }

View File

@ -28,8 +28,9 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler; import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory; import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory;
import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder; import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
@ -44,7 +45,8 @@ public class Http2ServerInitializer extends ChannelInitializer<SocketChannel> {
private static final UpgradeCodecFactory upgradeCodecFactory = protocol -> { private static final UpgradeCodecFactory upgradeCodecFactory = protocol -> {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec( return new Http2ServerUpgradeCodec(
Http2MultiplexCodecBuilder.forServer(new HelloWorldHttp2Handler()).build()); Http2FrameCodecBuilder.forServer().build(),
new Http2MultiplexHandler(new HelloWorldHttp2Handler()));
} else { } else {
return null; return null;
} }