Add HttpServerKeepAliveHandler

Motivation:

As discussed in #5738, developers need to concern themselves with setting
connection: keep-alive on the response as well as whether to close a
connection or not after writing a response.  This leads to special keep-alive
handling logic in many different places.  The purpose of the HttpServerKeepAliveHandler
is to allow developers to add this handler to their pipeline and therefore
free themselves of having to worry about the details of how Keep-Alive works.

Modifications:

Added HttpServerKeepAliveHandler to the io.netty.handler.codec.http package.

Result:

Developers can start using HttpServerKeepAliveHandler in their pipeline instead
of worrying about when to close a connection for keep-alive.
This commit is contained in:
Christopher O'Toole 2016-09-01 11:07:58 -05:00 committed by Norman Maurer
parent da8734a6f9
commit c57d4bed91
2 changed files with 315 additions and 0 deletions

View File

@ -0,0 +1,127 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import static io.netty.handler.codec.http.HttpUtil.*;
/**
* HttpServerKeepAliveHandler helps close persistent connections when appropriate.
* <p>
* The server channel is expected to set the proper 'Connection' header if it can handle persistent connections. {@link
* HttpServerKeepAliveHandler} will automatically close the channel for any LastHttpContent that corresponds to a client
* request for closing the connection, or if the HttpResponse associated with that LastHttpContent requested closing the
* connection or didn't have a self defined message length.
* <p>
* Since {@link HttpServerKeepAliveHandler} expects {@link HttpObject}s it should be added after {@link HttpServerCodec}
* but before any other handlers that might send a {@link HttpResponse}. <blockquote>
* <pre>
* {@link ChannelPipeline} p = ...;
* ...
* p.addLast("serverCodec", new {@link HttpServerCodec}());
* p.addLast("httpKeepAlive", <b>new {@link HttpServerKeepAliveHandler}()</b>);
* p.addLast("aggregator", new {@link HttpObjectAggregator}(1048576));
* ...
* p.addLast("handler", new HttpRequestHandler());
* </pre>
* </blockquote>
*/
public class HttpServerKeepAliveHandler extends ChannelDuplexHandler {
private static final String MULTIPART_PREFIX = "multipart";
private boolean persistentConnection = true;
// Track pending responses to support client pipelining: https://tools.ietf.org/html/rfc7230#section-6.3.2
private int pendingResponses;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// read message and track if it was keepAlive
if (msg instanceof HttpRequest) {
final HttpRequest request = (HttpRequest) msg;
if (persistentConnection) {
pendingResponses += 1;
persistentConnection = isKeepAlive(request);
}
}
super.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// modify message on way out to add headers if needed
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
trackResponse(response);
// Assume the response writer knows if they can persist or not and sets isKeepAlive on the response
if (!isKeepAlive(response) || !isSelfDefinedMessageLength(response)) {
// No longer keep alive as the client can't tell when the message is done unless we close connection
pendingResponses = 0;
persistentConnection = false;
}
// Server might think it can keep connection alive, but we should fix response header if we know better
if (!shouldKeepAlive()) {
setKeepAlive(response, false);
}
}
if (msg instanceof LastHttpContent && !shouldKeepAlive()) {
promise.addListener(ChannelFutureListener.CLOSE);
}
super.write(ctx, msg, promise);
}
private void trackResponse(HttpResponse response) {
if (!isInformational(response)) {
pendingResponses -= 1;
}
}
private boolean shouldKeepAlive() {
return pendingResponses != 0 || persistentConnection;
}
/**
* Keep-alive only works if the client can detect when the message has ended without relying on the connection being
* closed.
* <p>
* <ul>
* <li>See <a href="https://tools.ietf.org/html/rfc7230#section-6.3"/></li>
* <li>See <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3"/></li>
* </ul>
*
* @param response The HttpResponse to check
*
* @return true if the response has a self defined message length.
*/
private static boolean isSelfDefinedMessageLength(HttpResponse response) {
return isContentLengthSet(response) || isTransferEncodingChunked(response) || isMultipart(response) ||
isInformational(response);
}
private static boolean isInformational(HttpResponse response) {
return response.status().codeClass() == HttpStatusClass.INFORMATIONAL;
}
private static boolean isMultipart(HttpResponse response) {
String contentType = response.headers().get(HttpHeaderNames.CONTENT_TYPE);
return contentType != null &&
contentType.regionMatches(true, 0, MULTIPART_PREFIX, 0, MULTIPART_PREFIX.length());
}
}

View File

@ -0,0 +1,188 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.StringUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.Arrays;
import java.util.Collection;
import static io.netty.handler.codec.http.HttpHeaderValues.*;
import static io.netty.handler.codec.http.HttpUtil.*;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class HttpServerKeepAliveHandlerTest {
private static final String REQUEST_KEEP_ALIVE = "REQUEST_KEEP_ALIVE";
private static final int NOT_SELF_DEFINED_MSG_LENGTH = 0;
private static final int SET_RESPONSE_LENGTH = 1;
private static final int SET_MULTIPART = 2;
private static final int SET_CHUNKED = 4;
private final boolean isKeepAliveResponseExpected;
private final HttpVersion httpVersion;
private final String sendKeepAlive;
private final int setSelfDefinedMessageLength;
private final String setResponseConnection;
private EmbeddedChannel channel;
@Parameters
public static Collection<Object[]> keepAliveProvider() {
return Arrays.asList(new Object[][] {
{ true, HttpVersion.HTTP_1_0, REQUEST_KEEP_ALIVE, SET_RESPONSE_LENGTH, KEEP_ALIVE }, // 0
{ true, HttpVersion.HTTP_1_0, REQUEST_KEEP_ALIVE, SET_MULTIPART, KEEP_ALIVE }, // 1
{ false, HttpVersion.HTTP_1_0, null, SET_RESPONSE_LENGTH, null }, // 2
{ true, HttpVersion.HTTP_1_1, REQUEST_KEEP_ALIVE, SET_RESPONSE_LENGTH, null }, // 3
{ false, HttpVersion.HTTP_1_1, REQUEST_KEEP_ALIVE, SET_RESPONSE_LENGTH, CLOSE }, // 4
{ true, HttpVersion.HTTP_1_1, REQUEST_KEEP_ALIVE, SET_MULTIPART, null }, // 5
{ true, HttpVersion.HTTP_1_1, REQUEST_KEEP_ALIVE, SET_CHUNKED, null }, // 6
{ false, HttpVersion.HTTP_1_1, null, SET_RESPONSE_LENGTH, null }, // 7
{ false, HttpVersion.HTTP_1_0, REQUEST_KEEP_ALIVE, NOT_SELF_DEFINED_MSG_LENGTH, null }, // 8
{ false, HttpVersion.HTTP_1_0, null, NOT_SELF_DEFINED_MSG_LENGTH, null }, // 9
{ false, HttpVersion.HTTP_1_1, REQUEST_KEEP_ALIVE, NOT_SELF_DEFINED_MSG_LENGTH, null }, // 10
{ false, HttpVersion.HTTP_1_1, null, NOT_SELF_DEFINED_MSG_LENGTH, null }, // 11
{ false, HttpVersion.HTTP_1_0, REQUEST_KEEP_ALIVE, SET_RESPONSE_LENGTH, null }, // 12
});
}
public HttpServerKeepAliveHandlerTest(boolean isKeepAliveResponseExpected, HttpVersion httpVersion,
String sendKeepAlive,
int setSelfDefinedMessageLength, CharSequence setResponseConnection) {
this.isKeepAliveResponseExpected = isKeepAliveResponseExpected;
this.httpVersion = httpVersion;
this.sendKeepAlive = sendKeepAlive;
this.setSelfDefinedMessageLength = setSelfDefinedMessageLength;
this.setResponseConnection = setResponseConnection == null? null : setResponseConnection.toString();
}
@Before
public void setUp() {
channel = new EmbeddedChannel(new HttpServerKeepAliveHandler());
}
@Test
public void test_KeepAlive() throws Exception {
FullHttpRequest request = new DefaultFullHttpRequest(httpVersion, HttpMethod.GET, "/v1/foo/bar");
setKeepAlive(request, REQUEST_KEEP_ALIVE.equals(sendKeepAlive));
HttpResponse response = new DefaultFullHttpResponse(httpVersion, HttpResponseStatus.OK);
if (!StringUtil.isNullOrEmpty(setResponseConnection)) {
response.headers().set(HttpHeaderNames.CONNECTION, setResponseConnection);
}
setupMessageLength(response);
assertTrue(channel.writeInbound(request));
Object requestForwarded = channel.readInbound();
assertEquals(request, requestForwarded);
ReferenceCountUtil.release(requestForwarded);
channel.writeAndFlush(response);
HttpResponse writtenResponse = channel.readOutbound();
assertEquals("channel.isOpen", isKeepAliveResponseExpected, channel.isOpen());
assertEquals("response keep-alive", isKeepAliveResponseExpected, isKeepAlive(writtenResponse));
ReferenceCountUtil.release(writtenResponse);
assertFalse(channel.finishAndReleaseAll());
}
@Test
public void test_PipelineKeepAlive() {
FullHttpRequest firstRequest = new DefaultFullHttpRequest(httpVersion, HttpMethod.GET, "/v1/foo/bar");
setKeepAlive(firstRequest, true);
FullHttpRequest secondRequest = new DefaultFullHttpRequest(httpVersion, HttpMethod.GET, "/v1/foo/bar");
setKeepAlive(secondRequest, REQUEST_KEEP_ALIVE.equals(sendKeepAlive));
FullHttpRequest finalRequest = new DefaultFullHttpRequest(httpVersion, HttpMethod.GET, "/v1/foo/bar");
setKeepAlive(finalRequest, false);
FullHttpResponse response = new DefaultFullHttpResponse(httpVersion, HttpResponseStatus.OK);
FullHttpResponse informationalResp = new DefaultFullHttpResponse(httpVersion, HttpResponseStatus.PROCESSING);
setKeepAlive(response, true);
setContentLength(response, 0);
setKeepAlive(informationalResp, true);
assertTrue(channel.writeInbound(firstRequest, secondRequest, finalRequest));
Object requestForwarded = channel.readInbound();
assertEquals(firstRequest, requestForwarded);
ReferenceCountUtil.release(requestForwarded);
channel.writeAndFlush(response.retainedDuplicate());
HttpResponse firstResponse = channel.readOutbound();
assertTrue("channel.isOpen", channel.isOpen());
assertTrue("response keep-alive", isKeepAlive(firstResponse));
ReferenceCountUtil.release(firstResponse);
requestForwarded = channel.readInbound();
assertEquals(secondRequest, requestForwarded);
ReferenceCountUtil.release(requestForwarded);
channel.writeAndFlush(informationalResp);
HttpResponse writtenInfoResp = channel.readOutbound();
assertTrue("channel.isOpen", channel.isOpen());
assertTrue("response keep-alive", isKeepAlive(writtenInfoResp));
ReferenceCountUtil.release(writtenInfoResp);
if (!StringUtil.isNullOrEmpty(setResponseConnection)) {
response.headers().set(HttpHeaderNames.CONNECTION, setResponseConnection);
} else {
response.headers().remove(HttpHeaderNames.CONNECTION);
}
setupMessageLength(response);
channel.writeAndFlush(response.retainedDuplicate());
HttpResponse secondResponse = channel.readOutbound();
assertEquals("channel.isOpen", isKeepAliveResponseExpected, channel.isOpen());
assertEquals("response keep-alive", isKeepAliveResponseExpected, isKeepAlive(secondResponse));
ReferenceCountUtil.release(secondResponse);
requestForwarded = channel.readInbound();
assertEquals(finalRequest, requestForwarded);
ReferenceCountUtil.release(requestForwarded);
if (isKeepAliveResponseExpected) {
channel.writeAndFlush(response);
HttpResponse finalResponse = channel.readOutbound();
assertFalse("channel.isOpen", channel.isOpen());
assertFalse("response keep-alive", isKeepAlive(finalResponse));
}
ReferenceCountUtil.release(response);
assertFalse(channel.finishAndReleaseAll());
}
private void setupMessageLength(HttpResponse response) {
switch (setSelfDefinedMessageLength) {
case NOT_SELF_DEFINED_MSG_LENGTH:
if (isContentLengthSet(response)) {
response.headers().remove(HttpHeaderNames.CONTENT_LENGTH);
}
break;
case SET_RESPONSE_LENGTH:
setContentLength(response, 0);
break;
case SET_CHUNKED:
setTransferEncodingChunked(response, true);
break;
case SET_MULTIPART:
response.headers().set(HttpHeaderNames.CONTENT_TYPE, MULTIPART_MIXED.toUpperCase());
break;
default:
throw new IllegalArgumentException("selfDefinedMessageLength: " + setSelfDefinedMessageLength);
}
}
}