Allow HTTP decoding post CONNECT in HttpClientCode

__Motivation__

`HttpClientCodec` skips HTTP decoding on the connection after a successful HTTP CONNECT response is received.
 This behavior follows the spec for a client but pragmatically, if one creates a client to use a proxy transparently, the codec becomes useless after HTTP CONNECT.
 Ideally, one should be able to configure whether HTTP CONNECT should result in pass-through or not. This will enable client writers to continue using HTTP decoding even after HTTP CONNECT.

 __Modification__

 Added overloaded constructors to accept `parseHttpPostConnect`. If this parameter is `true` then the codec continues decoding even after a successful HTTP CONNECT.

 Also fixed a bug in the codec that was incrementing request count post HTTP CONNECT but not decrementing it on response. Now, the request count is only incremented if the codec is not `done`.

 __Result__

 Easier usage by HTTP client writers who wants to connect to a proxy but still decode HTTP for their users for subsequent requests.
This commit is contained in:
Nitesh Kant 2017-05-08 13:45:13 -07:00 committed by Norman Maurer
parent dd837fe803
commit a093b89bfe
2 changed files with 112 additions and 38 deletions

View File

@ -47,6 +47,7 @@ public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResp
/** A queue that is used for correlating a request and a response. */
private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
private final boolean parseHttpAfterConnectRequest;
/** If true, decoding stops (i.e. pass-through) */
private boolean done;
@ -84,8 +85,18 @@ public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResp
public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
boolean validateHeaders) {
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders, false);
}
/**
* Creates a new instance with the specified decoder options.
*/
public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
boolean validateHeaders, boolean parseHttpAfterConnectRequest) {
init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new Encoder());
this.failOnMissingResponse = failOnMissingResponse;
this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
}
/**
@ -94,8 +105,19 @@ public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResp
public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
boolean validateHeaders, int initialBufferSize) {
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders,
initialBufferSize, false);
}
/**
* Creates a new instance with the specified decoder options.
*/
public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest) {
init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize),
new Encoder());
this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
this.failOnMissingResponse = failOnMissingResponse;
}
@ -144,7 +166,7 @@ public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResp
super.encode(ctx, msg, out);
if (failOnMissingResponse) {
if (failOnMissingResponse && !done) {
// check if the request is chunked if so do not increment
if (msg instanceof LastHttpContent) {
// increment as its the last chunk
@ -238,9 +260,12 @@ public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResp
// Successful CONNECT request results in a response with empty body.
if (statusCode == 200) {
if (HttpMethod.CONNECT.equals(method)) {
// Proxy connection established - Not HTTP anymore.
done = true;
queue.clear();
// Proxy connection established - Parse HTTP only if configured by parseHttpAfterConnectRequest,
// else pass through.
if (!parseHttpAfterConnectRequest) {
done = true;
queue.clear();
}
return true;
}
}

View File

@ -43,6 +43,8 @@ import java.util.concurrent.CountDownLatch;
import static io.netty.util.ReferenceCountUtil.release;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@ -51,6 +53,7 @@ import static org.junit.Assert.fail;
public class HttpClientCodecTest {
private static final String EMPTY_RESPONSE = "HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n";
private static final String RESPONSE = "HTTP/1.0 200 OK\r\n" + "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n" +
"Content-Type: text/html\r\n" + "Content-Length: 28\r\n" + "\r\n"
+ "<html><body></body></html>\r\n";
@ -60,28 +63,12 @@ public class HttpClientCodecTest {
private static final String CHUNKED_RESPONSE = INCOMPLETE_CHUNKED_RESPONSE + "\r\n";
@Test
public void testFailsNotOnRequestResponse() {
public void testConnectWithResponseContent() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EmbeddedChannel ch = new EmbeddedChannel(codec);
ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
ch.writeInbound(Unpooled.copiedBuffer(RESPONSE, CharsetUtil.ISO_8859_1));
sendRequestAndReadResponse(ch, HttpMethod.CONNECT, RESPONSE);
ch.finish();
for (;;) {
Object msg = ch.readOutbound();
if (msg == null) {
break;
}
release(msg);
}
for (;;) {
Object msg = ch.readInbound();
if (msg == null) {
break;
}
release(msg);
}
}
@Test
@ -89,23 +76,8 @@ public class HttpClientCodecTest {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EmbeddedChannel ch = new EmbeddedChannel(codec);
ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
ch.writeInbound(Unpooled.copiedBuffer(CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1));
sendRequestAndReadResponse(ch, HttpMethod.GET, CHUNKED_RESPONSE);
ch.finish();
for (;;) {
Object msg = ch.readOutbound();
if (msg == null) {
break;
}
release(msg);
}
for (;;) {
Object msg = ch.readInbound();
if (msg == null) {
break;
}
release(msg);
}
}
@Test
@ -233,4 +205,81 @@ public class HttpClientCodecTest {
cb.config().group().shutdownGracefully();
}
}
@Test
public void testContinueParsingAfterConnect() throws Exception {
testAfterConnect(true);
}
@Test
public void testPassThroughAfterConnect() throws Exception {
testAfterConnect(false);
}
private static void testAfterConnect(final boolean parseAfterConnect) throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpClientCodec(4096, 8192, 8192, true, true, parseAfterConnect));
Consumer connectResponseConsumer = new Consumer();
sendRequestAndReadResponse(ch, HttpMethod.CONNECT, EMPTY_RESPONSE, connectResponseConsumer);
assertTrue("No connect response messages received.", connectResponseConsumer.getReceivedCount() > 0);
Consumer responseConsumer = new Consumer() {
@Override
void accept(Object object) {
if (parseAfterConnect) {
assertThat("Unexpected response message type.", object, instanceOf(HttpObject.class));
} else {
assertThat("Unexpected response message type.", object, not(instanceOf(HttpObject.class)));
}
}
};
sendRequestAndReadResponse(ch, HttpMethod.GET, RESPONSE, responseConsumer);
assertTrue("No response messages received.", responseConsumer.getReceivedCount() > 0);
assertFalse("Channel finish failed.", ch.finish());
}
private static void sendRequestAndReadResponse(EmbeddedChannel ch, HttpMethod httpMethod, String response) {
sendRequestAndReadResponse(ch, httpMethod, response, new Consumer());
}
private static void sendRequestAndReadResponse(EmbeddedChannel ch, HttpMethod httpMethod, String response,
Consumer responseConsumer) {
assertTrue("Channel outbound write failed.",
ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, "http://localhost/")));
assertTrue("Channel inbound write failed.",
ch.writeInbound(Unpooled.copiedBuffer(response, CharsetUtil.ISO_8859_1)));
for (;;) {
Object msg = ch.readOutbound();
if (msg == null) {
break;
}
release(msg);
}
for (;;) {
Object msg = ch.readInbound();
if (msg == null) {
break;
}
responseConsumer.onResponse(msg);
release(msg);
}
}
private static class Consumer {
private int receivedCount;
final void onResponse(Object object) {
receivedCount++;
accept(object);
}
void accept(Object object) {
// Default noop.
}
int getReceivedCount() {
return receivedCount;
}
}
}