Introduced HTTP/2 frame to HTTP/1.x translation layer.

Motivation:

The HTTP/2 codec currently provides direct callbacks to access stream events/data. The HTTP/2 codec provides the protocol support for HTTP/2 but it does not pass messages up the context pipeline. It would be nice to have a decoder which could collect the data framed by HTTP/2 and translate this into traditional HTTP type objects. This would allow the traditional Netty context pipeline to be used to separate processing concerns (i.e. HttpContentDecompressor).  It would also be good to have a layer which can translate FullHttp[Request|Response] objects into HTTP/2 frame outbound events.

Modifications:

Introduce a new InboundHttp2ToHttpAdapter and supporting classes which will translate HTTP/2 stream events/data into HttpObject objects. Introduce a new DelegatingHttp2HttpConnectionHandler which will translate FullHttp[Request|Response] objects to HTTP/2 frame events.

Result:

Introduced HTTP/2 frame events to HttpObject layer.
Introduced FullHttp[Request|Response] to HTTP/2 frame events.
Introduced new unit tests to support new code.
Updated HTTP/2 client example to use new code.
Miscelaneous updates and bug fixes made to support new code.
This commit is contained in:
Scott Mitchell 2014-08-11 19:17:05 -04:00 committed by nmittler
parent 9b755d79e7
commit b41b11c53d
23 changed files with 2290 additions and 243 deletions

View File

@ -23,7 +23,7 @@ import java.util.Map;
* The default {@link HttpMessage} implementation.
*/
public abstract class DefaultHttpMessage extends DefaultHttpObject implements HttpMessage {
private static final int HASH_CODE_PRIME = 31;
private HttpVersion version;
private final HttpHeaders headers;
@ -55,6 +55,28 @@ public abstract class DefaultHttpMessage extends DefaultHttpObject implements Ht
return version;
}
@Override
public int hashCode() {
int result = 1;
result = HASH_CODE_PRIME * result + headers.hashCode();
result = HASH_CODE_PRIME * result + version.hashCode();
result = HASH_CODE_PRIME * result + super.hashCode();
return result;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof DefaultHttpMessage)) {
return false;
}
DefaultHttpMessage other = (DefaultHttpMessage) o;
return headers().equals(other.headers()) &&
protocolVersion().equals(other.protocolVersion()) &&
super.equals(o);
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.http;
import io.netty.handler.codec.DecoderResult;
public class DefaultHttpObject implements HttpObject {
private static final int HASH_CODE_PRIME = 31;
private DecoderResult decoderResult = DecoderResult.SUCCESS;
protected DefaultHttpObject() {
@ -37,4 +37,22 @@ public class DefaultHttpObject implements HttpObject {
}
this.decoderResult = decoderResult;
}
@Override
public int hashCode() {
int result = 1;
result = HASH_CODE_PRIME * result + decoderResult.hashCode();
return result;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof DefaultHttpObject)) {
return false;
}
DefaultHttpObject other = (DefaultHttpObject) o;
return decoderResult().equals(other.decoderResult());
}
}

View File

@ -69,6 +69,26 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo
return this;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + status.hashCode();
result = prime * result + super.hashCode();
return result;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof DefaultHttpResponse)) {
return false;
}
DefaultHttpResponse other = (DefaultHttpResponse) o;
return status().equals(other.status()) && super.equals(o);
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();

View File

@ -333,6 +333,18 @@ public interface HttpHeaders extends TextHeaders {
* {@code "WWW-Authenticate"}
*/
public static final AsciiString WWW_AUTHENTICATE = new AsciiString("WWW-Authenticate");
/**
* {@code "Keep-Alive"}
* @deprecated use {@link #CONNECTION}
*/
@Deprecated
public static final AsciiString KEEP_ALIVE = new AsciiString("Keep-Alive");
/**
* {@code "Proxy-Connection"}
* @deprecated use {@link #CONNECTION}
*/
@Deprecated
public static final AsciiString PROXY_CONNECTION = new AsciiString("Proxy-Connection");
private Names() {
}

View File

@ -205,6 +205,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
}
}
@SuppressWarnings("deprecation")
private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage) throws Exception {
// Get the Stream-ID, Associated-To-Stream-ID, Priority, URL, and scheme from the headers
final HttpHeaders httpHeaders = httpMessage.headers();
@ -222,8 +223,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
// The Connection, Keep-Alive, Proxy-Connection, and Transfer-Encoding
// headers are not valid and MUST not be sent.
httpHeaders.remove(HttpHeaders.Names.CONNECTION);
httpHeaders.remove("Keep-Alive");
httpHeaders.remove("Proxy-Connection");
httpHeaders.remove(HttpHeaders.Names.KEEP_ALIVE);
httpHeaders.remove(HttpHeaders.Names.PROXY_CONNECTION);
httpHeaders.remove(HttpHeaders.Names.TRANSFER_ENCODING);
SpdySynStreamFrame spdySynStreamFrame =
@ -268,6 +269,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
return spdySynStreamFrame;
}
@SuppressWarnings("deprecation")
private SpdySynReplyFrame createSynReplyFrame(HttpResponse httpResponse) throws Exception {
// Get the Stream-ID from the headers
final HttpHeaders httpHeaders = httpResponse.headers();
@ -277,8 +279,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
// The Connection, Keep-Alive, Proxy-Connection, and Transfer-Encoding
// headers are not valid and MUST not be sent.
httpHeaders.remove(HttpHeaders.Names.CONNECTION);
httpHeaders.remove("Keep-Alive");
httpHeaders.remove("Proxy-Connection");
httpHeaders.remove(HttpHeaders.Names.KEEP_ALIVE);
httpHeaders.remove(HttpHeaders.Names.PROXY_CONNECTION);
httpHeaders.remove(HttpHeaders.Names.TRANSFER_ENCODING);
SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamID);

View File

@ -332,7 +332,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
/**
* Gets the next stream ID that can be created by the local endpoint.
*/
protected final int nextStreamId() {
public final int nextStreamId() {
return connection.local().nextStreamId();
}
@ -374,7 +374,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
// Create a new locally-initiated stream.
createLocalStream(streamId, endStream);
stream = createLocalStream(streamId, endStream);
} else {
// An existing stream...
if (stream.state() == RESERVED_LOCAL) {
@ -390,15 +390,17 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
stream.setPriority(streamDependency, weight, exclusive);
}
}
// If the headers are the end of the stream, close it now.
if (endStream) {
closeLocalSide(stream, promise);
}
}
return frameWriter.writeHeaders(ctx, promise, streamId, headers, streamDependency,
ChannelFuture future = frameWriter.writeHeaders(ctx, promise, streamId, headers, streamDependency,
weight, exclusive, padding, endStream, endSegment);
// If the headers are the end of the stream, close it now.
if (endStream) {
closeLocalSide(stream, promise);
}
return future;
} catch (Http2Exception e) {
return promise.setFailure(e);
}
@ -430,10 +432,12 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return promise;
}
ChannelFuture future = frameWriter.writeRstStream(ctx, promise, streamId, errorCode);
stream.terminateSent();
close(stream, promise);
return frameWriter.writeRstStream(ctx, promise, streamId, errorCode);
return future;
}
protected ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
@ -781,12 +785,12 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return;
}
AbstractHttp2ConnectionHandler.this.onDataRead(ctx, streamId, data, padding, endOfStream,
endOfSegment);
if (endOfStream) {
closeRemoteSide(stream, ctx.newSucceededFuture());
}
AbstractHttp2ConnectionHandler.this.onDataRead(ctx, streamId, data, padding, endOfStream,
endOfSegment);
}
/**
@ -840,7 +844,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
if (stream == null) {
createRemoteStream(streamId, endStream);
stream = createRemoteStream(streamId, endStream);
} else {
if (stream.state() == RESERVED_REMOTE) {
// Received headers for a reserved push stream ... open it for push to the
@ -858,15 +862,15 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
stream.setPriority(streamDependency, weight, exclusive);
}
}
// If the headers completes this stream, close it.
if (endStream) {
closeRemoteSide(stream, ctx.newSucceededFuture());
}
}
AbstractHttp2ConnectionHandler.this.onHeadersRead(ctx, streamId, headers, streamDependency,
weight, exclusive, padding, endStream, endSegment);
// If the headers completes this stream, close it.
if (endStream) {
closeRemoteSide(stream, ctx.newSucceededFuture());
}
}
@Override
@ -901,9 +905,10 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
stream.terminateReceived();
close(stream, ctx.newSucceededFuture());
AbstractHttp2ConnectionHandler.this.onRstStreamRead(ctx, streamId, errorCode);
close(stream, ctx.newSucceededFuture());
}
@Override

View File

@ -451,6 +451,8 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
settings.put(id, value);
}
observer.onSettingsRead(ctx, settings);
// Provide an interface for non-observers to capture settings
ctx.fireChannelRead(settings);
}
}

View File

@ -0,0 +1,189 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseAggregator;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import java.net.URI;
import java.util.Map;
/**
* Light weight wrapper around {@link DelegatingHttp2ConnectionHandler} to provide HTTP/1.x object to HTTP/2 encoding
*/
public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2ConnectionHandler {
public DelegatingHttp2HttpConnectionHandler(boolean server, Http2FrameObserver observer) {
super(server, observer);
}
public DelegatingHttp2HttpConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow, Http2FrameObserver observer) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, observer);
}
public DelegatingHttp2HttpConnectionHandler(Http2Connection connection, Http2FrameObserver observer) {
super(connection, observer);
}
/**
* Add HTTP/2 headers based upon HTTP/1.x headers from a {@link HttpHeaders}
*
* @param httpHeaders The HTTP/1.x request object to pull headers from
* @param http2Headers The HTTP/2 headers object to add headers to
*/
private void addHeaders(HttpHeaders httpHeaders, DefaultHttp2Headers.Builder http2Headers) {
String value = httpHeaders.get(HttpHeaders.Names.HOST);
if (value != null) {
URI hostUri = URI.create(value);
// The authority MUST NOT include the deprecated "userinfo" subcomponent
value = hostUri.getAuthority();
if (value != null) {
http2Headers.authority(value.replaceFirst("^.*@", ""));
}
value = hostUri.getScheme();
if (value != null) {
http2Headers.scheme(value);
}
httpHeaders.remove(HttpHeaders.Names.HOST);
}
// Consume the Authority extension header if present
value = httpHeaders.get(Http2HttpHeaders.Names.AUTHORITY);
if (value != null) {
http2Headers.authority(value);
httpHeaders.remove(Http2HttpHeaders.Names.AUTHORITY);
}
// Consume the Scheme extension header if present
value = httpHeaders.get(Http2HttpHeaders.Names.SCHEME);
if (value != null) {
http2Headers.scheme(value);
httpHeaders.remove(Http2HttpHeaders.Names.SCHEME);
}
}
/**
* Add HTTP/2 headers based upon HTTP/1.x headers from a {@link HttpRequest}
*
* @param httpRequest The HTTP/1.x request object to pull headers from
* @param http2Headers The HTTP/2 headers object to add headers to
*/
private void addRequestHeaders(HttpRequest httpRequest, DefaultHttp2Headers.Builder http2Headers) {
http2Headers.path(httpRequest.uri());
http2Headers.method(httpRequest.method().toString());
addHeaders(httpRequest.headers(), http2Headers);
}
/**
* Add HTTP/2 headers based upon HTTP/1.x headers from a {@link HttpRequest}
*
* @param httpResponse The HTTP/1.x response object to pull headers from
* @param http2Headers The HTTP/2 headers object to add headers to
*/
private void addResponseHeaders(HttpResponse httpResponse, DefaultHttp2Headers.Builder http2Headers) {
http2Headers.status(Integer.toString(httpResponse.status().code()));
addHeaders(httpResponse.headers(), http2Headers);
}
/**
* Get the next stream id either from the {@link HttpHeaders} object or HTTP/2 codec
*
* @param httpHeaders The HTTP/1.x headers object to look for the stream id
* @return The stream id to use with this {@link HttpHeaders} object
* @throws Http2Exception If the {@code httpHeaders} object specifies an invalid stream id
*/
private int getStreamId(HttpHeaders httpHeaders) throws Http2Exception {
int streamId = 0;
String value = httpHeaders.get(Http2HttpHeaders.Names.STREAM_ID);
if (value == null) {
streamId = nextStreamId();
} else {
try {
streamId = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw Http2Exception.format(Http2Error.INTERNAL_ERROR,
"Invalid user-specified stream id value '%s'", value);
}
httpHeaders.remove(Http2HttpHeaders.Names.STREAM_ID);
}
return streamId;
}
/**
* Handles conversion of a {@link FullHttpMessage} to HTTP/2 frames.
*/
@Override
@SuppressWarnings("deprecation")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof FullHttpMessage) {
FullHttpMessage httpMsg = (FullHttpMessage) msg;
boolean hasData = httpMsg.content().isReadable();
// Convert and write the headers.
String value = null;
HttpHeaders httpHeaders = httpMsg.headers();
DefaultHttp2Headers.Builder http2Headers = DefaultHttp2Headers.newBuilder();
if (msg instanceof HttpRequest) {
addRequestHeaders((HttpRequest) msg, http2Headers);
} else if (msg instanceof HttpResponse) {
addResponseHeaders((HttpResponse) msg, http2Headers);
}
// Provide the user the opportunity to specify the streamId
int streamId = 0;
try {
streamId = getStreamId(httpHeaders);
} catch (Http2Exception e) {
httpMsg.release();
promise.setFailure(e);
return;
}
// The Connection, Keep-Alive, Proxy-Connection, Transfer-Encoding,
// and Upgrade headers are not valid and MUST not be sent.
httpHeaders.remove(HttpHeaders.Names.CONNECTION);
httpHeaders.remove(HttpHeaders.Names.KEEP_ALIVE);
httpHeaders.remove(HttpHeaders.Names.PROXY_CONNECTION);
httpHeaders.remove(HttpHeaders.Names.TRANSFER_ENCODING);
// Add the HTTP headers which have not been consumed above
for (Map.Entry<String, String> entry : httpHeaders.entries()) {
http2Headers.add(entry.getKey(), entry.getValue());
}
if (hasData) {
ChannelPromiseAggregator promiseAggregator = new ChannelPromiseAggregator(promise);
ChannelPromise headerPromise = ctx.newPromise();
ChannelPromise dataPromise = ctx.newPromise();
promiseAggregator.add(headerPromise, dataPromise);
writeHeaders(ctx, headerPromise, streamId, http2Headers.build(), 0, false, false);
writeData(ctx, dataPromise, streamId, httpMsg.content(), 0, true, true);
} else {
writeHeaders(ctx, promise, streamId, http2Headers.build(), 0, true, true);
}
} else {
ctx.write(msg, promise);
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.handler.codec.AsciiString;
/**
* Provides the constants for the header names used by
* {@link InboundHttp2ToHttpAdapter} and {@link DelegatingHttp2HttpConnectionHandler}
*/
public final class Http2HttpHeaders {
public static final class Names {
/**
* {@code "X-HTTP2-Stream-ID"}
*/
public static final AsciiString STREAM_ID = new AsciiString("X-HTTP2-Stream-ID");
/**
* {@code "X-HTTP2-Authority"}
*/
public static final AsciiString AUTHORITY = new AsciiString("X-HTTP2-Authority");
/**
* {@code "X-HTTP2-Scheme"}
*/
public static final AsciiString SCHEME = new AsciiString("X-HTTP2-Scheme");
/**
* {@code "X-HTTP2-Path"}
*/
public static final AsciiString PATH = new AsciiString("X-HTTP2-Path");
/**
* {@code "X-HTTP2-Stream-Promise-ID"}
*/
public static final AsciiString STREAM_PROMISE_ID = new AsciiString("X-HTTP2-Stream-Promise-ID");
/**
* {@code "X-HTTP2-Stream-Dependency-ID"}
*/
public static final AsciiString STREAM_DEPENDENCY_ID = new AsciiString("X-HTTP2-Stream-Dependency-ID");
/**
* {@code "X-HTTP2-Stream-Weight"}
*/
public static final AsciiString STREAM_WEIGHT = new AsciiString("X-HTTP2-Stream-Weight");
/**
* {@code "X-HTTP2-Stream-Exclusive"}
*/
public static final AsciiString STREAM_EXCLUSIVE = new AsciiString("X-HTTP2-Stream-Exclusive");
private Names() {
}
}
}

View File

@ -0,0 +1,647 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.IntObjectHashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* Decodes HTTP/2 data and associated streams into {@link HttpMessage}s and {@link HttpContent}s.
*/
public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements Http2FrameObserver {
private final long maxContentLength;
private final boolean validateHttpHeaders;
private final Http2Connection connection;
private final IntObjectMap<Http2HttpMessageAccumulator> messageMap;
private static final Set<String> HEADERS_TO_EXCLUDE;
private static final Map<String, String> HEADER_NAME_TRANSLATIONS_REQUEST;
private static final Map<String, String> HEADER_NAME_TRANSLATIONS_RESPONSE;
static {
HEADERS_TO_EXCLUDE = new HashSet<String>();
HEADER_NAME_TRANSLATIONS_REQUEST = new HashMap<String, String>();
HEADER_NAME_TRANSLATIONS_RESPONSE = new HashMap<String, String>();
for (Http2Headers.HttpName http2HeaderName : Http2Headers.HttpName.values()) {
HEADERS_TO_EXCLUDE.add(http2HeaderName.value());
}
HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.HttpName.AUTHORITY.value(),
Http2HttpHeaders.Names.AUTHORITY.toString());
HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.HttpName.SCHEME.value(),
Http2HttpHeaders.Names.SCHEME.toString());
HEADER_NAME_TRANSLATIONS_REQUEST.putAll(HEADER_NAME_TRANSLATIONS_RESPONSE);
HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.HttpName.PATH.value(),
Http2HttpHeaders.Names.PATH.toString());
}
/**
* Creates a new instance
*
* @param connection The object which will provide connection notification events for the current connection
* @param maxContentLength the maximum length of the message content. If the length of the message content exceeds
* this value, a {@link TooLongFrameException} will be raised.
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, long maxContentLength)
throws NullPointerException, IllegalArgumentException {
InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection, maxContentLength);
connection.addListener(adapter);
return adapter;
}
/**
* Creates a new instance
*
* @param connection The object which will provide connection notification events for the current connection
* @param maxContentLength the maximum length of the message content. If the length of the message content exceeds
* this value, a {@link TooLongFrameException} will be raised.
* @param validateHeaders {@code true} if http headers should be validated
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, long maxContentLength,
boolean validateHttpHeaders)
throws NullPointerException, IllegalArgumentException {
InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection,
maxContentLength,
validateHttpHeaders);
connection.addListener(adapter);
return adapter;
}
/**
* Creates a new instance
*
* @param connection The object which will provide connection notification events for the current connection
* @param maxContentLength the maximum length of the message content. If the length of the message content exceeds
* this value, a {@link TooLongFrameException} will be raised.
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
protected InboundHttp2ToHttpAdapter(Http2Connection connection, long maxContentLength) throws NullPointerException,
IllegalArgumentException {
this(connection, maxContentLength, true);
}
/**
* Creates a new instance
*
* @param connection The object which will provide connection notification events for the current connection
* @param maxContentLength the maximum length of the message content. If the length of the message content exceeds
* this value, a {@link TooLongFrameException} will be raised.
* @param validateHeaders {@code true} if http headers should be validated
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
protected InboundHttp2ToHttpAdapter(Http2Connection connection, long maxContentLength, boolean validateHttpHeaders)
throws NullPointerException, IllegalArgumentException {
if (connection == null) {
throw new NullPointerException("connection");
}
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
}
this.maxContentLength = maxContentLength;
this.validateHttpHeaders = validateHttpHeaders;
this.connection = connection;
this.messageMap = new IntObjectHashMap<Http2HttpMessageAccumulator>();
}
@Override
public void streamRemoved(Http2Stream stream) {
removeMessage(stream.id());
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream,
boolean endOfSegment) throws Http2Exception {
// Padding is already stripped out of data by super class
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
if (msgAccumulator == null) {
throw Http2Exception.protocolError("Data Frame recieved for unknown stream id %d", streamId);
}
try {
msgAccumulator.add(endOfStream ? new DefaultLastHttpContent(data, validateHttpHeaders)
: new DefaultHttpContent(data), ctx);
} catch (TooLongFrameException e) {
removeMessage(streamId);
throw Http2Exception.format(Http2Error.INTERNAL_ERROR,
"Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
}
if (endOfStream) {
msgAccumulator.endOfStream(ctx);
removeMessage(streamId);
}
}
/**
* Extracts the common initial header processing and internal tracking
*
* @param streamId The stream id the {@code headers} apply to
* @param headers The headers to process
* @param allowAppend {@code true} if headers will be appended if the stream already exists. if {@code false} and
* the stream already exists this method returns {@code null}.
* @return The object used to track the stream corresponding to {@code streamId}. {@code null} if
* {@code allowAppend} is {@code false} and the stream already exists.
* @throws Http2Exception If the stream id is not in the correct state to process the headers request
*/
private Http2HttpMessageAccumulator processHeadersBegin(int streamId, Http2Headers headers, boolean allowAppend)
throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
try {
if (msgAccumulator == null) {
msgAccumulator = connection.isServer() ? newHttpRequestAccumulator(headers)
: newHttpResponseAccumulator(headers);
} else if (allowAppend) {
if (msgAccumulator.headerConsumed()) {
if (msgAccumulator.trailerConsumed()) {
throw new IllegalStateException("Header received and trailer already consumed");
} else {
msgAccumulator.trailer(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHttpHeaders));
}
}
msgAccumulator.add(headers);
} else {
return null;
}
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_ID, streamId);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError("Headers Frame recieved for stream id %d which is in an invalid state",
streamId);
} catch (Http2Exception e) {
removeMessage(streamId);
throw e;
}
return msgAccumulator;
}
/**
* Extracts the common final header processing and internal tracking
*
* @param ctx The context for which this message has been received
* @param streamId The stream id the {@code msgAccumulator} corresponds to
* @param msgAccumulator The object which represents all data for corresponding to {@code streamId}
* @param endOfStream {@code true} if this is the last event for the stream
*/
private void processHeadersEnd(ChannelHandlerContext ctx, int streamId,
Http2HttpMessageAccumulator msgAccumulator, boolean endOfStream) {
if (endOfStream) {
msgAccumulator.endOfStream(ctx);
removeMessage(streamId);
} else {
putMessage(streamId, msgAccumulator);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream, boolean endSegment) throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(streamId, headers, true);
processHeadersEnd(ctx, streamId, msgAccumulator, endOfStream);
}
/**
* Set/clear all HTTP/1.x headers related to stream dependencies
*
* @param msgAccumulator The object which caches the HTTP/1.x headers
* @param streamDependency The stream id for which the {@code msgAccumulator} is dependent on
* @param weight The dependency weight
* @param exclusive The exlusive HTTP/2 flag
* @throws IllegalStateException If the {@code msgAccumulator} is not in a valid state to change headers
*/
private void setDependencyHeaders(Http2HttpMessageAccumulator msgAccumulator, int streamDependency, short weight,
boolean exclusive) throws IllegalStateException {
if (streamDependency != 0) {
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID, streamDependency);
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_EXCLUSIVE, exclusive);
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_WEIGHT, weight);
} else {
msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID);
msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_EXCLUSIVE);
msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_WEIGHT);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream, boolean endSegment)
throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(streamId, headers, true);
try {
setDependencyHeaders(msgAccumulator, streamDependency, weight, exclusive);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError("Headers Frame recieved for stream id %d which is in an invalid state",
streamId);
}
processHeadersEnd(ctx, streamId, msgAccumulator, endOfStream);
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
if (msgAccumulator == null) {
throw Http2Exception.protocolError("Priority Frame recieved for unknown stream id %d", streamId);
}
try {
setDependencyHeaders(msgAccumulator, streamDependency, weight, exclusive);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError("Priority Frame recieved for stream id %d which is in an invalid state",
streamId);
}
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
if (msgAccumulator != null) {
removeMessage(streamId);
}
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
// NOOP
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
// NOOP
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
// NOOP
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
// NOOP
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers,
int padding) throws Http2Exception {
// Do not allow adding of headers to existing Http2HttpMessageAccumulator
// according to spec (http://tools.ietf.org/html/draft-ietf-httpbis-http2-13#section-6.6) there must
// be a CONTINUATION frame for more headers
Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(promisedStreamId, headers, false);
if (msgAccumulator == null) {
throw Http2Exception.protocolError("Push Promise Frame recieved for pre-existing stream id %d",
promisedStreamId);
}
try {
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_PROMISE_ID, streamId);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError(
"Push Promise Frame recieved for stream id %d which is in an invalid state",
promisedStreamId);
}
processHeadersEnd(ctx, promisedStreamId, msgAccumulator, false);
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
// NOOP
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
// NOOP
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload) {
// NOOP
}
private Http2HttpMessageAccumulator putMessage(int streamId, Http2HttpMessageAccumulator message) {
return messageMap.put(streamId, message);
}
private Http2HttpMessageAccumulator getMessage(int streamId) {
return messageMap.get(streamId);
}
private Http2HttpMessageAccumulator removeMessage(int streamId) {
return messageMap.remove(streamId);
}
/**
* Translate HTTP/2 headers into an object which can build the corresponding HTTP/1.x response objects
*
* @param http2Headers The HTTP/2 headers corresponding to a new stream
* @return Collector for HTTP/1.x objects
* @throws Http2Exception If any of the HTTP/2 headers to not translate to valid HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
private Http2HttpMessageAccumulator newHttpResponseAccumulator(Http2Headers http2Headers) throws Http2Exception,
IllegalStateException {
HttpResponseStatus status = null;
try {
status = HttpResponseStatus.parseLine(http2Headers.status());
} catch (Exception e) {
throw Http2Exception.protocolError(
"Unrecognized HTTP status code '%s' encountered in translation to HTTP/1.x",
http2Headers.status());
}
// HTTP/2 does not define a way to carry the version or reason phrase that is included in an HTTP/1.1
// status line.
Http2HttpMessageAccumulator messageAccumulator = new Http2HttpMessageAccumulator(new DefaultHttpResponse(
HttpVersion.HTTP_1_1, status, validateHttpHeaders));
messageAccumulator.add(http2Headers);
return messageAccumulator;
}
/**
* Translate HTTP/2 headers into an object which can build the corresponding HTTP/1.x request objects
*
* @param http2Headers The HTTP/2 headers corresponding to a new stream
* @return Collector for HTTP/1.x objects
* @throws Http2Exception If any of the HTTP/2 headers to not translate to valid HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
private Http2HttpMessageAccumulator newHttpRequestAccumulator(Http2Headers http2Headers) throws Http2Exception,
IllegalStateException {
// HTTP/2 does not define a way to carry the version identifier that is
// included in the HTTP/1.1 request line.
Http2HttpMessageAccumulator messageAccumulator = new Http2HttpMessageAccumulator(new DefaultHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.valueOf(http2Headers.method()), http2Headers.path(),
validateHttpHeaders));
messageAccumulator.add(http2Headers);
return messageAccumulator;
}
/**
* Provides a container to collect HTTP/1.x objects until the end of the stream has been reached
*/
private final class Http2HttpMessageAccumulator {
private HttpMessage message;
private LastHttpContent trailer;
private long contentLength;
/**
* Creates a new instance
*
* @param message The HTTP/1.x object which represents the headers
*/
public Http2HttpMessageAccumulator(HttpMessage message) {
if (message == null) {
throw new NullPointerException("message");
}
this.message = message;
this.trailer = null;
this.contentLength = 0;
}
/**
* Set a HTTP/1.x header
*
* @param name The name of the header
* @param value The value of the header
* @return The headers object after the set operation
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
public HttpHeaders setHeader(CharSequence name, Object value) throws IllegalStateException {
HttpHeaders headers = currentHeaders();
if (headers == null) {
throw new IllegalStateException("Headers object is null");
}
return headers.set(name, value);
}
/**
* Removes the header with the specified name.
*
* @param name The name of the header to remove
* @return {@code true} if and only if at least one entry has been removed
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
public boolean removeHeader(CharSequence name) throws IllegalStateException {
HttpHeaders headers = currentHeaders();
if (headers == null) {
throw new IllegalStateException("Headers object is null");
}
return headers.remove(name);
}
/**
* Send the headers that have been accumulated so far, if they have not already been sent
*
* @param ctx The channel context for which to propagate events
* @param setContentLength {@code true} to set the Content-Length header
* @return {@code true} If a non-trailer header was fired to {@code ctx}, {@code false} otherwise
*/
private boolean sendHeaders(ChannelHandlerContext ctx, boolean setContentLength) {
HttpHeaders headers = currentHeaders();
if (headers == null) {
return false;
}
// Transfer-Encoding header is not valid
headers.remove(HttpHeaders.Names.TRANSFER_ENCODING);
headers.remove(HttpHeaders.Names.TRAILER);
// Initial header and trailer header will never need to be sent at the same time
// the behavior is that all header events will be appended to the initial header
// until some data is received, and only then will the trailer be used
if (!headerConsumed()) {
// The Connection and Keep-Alive headers are no longer valid
HttpHeaderUtil.setKeepAlive(message, true);
if (setContentLength) {
HttpHeaderUtil.setContentLength(message, contentLength);
}
ctx.fireChannelRead(message);
message = null;
return true;
} else if (trailerExists()) {
ctx.fireChannelRead(trailer);
trailer = LastHttpContent.EMPTY_LAST_CONTENT;
}
return false;
}
/**
* Called when the end of the stream is encountered
*
* @param ctx The channel context for which to propagate events
*/
public void endOfStream(ChannelHandlerContext ctx) {
if (sendHeaders(ctx, true)) {
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
}
}
/**
* Add a HTTP/1.x object which represents part of the message body
*
* @param httpContent The content to add
* @param ctx The channel context for which to propagate events
* @throws TooLongFrameException If the {@code contentLength} is exceeded with the addition of the
* {@code httpContent}
*/
public void add(HttpContent httpContent, ChannelHandlerContext ctx) throws TooLongFrameException {
ByteBuf content = httpContent.content();
if (contentLength > maxContentLength - content.readableBytes()) {
throw new TooLongFrameException("HTTP/2 content length exceeded " + maxContentLength + " bytes.");
}
contentLength += content.readableBytes();
sendHeaders(ctx, false);
ctx.fireChannelRead(httpContent.retain());
}
/**
* Extend the current set of HTTP/1.x headers
*
* @param http2Headers The HTTP/2 headers to be added
* @throws Http2Exception If any HTTP/2 headers do not map to HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
public void add(Http2Headers http2Headers) throws Http2Exception, IllegalStateException {
add(http2Headers, InboundHttp2ToHttpAdapter.this.connection.isServer() ? HEADER_NAME_TRANSLATIONS_REQUEST
: HEADER_NAME_TRANSLATIONS_RESPONSE);
}
/**
* Extend the current set of HTTP/1.x headers
*
* @param http2Headers The HTTP/2 headers to be added
* @param headerTranslations Translation map from HTTP/2 headers to HTTP/1.x headers
* @throws Http2Exception If any HTTP/2 headers do not map to HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
private void add(Http2Headers http2Headers, Map<String, String> headerTranslations) throws Http2Exception,
IllegalStateException {
HttpHeaders headers = currentHeaders();
if (headers == null) {
throw new IllegalStateException("Headers object is null");
}
// http://tools.ietf.org/html/draft-ietf-httpbis-http2-13#section-8.1.2.1
// All headers that start with ':' are only valid in HTTP/2 context
Iterator<Entry<String, String>> itr = http2Headers.iterator();
while (itr.hasNext()) {
Entry<String, String> entry = itr.next();
String translatedName = headerTranslations.get(entry.getKey());
if (translatedName != null || !HEADERS_TO_EXCLUDE.contains(entry.getKey())) {
if (translatedName == null) {
translatedName = entry.getKey();
}
if (translatedName.isEmpty() || translatedName.charAt(0) == ':') {
throw Http2Exception.protocolError(
"Unknown HTTP/2 header '%s' encountered in translation to HTTP/1.x",
translatedName);
} else {
headers.add(translatedName, entry.getValue());
}
}
}
}
/**
* Set the current trailer header
*
* @param trailer The object which represents the trailing headers
*/
public void trailer(LastHttpContent trailer) {
this.trailer = trailer;
}
/**
* Determine if the initial header and continuations have been processed and sent up the pipeline
*
* @return {@code true} if the initial header and continuations have been processed and sent up the pipeline
*/
public boolean headerConsumed() {
return message == null;
}
/**
* Determine if the trailing header and continuations have been processed and sent up the pipeline
*
* @return {@code true} if the trailing header and continuations have been processed and sent up the pipeline
*/
public boolean trailerConsumed() {
return trailer != null && trailer.equals(LastHttpContent.EMPTY_LAST_CONTENT);
}
/**
* Determine if there is a trailing header that has not yet been sent up the pipeline
*
* @return {@code true} if there is a trailing header that has not yet been sent up the pipeline
*/
public boolean trailerExists() {
return trailer != null && !trailerConsumed();
}
/**
* Obtain the current object for accumulating headers
*
* @return The primary (non-trailer) header if it exists, otherwise the trailer header
*/
private HttpHeaders currentHeaders() {
if (headerConsumed()) {
return trailerExists() ? trailer.trailingHeaders() : null;
}
return message.headers();
}
}
}

View File

@ -279,6 +279,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void headersReadForUnknownStreamShouldCreateStream() throws Exception {
when(remote.createStream(eq(5), eq(false))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, false, false);
verify(remote).createStream(eq(5), eq(false));
verify(observer).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
@ -287,6 +288,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception {
when(remote.createStream(eq(5), eq(true))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, true, false);
verify(remote).createStream(eq(5), eq(true));
verify(observer).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
@ -466,6 +468,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void headersWriteForUnknownStreamShouldCreateStream() throws Exception {
when(local.createStream(eq(5), eq(false))).thenReturn(stream);
handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, false, false);
verify(local).createStream(eq(5), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0),
@ -474,6 +477,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void headersWriteShouldCreateHalfClosedStream() throws Exception {
when(local.createStream(eq(5), eq(true))).thenReturn(stream);
handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, true, false);
verify(local).createStream(eq(5), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0),

View File

@ -0,0 +1,306 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpMethod.POST;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2TestUtil.*;
import static io.netty.util.CharsetUtil.*;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/**
* Testing the {@link DelegatingHttp2HttpConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames
*/
public class DelegatingHttp2HttpConnectionHandlerTest {
@Mock
private Http2FrameObserver clientObserver;
@Mock
private Http2FrameObserver serverObserver;
private Http2FrameWriter frameWriter;
private ServerBootstrap sb;
private Bootstrap cb;
private Channel serverChannel;
private Channel clientChannel;
private CountDownLatch requestLatch;
private long maxContentLength;
private static final int CONNECTION_SETUP_READ_COUNT = 2;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
maxContentLength = 1 << 16;
requestLatch = new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 1);
frameWriter = new DefaultHttp2FrameWriter();
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelegatingHttp2ConnectionHandler(true, new FrameCountDown()));
}
});
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelegatingHttp2HttpConnectionHandler(false, clientObserver));
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
}
@After
public void teardown() throws Exception {
serverChannel.close().sync();
sb.group().shutdownGracefully();
cb.group().shutdownGracefully();
}
@Test
public void testJustHeadersRequest() throws Exception {
final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/example");
final HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 5);
httpHeaders.set(HttpHeaders.Names.HOST, "http://my-user_name@www.example.org:5555/example");
httpHeaders.set(Http2HttpHeaders.Names.AUTHORITY, "www.example.org:5555");
httpHeaders.set(Http2HttpHeaders.Names.SCHEME, "http");
httpHeaders.add("foo", "goo");
httpHeaders.add("foo", "goo2");
httpHeaders.add("foo2", "goo2");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/example").authority("www.example.org:5555").scheme("http")
.add("foo", "goo").add("foo", "goo2").add("foo2", "goo2").build();
ChannelPromise writePromise = newPromise();
ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise);
writePromise.awaitUninterruptibly(2, SECONDS);
assertTrue(writePromise.isSuccess());
writeFuture.awaitUninterruptibly(2, SECONDS);
assertTrue(writeFuture.isSuccess());
awaitRequests();
verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(5), eq(http2Headers), eq(0),
anyShort(), anyBoolean(), eq(0), eq(true), eq(true));
verify(serverObserver, never()).onDataRead(any(ChannelHandlerContext.class),
anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), anyBoolean());
}
@Test
public void testRequestWithBody() throws Exception {
requestLatch = new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 2);
final String text = "foooooogoooo";
final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, "/example",
Unpooled.copiedBuffer(text, UTF_8));
final HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpHeaders.Names.HOST, "http://your_user-name123@www.example.org:5555/example");
httpHeaders.add("foo", "goo");
httpHeaders.add("foo", "goo2");
httpHeaders.add("foo2", "goo2");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("POST").path("/example").authority("www.example.org:5555").scheme("http")
.add("foo", "goo").add("foo", "goo2").add("foo2", "goo2").build();
final HttpContent expectedContent = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
ChannelPromise writePromise = newPromise();
ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise);
writePromise.awaitUninterruptibly(2, SECONDS);
assertTrue(writePromise.isSuccess());
writeFuture.awaitUninterruptibly(2, SECONDS);
assertTrue(writeFuture.isSuccess());
awaitRequests();
verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(http2Headers), eq(0),
anyShort(), anyBoolean(), eq(0), eq(false), eq(false));
verify(serverObserver).onDataRead(any(ChannelHandlerContext.class),
eq(3), eq(Unpooled.copiedBuffer(text.getBytes())), eq(0), eq(true), eq(true));
}
private void awaitRequests() throws Exception {
requestLatch.await(2, SECONDS);
}
private ChannelHandlerContext ctx() {
return clientChannel.pipeline().firstContext();
}
private ChannelPromise newPromise() {
return ctx().newPromise();
}
/**
* A decorator around the serverObserver that counts down the latch so that we can await the
* completion of the request.
*/
private final class FrameCountDown implements Http2FrameObserver {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment)
throws Http2Exception {
serverObserver.onDataRead(ctx, streamId, copy(data), padding, endOfStream,
endOfSegment);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream, boolean endSegment) throws Http2Exception {
serverObserver.onHeadersRead(ctx, streamId, headers, padding, endStream, endSegment);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream, boolean endSegment) throws Http2Exception {
serverObserver.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream, endSegment);
requestLatch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
serverObserver.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
requestLatch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
serverObserver.onRstStreamRead(ctx, streamId, errorCode);
requestLatch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
serverObserver.onSettingsAckRead(ctx);
requestLatch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
serverObserver.onSettingsRead(ctx, settings);
requestLatch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
serverObserver.onPingRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
serverObserver.onPingAckRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
serverObserver.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
requestLatch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
serverObserver.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData));
requestLatch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement) throws Http2Exception {
serverObserver.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
requestLatch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) {
serverObserver.onUnknownFrame(ctx, frameType, streamId, flags, payload);
requestLatch.countDown();
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
}
}
}

View File

@ -85,7 +85,7 @@ public class Http2ConnectionRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelegatingHttp2ConnectionHandler(false, serverObserver));
p.addLast(new DelegatingHttp2ConnectionHandler(false, clientObserver));
}
});

View File

@ -0,0 +1,579 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2TestUtil.*;
import static io.netty.util.CharsetUtil.*;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/**
* Testing the {@link InboundHttp2ToHttpAdapter} for HTTP/2 frames into {@link HttpObject}s
*/
public class InboundHttp2ToHttpAdapterTest {
@Mock
private HttpResponseListener messageObserver;
@Mock
private Http2FrameObserver clientObserver;
private Http2FrameWriter frameWriter;
private ServerBootstrap sb;
private Bootstrap cb;
private Channel serverChannel;
private Channel clientChannel;
private CountDownLatch serverLatch;
private long maxContentLength;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
maxContentLength = 1 << 16;
serverLatch = new CountDownLatch(1);
frameWriter = new DefaultHttp2FrameWriter();
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(true);
p.addLast("reader", new FrameAdapter(InboundHttp2ToHttpAdapter.newInstance(
connection, maxContentLength),
new CountDownLatch(10)));
p.addLast(new HttpResponseDelegator(messageObserver));
}
});
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("reader", new FrameAdapter(clientObserver, new CountDownLatch(10)));
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
}
@After
public void teardown() throws Exception {
serverChannel.close().sync();
sb.group().shutdownGracefully();
cb.group().shutdownGracefully();
}
@Test
public void clientRequestSingleHeaderNoDataFrames() throws Exception {
serverLatch = new CountDownLatch(2);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.SCHEME, "https");
httpHeaders.set(Http2HttpHeaders.Names.AUTHORITY, "example.org");
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").scheme("https").authority("example.org")
.path("/some/path/resource2").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, true, false);
}
});
awaitRequests();
verify(messageObserver).messageReceived(eq(request));
verify(messageObserver).messageReceived(eq(LastHttpContent.EMPTY_LAST_CONTENT));
}
@Test
public void clientRequestOneDataFrame() throws Exception {
serverLatch = new CountDownLatch(2);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "hello world";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, true);
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
}
@Test
public void clientRequestMultipleDataFrames() throws Exception {
serverLatch = new CountDownLatch(3);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "hello world";
final String text2 = "hello world2";
final HttpContent content = new DefaultHttpContent(Unpooled.copiedBuffer(text.getBytes()));
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, true);
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(3)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(content2, capturedHttpObjects.get(2));
}
@Test
public void clientRequestMultipleEmptyDataFrames() throws Exception {
serverLatch = new CountDownLatch(4);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "";
final HttpContent content = new DefaultHttpContent(Unpooled.copiedBuffer(text.getBytes()));
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, true);
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(content, capturedHttpObjects.get(2));
assertEquals(content2, capturedHttpObjects.get(3));
}
@Test
public void clientRequestHeaderContinuation() throws Exception {
serverLatch = new CountDownLatch(2);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set("foo", "goo");
httpHeaders.set("foo2", "goo2");
httpHeaders.add("foo2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo")
.set("foo2", "goo2").add("foo2", "goo3").build();
final String text = "";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false);
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, true);
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
}
@Test
public void clientRequestTrailingHeaders() throws Exception {
serverLatch = new CountDownLatch(3);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, true);
HttpHeaders trailingHeaders = trailer.trailingHeaders();
trailingHeaders.set("foo", "goo");
trailingHeaders.set("foo2", "goo2");
trailingHeaders.add("foo2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo")
.set("foo2", "goo2").add("foo2", "goo3").build();
final String text = "not empty!";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, false);
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, true, true);
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(3)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(trailer, capturedHttpObjects.get(2));
}
@Test
public void clientRequestPushPromise() throws Exception {
serverLatch = new CountDownLatch(4);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final HttpMessage request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders2 = request2.headers();
httpHeaders2.set(Http2HttpHeaders.Names.SCHEME, "https");
httpHeaders2.set(Http2HttpHeaders.Names.AUTHORITY, "example.org");
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_PROMISE_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").scheme("https")
.authority("example.org").build();
final String text = "hello 1!**";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
final String text2 = "hello 2!><";
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false);
frameWriter.writePushPromise(ctx(), newPromise(), 3, 5, http2Headers2, 0);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, true);
frameWriter.writeData(ctx(), newPromise(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, true);
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(request2, capturedHttpObjects.get(2));
assertEquals(content2, capturedHttpObjects.get(3));
}
@Test
public void clientRequestStreamDependency() throws Exception {
serverLatch = new CountDownLatch(4);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final HttpMessage request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders2 = request2.headers();
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID, 3);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_EXCLUSIVE, true);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_WEIGHT, 256);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "hello 1!**";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
final String text2 = "hello 2!><";
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false);
frameWriter.writeHeaders(ctx(), newPromise(), 5, http2Headers2, 0, false, false);
frameWriter.writePriority(ctx(), newPromise(), 5, 3, (short) 256, true);
frameWriter.writeData(ctx(), newPromise(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, true);
frameWriter.writeData(ctx(), newPromise(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, true);
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(request2, capturedHttpObjects.get(2));
assertEquals(content2, capturedHttpObjects.get(3));
}
private void awaitRequests() throws Exception {
serverLatch.await(2, SECONDS);
}
private ChannelHandlerContext ctx() {
return clientChannel.pipeline().firstContext();
}
private ChannelPromise newPromise() {
return ctx().newPromise();
}
private interface HttpResponseListener {
void messageReceived(HttpObject obj);
}
private final class HttpResponseDelegator extends SimpleChannelInboundHandler<HttpObject> {
private final HttpResponseListener listener;
public HttpResponseDelegator(HttpResponseListener listener) {
this.listener = listener;
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
this.listener.messageReceived(msg);
serverLatch.countDown();
}
}
private final class FrameAdapter extends ByteToMessageDecoder {
private final Http2FrameObserver observer;
private final DefaultHttp2FrameReader reader;
private final CountDownLatch requestLatch;
FrameAdapter(Http2FrameObserver observer, CountDownLatch requestLatch) {
this.observer = observer;
reader = new DefaultHttp2FrameReader();
this.requestLatch = requestLatch;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
reader.readFrame(ctx, in, new Http2FrameObserver() {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream, boolean endOfSegment)
throws Http2Exception {
observer.onDataRead(ctx, streamId, copy(data), padding, endOfStream,
endOfSegment);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endStream, boolean endSegment)
throws Http2Exception {
observer.onHeadersRead(ctx, streamId, headers, padding, endStream, endSegment);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream, boolean endSegment)
throws Http2Exception {
observer.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream, endSegment);
requestLatch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId,
int streamDependency, short weight, boolean exclusive)
throws Http2Exception {
observer.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
requestLatch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
observer.onRstStreamRead(ctx, streamId, errorCode);
requestLatch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
observer.onSettingsAckRead(ctx);
requestLatch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
observer.onSettingsRead(ctx, settings);
requestLatch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
observer.onPingRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
observer.onPingAckRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding)
throws Http2Exception {
observer.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
requestLatch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId,
long errorCode, ByteBuf debugData) throws Http2Exception {
observer.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData));
requestLatch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement) throws Http2Exception {
observer.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
requestLatch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) {
observer.onUnknownFrame(ctx, frameType, streamId, flags, payload);
requestLatch.countDown();
}
});
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
}
}
}

View File

@ -35,6 +35,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
import java.util.HashSet;
import java.util.TimeZone;
public class DefaultTextHeaders implements TextHeaders {
@ -876,6 +878,54 @@ public class DefaultTextHeaders implements TextHeaders {
return this;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
for (String name : names()) {
result = prime * result + name.hashCode();
Set<String> values = new TreeSet<String>(getAll(name));
for (String value : values) {
result = prime * result + value.hashCode();
}
}
return result;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof DefaultTextHeaders)) {
return false;
}
DefaultTextHeaders other = (DefaultTextHeaders) o;
// First, check that the set of names match.
Set<String> names = names();
if (!names.equals(other.names())) {
return false;
}
// Compare the values for each name.
for (String name : names) {
List<String> values = getAll(name);
List<String> otherValues = other.getAll(name);
if (values.size() != otherValues.size()) {
return false;
}
// Convert the values to a set and remove values from the other object to see if
// they match.
Set<String> valueSet = new HashSet<String>(values);
valueSet.removeAll(otherValues);
if (!valueSet.isEmpty()) {
return false;
}
}
return true;
}
private static final class HeaderEntry implements Map.Entry<CharSequence, CharSequence> {
private final DefaultTextHeaders parent;
final int hash;

View File

@ -0,0 +1,80 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import static org.junit.Assert.*;
public class DefaultTextHeadersTest {
@Test
public void testEqualsMultipleHeaders() {
DefaultTextHeaders h1 = new DefaultTextHeaders();
h1.set("foo", "goo");
h1.set("foo2", "goo2");
DefaultTextHeaders h2 = new DefaultTextHeaders();
h2.set("foo", "goo");
h2.set("foo2", "goo2");
assertTrue(h1.equals(h2));
assertTrue(h2.equals(h1));
assertTrue(h2.equals(h2));
assertTrue(h1.equals(h1));
}
@Test
public void testEqualsDuplicateMultipleHeaders() {
DefaultTextHeaders h1 = new DefaultTextHeaders();
h1.set("foo", "goo");
h1.set("foo2", "goo2");
h1.add("foo2", "goo3");
h1.add("foo", "goo4");
DefaultTextHeaders h2 = new DefaultTextHeaders();
h2.set("foo", "goo");
h2.set("foo2", "goo2");
h2.add("foo", "goo4");
h2.add("foo2", "goo3");
assertTrue(h1.equals(h2));
assertTrue(h2.equals(h1));
assertTrue(h2.equals(h2));
assertTrue(h1.equals(h1));
}
@Test
public void testNotEqualsDuplicateMultipleHeaders() {
DefaultTextHeaders h1 = new DefaultTextHeaders();
h1.set("foo", "goo");
h1.set("foo2", "goo2");
h1.add("foo2", "goo3");
h1.add("foo", "goo4");
DefaultTextHeaders h2 = new DefaultTextHeaders();
h2.set("foo", "goo");
h2.set("foo2", "goo2");
h2.add("foo", "goo4");
assertFalse(h1.equals(h2));
assertFalse(h2.equals(h1));
assertTrue(h2.equals(h2));
assertTrue(h1.equals(h1));
}
}

View File

@ -175,7 +175,6 @@ public final class HttpUploadClient {
//connection will not close but needed
// headers.set("Connection","keep-alive");
// headers.set("Keep-Alive","300");
headers.set(
HttpHeaders.Names.COOKIE, ClientCookieEncoder.encode(

View File

@ -30,7 +30,9 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.CharsetUtil;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpVersion.*;
@ -45,6 +47,9 @@ public final class Http2Client {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
static final String URL = System.getProperty("url", "/whatever");
static final String URL2 = System.getProperty("url2");
static final String URL2DATA = System.getProperty("url2data", "test data!");
public static void main(String[] args) throws Exception {
// Configure SSL.
@ -59,7 +64,7 @@ public final class Http2Client {
}
EventLoopGroup workerGroup = new NioEventLoopGroup();
Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx);
Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx, Integer.MAX_VALUE);
try {
// Configure the client.
@ -75,23 +80,34 @@ public final class Http2Client {
System.out.println("Connected to [" + HOST + ':' + PORT + ']');
// Wait for the HTTP/2 upgrade to occur.
Http2ClientConnectionHandler http2ConnectionHandler = initializer.connectionHandler();
http2ConnectionHandler.awaitInitialization();
Http2SettingsHandler http2SettingsHandler = initializer.settingsHandler();
http2SettingsHandler.awaitSettings(5, TimeUnit.SECONDS);
// Create a simple POST request with just headers.
FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, "/whatever",
Unpooled.copiedBuffer("sample data".getBytes(CharsetUtil.UTF_8)));
request.headers().add(HttpHeaders.Names.HOST, HOST + ':' + PORT);
// Send the request to the server.
System.err.println("Sending request...");
ChannelFuture requestFuture = channel.writeAndFlush(request).sync();
System.err.println("Back from sending headers...");
requestFuture.sync();
// Waits for the complete response
http2ConnectionHandler.awaitResponse();
System.out.println("Finished HTTP/2 request");
HttpResponseHandler responseHandler = initializer.responseHandler();
int streamId = 3;
URI hostName = URI.create((SSL ? "https" : "http") + "://" + HOST + ':' + PORT);
System.err.println("Sending request(s)...");
if (URL != null) {
// Create a simple GET request.
FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, URL);
request.headers().add(HttpHeaders.Names.HOST, hostName);
request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
channel.writeAndFlush(request);
responseHandler.put(streamId, channel.newPromise());
streamId += 2;
}
if (URL2 != null) {
// Create a simple POST request with a body.
FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, URL2,
Unpooled.copiedBuffer(URL2DATA.getBytes(CharsetUtil.UTF_8)));
request.headers().add(HttpHeaders.Names.HOST, hostName);
request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
channel.writeAndFlush(request);
responseHandler.put(streamId, channel.newPromise());
streamId += 2;
}
responseHandler.awaitResponses(5, TimeUnit.SECONDS);
System.out.println("Finished HTTP/2 request(s)");
// Wait until the connection is closed.
channel.close().syncUninterruptibly();

View File

@ -1,184 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.example.http2.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static io.netty.example.http2.Http2ExampleUtil.*;
import static io.netty.util.internal.logging.InternalLogLevel.*;
/**
* A subclass of the connection handler that interprets response messages as text and prints it out
* to the console.
*/
public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler {
private static final Http2FrameLogger logger =
new Http2FrameLogger(INFO, InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class));
private final ChannelPromise initPromise;
private final ChannelPromise responsePromise;
private ByteBuf collectedData;
public Http2ClientConnectionHandler(ChannelPromise initPromise, ChannelPromise responsePromise) {
this(initPromise, responsePromise, new DefaultHttp2Connection(false));
}
private Http2ClientConnectionHandler(ChannelPromise initPromise,
ChannelPromise responsePromise, Http2Connection connection) {
super(connection, frameReader(), frameWriter(), new DefaultHttp2InboundFlowController(
connection), new DefaultHttp2OutboundFlowController(connection));
this.initPromise = initPromise;
this.responsePromise = responsePromise;
}
/**
* Wait for this handler to be added after the upgrade to HTTP/2, and for initial preface
* handshake to complete.
*/
public void awaitInitialization() throws Exception {
if (!initPromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for initialization");
}
if (!initPromise.isSuccess()) {
throw new RuntimeException(initPromise.cause());
}
}
/**
* Wait for this full response to be received and printed out.
*/
public void awaitResponse() throws Exception {
if (!responsePromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for completion of the response");
}
if (!responsePromise.isSuccess()) {
throw new RuntimeException(initPromise.cause());
}
}
/**
* Handles conversion of a {@link FullHttpMessage} to HTTP/2 frames.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof FullHttpMessage) {
FullHttpMessage httpMsg = (FullHttpMessage) msg;
boolean hasData = httpMsg.content().isReadable();
// Convert and write the headers.
DefaultHttp2Headers.Builder headers = DefaultHttp2Headers.newBuilder();
for (Map.Entry<String, String> entry : httpMsg.headers().entries()) {
headers.add(entry.getKey(), entry.getValue());
}
int streamId = nextStreamId();
writeHeaders(ctx, promise, streamId, headers.build(), 0, !hasData, false);
if (hasData) {
writeData(ctx, ctx.newPromise(), streamId, httpMsg.content(), 0, true, true);
}
} else {
ctx.write(msg, promise);
}
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment) throws Http2Exception {
// Copy the data into the buffer.
int available = data.readableBytes();
if (collectedData == null) {
collectedData = ctx().alloc().buffer(available);
collectedData.writeBytes(data, data.readerIndex(), data.readableBytes());
} else {
// Expand the buffer
ByteBuf newBuffer = ctx().alloc().buffer(collectedData.readableBytes() + available);
newBuffer.writeBytes(collectedData);
newBuffer.writeBytes(data);
collectedData.release();
collectedData = newBuffer;
}
// If it's the last frame, print the complete message.
if (endOfStream) {
System.out.println("Received message: " + collectedData.toString(CharsetUtil.UTF_8));
// Free the data buffer.
collectedData.release();
collectedData = null;
responsePromise.setSuccess();
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
boolean endSegment) throws Http2Exception {
if (headers.contains(UPGRADE_RESPONSE_HEADER)) {
System.out.println("Received HTTP/2 response to the HTTP->HTTP/2 upgrade request");
}
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
if (!initPromise.isDone()) {
initPromise.setSuccess();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!initPromise.isDone()) {
initPromise.setFailure(cause);
}
if (!responsePromise.isDone()) {
initPromise.setFailure(cause);
}
super.exceptionCaught(ctx, cause);
}
private static Http2FrameReader frameReader() {
return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger);
}
private static Http2FrameWriter frameWriter() {
return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger);
}
}

View File

@ -17,30 +17,62 @@ package io.netty.example.http2.client;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DelegatingHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DelegatingHttp2HttpConnectionHandler;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.util.internal.logging.InternalLogLevel.*;
/**
* Configures the client pipeline to support HTTP/2 frames.
*/
public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
private static final Http2FrameLogger logger =
new Http2FrameLogger(INFO, InternalLoggerFactory.getInstance(Http2ClientInitializer.class));
private final SslContext sslCtx;
private Http2ClientConnectionHandler connectionHandler;
private final long maxContentLength;
private DelegatingHttp2ConnectionHandler connectionHandler;
private HttpResponseHandler responseHandler;
private Http2SettingsHandler settingsHandler;
public Http2ClientInitializer(SslContext sslCtx) {
public Http2ClientInitializer(SslContext sslCtx, long maxContentLength) {
this.sslCtx = sslCtx;
this.maxContentLength = maxContentLength;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
connectionHandler = new Http2ClientConnectionHandler(ch.newPromise(), ch.newPromise());
Http2Connection connection = new DefaultHttp2Connection(false);
connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection,
frameReader(), frameWriter(), new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2OutboundFlowController(connection),
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength));
responseHandler = new HttpResponseHandler();
settingsHandler = new Http2SettingsHandler(ch.newPromise());
if (sslCtx != null) {
configureSsl(ch);
} else {
@ -48,15 +80,29 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
}
}
public Http2ClientConnectionHandler connectionHandler() {
return connectionHandler;
public HttpResponseHandler responseHandler() {
return responseHandler;
}
public Http2SettingsHandler settingsHandler() {
return settingsHandler;
}
protected void configureEndOfPipeline(ChannelPipeline pipeline) {
pipeline.addLast("Http2SettingsHandler", settingsHandler);
pipeline.addLast("Decompressor", new HttpContentDecompressor());
pipeline.addLast("Aggregator", new HttpObjectAggregator((int) maxContentLength));
pipeline.addLast("HttpResponseHandler", responseHandler);
}
/**
* Configure the pipeline for TLS NPN negotiation to HTTP/2.
*/
private void configureSsl(SocketChannel ch) {
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), connectionHandler);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("SslHandler", sslCtx.newHandler(ch.alloc()));
pipeline.addLast("Http2Handler", connectionHandler);
configureEndOfPipeline(pipeline);
}
/**
@ -67,16 +113,16 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(connectionHandler);
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536);
ch.pipeline().addLast(sourceCodec);
ch.pipeline().addLast(upgradeHandler);
ch.pipeline().addLast(new UpgradeRequestHandler());
ch.pipeline().addLast(new UserEventLogger());
ch.pipeline().addLast("Http2SourceCodec", sourceCodec);
ch.pipeline().addLast("Http2UpgradeHandler", upgradeHandler);
ch.pipeline().addLast("Http2UpgradeRequestHandler", new UpgradeRequestHandler());
ch.pipeline().addLast("Logger", new UserEventLogger());
}
/**
* A handler that triggers the cleartext upgrade to HTTP/2 by sending an initial HTTP request.
*/
private static class UpgradeRequestHandler extends ChannelHandlerAdapter {
private final class UpgradeRequestHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest upgradeRequest =
@ -86,7 +132,9 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
super.channelActive(ctx);
// Done with this handler, remove it from the pipeline.
ctx.pipeline().remove(ctx.name());
ctx.pipeline().remove(this);
Http2ClientInitializer.this.configureEndOfPipeline(ctx.pipeline());
}
}
@ -100,4 +148,12 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
super.userEventTriggered(ctx, evt);
}
}
private static Http2FrameReader frameReader() {
return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger);
}
private static Http2FrameWriter frameWriter() {
return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.example.http2.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2Settings;
import java.util.concurrent.TimeUnit;
/**
* Reads the first {@link Http2Settings} object and notifies a {@link ChannelPromise}
*/
public class Http2SettingsHandler extends SimpleChannelInboundHandler<Http2Settings> {
private ChannelPromise promise;
/**
* Create new instance
*
* @param promise Promise object used to notify when first settings are received
*/
public Http2SettingsHandler(ChannelPromise promise) {
this.promise = promise;
}
/**
* Wait for this handler to be added after the upgrade to HTTP/2, and for initial preface
* handshake to complete.
*
* @param timeout Time to wait
* @param Units for {@code timeout}
* @throws Exception if timeout or other failure occurs
*/
public void awaitSettings(long timeout, TimeUnit unit) throws Exception {
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for settings");
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, Http2Settings msg) throws Exception {
promise.setSuccess();
// Only care about the first settings message
ctx.pipeline().remove(this);
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.example.http2.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.Http2HttpHeaders;
import io.netty.util.CharsetUtil;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* Process {@link FullHttpResponse} translated from HTTP/2 frames
*/
public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
private SortedMap<Integer, ChannelPromise> streamidPromiseMap;
public HttpResponseHandler() {
streamidPromiseMap = new TreeMap<Integer, ChannelPromise>();
}
/**
* Create an association between an anticipated response stream id and a {@link ChannelPromise}
*
* @param streamId The stream for which a response is expected
* @param promise The promise object that will be used to wait/notify events
* @return The previous object associated with {@code streamId}
* @see {@link io.netty.example.http2.client.HttpResponseHandler#awaitResponses awaitResponses}
*/
public ChannelPromise put(int streamId, ChannelPromise promise) {
return streamidPromiseMap.put(streamId, promise);
}
/**
* Wait (sequentially) for a time duration for each anticipated response
*
* @param timeout Value of time to wait for each response
* @param unit Units associated with {@code timeout}
* @see {@link io.netty.example.http2.client.HttpResponseHandler#put put}
*/
public void awaitResponses(long timeout, TimeUnit unit) {
Iterator<Entry<Integer, ChannelPromise>> itr = streamidPromiseMap.entrySet().iterator();
while (itr.hasNext()) {
Entry<Integer, ChannelPromise> entry = itr.next();
ChannelPromise promise = entry.getValue();
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
System.out.println("---Stream id: " + entry.getKey() + " received---");
itr.remove();
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
int streamId = Integer.parseInt(msg.headers().get(Http2HttpHeaders.Names.STREAM_ID));
ChannelPromise promise = streamidPromiseMap.get(streamId);
if (promise == null) {
System.err.println("Message received for unknown stream id " + streamId);
} else {
// Do stuff with the message (for now just print it)
ByteBuf content = msg.content();
if (content.isReadable()) {
int contentLength = content.readableBytes();
byte[] arr = new byte[contentLength];
content.readBytes(arr);
System.out.println(new String(arr, 0, contentLength, CharsetUtil.UTF_8));
}
promise.setSuccess();
}
}
}

View File

@ -21,7 +21,6 @@ import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER;
import static io.netty.util.internal.logging.InternalLogLevel.INFO;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.http2.client.Http2ClientConnectionHandler;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
@ -45,7 +44,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
private static final Http2FrameLogger logger = new Http2FrameLogger(INFO,
InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class));
InternalLoggerFactory.getInstance(HelloWorldHttp2Handler.class));
static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8));
public HelloWorldHttp2Handler() {
@ -68,7 +67,8 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) {
// Write an HTTP/2 response to the upgrade request
Http2Headers headers =
DefaultHttp2Headers.newBuilder().set(UPGRADE_RESPONSE_HEADER, "true").build();
DefaultHttp2Headers.newBuilder().status("200")
.set(UPGRADE_RESPONSE_HEADER, "true").build();
writeHeaders(ctx, ctx.newPromise(), 1, headers, 0, true, true);
}
super.userEventTriggered(ctx, evt);