codec-http: HttpClientUpgradeHandler can handle streamed responses

Motivation:

We want to reject the upgrade as quickly as possible, so that we can
support streamed responses.

Modifications:

Reject the upgrade as soon as we inspect the headers if they're wrong,
instead of waiting for the entire response body.

Result:

If a remote server doesn't know how to use the http upgrade and tries to
responsd with a streaming response that never ends, the client doesn't
buffer forever, but can instead pass it along.  Fixes #5954
This commit is contained in:
Moses Nakamura 2016-10-30 20:51:55 -07:00 committed by Norman Maurer
parent b2379e62f4
commit bff951ca07
2 changed files with 192 additions and 10 deletions

View File

@ -195,6 +195,20 @@ public class HttpClientUpgradeHandler extends HttpObjectAggregator implements Ch
throw new IllegalStateException("Read HTTP response without requesting protocol switch"); throw new IllegalStateException("Read HTTP response without requesting protocol switch");
} }
if (msg instanceof HttpResponse) {
HttpResponse rep = (HttpResponse) msg;
if (!SWITCHING_PROTOCOLS.equals(rep.status())) {
// The server does not support the requested protocol, just remove this handler
// and continue processing HTTP.
// NOTE: not releasing the response since we're letting it propagate to the
// next handler.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED);
removeThisHandler(ctx);
ctx.fireChannelRead(msg);
return;
}
}
if (msg instanceof FullHttpResponse) { if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse) msg; response = (FullHttpResponse) msg;
// Need to retain since the base class will release after returning from this method. // Need to retain since the base class will release after returning from this method.
@ -212,16 +226,6 @@ public class HttpClientUpgradeHandler extends HttpObjectAggregator implements Ch
response = (FullHttpResponse) out.get(0); response = (FullHttpResponse) out.get(0);
} }
if (!SWITCHING_PROTOCOLS.equals(response.status())) {
// The server does not support the requested protocol, just remove this handler
// and continue processing HTTP.
// NOTE: not releasing the response since we're letting it propagate to the
// next handler.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED);
removeThisHandler(ctx);
return;
}
CharSequence upgradeHeader = response.headers().get(HttpHeaderNames.UPGRADE); CharSequence upgradeHeader = response.headers().get(HttpHeaderNames.UPGRADE);
if (upgradeHeader != null && !AsciiString.contentEqualsIgnoreCase(upgradeCodec.protocol(), upgradeHeader)) { if (upgradeHeader != null && !AsciiString.contentEqualsIgnoreCase(upgradeCodec.protocol(), upgradeHeader)) {
throw new IllegalStateException( throw new IllegalStateException(

View File

@ -0,0 +1,178 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class HttpClientUpgradeHandlerTest {
private static final class FakeSourceCodec implements HttpClientUpgradeHandler.SourceCodec {
@Override
public void prepareUpgradeFrom(ChannelHandlerContext ctx) {
}
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
}
}
private static final class FakeUpgradeCodec implements HttpClientUpgradeHandler.UpgradeCodec {
@Override
public CharSequence protocol() {
return "fancyhttp";
}
@Override
public Collection<CharSequence> setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest) {
return Collections.emptyList();
}
@Override
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {
}
}
private static final class UserEventCatcher extends ChannelInboundHandlerAdapter {
private Object evt;
public Object getUserEvent() {
return evt;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
this.evt = evt;
}
}
@Test
public void testSuccessfulUpgrade() {
HttpClientUpgradeHandler.SourceCodec sourceCodec = new FakeSourceCodec();
HttpClientUpgradeHandler.UpgradeCodec upgradeCodec = new FakeUpgradeCodec();
HttpClientUpgradeHandler handler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 1024);
UserEventCatcher catcher = new UserEventCatcher();
EmbeddedChannel channel = new EmbeddedChannel(catcher);
channel.pipeline().addFirst("upgrade", handler);
assertTrue(
channel.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "netty.io")));
FullHttpRequest request = channel.readOutbound();
assertEquals(request.headers().size(), 2);
assertTrue(request.headers().contains(HttpHeaderNames.UPGRADE, "fancyhttp", false));
assertTrue(request.headers().contains("connection", "upgrade", false));
assertTrue(request.release());
assertEquals(catcher.getUserEvent(), HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_ISSUED);
HttpResponse upgradeResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
upgradeResponse.headers().add(HttpHeaderNames.UPGRADE, "fancyhttp");
assertFalse(channel.writeInbound(upgradeResponse));
assertFalse(channel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT));
assertEquals(catcher.getUserEvent(), HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL);
assertNull(channel.pipeline().get("upgrade"));
assertTrue(channel.writeInbound(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
FullHttpResponse response = channel.readInbound();
assertEquals(response.status(), HttpResponseStatus.OK);
assertTrue(response.release());
assertFalse(channel.finish());
}
@Test
public void testUpgradeRejected() {
HttpClientUpgradeHandler.SourceCodec sourceCodec = new FakeSourceCodec();
HttpClientUpgradeHandler.UpgradeCodec upgradeCodec = new FakeUpgradeCodec();
HttpClientUpgradeHandler handler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 1024);
UserEventCatcher catcher = new UserEventCatcher();
EmbeddedChannel channel = new EmbeddedChannel(catcher);
channel.pipeline().addFirst("upgrade", handler);
assertTrue(
channel.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "netty.io")));
FullHttpRequest request = channel.readOutbound();
assertEquals(request.headers().size(), 2);
assertTrue(request.headers().contains(HttpHeaderNames.UPGRADE, "fancyhttp", false));
assertTrue(request.headers().contains("connection", "upgrade", false));
assertTrue(request.release());
assertEquals(catcher.getUserEvent(), HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_ISSUED);
HttpResponse upgradeResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
upgradeResponse.headers().add(HttpHeaderNames.UPGRADE, "fancyhttp");
assertTrue(channel.writeInbound(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
assertTrue(channel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT));
assertEquals(catcher.getUserEvent(), HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED);
assertNull(channel.pipeline().get("upgrade"));
HttpResponse response = channel.readInbound();
assertEquals(response.status(), HttpResponseStatus.OK);
LastHttpContent last = channel.readInbound();
assertEquals(last, LastHttpContent.EMPTY_LAST_CONTENT);
assertFalse(last.release());
assertFalse(channel.finish());
}
@Test
public void testEarlyBailout() {
HttpClientUpgradeHandler.SourceCodec sourceCodec = new FakeSourceCodec();
HttpClientUpgradeHandler.UpgradeCodec upgradeCodec = new FakeUpgradeCodec();
HttpClientUpgradeHandler handler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 1024);
UserEventCatcher catcher = new UserEventCatcher();
EmbeddedChannel channel = new EmbeddedChannel(catcher);
channel.pipeline().addFirst("upgrade", handler);
assertTrue(
channel.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "netty.io")));
FullHttpRequest request = channel.readOutbound();
assertEquals(request.headers().size(), 2);
assertTrue(request.headers().contains(HttpHeaderNames.UPGRADE, "fancyhttp", false));
assertTrue(request.headers().contains("connection", "upgrade", false));
assertTrue(request.release());
assertEquals(catcher.getUserEvent(), HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_ISSUED);
HttpResponse upgradeResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
upgradeResponse.headers().add(HttpHeaderNames.UPGRADE, "fancyhttp");
assertTrue(channel.writeInbound(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
assertEquals(catcher.getUserEvent(), HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED);
assertNull(channel.pipeline().get("upgrade"));
HttpResponse response = channel.readInbound();
assertEquals(response.status(), HttpResponseStatus.OK);
assertFalse(channel.finish());
}
}