Use ByteBufAllocator to allocate ByteBuf for FullHttpMessage Motivation: When converting SPDY or HTTP/2 frames to HTTP/1.x, netty always used an unpooled heap ByteBuf.

Modifications:
When constructing the FullHttpMessage pass in the ByteBuf to use via the ByteBufAllocator assigned via the context.

Result:
The ByteBuf assigned to the FullHttpMessage can now be configured as a pooled/unpooled, direct/heap based ByteBuf via the ByteBufAllocator used.
This commit is contained in:
Brendt Lucas 2016-02-13 12:30:48 +00:00 committed by Norman Maurer
parent 839e2ca508
commit 41d0a81691
4 changed files with 89 additions and 49 deletions

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
@ -30,6 +31,7 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.spdy.SpdyHttpHeaders.Names;
import io.netty.util.ReferenceCountUtil;
import java.util.HashMap;
import java.util.List;
@ -111,6 +113,16 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
this.validateHeaders = validateHeaders;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Release any outstanding messages from the map
for (Map.Entry<Integer, FullHttpMessage> entry : messageMap.entrySet()) {
ReferenceCountUtil.safeRelease(entry.getValue());
}
messageMap.clear();
super.channelInactive(ctx);
}
protected FullHttpMessage putMessage(int streamId, FullHttpMessage message) {
return messageMap.put(streamId, message);
}
@ -165,7 +177,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
}
try {
FullHttpRequest httpRequestWithEntity = createHttpRequest(spdyVersion, spdySynStreamFrame);
FullHttpRequest httpRequestWithEntity = createHttpRequest(spdySynStreamFrame, ctx.alloc());
// Set the Stream-ID, Associated-To-Stream-ID, iand Priority as headers
httpRequestWithEntity.headers().setInt(Names.STREAM_ID, streamId);
@ -174,7 +186,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
out.add(httpRequestWithEntity);
} catch (Exception ignored) {
} catch (Throwable ignored) {
SpdyRstStreamFrame spdyRstStreamFrame =
new DefaultSpdyRstStreamFrame(streamId, SpdyStreamStatus.PROTOCOL_ERROR);
ctx.writeAndFlush(spdyRstStreamFrame);
@ -195,7 +207,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
}
try {
FullHttpRequest httpRequestWithEntity = createHttpRequest(spdyVersion, spdySynStreamFrame);
FullHttpRequest httpRequestWithEntity = createHttpRequest(spdySynStreamFrame, ctx.alloc());
// Set the Stream-ID as a header
httpRequestWithEntity.headers().setInt(Names.STREAM_ID, streamId);
@ -206,7 +218,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
// Request body will follow in a series of Data Frames
putMessage(streamId, httpRequestWithEntity);
}
} catch (Exception e) {
} catch (Throwable t) {
// If a client sends a SYN_STREAM without all of the getMethod, url (host and path),
// scheme, and version headers the server must reply with a HTTP 400 BAD REQUEST reply.
// Also sends HTTP 400 BAD REQUEST reply if header name/value pairs are invalid
@ -234,7 +246,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
}
try {
FullHttpResponse httpResponseWithEntity = createHttpResponse(ctx, spdySynReplyFrame, validateHeaders);
FullHttpResponse httpResponseWithEntity =
createHttpResponse(spdySynReplyFrame, ctx.alloc(), validateHeaders);
// Set the Stream-ID as a header
httpResponseWithEntity.headers().setInt(Names.STREAM_ID, streamId);
@ -246,7 +259,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
// Response body will follow in a series of Data Frames
putMessage(streamId, httpResponseWithEntity);
}
} catch (Exception e) {
} catch (Throwable t) {
// If a client receives a SYN_REPLY without valid getStatus and version headers
// the client must reply with a RST_STREAM frame indicating a PROTOCOL_ERROR
SpdyRstStreamFrame spdyRstStreamFrame =
@ -274,7 +287,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
}
try {
fullHttpMessage = createHttpResponse(ctx, spdyHeadersFrame, validateHeaders);
fullHttpMessage = createHttpResponse(spdyHeadersFrame, ctx.alloc(), validateHeaders);
// Set the Stream-ID as a header
fullHttpMessage.headers().setInt(Names.STREAM_ID, streamId);
@ -286,7 +299,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
// Response body will follow in a series of Data Frames
putMessage(streamId, fullHttpMessage);
}
} catch (Exception e) {
} catch (Throwable t) {
// If a client receives a SYN_REPLY without valid getStatus and version headers
// the client must reply with a RST_STREAM frame indicating a PROTOCOL_ERROR
SpdyRstStreamFrame spdyRstStreamFrame =
@ -346,8 +359,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
}
}
private static FullHttpRequest createHttpRequest(int spdyVersion, SpdyHeadersFrame requestFrame)
throws Exception {
private static FullHttpRequest createHttpRequest(SpdyHeadersFrame requestFrame, ByteBufAllocator alloc)
throws Exception {
// Create the first line of the request from the name/value pairs
SpdyHeaders headers = requestFrame.headers();
HttpMethod method = HttpMethod.valueOf(headers.getAsString(METHOD));
@ -357,30 +370,38 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
headers.remove(PATH);
headers.remove(VERSION);
FullHttpRequest req = new DefaultFullHttpRequest(httpVersion, method, url);
boolean release = true;
ByteBuf buffer = alloc.buffer();
try {
FullHttpRequest req = new DefaultFullHttpRequest(httpVersion, method, url, buffer);
// Remove the scheme header
headers.remove(SCHEME);
// Remove the scheme header
headers.remove(SCHEME);
// Replace the SPDY host header with the HTTP host header
CharSequence host = headers.get(HOST);
headers.remove(HOST);
req.headers().set(HttpHeaderNames.HOST, host);
// Replace the SPDY host header with the HTTP host header
CharSequence host = headers.get(HOST);
headers.remove(HOST);
req.headers().set(HttpHeaderNames.HOST, host);
for (Map.Entry<CharSequence, CharSequence> e: requestFrame.headers()) {
req.headers().add(e.getKey(), e.getValue());
for (Map.Entry<CharSequence, CharSequence> e : requestFrame.headers()) {
req.headers().add(e.getKey(), e.getValue());
}
// The Connection and Keep-Alive headers are no longer valid
HttpUtil.setKeepAlive(req, true);
// Transfer-Encoding header is not valid
req.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
release = false;
return req;
} finally {
if (release) {
buffer.release();
}
}
// The Connection and Keep-Alive headers are no longer valid
HttpUtil.setKeepAlive(req, true);
// Transfer-Encoding header is not valid
req.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
return req;
}
private static FullHttpResponse createHttpResponse(ChannelHandlerContext ctx, SpdyHeadersFrame responseFrame,
private static FullHttpResponse createHttpResponse(SpdyHeadersFrame responseFrame, ByteBufAllocator alloc,
boolean validateHeaders) throws Exception {
// Create the first line of the response from the name/value pairs
@ -390,18 +411,27 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
headers.remove(STATUS);
headers.remove(VERSION);
FullHttpResponse res = new DefaultFullHttpResponse(version, status, ctx.alloc().buffer(), validateHeaders);
for (Map.Entry<CharSequence, CharSequence> e: responseFrame.headers()) {
res.headers().add(e.getKey(), e.getValue());
boolean release = true;
ByteBuf buffer = alloc.buffer();
try {
FullHttpResponse res = new DefaultFullHttpResponse(version, status, buffer, validateHeaders);
for (Map.Entry<CharSequence, CharSequence> e: responseFrame.headers()) {
res.headers().add(e.getKey(), e.getValue());
}
// The Connection and Keep-Alive headers are no longer valid
HttpUtil.setKeepAlive(res, true);
// Transfer-Encoding header is not valid
res.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
res.headers().remove(HttpHeaderNames.TRAILER);
release = false;
return res;
} finally {
if (release) {
buffer.release();
}
}
// The Connection and Keep-Alive headers are no longer valid
HttpUtil.setKeepAlive(res, true);
// Transfer-Encoding header is not valid
res.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
res.headers().remove(HttpHeaderNames.TRAILER);
return res;
}
}

View File

@ -14,6 +14,7 @@
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
@ -186,6 +187,7 @@ public final class HttpConversionUtil {
*
* @param streamId The stream associated with the response
* @param http2Headers The initial set of HTTP/2 headers to create the response with
* @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
* @param validateHttpHeaders <ul>
* <li>{@code true} to validate HTTP headers in the http-codec</li>
* <li>{@code false} not to validate HTTP headers in the http-codec</li>
@ -193,12 +195,14 @@ public final class HttpConversionUtil {
* @return A new response object which represents headers/data
* @throws Http2Exception see {@link #addHttp2ToHttpHeaders(int, Http2Headers, FullHttpMessage, boolean)}
*/
public static FullHttpResponse toHttpResponse(int streamId, Http2Headers http2Headers, boolean validateHttpHeaders)
public static FullHttpResponse toHttpResponse(int streamId, Http2Headers http2Headers, ByteBufAllocator alloc,
boolean validateHttpHeaders)
throws Http2Exception {
HttpResponseStatus status = parseStatus(http2Headers.status());
// HTTP/2 does not define a way to carry the version or reason phrase that is included in an
// HTTP/1.1 status line.
FullHttpResponse msg = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, validateHttpHeaders);
FullHttpResponse msg = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, alloc.buffer(),
validateHttpHeaders);
try {
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false);
} catch (Http2Exception e) {
@ -216,6 +220,7 @@ public final class HttpConversionUtil {
*
* @param streamId The stream associated with the request
* @param http2Headers The initial set of HTTP/2 headers to create the request with
* @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
* @param validateHttpHeaders <ul>
* <li>{@code true} to validate HTTP headers in the http-codec</li>
* <li>{@code false} not to validate HTTP headers in the http-codec</li>
@ -223,7 +228,8 @@ public final class HttpConversionUtil {
* @return A new request object which represents headers/data
* @throws Http2Exception see {@link #addHttp2ToHttpHeaders(int, Http2Headers, FullHttpMessage, boolean)}
*/
public static FullHttpRequest toHttpRequest(int streamId, Http2Headers http2Headers, boolean validateHttpHeaders)
public static FullHttpRequest toHttpRequest(int streamId, Http2Headers http2Headers, ByteBufAllocator alloc,
boolean validateHttpHeaders)
throws Http2Exception {
// HTTP/2 does not define a way to carry the version identifier that is included in the HTTP/1.1 request line.
final CharSequence method = checkNotNull(http2Headers.method(),
@ -231,7 +237,7 @@ public final class HttpConversionUtil {
final CharSequence path = checkNotNull(http2Headers.path(),
"path header cannot be null in conversion to HTTP/1.x");
FullHttpRequest msg = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method
.toString()), path.toString(), validateHttpHeaders);
.toString()), path.toString(), alloc.buffer(), validateHttpHeaders);
try {
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false);
} catch (Http2Exception e) {

View File

@ -15,6 +15,7 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpRequest;
@ -143,12 +144,15 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
* <li>{@code true} to validate HTTP headers in the http-codec</li>
* <li>{@code false} not to validate HTTP headers in the http-codec</li>
* </ul>
* @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
* @throws Http2Exception
*/
protected FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders)
protected FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders,
ByteBufAllocator alloc)
throws Http2Exception {
return connection.isServer() ? HttpConversionUtil.toHttpRequest(stream.id(), headers,
validateHttpHeaders) : HttpConversionUtil.toHttpResponse(stream.id(), headers, validateHttpHeaders);
return connection.isServer() ? HttpConversionUtil.toHttpRequest(stream.id(), headers, alloc,
validateHttpHeaders) : HttpConversionUtil.toHttpResponse(stream.id(), headers, alloc,
validateHttpHeaders);
}
/**
@ -180,7 +184,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
FullHttpMessage msg = getMessage(stream);
boolean release = true;
if (msg == null) {
msg = newMessage(stream, headers, validateHttpHeaders);
msg = newMessage(stream, headers, validateHttpHeaders, ctx.alloc());
} else if (allowAppend) {
release = false;
HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);

View File

@ -204,7 +204,7 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
Http2Headers http2Headers = new DefaultHttp2Headers(validateHttpHeaders, httpHeaders.size());
initializePseudoHeaders(http2Headers);
addHttpHeadersToHttp2Headers(httpHeaders, http2Headers);
msg = newMessage(stream, http2Headers, validateHttpHeaders);
msg = newMessage(stream, http2Headers, validateHttpHeaders, ctx.alloc());
fireChannelRead(ctx, msg, false, stream);
}
}