InboundHttp2ToHttpAdapter leak and logic improvements

Motivation:
In HttpConversionUtil's toHttpRequest and toHttpResponse methods can
allocate FullHttpMessage objects, and if an exeception is thrown during
the header conversion then this object will not be released. If a
FullHttpMessage is not fired up the pipeline, and the stream is closed
then we remove from the map, but do not release the object. This leads
to a ByteBuf leak. Some of the logic related to stream lifetime management
and FullHttpMessage also predates the RFC being finalized and is not correct.

Modifications:
- Fix leaks in HttpConversionUtil
- Ensure the objects are released when they are removed from the map.
- Correct logic and unit tests where they are found to be incorrect.

Result:
Fixes https://github.com/netty/netty/issues/4780
Fixes https://github.com/netty/netty/issues/3619
This commit is contained in:
Scott Mitchell 2016-01-28 13:07:56 -08:00
parent 4d6ab1d30d
commit 5fb18e3415
5 changed files with 300 additions and 200 deletions

View File

@ -35,6 +35,16 @@ public enum Http2Error {
HTTP_1_1_REQUIRED(0xD);
private final long code;
private static final Http2Error[] INT_TO_ENUM_MAP;
static {
Http2Error[] errors = Http2Error.values();
Http2Error[] map = new Http2Error[errors.length];
for (int i = 0; i < errors.length; ++i) {
Http2Error error = errors[i];
map[(int) error.code()] = error;
}
INT_TO_ENUM_MAP = map;
}
Http2Error(long code) {
this.code = code;
@ -46,4 +56,8 @@ public enum Http2Error {
public long code() {
return code;
}
public static Http2Error valueOf(long value) {
return value >= INT_TO_ENUM_MAP.length || value < 0 ? null : INT_TO_ENUM_MAP[(int) value];
}
}

View File

@ -199,7 +199,15 @@ public final class HttpConversionUtil {
// HTTP/2 does not define a way to carry the version or reason phrase that is included in an
// HTTP/1.1 status line.
FullHttpResponse msg = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, validateHttpHeaders);
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false);
try {
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false);
} catch (Http2Exception e) {
msg.release();
throw e;
} catch (Throwable t) {
msg.release();
throw streamError(streamId, PROTOCOL_ERROR, t, "HTTP/2 to HTTP/1.x headers conversion error");
}
return msg;
}
@ -224,7 +232,15 @@ public final class HttpConversionUtil {
"path header cannot be null in conversion to HTTP/1.x");
FullHttpRequest msg = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method
.toString()), path.toString(), validateHttpHeaders);
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false);
try {
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false);
} catch (Http2Exception e) {
msg.release();
throw e;
} catch (Throwable t) {
msg.release();
throw streamError(streamId, PROTOCOL_ERROR, t, "HTTP/2 to HTTP/1.x headers conversion error");
}
return msg;
}

View File

@ -22,8 +22,6 @@ import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
@ -61,11 +59,11 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
};
private final int maxContentLength;
private final ImmediateSendDetector sendDetector;
private final Http2Connection.PropertyKey messageKey;
private final boolean propagateSettings;
protected final Http2Connection connection;
protected final boolean validateHttpHeaders;
private final ImmediateSendDetector sendDetector;
protected final IntObjectMap<FullHttpMessage> messageMap;
private final boolean propagateSettings;
protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
boolean validateHttpHeaders, boolean propagateSettings) {
@ -79,20 +77,45 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
this.validateHttpHeaders = validateHttpHeaders;
this.propagateSettings = propagateSettings;
sendDetector = DEFAULT_SEND_DETECTOR;
messageMap = new IntObjectHashMap<FullHttpMessage>();
messageKey = connection.newKey();
}
/**
* The streamId is out of scope for the HTTP message flow and will no longer be tracked
* @param streamId The stream id to remove associated state with
* The stream is out of scope for the HTTP message flow and will no longer be tracked
* @param stream The stream to remove associated state with
* @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
*/
protected void removeMessage(int streamId) {
messageMap.remove(streamId);
protected final void removeMessage(Http2Stream stream, boolean release) {
FullHttpMessage msg = stream.removeProperty(messageKey);
if (release && msg != null) {
msg.release();
}
}
/**
* Get the {@link FullHttpMessage} associated with {@code stream}.
* @param stream The stream to get the associated state from
* @return The {@link FullHttpMessage} associated with {@code stream}.
*/
protected final FullHttpMessage getMessage(Http2Stream stream) {
return (FullHttpMessage) stream.getProperty(messageKey);
}
/**
* Make {@code message} be the state associated with {@code stream}.
* @param stream The stream which {@code message} is associated with.
* @param message The message which contains the HTTP semantics.
*/
protected final void putMessage(Http2Stream stream, FullHttpMessage message) {
FullHttpMessage previous = stream.setProperty(messageKey, message);
if (previous != message && previous != null) {
previous.release();
}
}
@Override
public void onStreamRemoved(Http2Stream stream) {
removeMessage(stream.id());
removeMessage(stream, true);
}
/**
@ -100,10 +123,12 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
*
* @param ctx The context to fire the event on
* @param msg The message to send
* @param streamId the streamId of the message which is being fired
* @param release {@code true} to release if present in {@link #messageMap}. {@code false} otherwise.
* @param stream the stream of the message which is being fired
*/
protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, int streamId) {
removeMessage(streamId);
protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, boolean release,
Http2Stream stream) {
removeMessage(stream, release);
HttpUtil.setContentLength(msg, msg.content().readableBytes());
ctx.fireChannelRead(msg);
}
@ -111,8 +136,8 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
/**
* Create a new {@link FullHttpMessage} based upon the current connection parameters
*
* @param streamId The stream id to create a message for
* @param headers The headers associated with {@code streamId}
* @param stream The stream to create a message for
* @param headers The headers associated with {@code stream}
* @param validateHttpHeaders
* <ul>
* <li>{@code true} to validate HTTP headers in the http-codec</li>
@ -120,10 +145,10 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
* </ul>
* @throws Http2Exception
*/
protected FullHttpMessage newMessage(int streamId, Http2Headers headers, boolean validateHttpHeaders)
protected FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders)
throws Http2Exception {
return connection.isServer() ? HttpConversionUtil.toHttpRequest(streamId, headers,
validateHttpHeaders) : HttpConversionUtil.toHttpResponse(streamId, headers, validateHttpHeaders);
return connection.isServer() ? HttpConversionUtil.toHttpRequest(stream.id(), headers,
validateHttpHeaders) : HttpConversionUtil.toHttpResponse(stream.id(), headers, validateHttpHeaders);
}
/**
@ -132,9 +157,9 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
*
* @param ctx The context for which this message has been received.
* Used to send informational header if detected.
* @param streamId The stream id the {@code headers} apply to
* @param stream The stream the {@code headers} apply to
* @param headers The headers to process
* @param endOfStream {@code true} if the {@code streamId} has received the end of stream flag
* @param endOfStream {@code true} if the {@code stream} has received the end of stream flag
* @param allowAppend
* <ul>
* <li>{@code true} if headers will be appended if the stream already exists.</li>
@ -142,27 +167,25 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
* </ul>
* @param appendToTrailer
* <ul>
* <li>{@code true} if a message {@code streamId} already exists then the headers
* <li>{@code true} if a message {@code stream} already exists then the headers
* should be added to the trailing headers.</li>
* <li>{@code false} then appends will be done to the initial headers.</li>
* </ul>
* @return The object used to track the stream corresponding to {@code streamId}. {@code null} if
* @return The object used to track the stream corresponding to {@code stream}. {@code null} if
* {@code allowAppend} is {@code false} and the stream already exists.
* @throws Http2Exception If the stream id is not in the correct state to process the headers request
*/
protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
boolean endOfStream, boolean allowAppend, boolean appendToTrailer) throws Http2Exception {
FullHttpMessage msg = messageMap.get(streamId);
FullHttpMessage msg = getMessage(stream);
boolean release = true;
if (msg == null) {
msg = newMessage(streamId, headers, validateHttpHeaders);
msg = newMessage(stream, headers, validateHttpHeaders);
} else if (allowAppend) {
try {
HttpConversionUtil.addHttp2ToHttpHeaders(streamId, headers, msg, appendToTrailer);
} catch (Http2Exception e) {
removeMessage(streamId);
throw e;
}
release = false;
HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);
} else {
release = false;
msg = null;
}
@ -170,7 +193,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
// Copy the message (if necessary) before sending. The content is not expected to be copied (or used) in
// this operation but just in case it is used do the copy before sending and the resource may be released
final FullHttpMessage copy = endOfStream ? null : sendDetector.copyIfNeeded(msg);
fireChannelRead(ctx, msg, streamId);
fireChannelRead(ctx, msg, release, stream);
return copy;
}
@ -182,23 +205,25 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
* sends the result up the pipeline or retains the message for future processing.
*
* @param ctx The context for which this message has been received
* @param streamId The stream id the {@code objAccumulator} corresponds to
* @param msg The object which represents all headers/data for corresponding to {@code streamId}
* @param stream The stream the {@code objAccumulator} corresponds to
* @param msg The object which represents all headers/data for corresponding to {@code stream}
* @param endOfStream {@code true} if this is the last event for the stream
*/
private void processHeadersEnd(ChannelHandlerContext ctx, int streamId,
FullHttpMessage msg, boolean endOfStream) {
private void processHeadersEnd(ChannelHandlerContext ctx, Http2Stream stream, FullHttpMessage msg,
boolean endOfStream) {
if (endOfStream) {
fireChannelRead(ctx, msg, streamId);
// Release if the msg from the map is different from the object being forwarded up the pipeline.
fireChannelRead(ctx, msg, getMessage(stream) != msg, stream);
} else {
messageMap.put(streamId, msg);
putMessage(stream, msg);
}
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
FullHttpMessage msg = messageMap.get(streamId);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = getMessage(stream);
if (msg == null) {
throw connectionError(PROTOCOL_ERROR, "Data Frame received for unknown stream id %d", streamId);
}
@ -213,7 +238,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
content.writeBytes(data, data.readerIndex(), dataReadableBytes);
if (endOfStream) {
fireChannelRead(ctx, msg, streamId);
fireChannelRead(ctx, msg, false, stream);
}
// All bytes have been processed.
@ -223,34 +248,40 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream) throws Http2Exception {
FullHttpMessage msg = processHeadersBegin(ctx, streamId, headers, endOfStream, true, true);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
if (msg != null) {
processHeadersEnd(ctx, streamId, msg, endOfStream);
processHeadersEnd(ctx, stream, msg, endOfStream);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
FullHttpMessage msg = processHeadersBegin(ctx, streamId, headers, endOfStream, true, true);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
if (msg != null) {
processHeadersEnd(ctx, streamId, msg, endOfStream);
processHeadersEnd(ctx, stream, msg, endOfStream);
}
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
FullHttpMessage msg = messageMap.get(streamId);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = getMessage(stream);
if (msg != null) {
fireChannelRead(ctx, msg, streamId);
onRstStreamRead(stream, msg);
}
ctx.fireExceptionCaught(Http2Exception.streamError(streamId, Http2Error.valueOf(errorCode),
"HTTP/2 to HTTP layer caught stream reset"));
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
// A push promise should not be allowed to add headers to an existing stream
FullHttpMessage msg = processHeadersBegin(ctx, promisedStreamId, headers, false, false, false);
Http2Stream promisedStream = connection.stream(promisedStreamId);
FullHttpMessage msg = processHeadersBegin(ctx, promisedStream, headers, false, false, false);
if (msg == null) {
throw connectionError(PROTOCOL_ERROR, "Push Promise Frame received for pre-existing stream id %d",
promisedStreamId);
@ -258,7 +289,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
processHeadersEnd(ctx, promisedStreamId, msg, false);
processHeadersEnd(ctx, promisedStream, msg, false);
}
@Override
@ -269,6 +300,13 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
}
}
/**
* Called if a {@code RST_STREAM} is received but we have some data for that stream.
*/
protected void onRstStreamRead(Http2Stream stream, FullHttpMessage msg) {
removeMessage(stream, true);
}
/**
* Allows messages to be sent up the pipeline before the next phase in the
* HTTP message flow is detected.

View File

@ -19,8 +19,6 @@ import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.AsciiString;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import java.util.Iterator;
import java.util.Map.Entry;
@ -41,20 +39,25 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
HttpConversionUtil.OUT_OF_MESSAGE_SEQUENCE_PATH);
private static final AsciiString OUT_OF_MESSAGE_SEQUENCE_RETURN_CODE = new AsciiString(
HttpConversionUtil.OUT_OF_MESSAGE_SEQUENCE_RETURN_CODE.toString());
private final IntObjectMap<HttpHeaders> outOfMessageFlowHeaders;
private final Http2Connection.PropertyKey outOfMessageFlowHeadersKey;
InboundHttp2ToHttpPriorityAdapter(Http2Connection connection, int maxContentLength,
boolean validateHttpHeaders,
boolean propagateSettings) {
super(connection, maxContentLength, validateHttpHeaders, propagateSettings);
outOfMessageFlowHeaders = new IntObjectHashMap<HttpHeaders>();
outOfMessageFlowHeadersKey = connection.newKey();
}
@Override
protected void removeMessage(int streamId) {
super.removeMessage(streamId);
outOfMessageFlowHeaders.remove(streamId);
private HttpHeaders getOutOfMessageFlowHeaders(Http2Stream stream) {
return stream.getProperty(outOfMessageFlowHeadersKey);
}
private void putOutOfMessageFlowHeaders(Http2Stream stream, HttpHeaders headers) {
stream.setProperty(outOfMessageFlowHeadersKey, headers);
}
private HttpHeaders removeOutOfMessageFlowHeaders(Http2Stream stream) {
return stream.removeProperty(outOfMessageFlowHeadersKey);
}
/**
@ -68,13 +71,13 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
/**
* This method will add the {@code headers} to the out of order headers map
* @param streamId The stream id associated with {@code headers}
* @param stream The stream associated with {@code headers}
* @param headers Newly encountered out of order headers which must be stored for future use
*/
private void importOutOfMessageFlowHeaders(int streamId, HttpHeaders headers) {
final HttpHeaders outOfMessageFlowHeader = outOfMessageFlowHeaders.get(streamId);
private void importOutOfMessageFlowHeaders(Http2Stream stream, HttpHeaders headers) {
final HttpHeaders outOfMessageFlowHeader = getOutOfMessageFlowHeaders(stream);
if (outOfMessageFlowHeader == null) {
outOfMessageFlowHeaders.put(streamId, headers);
putOutOfMessageFlowHeaders(stream, headers);
} else {
outOfMessageFlowHeader.setAll(headers);
}
@ -82,11 +85,11 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
/**
* Take any saved out of order headers and export them to {@code headers}
* @param streamId The stream id to search for out of order headers for
* @param headers If any out of order headers exist for {@code streamId} they will be added to this object
* @param stream The stream to search for out of order headers for
* @param headers If any out of order headers exist for {@code stream} they will be added to this object
*/
private void exportOutOfMessageFlowHeaders(int streamId, final HttpHeaders headers) {
final HttpHeaders outOfMessageFlowHeader = outOfMessageFlowHeaders.get(streamId);
private void exportOutOfMessageFlowHeaders(Http2Stream stream, final HttpHeaders headers) {
final HttpHeaders outOfMessageFlowHeader = getOutOfMessageFlowHeaders(stream);
if (outOfMessageFlowHeader != null) {
headers.setAll(outOfMessageFlowHeader);
}
@ -127,18 +130,19 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
}
@Override
protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, int streamId) {
exportOutOfMessageFlowHeaders(streamId, getActiveHeaders(msg));
super.fireChannelRead(ctx, msg, streamId);
protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, boolean release,
Http2Stream stream) {
exportOutOfMessageFlowHeaders(stream, getActiveHeaders(msg));
super.fireChannelRead(ctx, msg, release, stream);
}
@Override
protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
boolean endOfStream, boolean allowAppend, boolean appendToTrailer) throws Http2Exception {
FullHttpMessage msg = super.processHeadersBegin(ctx, streamId, headers,
FullHttpMessage msg = super.processHeadersBegin(ctx, stream, headers,
endOfStream, allowAppend, appendToTrailer);
if (msg != null) {
exportOutOfMessageFlowHeaders(streamId, getActiveHeaders(msg));
exportOutOfMessageFlowHeaders(stream, getActiveHeaders(msg));
}
return msg;
}
@ -146,15 +150,15 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
@Override
public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
FullHttpMessage msg = messageMap.get(stream.id());
FullHttpMessage msg = getMessage(stream);
if (msg == null) {
// msg may be null if a HTTP/2 frame event in received outside the HTTP message flow
// For example a PRIORITY frame can be received in any state
// and the HTTP message flow exists in OPEN.
// msg may be null if a HTTP/2 frame event is received outside the HTTP message flow
// For example a PRIORITY frame can be received in any state but the HTTP message flow
// takes place while the stream is OPEN.
if (parent != null && !parent.equals(connection.connectionStream())) {
HttpHeaders headers = new DefaultHttpHeaders();
headers.setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(), parent.id());
importOutOfMessageFlowHeaders(stream.id(), headers);
importOutOfMessageFlowHeaders(stream, headers);
}
} else {
if (parent == null) {
@ -169,14 +173,14 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
@Override
public void onWeightChanged(Http2Stream stream, short oldWeight) {
FullHttpMessage msg = messageMap.get(stream.id());
FullHttpMessage msg = getMessage(stream);
final HttpHeaders headers;
if (msg == null) {
// msg may be null if a HTTP/2 frame event in received outside the HTTP message flow
// For example a PRIORITY frame can be received in any state
// and the HTTP message flow exists in OPEN.
headers = new DefaultHttpHeaders();
importOutOfMessageFlowHeaders(stream.id(), headers);
importOutOfMessageFlowHeaders(stream, headers);
} else {
headers = getActiveHeaders(msg);
}
@ -186,9 +190,13 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
FullHttpMessage msg = messageMap.get(streamId);
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
return;
}
FullHttpMessage msg = getMessage(stream);
if (msg == null) {
HttpHeaders httpHeaders = outOfMessageFlowHeaders.remove(streamId);
HttpHeaders httpHeaders = removeOutOfMessageFlowHeaders(stream);
if (httpHeaders == null) {
throw connectionError(PROTOCOL_ERROR, "Priority Frame recieved for unknown stream id %d", streamId);
}
@ -196,8 +204,8 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
Http2Headers http2Headers = new DefaultHttp2Headers(validateHttpHeaders, httpHeaders.size());
initializePseudoHeaders(http2Headers);
addHttpHeadersToHttp2Headers(httpHeaders, http2Headers);
msg = newMessage(streamId, http2Headers, validateHttpHeaders);
fireChannelRead(ctx, msg, streamId);
msg = newMessage(stream, http2Headers, validateHttpHeaders);
fireChannelRead(ctx, msg, false, stream);
}
}
}

View File

@ -41,7 +41,6 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
@ -84,7 +83,8 @@ public class InboundHttp2ToHttpAdapterTest {
@Mock
private HttpSettingsListener settingsListener;
private Http2FrameWriter frameWriter;
private Http2ConnectionHandler serverHandler;
private Http2ConnectionHandler clientHandler;
private ServerBootstrap sb;
private Bootstrap cb;
private Channel serverChannel;
@ -92,12 +92,14 @@ public class InboundHttp2ToHttpAdapterTest {
private Channel clientChannel;
private CountDownLatch serverLatch;
private CountDownLatch clientLatch;
private CountDownLatch serverLatch2;
private CountDownLatch clientLatch2;
private CountDownLatch settingsLatch;
private int maxContentLength;
private HttpResponseDelegator serverDelegator;
private HttpResponseDelegator clientDelegator;
private HttpSettingsDelegator settingsDelegator;
private Http2Exception serverException;
private Http2Exception clientException;
@Before
public void setup() throws Exception {
@ -144,8 +146,8 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -179,8 +181,8 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -213,8 +215,8 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -240,12 +242,12 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
assertTrue(isStreamError(serverException));
awaitResponses();
assertTrue(isStreamError(clientException));
}
@Test
@ -264,9 +266,10 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.duplicate().retain(), 0, true,
newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -296,12 +299,12 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.slice(0, midPoint).retain(), 0, false,
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.slice(0, midPoint).retain(), 0, false,
newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.slice(midPoint, text.length() - midPoint).retain(),
0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeData(ctxClient(), 3,
content.slice(midPoint, text.length() - midPoint).retain(), 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -330,11 +333,11 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -373,10 +376,10 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers2, 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -413,10 +416,11 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.duplicate().retain(), 0, false,
newPromiseClient());
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers2, 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -456,12 +460,15 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
frameWriter.writePriority(ctxClient(), 5, 3, (short) 123, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientHandler.encoder().writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
clientChannel.flush(); // Headers are queued in the flow controller and so flush them.
clientHandler.encoder().writePriority(ctxClient(), 5, 3, (short) 123, true, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.duplicate().retain(), 0, true,
newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true,
newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -509,12 +516,15 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true, newPromiseClient());
frameWriter.writePriority(ctxClient(), 5, 3, (short) 222, false, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientHandler.encoder().writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 3, content.duplicate().retain(), 0, true,
newPromiseClient());
clientHandler.encoder().writeData(ctxClient(), 5, content2.duplicate().retain(), 0, true,
newPromiseClient());
clientChannel.flush(); // headers and data are queued in the flow controller, so flush them.
clientHandler.encoder().writePriority(ctxClient(), 5, 3, (short) 222, false, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -533,7 +543,7 @@ public class InboundHttp2ToHttpAdapterTest {
@Test
public void serverRequestPushPromise() throws Exception {
boostrapEnv(2, 1, 1);
boostrapEnv(1, 1, 1);
final String text = "hello world big time data!";
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final String text2 = "hello world smaller data?";
@ -563,8 +573,8 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers3, 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers3, 0, true, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
@ -580,19 +590,20 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2Headers, 0, false, newPromiseServer());
frameWriter.writePushPromise(ctxServer(), 3, 5, http2Headers2, 0, newPromiseServer());
frameWriter.writeData(ctxServer(), 3, content.duplicate().retain(), 0, true, newPromiseServer());
frameWriter.writeData(ctxServer(), 5, content2.duplicate().retain(), 0, true, newPromiseServer());
ctxServer().flush();
serverHandler.encoder().writeHeaders(ctxServer(), 3, http2Headers, 0, false, newPromiseServer());
serverHandler.encoder().writePushPromise(ctxServer(), 3, 2, http2Headers2, 0, newPromiseServer());
serverHandler.encoder().writeData(ctxServer(), 3, content.duplicate().retain(), 0, true,
newPromiseServer());
serverHandler.encoder().writeData(ctxServer(), 5, content2.duplicate().retain(), 0, true,
newPromiseServer());
serverConnectedChannel.flush();
}
});
awaitResponses();
ArgumentCaptor<FullHttpMessage> responseCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(clientListener, times(2)).messageReceived(responseCaptor.capture());
verify(clientListener).messageReceived(responseCaptor.capture());
capturedResponses = responseCaptor.getAllValues();
assertEquals(response, capturedResponses.get(0));
assertEquals(response2, capturedResponses.get(1));
} finally {
request.release();
response.release();
@ -602,7 +613,7 @@ public class InboundHttp2ToHttpAdapterTest {
@Test
public void serverResponseHeaderInformational() throws Exception {
boostrapEnv(2, 2, 1);
boostrapEnv(1, 2, 1, 2, 1);
final FullHttpMessage request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, "/info/test",
true);
HttpHeaders httpHeaders = request.headers();
@ -623,11 +634,12 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
clientChannel.flush();
}
});
awaitRequests();
httpHeaders = response.headers();
httpHeaders.setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), 3);
httpHeaders.setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
@ -635,22 +647,26 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2HeadersResponse, 0, false, newPromiseServer());
ctxServer().flush();
serverHandler.encoder().writeHeaders(ctxServer(), 3, http2HeadersResponse, 0, false,
newPromiseServer());
serverConnectedChannel.flush();
}
});
awaitResponses();
httpHeaders = request2.headers();
httpHeaders.setInt(HttpHeaderNames.CONTENT_LENGTH, text.length());
httpHeaders.remove(HttpHeaderNames.EXPECT);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeData(ctxClient(), 3, payload.duplicate().retain(), 0, true, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeData(ctxClient(), 3, payload.duplicate().retain(), 0, true,
newPromiseClient());
clientChannel.flush();
}
});
awaitRequests2();
httpHeaders = response2.headers();
httpHeaders.setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), 3);
httpHeaders.setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
@ -658,12 +674,13 @@ public class InboundHttp2ToHttpAdapterTest {
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2HeadersResponse2, 0, true, newPromiseServer());
ctxServer().flush();
serverHandler.encoder().writeHeaders(ctxServer(), 3, http2HeadersResponse2, 0, true,
newPromiseServer());
serverConnectedChannel.flush();
}
});
awaitRequests();
awaitResponses2();
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener, times(2)).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
@ -671,7 +688,6 @@ public class InboundHttp2ToHttpAdapterTest {
assertEquals(request, capturedRequests.get(0));
assertEquals(request2, capturedRequests.get(1));
awaitResponses();
ArgumentCaptor<FullHttpMessage> responseCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(clientListener, times(2)).messageReceived(responseCaptor.capture());
capturedResponses = responseCaptor.getAllValues();
@ -688,23 +704,28 @@ public class InboundHttp2ToHttpAdapterTest {
@Test
public void propagateSettings() throws Exception {
boostrapEnv(1, 1, 1);
boostrapEnv(1, 1, 2);
final Http2Settings settings = new Http2Settings().pushEnabled(true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeSettings(ctxClient(), settings, newPromiseClient());
ctxClient().flush();
clientHandler.encoder().writeSettings(ctxClient(), settings, newPromiseClient());
clientChannel.flush();
}
});
assertTrue(settingsLatch.await(3, SECONDS));
ArgumentCaptor<Http2Settings> settingsCaptor = ArgumentCaptor.forClass(Http2Settings.class);
verify(settingsListener).messageReceived(settingsCaptor.capture());
verify(settingsListener, times(2)).messageReceived(settingsCaptor.capture());
assertEquals(settings, settingsCaptor.getValue());
}
private void boostrapEnv(int clientLatchCount, int serverLatchCount, int settingsLatchCount)
throws InterruptedException {
throws InterruptedException {
boostrapEnv(clientLatchCount, clientLatchCount, serverLatchCount, serverLatchCount, settingsLatchCount);
}
private void boostrapEnv(int clientLatchCount, int clientLatchCount2, int serverLatchCount, int serverLatchCount2,
int settingsLatchCount) throws InterruptedException {
clientDelegator = null;
serverDelegator = null;
serverConnectedChannel = null;
@ -712,8 +733,9 @@ public class InboundHttp2ToHttpAdapterTest {
final CountDownLatch serverChannelLatch = new CountDownLatch(1);
serverLatch = new CountDownLatch(serverLatchCount);
clientLatch = new CountDownLatch(clientLatchCount);
serverLatch2 = new CountDownLatch(serverLatchCount2);
clientLatch2 = new CountDownLatch(clientLatchCount2);
settingsLatch = new CountDownLatch(settingsLatchCount);
frameWriter = new DefaultHttp2FrameWriter();
sb = new ServerBootstrap();
cb = new Bootstrap();
@ -726,32 +748,22 @@ public class InboundHttp2ToHttpAdapterTest {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(true);
p.addLast(new HttpAdapterFrameAdapter(
connection,
serverHandler = new Http2ConnectionHandlerBuilder().frameListener(
new InboundHttp2ToHttpPriorityAdapterBuilder(connection)
.maxContentLength(maxContentLength)
.validateHttpHeaders(true)
.propagateSettings(true)
.build(),
new CountDownLatch(10)));
.maxContentLength(maxContentLength)
.validateHttpHeaders(true)
.propagateSettings(true)
.build())
.connection(connection)
.gracefulShutdownTimeoutMillis(0)
.build();
p.addLast(serverHandler);
serverDelegator = new HttpResponseDelegator(serverListener, serverLatch);
serverDelegator = new HttpResponseDelegator(serverListener, serverLatch, serverLatch2);
p.addLast(serverDelegator);
serverConnectedChannel = ch;
settingsDelegator = new HttpSettingsDelegator(settingsListener, settingsLatch);
p.addLast(settingsDelegator);
p.addLast(new ChannelHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Http2Exception e = getEmbeddedHttp2Exception(cause);
if (e != null) {
serverException = e;
serverLatch.countDown();
} else {
super.exceptionCaught(ctx, cause);
}
}
});
serverChannelLatch.countDown();
}
});
@ -764,15 +776,29 @@ public class InboundHttp2ToHttpAdapterTest {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(false);
p.addLast(new HttpAdapterFrameAdapter(
connection,
clientHandler = new Http2ConnectionHandlerBuilder().frameListener(
new InboundHttp2ToHttpPriorityAdapterBuilder(connection)
.maxContentLength(maxContentLength)
.build(),
new CountDownLatch(10)));
.maxContentLength(maxContentLength)
.build())
.connection(connection)
.gracefulShutdownTimeoutMillis(0)
.build();
p.addLast(clientHandler);
clientDelegator = new HttpResponseDelegator(clientListener, clientLatch);
clientDelegator = new HttpResponseDelegator(clientListener, clientLatch, clientLatch2);
p.addLast(clientDelegator);
p.addLast(new ChannelHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Http2Exception e = getEmbeddedHttp2Exception(cause);
if (e != null) {
clientException = e;
clientLatch.countDown();
} else {
super.exceptionCaught(ctx, cause);
}
}
});
}
});
@ -803,11 +829,19 @@ public class InboundHttp2ToHttpAdapterTest {
}
private void awaitRequests() throws Exception {
assertTrue(serverLatch.await(2, SECONDS));
assertTrue(serverLatch.await(3, SECONDS));
}
private void awaitResponses() throws Exception {
assertTrue(clientLatch.await(2, SECONDS));
assertTrue(clientLatch.await(3, SECONDS));
}
private void awaitRequests2() throws Exception {
assertTrue(serverLatch2.await(3, SECONDS));
}
private void awaitResponses2() throws Exception {
assertTrue(clientLatch2.await(3, SECONDS));
}
private ChannelHandlerContext ctxClient() {
@ -837,17 +871,20 @@ public class InboundHttp2ToHttpAdapterTest {
private static final class HttpResponseDelegator extends SimpleChannelInboundHandler<HttpObject> {
private final HttpResponseListener listener;
private final CountDownLatch latch;
private final CountDownLatch latch2;
HttpResponseDelegator(HttpResponseListener listener, CountDownLatch latch) {
HttpResponseDelegator(HttpResponseListener listener, CountDownLatch latch, CountDownLatch latch2) {
super(false);
this.listener = listener;
this.latch = latch;
this.latch2 = latch2;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
listener.messageReceived(msg);
latch.countDown();
latch2.countDown();
}
}
@ -867,17 +904,4 @@ public class InboundHttp2ToHttpAdapterTest {
latch.countDown();
}
}
private static final class HttpAdapterFrameAdapter extends FrameAdapter {
HttpAdapterFrameAdapter(Http2Connection connection, Http2FrameListener listener, CountDownLatch latch) {
super(connection, listener, latch);
}
@Override
protected void closeStream(Http2Stream stream, boolean dataRead) {
if (!dataRead) { // NOTE: Do not close the stream to allow the out of order messages to be processed
super.closeStream(stream, dataRead);
}
}
}
}