HTTP/2: Ensure newStream() is called only once per connection upgrade and the correct handler is used (#9396)
Motivation:
306299323c
introduced some code change to move the responsibility of creating the stream for the upgrade to Http2FrameCodec. Unfortunaly this lead to the situation of having newStream().setStreamAndProperty(...) be called twice. Because of this we only ever saw the channelActive(...) on Http2StreamChannel but no other events as the mapping was replaced on the second newStream().setStreamAndProperty(...) call.
Beside this we also did not use the correct handler for the upgrade stream in some cases
Modifications:
- Just remove the Http2FrameCodec.onHttpClientUpgrade() method and so let the base class handle all of it. The stream is created correctly as part of the ConnectionListener implementation of Http2FrameCodec already.
- Consolidate logic of creating stream channels
- Adjust unit test to capture the bug
Result:
Fixes https://github.com/netty/netty/issues/9395
This commit is contained in:
parent
90b788bef7
commit
d6c05e9b31
@ -232,13 +232,6 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
||||
// sub-class can override this for extra steps that needs to be done when the handler is added.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHttpClientUpgrade() throws Http2Exception {
|
||||
super.onHttpClientUpgrade();
|
||||
// Now make a new Http2FrameStream, set it's underlying Http2Stream, and initialize it.
|
||||
newStream().setStreamAndProperty(streamKey, connection().stream(HTTP_UPGRADE_STREAM_ID));
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
|
||||
* HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
|
||||
|
@ -149,39 +149,35 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
ctx.fireChannelRead(frame);
|
||||
}
|
||||
|
||||
private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
|
||||
assert stream.state() == Http2Stream.State.HALF_CLOSED_LOCAL;
|
||||
AbstractHttp2StreamChannel ch = new Http2MultiplexCodecStreamChannel(stream, null);
|
||||
ch.closeOutbound();
|
||||
|
||||
// Add our upgrade handler to the channel and then register the channel.
|
||||
// The register call fires the channelActive, etc.
|
||||
ch.pipeline().addLast(upgradeStreamHandler);
|
||||
ChannelFuture future = ch.register();
|
||||
if (future.isDone()) {
|
||||
Http2MultiplexHandler.registerDone(future);
|
||||
} else {
|
||||
future.addListener(Http2MultiplexHandler.CHILD_CHANNEL_REGISTRATION_LISTENER);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
|
||||
|
||||
switch (stream.state()) {
|
||||
case HALF_CLOSED_LOCAL:
|
||||
if (stream.id() == HTTP_UPGRADE_STREAM_ID) {
|
||||
onHttp2UpgradeStreamInitialized(ctx, stream);
|
||||
if (stream.id() != HTTP_UPGRADE_STREAM_ID) {
|
||||
// Ignore everything which was not caused by an upgrade
|
||||
break;
|
||||
}
|
||||
break;
|
||||
// fall-through
|
||||
case HALF_CLOSED_REMOTE:
|
||||
// fall-through
|
||||
case OPEN:
|
||||
if (stream.attachment != null) {
|
||||
// ignore if child channel was already created.
|
||||
break;
|
||||
}
|
||||
// fall-trough
|
||||
ChannelFuture future = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler).register();
|
||||
final Http2MultiplexCodecStreamChannel streamChannel;
|
||||
// We need to handle upgrades special when on the client side.
|
||||
if (stream.id() == HTTP_UPGRADE_STREAM_ID && !connection().isServer()) {
|
||||
// Add our upgrade handler to the channel and then register the channel.
|
||||
// The register call fires the channelActive, etc.
|
||||
assert upgradeStreamHandler != null;
|
||||
streamChannel = new Http2MultiplexCodecStreamChannel(stream, upgradeStreamHandler);
|
||||
streamChannel.closeOutbound();
|
||||
} else {
|
||||
streamChannel = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler);
|
||||
}
|
||||
|
||||
ChannelFuture future = streamChannel.register();
|
||||
if (future.isDone()) {
|
||||
Http2MultiplexHandler.registerDone(future);
|
||||
} else {
|
||||
|
@ -209,26 +209,24 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
|
||||
// Ignore everything which was not caused by an upgrade
|
||||
break;
|
||||
}
|
||||
// We must have an upgrade handler or else we can't handle the stream
|
||||
if (upgradeStreamHandler == null) {
|
||||
throw connectionError(INTERNAL_ERROR,
|
||||
"Client is misconfigured for upgrade requests");
|
||||
}
|
||||
// fall-trough
|
||||
// fall-through
|
||||
case HALF_CLOSED_REMOTE:
|
||||
// fall-trough
|
||||
// fall-through
|
||||
case OPEN:
|
||||
if (stream.attachment != null) {
|
||||
// ignore if child channel was already created.
|
||||
break;
|
||||
}
|
||||
final AbstractHttp2StreamChannel ch;
|
||||
if (stream.state() == Http2Stream.State.HALF_CLOSED_LOCAL) {
|
||||
ch = new Http2MultiplexHandlerStreamChannel(stream, null);
|
||||
// We need to handle upgrades special when on the client side.
|
||||
if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
|
||||
// We must have an upgrade handler or else we can't handle the stream
|
||||
if (upgradeStreamHandler == null) {
|
||||
throw connectionError(INTERNAL_ERROR,
|
||||
"Client is misconfigured for upgrade requests");
|
||||
}
|
||||
ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
|
||||
ch.closeOutbound();
|
||||
// Add our upgrade handler to the channel and then register the channel.
|
||||
// The register call fires the channelActive, etc.
|
||||
ch.pipeline().addLast(upgradeStreamHandler);
|
||||
} else {
|
||||
ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
|
||||
}
|
||||
@ -277,9 +275,13 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
|
||||
ctx.fireExceptionCaught(cause);
|
||||
}
|
||||
|
||||
private static boolean isServer(ChannelHandlerContext ctx) {
|
||||
return ctx.channel().parent() instanceof ServerChannel;
|
||||
}
|
||||
|
||||
private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
|
||||
try {
|
||||
final boolean server = ctx.channel().parent() instanceof ServerChannel;
|
||||
final boolean server = isServer(ctx);
|
||||
forEachActiveStream(stream -> {
|
||||
final int streamId = stream.id();
|
||||
if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
|
||||
|
@ -22,21 +22,23 @@ import io.netty.channel.embedded.EmbeddedChannel;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public abstract class Http2MultiplexClientUpgradeTest<C extends Http2FrameCodec> {
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
final class NoopHandler implements ChannelHandler {
|
||||
static final class NoopHandler implements ChannelHandler {
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
ctx.channel().close();
|
||||
}
|
||||
}
|
||||
|
||||
private final class UpgradeHandler implements ChannelHandler {
|
||||
private static final class UpgradeHandler implements ChannelHandler {
|
||||
Http2Stream.State stateOnActive;
|
||||
int streamId;
|
||||
boolean channelInactiveCalled;
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
@ -45,6 +47,12 @@ public abstract class Http2MultiplexClientUpgradeTest<C extends Http2FrameCodec>
|
||||
streamId = ch.stream().id();
|
||||
ctx.fireChannelActive();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
channelInactiveCalled = true;
|
||||
ctx.fireChannelInactive();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract C newCodec(ChannelHandler upgradeHandler);
|
||||
@ -61,8 +69,10 @@ public abstract class Http2MultiplexClientUpgradeTest<C extends Http2FrameCodec>
|
||||
|
||||
assertFalse(upgradeHandler.stateOnActive.localSideOpen());
|
||||
assertTrue(upgradeHandler.stateOnActive.remoteSideOpen());
|
||||
assertEquals(1, upgradeHandler.streamId);
|
||||
assertNotNull(codec.connection().stream(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID).getProperty(codec.streamKey));
|
||||
assertEquals(Http2CodecUtil.HTTP_UPGRADE_STREAM_ID, upgradeHandler.streamId);
|
||||
assertTrue(ch.finishAndReleaseAll());
|
||||
assertTrue(upgradeHandler.channelInactiveCalled);
|
||||
}
|
||||
|
||||
@Test(expected = Http2Exception.class)
|
||||
|
@ -17,6 +17,8 @@ package io.netty.handler.codec.http2;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.DefaultChannelId;
|
||||
import io.netty.channel.ServerChannel;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||
import io.netty.handler.codec.http.DefaultHttpHeaders;
|
||||
@ -31,6 +33,7 @@ import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class Http2ServerUpgradeCodecTest {
|
||||
|
||||
@ -62,7 +65,9 @@ public class Http2ServerUpgradeCodecTest {
|
||||
request.headers().set(HttpHeaderNames.UPGRADE, "h2c");
|
||||
request.headers().set("HTTP2-Settings", "AAMAAABkAAQAAP__");
|
||||
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() { });
|
||||
ServerChannel parent = Mockito.mock(ServerChannel.class);
|
||||
EmbeddedChannel channel = new EmbeddedChannel(parent, DefaultChannelId.newInstance(), true, false,
|
||||
new ChannelHandler() { });
|
||||
ChannelHandlerContext ctx = channel.pipeline().firstContext();
|
||||
Http2ServerUpgradeCodec codec;
|
||||
if (multiplexer == null) {
|
||||
|
Loading…
Reference in New Issue
Block a user