Fix missing trailing data on HTTP client upgrade
Motivation: When HttpClientUpgradeHandler upgrades from HTTP/1 to another protocol, it performs a two-step opertion: 1. Remove the SourceCodec (HttpClientCodec) 2. Add the UpgradeCodec When HttpClientCodec is removed from the pipeline, the decoder being removed triggers channelRead() event with the data left in its cumulation buffer. However, this is not received by the UpgradeCodec becuase it's not added yet. e.g. HTTP/2 SETTINGS frame sent by the server can be missed out. To fix the problem, we need to reverse the steps: 1. Add the UpgradeCodec 2. Remove the SourceCodec However, this does not work as expected either, because UpgradeCodec can send a greeting message such as HTTP/2 Preface. Such a greeting message will be handled by the SourceCodec and will trigger an 'unsupported message type' exception. To fix the problem really, we need to make the upgrade process 3-step: 1. Remove/disable the encoder of SourceCodec 2. Add the UpgradeCodec 3. Remove the SourceCodec Modifications: - Add SourceCodec.prepareUpgradeFrom() so that SourceCodec can remove or disable its encoder - Implement HttpClientCodec.prepareUpgradeFrom() properly - Miscellaneous: - Log the related channel as well When logging the failure to send a GOAWAY Result: Cleartext HTTP/1-to-HTTP/2 upgrade works again.
This commit is contained in:
parent
17df8171b3
commit
4d6ab1d30d
@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
|
|||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.CombinedChannelDuplexHandler;
|
import io.netty.channel.CombinedChannelDuplexHandler;
|
||||||
import io.netty.handler.codec.PrematureChannelClosureException;
|
import io.netty.handler.codec.PrematureChannelClosureException;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -87,6 +88,14 @@ public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResp
|
|||||||
this.failOnMissingResponse = failOnMissingResponse;
|
this.failOnMissingResponse = failOnMissingResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepares to upgrade to another protocol from HTTP. Disables the {@link Encoder}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void prepareUpgradeFrom(ChannelHandlerContext ctx) {
|
||||||
|
((Encoder) outboundHandler()).upgraded = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upgrades to another protocol from HTTP. Removes the {@link Decoder} and {@link Encoder} from
|
* Upgrades to another protocol from HTTP. Removes the {@link Decoder} and {@link Encoder} from
|
||||||
* the pipeline.
|
* the pipeline.
|
||||||
@ -107,9 +116,17 @@ public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResp
|
|||||||
|
|
||||||
private final class Encoder extends HttpRequestEncoder {
|
private final class Encoder extends HttpRequestEncoder {
|
||||||
|
|
||||||
|
boolean upgraded;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void encode(
|
protected void encode(
|
||||||
ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
|
ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
|
||||||
|
|
||||||
|
if (upgraded) {
|
||||||
|
out.add(ReferenceCountUtil.retain(msg));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (msg instanceof HttpRequest && !done) {
|
if (msg instanceof HttpRequest && !done) {
|
||||||
queue.offer(((HttpRequest) msg).method());
|
queue.offer(((HttpRequest) msg).method());
|
||||||
}
|
}
|
||||||
|
@ -62,6 +62,13 @@ public class HttpClientUpgradeHandler extends HttpObjectAggregator implements Ch
|
|||||||
* The source codec that is used in the pipeline initially.
|
* The source codec that is used in the pipeline initially.
|
||||||
*/
|
*/
|
||||||
public interface SourceCodec {
|
public interface SourceCodec {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes or disables the encoder of this codec so that the {@link UpgradeCodec} can send an initial greeting
|
||||||
|
* (if any).
|
||||||
|
*/
|
||||||
|
void prepareUpgradeFrom(ChannelHandlerContext ctx);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes this codec (i.e. all associated handlers) from the pipeline.
|
* Removes this codec (i.e. all associated handlers) from the pipeline.
|
||||||
*/
|
*/
|
||||||
@ -222,8 +229,9 @@ public class HttpClientUpgradeHandler extends HttpObjectAggregator implements Ch
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Upgrade to the new protocol.
|
// Upgrade to the new protocol.
|
||||||
sourceCodec.upgradeFrom(ctx);
|
sourceCodec.prepareUpgradeFrom(ctx);
|
||||||
upgradeCodec.upgradeTo(ctx, response);
|
upgradeCodec.upgradeTo(ctx, response);
|
||||||
|
sourceCodec.upgradeFrom(ctx);
|
||||||
|
|
||||||
// Notify that the upgrade to the new protocol completed successfully.
|
// Notify that the upgrade to the new protocol completed successfully.
|
||||||
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL);
|
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL);
|
||||||
|
@ -47,7 +47,6 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
|
|||||||
import static io.netty.util.CharsetUtil.UTF_8;
|
import static io.netty.util.CharsetUtil.UTF_8;
|
||||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
import static java.lang.Math.min;
|
import static java.lang.Math.min;
|
||||||
import static java.lang.String.format;
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -724,20 +723,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
if (errorCode != NO_ERROR.code()) {
|
if (errorCode != NO_ERROR.code()) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug(
|
logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
|
||||||
format("Sent GOAWAY: lastStreamId '%d', errorCode '%d', " +
|
"debugData '{}'. Forcing shutdown of the connection.",
|
||||||
"debugData '%s'. Forcing shutdown of the connection.",
|
ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
|
||||||
lastStreamId, errorCode, debugData.toString(UTF_8)),
|
|
||||||
future.cause());
|
|
||||||
}
|
}
|
||||||
ctx.close();
|
ctx.close();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
logger.error(
|
logger.error("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
|
||||||
format("Sending GOAWAY failed: lastStreamId '%d', errorCode '%d', " +
|
"debugData '{}'. Forcing shutdown of the connection.",
|
||||||
"debugData '%s'. Forcing shutdown of the connection.",
|
ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
|
||||||
lastStreamId, errorCode, debugData.toString(UTF_8)), future.cause());
|
|
||||||
}
|
}
|
||||||
ctx.close();
|
ctx.close();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user