diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java index 56cd37c402..7e0427d380 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java @@ -23,7 +23,7 @@ import java.util.Map; * The default {@link HttpMessage} implementation. */ public abstract class DefaultHttpMessage extends DefaultHttpObject implements HttpMessage { - + private static final int HASH_CODE_PRIME = 31; private HttpVersion version; private final HttpHeaders headers; @@ -55,6 +55,28 @@ public abstract class DefaultHttpMessage extends DefaultHttpObject implements Ht return version; } + @Override + public int hashCode() { + int result = 1; + result = HASH_CODE_PRIME * result + headers.hashCode(); + result = HASH_CODE_PRIME * result + version.hashCode(); + result = HASH_CODE_PRIME * result + super.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultHttpMessage)) { + return false; + } + + DefaultHttpMessage other = (DefaultHttpMessage) o; + + return headers().equals(other.headers()) && + protocolVersion().equals(other.protocolVersion()) && + super.equals(o); + } + @Override public String toString() { StringBuilder buf = new StringBuilder(); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java index 24eade9be7..42c7f79857 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.http; import io.netty.handler.codec.DecoderResult; public class DefaultHttpObject implements HttpObject { - + private static final int HASH_CODE_PRIME = 31; private DecoderResult decoderResult = DecoderResult.SUCCESS; protected DefaultHttpObject() { @@ -37,4 +37,22 @@ public class DefaultHttpObject implements HttpObject { } this.decoderResult = decoderResult; } + + @Override + public int hashCode() { + int result = 1; + result = HASH_CODE_PRIME * result + decoderResult.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultHttpObject)) { + return false; + } + + DefaultHttpObject other = (DefaultHttpObject) o; + + return decoderResult().equals(other.decoderResult()); + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java index eddc01e027..1ff6fe9793 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java @@ -69,6 +69,26 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo return this; } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + status.hashCode(); + result = prime * result + super.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultHttpResponse)) { + return false; + } + + DefaultHttpResponse other = (DefaultHttpResponse) o; + + return status().equals(other.status()) && super.equals(o); + } + @Override public String toString() { StringBuilder buf = new StringBuilder(); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java index adb837c5e9..1c042c227f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java @@ -333,6 +333,18 @@ public interface HttpHeaders extends TextHeaders { * {@code "WWW-Authenticate"} */ public static final AsciiString WWW_AUTHENTICATE = new AsciiString("WWW-Authenticate"); + /** + * {@code "Keep-Alive"} + * @deprecated use {@link #CONNECTION} + */ + @Deprecated + public static final AsciiString KEEP_ALIVE = new AsciiString("Keep-Alive"); + /** + * {@code "Proxy-Connection"} + * @deprecated use {@link #CONNECTION} + */ + @Deprecated + public static final AsciiString PROXY_CONNECTION = new AsciiString("Proxy-Connection"); private Names() { } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java index c397fac7d9..556c21eb96 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java @@ -205,6 +205,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { } } + @SuppressWarnings("deprecation") private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage) throws Exception { // Get the Stream-ID, Associated-To-Stream-ID, Priority, URL, and scheme from the headers final HttpHeaders httpHeaders = httpMessage.headers(); @@ -222,8 +223,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { // The Connection, Keep-Alive, Proxy-Connection, and Transfer-Encoding // headers are not valid and MUST not be sent. httpHeaders.remove(HttpHeaders.Names.CONNECTION); - httpHeaders.remove("Keep-Alive"); - httpHeaders.remove("Proxy-Connection"); + httpHeaders.remove(HttpHeaders.Names.KEEP_ALIVE); + httpHeaders.remove(HttpHeaders.Names.PROXY_CONNECTION); httpHeaders.remove(HttpHeaders.Names.TRANSFER_ENCODING); SpdySynStreamFrame spdySynStreamFrame = @@ -268,6 +269,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { return spdySynStreamFrame; } + @SuppressWarnings("deprecation") private SpdySynReplyFrame createSynReplyFrame(HttpResponse httpResponse) throws Exception { // Get the Stream-ID from the headers final HttpHeaders httpHeaders = httpResponse.headers(); @@ -277,8 +279,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { // The Connection, Keep-Alive, Proxy-Connection, and Transfer-Encoding // headers are not valid and MUST not be sent. httpHeaders.remove(HttpHeaders.Names.CONNECTION); - httpHeaders.remove("Keep-Alive"); - httpHeaders.remove("Proxy-Connection"); + httpHeaders.remove(HttpHeaders.Names.KEEP_ALIVE); + httpHeaders.remove(HttpHeaders.Names.PROXY_CONNECTION); httpHeaders.remove(HttpHeaders.Names.TRANSFER_ENCODING); SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamID); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java index fba729c9f5..f53ca9d1ac 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java @@ -332,7 +332,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode /** * Gets the next stream ID that can be created by the local endpoint. */ - protected final int nextStreamId() { + public final int nextStreamId() { return connection.local().nextStreamId(); } @@ -374,7 +374,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode Http2Stream stream = connection.stream(streamId); if (stream == null) { // Create a new locally-initiated stream. - createLocalStream(streamId, endStream); + stream = createLocalStream(streamId, endStream); } else { // An existing stream... if (stream.state() == RESERVED_LOCAL) { @@ -390,15 +390,17 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode stream.setPriority(streamDependency, weight, exclusive); } } - - // If the headers are the end of the stream, close it now. - if (endStream) { - closeLocalSide(stream, promise); - } } - return frameWriter.writeHeaders(ctx, promise, streamId, headers, streamDependency, + ChannelFuture future = frameWriter.writeHeaders(ctx, promise, streamId, headers, streamDependency, weight, exclusive, padding, endStream, endSegment); + + // If the headers are the end of the stream, close it now. + if (endStream) { + closeLocalSide(stream, promise); + } + + return future; } catch (Http2Exception e) { return promise.setFailure(e); } @@ -430,10 +432,12 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode return promise; } + ChannelFuture future = frameWriter.writeRstStream(ctx, promise, streamId, errorCode); + stream.terminateSent(); close(stream, promise); - return frameWriter.writeRstStream(ctx, promise, streamId, errorCode); + return future; } protected ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, @@ -781,12 +785,12 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode return; } + AbstractHttp2ConnectionHandler.this.onDataRead(ctx, streamId, data, padding, endOfStream, + endOfSegment); + if (endOfStream) { closeRemoteSide(stream, ctx.newSucceededFuture()); } - - AbstractHttp2ConnectionHandler.this.onDataRead(ctx, streamId, data, padding, endOfStream, - endOfSegment); } /** @@ -840,7 +844,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode } if (stream == null) { - createRemoteStream(streamId, endStream); + stream = createRemoteStream(streamId, endStream); } else { if (stream.state() == RESERVED_REMOTE) { // Received headers for a reserved push stream ... open it for push to the @@ -858,15 +862,15 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode stream.setPriority(streamDependency, weight, exclusive); } } - - // If the headers completes this stream, close it. - if (endStream) { - closeRemoteSide(stream, ctx.newSucceededFuture()); - } } AbstractHttp2ConnectionHandler.this.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, endSegment); + + // If the headers completes this stream, close it. + if (endStream) { + closeRemoteSide(stream, ctx.newSucceededFuture()); + } } @Override @@ -901,9 +905,10 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode } stream.terminateReceived(); - close(stream, ctx.newSucceededFuture()); AbstractHttp2ConnectionHandler.this.onRstStreamRead(ctx, streamId, errorCode); + + close(stream, ctx.newSucceededFuture()); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java index 27bfb77c88..c03d3707b3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java @@ -451,6 +451,8 @@ public class DefaultHttp2FrameReader implements Http2FrameReader { settings.put(id, value); } observer.onSettingsRead(ctx, settings); + // Provide an interface for non-observers to capture settings + ctx.fireChannelRead(settings); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandler.java new file mode 100644 index 0000000000..baa26b8681 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandler.java @@ -0,0 +1,189 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelPromiseAggregator; +import io.netty.handler.codec.http.FullHttpMessage; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; + +import java.net.URI; +import java.util.Map; + +/** + * Light weight wrapper around {@link DelegatingHttp2ConnectionHandler} to provide HTTP/1.x object to HTTP/2 encoding + */ +public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2ConnectionHandler { + + public DelegatingHttp2HttpConnectionHandler(boolean server, Http2FrameObserver observer) { + super(server, observer); + } + + public DelegatingHttp2HttpConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, + Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow, + Http2OutboundFlowController outboundFlow, Http2FrameObserver observer) { + super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, observer); + } + + public DelegatingHttp2HttpConnectionHandler(Http2Connection connection, Http2FrameObserver observer) { + super(connection, observer); + } + + /** + * Add HTTP/2 headers based upon HTTP/1.x headers from a {@link HttpHeaders} + * + * @param httpHeaders The HTTP/1.x request object to pull headers from + * @param http2Headers The HTTP/2 headers object to add headers to + */ + private void addHeaders(HttpHeaders httpHeaders, DefaultHttp2Headers.Builder http2Headers) { + String value = httpHeaders.get(HttpHeaders.Names.HOST); + if (value != null) { + URI hostUri = URI.create(value); + // The authority MUST NOT include the deprecated "userinfo" subcomponent + value = hostUri.getAuthority(); + if (value != null) { + http2Headers.authority(value.replaceFirst("^.*@", "")); + } + value = hostUri.getScheme(); + if (value != null) { + http2Headers.scheme(value); + } + httpHeaders.remove(HttpHeaders.Names.HOST); + } + + // Consume the Authority extension header if present + value = httpHeaders.get(Http2HttpHeaders.Names.AUTHORITY); + if (value != null) { + http2Headers.authority(value); + httpHeaders.remove(Http2HttpHeaders.Names.AUTHORITY); + } + + // Consume the Scheme extension header if present + value = httpHeaders.get(Http2HttpHeaders.Names.SCHEME); + if (value != null) { + http2Headers.scheme(value); + httpHeaders.remove(Http2HttpHeaders.Names.SCHEME); + } + } + + /** + * Add HTTP/2 headers based upon HTTP/1.x headers from a {@link HttpRequest} + * + * @param httpRequest The HTTP/1.x request object to pull headers from + * @param http2Headers The HTTP/2 headers object to add headers to + */ + private void addRequestHeaders(HttpRequest httpRequest, DefaultHttp2Headers.Builder http2Headers) { + http2Headers.path(httpRequest.uri()); + http2Headers.method(httpRequest.method().toString()); + addHeaders(httpRequest.headers(), http2Headers); + } + + /** + * Add HTTP/2 headers based upon HTTP/1.x headers from a {@link HttpRequest} + * + * @param httpResponse The HTTP/1.x response object to pull headers from + * @param http2Headers The HTTP/2 headers object to add headers to + */ + private void addResponseHeaders(HttpResponse httpResponse, DefaultHttp2Headers.Builder http2Headers) { + http2Headers.status(Integer.toString(httpResponse.status().code())); + addHeaders(httpResponse.headers(), http2Headers); + } + + /** + * Get the next stream id either from the {@link HttpHeaders} object or HTTP/2 codec + * + * @param httpHeaders The HTTP/1.x headers object to look for the stream id + * @return The stream id to use with this {@link HttpHeaders} object + * @throws Http2Exception If the {@code httpHeaders} object specifies an invalid stream id + */ + private int getStreamId(HttpHeaders httpHeaders) throws Http2Exception { + int streamId = 0; + String value = httpHeaders.get(Http2HttpHeaders.Names.STREAM_ID); + if (value == null) { + streamId = nextStreamId(); + } else { + try { + streamId = Integer.parseInt(value); + } catch (NumberFormatException e) { + throw Http2Exception.format(Http2Error.INTERNAL_ERROR, + "Invalid user-specified stream id value '%s'", value); + } + httpHeaders.remove(Http2HttpHeaders.Names.STREAM_ID); + } + + return streamId; + } + + /** + * Handles conversion of a {@link FullHttpMessage} to HTTP/2 frames. + */ + @Override + @SuppressWarnings("deprecation") + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof FullHttpMessage) { + FullHttpMessage httpMsg = (FullHttpMessage) msg; + boolean hasData = httpMsg.content().isReadable(); + + // Convert and write the headers. + String value = null; + HttpHeaders httpHeaders = httpMsg.headers(); + DefaultHttp2Headers.Builder http2Headers = DefaultHttp2Headers.newBuilder(); + if (msg instanceof HttpRequest) { + addRequestHeaders((HttpRequest) msg, http2Headers); + } else if (msg instanceof HttpResponse) { + addResponseHeaders((HttpResponse) msg, http2Headers); + } + + // Provide the user the opportunity to specify the streamId + int streamId = 0; + try { + streamId = getStreamId(httpHeaders); + } catch (Http2Exception e) { + httpMsg.release(); + promise.setFailure(e); + return; + } + + // The Connection, Keep-Alive, Proxy-Connection, Transfer-Encoding, + // and Upgrade headers are not valid and MUST not be sent. + httpHeaders.remove(HttpHeaders.Names.CONNECTION); + httpHeaders.remove(HttpHeaders.Names.KEEP_ALIVE); + httpHeaders.remove(HttpHeaders.Names.PROXY_CONNECTION); + httpHeaders.remove(HttpHeaders.Names.TRANSFER_ENCODING); + + // Add the HTTP headers which have not been consumed above + for (Map.Entry entry : httpHeaders.entries()) { + http2Headers.add(entry.getKey(), entry.getValue()); + } + + if (hasData) { + ChannelPromiseAggregator promiseAggregator = new ChannelPromiseAggregator(promise); + ChannelPromise headerPromise = ctx.newPromise(); + ChannelPromise dataPromise = ctx.newPromise(); + promiseAggregator.add(headerPromise, dataPromise); + writeHeaders(ctx, headerPromise, streamId, http2Headers.build(), 0, false, false); + writeData(ctx, dataPromise, streamId, httpMsg.content(), 0, true, true); + } else { + writeHeaders(ctx, promise, streamId, http2Headers.build(), 0, true, true); + } + } else { + ctx.write(msg, promise); + } + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HttpHeaders.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HttpHeaders.java new file mode 100644 index 0000000000..f4fd43f32b --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HttpHeaders.java @@ -0,0 +1,62 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.handler.codec.AsciiString; + +/** + * Provides the constants for the header names used by + * {@link InboundHttp2ToHttpAdapter} and {@link DelegatingHttp2HttpConnectionHandler} + */ +public final class Http2HttpHeaders { + + public static final class Names { + /** + * {@code "X-HTTP2-Stream-ID"} + */ + public static final AsciiString STREAM_ID = new AsciiString("X-HTTP2-Stream-ID"); + /** + * {@code "X-HTTP2-Authority"} + */ + public static final AsciiString AUTHORITY = new AsciiString("X-HTTP2-Authority"); + /** + * {@code "X-HTTP2-Scheme"} + */ + public static final AsciiString SCHEME = new AsciiString("X-HTTP2-Scheme"); + /** + * {@code "X-HTTP2-Path"} + */ + public static final AsciiString PATH = new AsciiString("X-HTTP2-Path"); + /** + * {@code "X-HTTP2-Stream-Promise-ID"} + */ + public static final AsciiString STREAM_PROMISE_ID = new AsciiString("X-HTTP2-Stream-Promise-ID"); + /** + * {@code "X-HTTP2-Stream-Dependency-ID"} + */ + public static final AsciiString STREAM_DEPENDENCY_ID = new AsciiString("X-HTTP2-Stream-Dependency-ID"); + /** + * {@code "X-HTTP2-Stream-Weight"} + */ + public static final AsciiString STREAM_WEIGHT = new AsciiString("X-HTTP2-Stream-Weight"); + /** + * {@code "X-HTTP2-Stream-Exclusive"} + */ + public static final AsciiString STREAM_EXCLUSIVE = new AsciiString("X-HTTP2-Stream-Exclusive"); + + private Names() { + } + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java new file mode 100644 index 0000000000..b7689f9984 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java @@ -0,0 +1,647 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderUtil; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.collection.IntObjectMap; +import io.netty.util.collection.IntObjectHashMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * Decodes HTTP/2 data and associated streams into {@link HttpMessage}s and {@link HttpContent}s. + */ +public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements Http2FrameObserver { + + private final long maxContentLength; + private final boolean validateHttpHeaders; + private final Http2Connection connection; + private final IntObjectMap messageMap; + + private static final Set HEADERS_TO_EXCLUDE; + private static final Map HEADER_NAME_TRANSLATIONS_REQUEST; + private static final Map HEADER_NAME_TRANSLATIONS_RESPONSE; + + static { + HEADERS_TO_EXCLUDE = new HashSet(); + HEADER_NAME_TRANSLATIONS_REQUEST = new HashMap(); + HEADER_NAME_TRANSLATIONS_RESPONSE = new HashMap(); + for (Http2Headers.HttpName http2HeaderName : Http2Headers.HttpName.values()) { + HEADERS_TO_EXCLUDE.add(http2HeaderName.value()); + } + + HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.HttpName.AUTHORITY.value(), + Http2HttpHeaders.Names.AUTHORITY.toString()); + HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.HttpName.SCHEME.value(), + Http2HttpHeaders.Names.SCHEME.toString()); + HEADER_NAME_TRANSLATIONS_REQUEST.putAll(HEADER_NAME_TRANSLATIONS_RESPONSE); + HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.HttpName.PATH.value(), + Http2HttpHeaders.Names.PATH.toString()); + } + + /** + * Creates a new instance + * + * @param connection The object which will provide connection notification events for the current connection + * @param maxContentLength the maximum length of the message content. If the length of the message content exceeds + * this value, a {@link TooLongFrameException} will be raised. + * @throws NullPointerException If {@code connection} is null + * @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0} + */ + public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, long maxContentLength) + throws NullPointerException, IllegalArgumentException { + InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection, maxContentLength); + connection.addListener(adapter); + return adapter; + } + + /** + * Creates a new instance + * + * @param connection The object which will provide connection notification events for the current connection + * @param maxContentLength the maximum length of the message content. If the length of the message content exceeds + * this value, a {@link TooLongFrameException} will be raised. + * @param validateHeaders {@code true} if http headers should be validated + * @throws NullPointerException If {@code connection} is null + * @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0} + */ + public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, long maxContentLength, + boolean validateHttpHeaders) + throws NullPointerException, IllegalArgumentException { + InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection, + maxContentLength, + validateHttpHeaders); + connection.addListener(adapter); + return adapter; + } + + /** + * Creates a new instance + * + * @param connection The object which will provide connection notification events for the current connection + * @param maxContentLength the maximum length of the message content. If the length of the message content exceeds + * this value, a {@link TooLongFrameException} will be raised. + * @throws NullPointerException If {@code connection} is null + * @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0} + */ + protected InboundHttp2ToHttpAdapter(Http2Connection connection, long maxContentLength) throws NullPointerException, + IllegalArgumentException { + this(connection, maxContentLength, true); + } + + /** + * Creates a new instance + * + * @param connection The object which will provide connection notification events for the current connection + * @param maxContentLength the maximum length of the message content. If the length of the message content exceeds + * this value, a {@link TooLongFrameException} will be raised. + * @param validateHeaders {@code true} if http headers should be validated + * @throws NullPointerException If {@code connection} is null + * @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0} + */ + protected InboundHttp2ToHttpAdapter(Http2Connection connection, long maxContentLength, boolean validateHttpHeaders) + throws NullPointerException, IllegalArgumentException { + if (connection == null) { + throw new NullPointerException("connection"); + } + if (maxContentLength <= 0) { + throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength); + } + this.maxContentLength = maxContentLength; + this.validateHttpHeaders = validateHttpHeaders; + this.connection = connection; + this.messageMap = new IntObjectHashMap(); + } + + @Override + public void streamRemoved(Http2Stream stream) { + removeMessage(stream.id()); + } + + @Override + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream, + boolean endOfSegment) throws Http2Exception { + // Padding is already stripped out of data by super class + Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId); + if (msgAccumulator == null) { + throw Http2Exception.protocolError("Data Frame recieved for unknown stream id %d", streamId); + } + + try { + msgAccumulator.add(endOfStream ? new DefaultLastHttpContent(data, validateHttpHeaders) + : new DefaultHttpContent(data), ctx); + } catch (TooLongFrameException e) { + removeMessage(streamId); + throw Http2Exception.format(Http2Error.INTERNAL_ERROR, + "Content length exceeded max of %d for stream id %d", maxContentLength, streamId); + } + + if (endOfStream) { + msgAccumulator.endOfStream(ctx); + removeMessage(streamId); + } + } + + /** + * Extracts the common initial header processing and internal tracking + * + * @param streamId The stream id the {@code headers} apply to + * @param headers The headers to process + * @param allowAppend {@code true} if headers will be appended if the stream already exists. if {@code false} and + * the stream already exists this method returns {@code null}. + * @return The object used to track the stream corresponding to {@code streamId}. {@code null} if + * {@code allowAppend} is {@code false} and the stream already exists. + * @throws Http2Exception If the stream id is not in the correct state to process the headers request + */ + private Http2HttpMessageAccumulator processHeadersBegin(int streamId, Http2Headers headers, boolean allowAppend) + throws Http2Exception { + Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId); + try { + if (msgAccumulator == null) { + msgAccumulator = connection.isServer() ? newHttpRequestAccumulator(headers) + : newHttpResponseAccumulator(headers); + } else if (allowAppend) { + if (msgAccumulator.headerConsumed()) { + if (msgAccumulator.trailerConsumed()) { + throw new IllegalStateException("Header received and trailer already consumed"); + } else { + msgAccumulator.trailer(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHttpHeaders)); + } + } + msgAccumulator.add(headers); + } else { + return null; + } + msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_ID, streamId); + } catch (IllegalStateException e) { + removeMessage(streamId); + throw Http2Exception.protocolError("Headers Frame recieved for stream id %d which is in an invalid state", + streamId); + } catch (Http2Exception e) { + removeMessage(streamId); + throw e; + } + + return msgAccumulator; + } + + /** + * Extracts the common final header processing and internal tracking + * + * @param ctx The context for which this message has been received + * @param streamId The stream id the {@code msgAccumulator} corresponds to + * @param msgAccumulator The object which represents all data for corresponding to {@code streamId} + * @param endOfStream {@code true} if this is the last event for the stream + */ + private void processHeadersEnd(ChannelHandlerContext ctx, int streamId, + Http2HttpMessageAccumulator msgAccumulator, boolean endOfStream) { + if (endOfStream) { + msgAccumulator.endOfStream(ctx); + removeMessage(streamId); + } else { + putMessage(streamId, msgAccumulator); + } + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, + boolean endOfStream, boolean endSegment) throws Http2Exception { + Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(streamId, headers, true); + processHeadersEnd(ctx, streamId, msgAccumulator, endOfStream); + } + + /** + * Set/clear all HTTP/1.x headers related to stream dependencies + * + * @param msgAccumulator The object which caches the HTTP/1.x headers + * @param streamDependency The stream id for which the {@code msgAccumulator} is dependent on + * @param weight The dependency weight + * @param exclusive The exlusive HTTP/2 flag + * @throws IllegalStateException If the {@code msgAccumulator} is not in a valid state to change headers + */ + private void setDependencyHeaders(Http2HttpMessageAccumulator msgAccumulator, int streamDependency, short weight, + boolean exclusive) throws IllegalStateException { + if (streamDependency != 0) { + msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID, streamDependency); + msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_EXCLUSIVE, exclusive); + msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_WEIGHT, weight); + } else { + msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID); + msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_EXCLUSIVE); + msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_WEIGHT); + } + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, + short weight, boolean exclusive, int padding, boolean endOfStream, boolean endSegment) + throws Http2Exception { + Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(streamId, headers, true); + try { + setDependencyHeaders(msgAccumulator, streamDependency, weight, exclusive); + } catch (IllegalStateException e) { + removeMessage(streamId); + throw Http2Exception.protocolError("Headers Frame recieved for stream id %d which is in an invalid state", + streamId); + } + processHeadersEnd(ctx, streamId, msgAccumulator, endOfStream); + } + + @Override + public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, + boolean exclusive) throws Http2Exception { + Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId); + if (msgAccumulator == null) { + throw Http2Exception.protocolError("Priority Frame recieved for unknown stream id %d", streamId); + } + + try { + setDependencyHeaders(msgAccumulator, streamDependency, weight, exclusive); + } catch (IllegalStateException e) { + removeMessage(streamId); + throw Http2Exception.protocolError("Priority Frame recieved for stream id %d which is in an invalid state", + streamId); + } + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { + Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId); + if (msgAccumulator != null) { + removeMessage(streamId); + } + } + + @Override + public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { + // NOOP + } + + @Override + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { + // NOOP + } + + @Override + public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + // NOOP + } + + @Override + public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + // NOOP + } + + @Override + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, + int padding) throws Http2Exception { + // Do not allow adding of headers to existing Http2HttpMessageAccumulator + // according to spec (http://tools.ietf.org/html/draft-ietf-httpbis-http2-13#section-6.6) there must + // be a CONTINUATION frame for more headers + Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(promisedStreamId, headers, false); + if (msgAccumulator == null) { + throw Http2Exception.protocolError("Push Promise Frame recieved for pre-existing stream id %d", + promisedStreamId); + } + + try { + msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_PROMISE_ID, streamId); + } catch (IllegalStateException e) { + removeMessage(streamId); + throw Http2Exception.protocolError( + "Push Promise Frame recieved for stream id %d which is in an invalid state", + promisedStreamId); + } + + processHeadersEnd(ctx, promisedStreamId, msgAccumulator, false); + } + + @Override + public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) + throws Http2Exception { + // NOOP + } + + @Override + public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) + throws Http2Exception { + // NOOP + } + + @Override + public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, + ByteBuf payload) { + // NOOP + } + + private Http2HttpMessageAccumulator putMessage(int streamId, Http2HttpMessageAccumulator message) { + return messageMap.put(streamId, message); + } + + private Http2HttpMessageAccumulator getMessage(int streamId) { + return messageMap.get(streamId); + } + + private Http2HttpMessageAccumulator removeMessage(int streamId) { + return messageMap.remove(streamId); + } + + /** + * Translate HTTP/2 headers into an object which can build the corresponding HTTP/1.x response objects + * + * @param http2Headers The HTTP/2 headers corresponding to a new stream + * @return Collector for HTTP/1.x objects + * @throws Http2Exception If any of the HTTP/2 headers to not translate to valid HTTP/1.x headers + * @throws IllegalStateException If this object is not in the correct state to accept additional headers + */ + private Http2HttpMessageAccumulator newHttpResponseAccumulator(Http2Headers http2Headers) throws Http2Exception, + IllegalStateException { + HttpResponseStatus status = null; + try { + status = HttpResponseStatus.parseLine(http2Headers.status()); + } catch (Exception e) { + throw Http2Exception.protocolError( + "Unrecognized HTTP status code '%s' encountered in translation to HTTP/1.x", + http2Headers.status()); + } + // HTTP/2 does not define a way to carry the version or reason phrase that is included in an HTTP/1.1 + // status line. + Http2HttpMessageAccumulator messageAccumulator = new Http2HttpMessageAccumulator(new DefaultHttpResponse( + HttpVersion.HTTP_1_1, status, validateHttpHeaders)); + messageAccumulator.add(http2Headers); + return messageAccumulator; + } + + /** + * Translate HTTP/2 headers into an object which can build the corresponding HTTP/1.x request objects + * + * @param http2Headers The HTTP/2 headers corresponding to a new stream + * @return Collector for HTTP/1.x objects + * @throws Http2Exception If any of the HTTP/2 headers to not translate to valid HTTP/1.x headers + * @throws IllegalStateException If this object is not in the correct state to accept additional headers + */ + private Http2HttpMessageAccumulator newHttpRequestAccumulator(Http2Headers http2Headers) throws Http2Exception, + IllegalStateException { + // HTTP/2 does not define a way to carry the version identifier that is + // included in the HTTP/1.1 request line. + Http2HttpMessageAccumulator messageAccumulator = new Http2HttpMessageAccumulator(new DefaultHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.valueOf(http2Headers.method()), http2Headers.path(), + validateHttpHeaders)); + messageAccumulator.add(http2Headers); + return messageAccumulator; + } + + /** + * Provides a container to collect HTTP/1.x objects until the end of the stream has been reached + */ + private final class Http2HttpMessageAccumulator { + private HttpMessage message; + private LastHttpContent trailer; + private long contentLength; + + /** + * Creates a new instance + * + * @param message The HTTP/1.x object which represents the headers + */ + public Http2HttpMessageAccumulator(HttpMessage message) { + if (message == null) { + throw new NullPointerException("message"); + } + this.message = message; + this.trailer = null; + this.contentLength = 0; + } + + /** + * Set a HTTP/1.x header + * + * @param name The name of the header + * @param value The value of the header + * @return The headers object after the set operation + * @throws IllegalStateException If this object is not in the correct state to accept additional headers + */ + public HttpHeaders setHeader(CharSequence name, Object value) throws IllegalStateException { + HttpHeaders headers = currentHeaders(); + if (headers == null) { + throw new IllegalStateException("Headers object is null"); + } + return headers.set(name, value); + } + + /** + * Removes the header with the specified name. + * + * @param name The name of the header to remove + * @return {@code true} if and only if at least one entry has been removed + * @throws IllegalStateException If this object is not in the correct state to accept additional headers + */ + public boolean removeHeader(CharSequence name) throws IllegalStateException { + HttpHeaders headers = currentHeaders(); + if (headers == null) { + throw new IllegalStateException("Headers object is null"); + } + return headers.remove(name); + } + + /** + * Send the headers that have been accumulated so far, if they have not already been sent + * + * @param ctx The channel context for which to propagate events + * @param setContentLength {@code true} to set the Content-Length header + * @return {@code true} If a non-trailer header was fired to {@code ctx}, {@code false} otherwise + */ + private boolean sendHeaders(ChannelHandlerContext ctx, boolean setContentLength) { + HttpHeaders headers = currentHeaders(); + if (headers == null) { + return false; + } + + // Transfer-Encoding header is not valid + headers.remove(HttpHeaders.Names.TRANSFER_ENCODING); + headers.remove(HttpHeaders.Names.TRAILER); + + // Initial header and trailer header will never need to be sent at the same time + // the behavior is that all header events will be appended to the initial header + // until some data is received, and only then will the trailer be used + if (!headerConsumed()) { + // The Connection and Keep-Alive headers are no longer valid + HttpHeaderUtil.setKeepAlive(message, true); + if (setContentLength) { + HttpHeaderUtil.setContentLength(message, contentLength); + } + + ctx.fireChannelRead(message); + message = null; + return true; + } else if (trailerExists()) { + ctx.fireChannelRead(trailer); + trailer = LastHttpContent.EMPTY_LAST_CONTENT; + } + return false; + } + + /** + * Called when the end of the stream is encountered + * + * @param ctx The channel context for which to propagate events + */ + public void endOfStream(ChannelHandlerContext ctx) { + if (sendHeaders(ctx, true)) { + ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); + } + } + + /** + * Add a HTTP/1.x object which represents part of the message body + * + * @param httpContent The content to add + * @param ctx The channel context for which to propagate events + * @throws TooLongFrameException If the {@code contentLength} is exceeded with the addition of the + * {@code httpContent} + */ + public void add(HttpContent httpContent, ChannelHandlerContext ctx) throws TooLongFrameException { + ByteBuf content = httpContent.content(); + if (contentLength > maxContentLength - content.readableBytes()) { + throw new TooLongFrameException("HTTP/2 content length exceeded " + maxContentLength + " bytes."); + } + + contentLength += content.readableBytes(); + sendHeaders(ctx, false); + ctx.fireChannelRead(httpContent.retain()); + } + + /** + * Extend the current set of HTTP/1.x headers + * + * @param http2Headers The HTTP/2 headers to be added + * @throws Http2Exception If any HTTP/2 headers do not map to HTTP/1.x headers + * @throws IllegalStateException If this object is not in the correct state to accept additional headers + */ + public void add(Http2Headers http2Headers) throws Http2Exception, IllegalStateException { + add(http2Headers, InboundHttp2ToHttpAdapter.this.connection.isServer() ? HEADER_NAME_TRANSLATIONS_REQUEST + : HEADER_NAME_TRANSLATIONS_RESPONSE); + } + + /** + * Extend the current set of HTTP/1.x headers + * + * @param http2Headers The HTTP/2 headers to be added + * @param headerTranslations Translation map from HTTP/2 headers to HTTP/1.x headers + * @throws Http2Exception If any HTTP/2 headers do not map to HTTP/1.x headers + * @throws IllegalStateException If this object is not in the correct state to accept additional headers + */ + private void add(Http2Headers http2Headers, Map headerTranslations) throws Http2Exception, + IllegalStateException { + HttpHeaders headers = currentHeaders(); + if (headers == null) { + throw new IllegalStateException("Headers object is null"); + } + + // http://tools.ietf.org/html/draft-ietf-httpbis-http2-13#section-8.1.2.1 + // All headers that start with ':' are only valid in HTTP/2 context + Iterator> itr = http2Headers.iterator(); + while (itr.hasNext()) { + Entry entry = itr.next(); + String translatedName = headerTranslations.get(entry.getKey()); + if (translatedName != null || !HEADERS_TO_EXCLUDE.contains(entry.getKey())) { + if (translatedName == null) { + translatedName = entry.getKey(); + } + + if (translatedName.isEmpty() || translatedName.charAt(0) == ':') { + throw Http2Exception.protocolError( + "Unknown HTTP/2 header '%s' encountered in translation to HTTP/1.x", + translatedName); + } else { + headers.add(translatedName, entry.getValue()); + } + } + } + } + + /** + * Set the current trailer header + * + * @param trailer The object which represents the trailing headers + */ + public void trailer(LastHttpContent trailer) { + this.trailer = trailer; + } + + /** + * Determine if the initial header and continuations have been processed and sent up the pipeline + * + * @return {@code true} if the initial header and continuations have been processed and sent up the pipeline + */ + public boolean headerConsumed() { + return message == null; + } + + /** + * Determine if the trailing header and continuations have been processed and sent up the pipeline + * + * @return {@code true} if the trailing header and continuations have been processed and sent up the pipeline + */ + public boolean trailerConsumed() { + return trailer != null && trailer.equals(LastHttpContent.EMPTY_LAST_CONTENT); + } + + /** + * Determine if there is a trailing header that has not yet been sent up the pipeline + * + * @return {@code true} if there is a trailing header that has not yet been sent up the pipeline + */ + public boolean trailerExists() { + return trailer != null && !trailerConsumed(); + } + + /** + * Obtain the current object for accumulating headers + * + * @return The primary (non-trailer) header if it exists, otherwise the trailer header + */ + private HttpHeaders currentHeaders() { + if (headerConsumed()) { + return trailerExists() ? trailer.trailingHeaders() : null; + } + return message.headers(); + } + } +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java index 920b404043..ea8c7d353e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java @@ -279,6 +279,7 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void headersReadForUnknownStreamShouldCreateStream() throws Exception { + when(remote.createStream(eq(5), eq(false))).thenReturn(stream); decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, false, false); verify(remote).createStream(eq(5), eq(false)); verify(observer).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), @@ -287,6 +288,7 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception { + when(remote.createStream(eq(5), eq(true))).thenReturn(stream); decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, true, false); verify(remote).createStream(eq(5), eq(true)); verify(observer).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0), @@ -466,6 +468,7 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void headersWriteForUnknownStreamShouldCreateStream() throws Exception { + when(local.createStream(eq(5), eq(false))).thenReturn(stream); handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, false, false); verify(local).createStream(eq(5), eq(false)); verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0), @@ -474,6 +477,7 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void headersWriteShouldCreateHalfClosedStream() throws Exception { + when(local.createStream(eq(5), eq(true))).thenReturn(stream); handler.writeHeaders(ctx, promise, 5, EMPTY_HEADERS, 0, true, false); verify(local).createStream(eq(5), eq(true)); verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0), diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandlerTest.java new file mode 100644 index 0000000000..75202824fb --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandlerTest.java @@ -0,0 +1,306 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderUtil; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.POST; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; +import io.netty.util.NetUtil; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static io.netty.handler.codec.http2.Http2TestUtil.*; +import static io.netty.util.CharsetUtil.*; +import static java.util.concurrent.TimeUnit.*; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +/** + * Testing the {@link DelegatingHttp2HttpConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames + */ +public class DelegatingHttp2HttpConnectionHandlerTest { + + @Mock + private Http2FrameObserver clientObserver; + + @Mock + private Http2FrameObserver serverObserver; + + private Http2FrameWriter frameWriter; + private ServerBootstrap sb; + private Bootstrap cb; + private Channel serverChannel; + private Channel clientChannel; + private CountDownLatch requestLatch; + private long maxContentLength; + private static final int CONNECTION_SETUP_READ_COUNT = 2; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + + maxContentLength = 1 << 16; + requestLatch = new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 1); + frameWriter = new DefaultHttp2FrameWriter(); + + sb = new ServerBootstrap(); + cb = new Bootstrap(); + + sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); + sb.channel(NioServerSocketChannel.class); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new DelegatingHttp2ConnectionHandler(true, new FrameCountDown())); + } + }); + + cb.group(new NioEventLoopGroup()); + cb.channel(NioSocketChannel.class); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new DelegatingHttp2HttpConnectionHandler(false, clientObserver)); + } + }); + + serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel(); + int port = ((InetSocketAddress) serverChannel.localAddress()).getPort(); + + ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); + assertTrue(ccf.awaitUninterruptibly().isSuccess()); + clientChannel = ccf.channel(); + } + + @After + public void teardown() throws Exception { + serverChannel.close().sync(); + sb.group().shutdownGracefully(); + cb.group().shutdownGracefully(); + } + + @Test + public void testJustHeadersRequest() throws Exception { + final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/example"); + final HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 5); + httpHeaders.set(HttpHeaders.Names.HOST, "http://my-user_name@www.example.org:5555/example"); + httpHeaders.set(Http2HttpHeaders.Names.AUTHORITY, "www.example.org:5555"); + httpHeaders.set(Http2HttpHeaders.Names.SCHEME, "http"); + httpHeaders.add("foo", "goo"); + httpHeaders.add("foo", "goo2"); + httpHeaders.add("foo2", "goo2"); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/example").authority("www.example.org:5555").scheme("http") + .add("foo", "goo").add("foo", "goo2").add("foo2", "goo2").build(); + ChannelPromise writePromise = newPromise(); + ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise); + + writePromise.awaitUninterruptibly(2, SECONDS); + assertTrue(writePromise.isSuccess()); + writeFuture.awaitUninterruptibly(2, SECONDS); + assertTrue(writeFuture.isSuccess()); + awaitRequests(); + verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(5), eq(http2Headers), eq(0), + anyShort(), anyBoolean(), eq(0), eq(true), eq(true)); + verify(serverObserver, never()).onDataRead(any(ChannelHandlerContext.class), + anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), anyBoolean()); + } + + @Test + public void testRequestWithBody() throws Exception { + requestLatch = new CountDownLatch(CONNECTION_SETUP_READ_COUNT + 2); + final String text = "foooooogoooo"; + final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, "/example", + Unpooled.copiedBuffer(text, UTF_8)); + final HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(HttpHeaders.Names.HOST, "http://your_user-name123@www.example.org:5555/example"); + httpHeaders.add("foo", "goo"); + httpHeaders.add("foo", "goo2"); + httpHeaders.add("foo2", "goo2"); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("POST").path("/example").authority("www.example.org:5555").scheme("http") + .add("foo", "goo").add("foo", "goo2").add("foo2", "goo2").build(); + final HttpContent expectedContent = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()), + true); + ChannelPromise writePromise = newPromise(); + ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise); + + writePromise.awaitUninterruptibly(2, SECONDS); + assertTrue(writePromise.isSuccess()); + writeFuture.awaitUninterruptibly(2, SECONDS); + assertTrue(writeFuture.isSuccess()); + awaitRequests(); + verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(http2Headers), eq(0), + anyShort(), anyBoolean(), eq(0), eq(false), eq(false)); + verify(serverObserver).onDataRead(any(ChannelHandlerContext.class), + eq(3), eq(Unpooled.copiedBuffer(text.getBytes())), eq(0), eq(true), eq(true)); + } + + private void awaitRequests() throws Exception { + requestLatch.await(2, SECONDS); + } + + private ChannelHandlerContext ctx() { + return clientChannel.pipeline().firstContext(); + } + + private ChannelPromise newPromise() { + return ctx().newPromise(); + } + + /** + * A decorator around the serverObserver that counts down the latch so that we can await the + * completion of the request. + */ + private final class FrameCountDown implements Http2FrameObserver { + + @Override + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream, boolean endOfSegment) + throws Http2Exception { + serverObserver.onDataRead(ctx, streamId, copy(data), padding, endOfStream, + endOfSegment); + requestLatch.countDown(); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int padding, boolean endStream, boolean endSegment) throws Http2Exception { + serverObserver.onHeadersRead(ctx, streamId, headers, padding, endStream, endSegment); + requestLatch.countDown(); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int streamDependency, short weight, boolean exclusive, int padding, + boolean endStream, boolean endSegment) throws Http2Exception { + serverObserver.onHeadersRead(ctx, streamId, headers, streamDependency, weight, + exclusive, padding, endStream, endSegment); + requestLatch.countDown(); + } + + @Override + public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, + short weight, boolean exclusive) throws Http2Exception { + serverObserver.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive); + requestLatch.countDown(); + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + throws Http2Exception { + serverObserver.onRstStreamRead(ctx, streamId, errorCode); + requestLatch.countDown(); + } + + @Override + public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { + serverObserver.onSettingsAckRead(ctx); + requestLatch.countDown(); + } + + @Override + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { + serverObserver.onSettingsRead(ctx, settings); + requestLatch.countDown(); + } + + @Override + public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + serverObserver.onPingRead(ctx, copy(data)); + requestLatch.countDown(); + } + + @Override + public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + serverObserver.onPingAckRead(ctx, copy(data)); + requestLatch.countDown(); + } + + @Override + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, + int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception { + serverObserver.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding); + requestLatch.countDown(); + } + + @Override + public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) + throws Http2Exception { + serverObserver.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData)); + requestLatch.countDown(); + } + + @Override + public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, + int windowSizeIncrement) throws Http2Exception { + serverObserver.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); + requestLatch.countDown(); + } + + @Override + public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, + Http2Flags flags, ByteBuf payload) { + serverObserver.onUnknownFrame(ctx, frameType, streamId, flags, payload); + requestLatch.countDown(); + } + + ByteBuf copy(ByteBuf buffer) { + return Unpooled.copiedBuffer(buffer); + } + } +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java index 098495da8d..4f6ade8d95 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java @@ -85,7 +85,7 @@ public class Http2ConnectionRoundtripTest { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - p.addLast(new DelegatingHttp2ConnectionHandler(false, serverObserver)); + p.addLast(new DelegatingHttp2ConnectionHandler(false, clientObserver)); } }); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java new file mode 100644 index 0000000000..a521b8d9b8 --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java @@ -0,0 +1,579 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderUtil; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; +import io.netty.util.NetUtil; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static io.netty.handler.codec.http2.Http2TestUtil.*; +import static io.netty.util.CharsetUtil.*; +import static java.util.concurrent.TimeUnit.*; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +/** + * Testing the {@link InboundHttp2ToHttpAdapter} for HTTP/2 frames into {@link HttpObject}s + */ +public class InboundHttp2ToHttpAdapterTest { + + @Mock + private HttpResponseListener messageObserver; + + @Mock + private Http2FrameObserver clientObserver; + + private Http2FrameWriter frameWriter; + private ServerBootstrap sb; + private Bootstrap cb; + private Channel serverChannel; + private Channel clientChannel; + private CountDownLatch serverLatch; + private long maxContentLength; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + + maxContentLength = 1 << 16; + serverLatch = new CountDownLatch(1); + frameWriter = new DefaultHttp2FrameWriter(); + + sb = new ServerBootstrap(); + cb = new Bootstrap(); + + sb.group(new NioEventLoopGroup(), new NioEventLoopGroup()); + sb.channel(NioServerSocketChannel.class); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + Http2Connection connection = new DefaultHttp2Connection(true); + p.addLast("reader", new FrameAdapter(InboundHttp2ToHttpAdapter.newInstance( + connection, maxContentLength), + new CountDownLatch(10))); + p.addLast(new HttpResponseDelegator(messageObserver)); + } + }); + + cb.group(new NioEventLoopGroup()); + cb.channel(NioSocketChannel.class); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast("reader", new FrameAdapter(clientObserver, new CountDownLatch(10))); + } + }); + + serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel(); + int port = ((InetSocketAddress) serverChannel.localAddress()).getPort(); + + ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port)); + assertTrue(ccf.awaitUninterruptibly().isSuccess()); + clientChannel = ccf.channel(); + } + + @After + public void teardown() throws Exception { + serverChannel.close().sync(); + sb.group().shutdownGracefully(); + cb.group().shutdownGracefully(); + } + + @Test + public void clientRequestSingleHeaderNoDataFrames() throws Exception { + serverLatch = new CountDownLatch(2); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.SCHEME, "https"); + httpHeaders.set(Http2HttpHeaders.Names.AUTHORITY, "example.org"); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").scheme("https").authority("example.org") + .path("/some/path/resource2").build(); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, true, false); + } + }); + awaitRequests(); + verify(messageObserver).messageReceived(eq(request)); + verify(messageObserver).messageReceived(eq(LastHttpContent.EMPTY_LAST_CONTENT)); + } + + @Test + public void clientRequestOneDataFrame() throws Exception { + serverLatch = new CountDownLatch(2); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource2").build(); + final String text = "hello world"; + final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()), + true); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, true); + } + }); + awaitRequests(); + ArgumentCaptor httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class); + verify(messageObserver, times(2)).messageReceived(httpObjectCaptor.capture()); + + List capturedHttpObjects = httpObjectCaptor.getAllValues(); + assertEquals(request, capturedHttpObjects.get(0)); + assertEquals(content, capturedHttpObjects.get(1)); + } + + @Test + public void clientRequestMultipleDataFrames() throws Exception { + serverLatch = new CountDownLatch(3); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource2").build(); + final String text = "hello world"; + final String text2 = "hello world2"; + final HttpContent content = new DefaultHttpContent(Unpooled.copiedBuffer(text.getBytes())); + final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()), + true); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text2.getBytes()), 0, true, true); + } + }); + awaitRequests(); + ArgumentCaptor httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class); + verify(messageObserver, times(3)).messageReceived(httpObjectCaptor.capture()); + + List capturedHttpObjects = httpObjectCaptor.getAllValues(); + assertEquals(request, capturedHttpObjects.get(0)); + assertEquals(content, capturedHttpObjects.get(1)); + assertEquals(content2, capturedHttpObjects.get(2)); + } + + @Test + public void clientRequestMultipleEmptyDataFrames() throws Exception { + serverLatch = new CountDownLatch(4); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource2").build(); + final String text = ""; + final HttpContent content = new DefaultHttpContent(Unpooled.copiedBuffer(text.getBytes())); + final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()), + true); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, true); + } + }); + awaitRequests(); + ArgumentCaptor httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class); + verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture()); + + List capturedHttpObjects = httpObjectCaptor.getAllValues(); + assertEquals(request, capturedHttpObjects.get(0)); + assertEquals(content, capturedHttpObjects.get(1)); + assertEquals(content, capturedHttpObjects.get(2)); + assertEquals(content2, capturedHttpObjects.get(3)); + } + + @Test + public void clientRequestHeaderContinuation() throws Exception { + serverLatch = new CountDownLatch(2); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + httpHeaders.set("foo", "goo"); + httpHeaders.set("foo2", "goo2"); + httpHeaders.add("foo2", "goo3"); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource2").build(); + final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo") + .set("foo2", "goo2").add("foo2", "goo3").build(); + final String text = ""; + final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()), + true); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false); + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, true); + } + }); + awaitRequests(); + ArgumentCaptor httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class); + verify(messageObserver, times(2)).messageReceived(httpObjectCaptor.capture()); + + List capturedHttpObjects = httpObjectCaptor.getAllValues(); + assertEquals(request, capturedHttpObjects.get(0)); + assertEquals(content, capturedHttpObjects.get(1)); + } + + @Test + public void clientRequestTrailingHeaders() throws Exception { + serverLatch = new CountDownLatch(3); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + final LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, true); + HttpHeaders trailingHeaders = trailer.trailingHeaders(); + trailingHeaders.set("foo", "goo"); + trailingHeaders.set("foo2", "goo2"); + trailingHeaders.add("foo2", "goo3"); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource2").build(); + final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo") + .set("foo2", "goo2").add("foo2", "goo3").build(); + final String text = "not empty!"; + final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()), + true); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, false, false); + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers2, 0, true, true); + } + }); + awaitRequests(); + ArgumentCaptor httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class); + verify(messageObserver, times(3)).messageReceived(httpObjectCaptor.capture()); + + List capturedHttpObjects = httpObjectCaptor.getAllValues(); + assertEquals(request, capturedHttpObjects.get(0)); + assertEquals(content, capturedHttpObjects.get(1)); + assertEquals(trailer, capturedHttpObjects.get(2)); + } + + @Test + public void clientRequestPushPromise() throws Exception { + serverLatch = new CountDownLatch(4); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + final HttpMessage request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders2 = request2.headers(); + httpHeaders2.set(Http2HttpHeaders.Names.SCHEME, "https"); + httpHeaders2.set(Http2HttpHeaders.Names.AUTHORITY, "example.org"); + httpHeaders2.set(Http2HttpHeaders.Names.STREAM_ID, 5); + httpHeaders2.set(Http2HttpHeaders.Names.STREAM_PROMISE_ID, 3); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource").build(); + final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource2").scheme("https") + .authority("example.org").build(); + final String text = "hello 1!**"; + final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()), + true); + final String text2 = "hello 2!><"; + final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()), + true); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false); + frameWriter.writePushPromise(ctx(), newPromise(), 3, 5, http2Headers2, 0); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, true); + frameWriter.writeData(ctx(), newPromise(), 5, + Unpooled.copiedBuffer(text2.getBytes()), 0, true, true); + } + }); + awaitRequests(); + ArgumentCaptor httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class); + verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture()); + + List capturedHttpObjects = httpObjectCaptor.getAllValues(); + assertEquals(request, capturedHttpObjects.get(0)); + assertEquals(content, capturedHttpObjects.get(1)); + assertEquals(request2, capturedHttpObjects.get(2)); + assertEquals(content2, capturedHttpObjects.get(3)); + } + + @Test + public void clientRequestStreamDependency() throws Exception { + serverLatch = new CountDownLatch(4); + final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource", true); + HttpHeaders httpHeaders = request.headers(); + httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3); + final HttpMessage request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.GET, "/some/path/resource2", true); + HttpHeaders httpHeaders2 = request2.headers(); + httpHeaders2.set(Http2HttpHeaders.Names.STREAM_ID, 5); + httpHeaders2.set(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID, 3); + httpHeaders2.set(Http2HttpHeaders.Names.STREAM_EXCLUSIVE, true); + httpHeaders2.set(Http2HttpHeaders.Names.STREAM_WEIGHT, 256); + final Http2Headers http2Headers = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource").build(); + final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder() + .method("GET").path("/some/path/resource2").build(); + final String text = "hello 1!**"; + final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()), + true); + final String text2 = "hello 2!><"; + final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()), + true); + runInChannel(clientChannel, new Http2Runnable() { + @Override + public void run() { + frameWriter.writeHeaders(ctx(), newPromise(), 3, http2Headers, 0, false, false); + frameWriter.writeHeaders(ctx(), newPromise(), 5, http2Headers2, 0, false, false); + frameWriter.writePriority(ctx(), newPromise(), 5, 3, (short) 256, true); + frameWriter.writeData(ctx(), newPromise(), 3, + Unpooled.copiedBuffer(text.getBytes()), 0, true, true); + frameWriter.writeData(ctx(), newPromise(), 5, + Unpooled.copiedBuffer(text2.getBytes()), 0, true, true); + } + }); + awaitRequests(); + ArgumentCaptor httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class); + verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture()); + + List capturedHttpObjects = httpObjectCaptor.getAllValues(); + assertEquals(request, capturedHttpObjects.get(0)); + assertEquals(content, capturedHttpObjects.get(1)); + assertEquals(request2, capturedHttpObjects.get(2)); + assertEquals(content2, capturedHttpObjects.get(3)); + } + + private void awaitRequests() throws Exception { + serverLatch.await(2, SECONDS); + } + + private ChannelHandlerContext ctx() { + return clientChannel.pipeline().firstContext(); + } + + private ChannelPromise newPromise() { + return ctx().newPromise(); + } + + private interface HttpResponseListener { + void messageReceived(HttpObject obj); + } + + private final class HttpResponseDelegator extends SimpleChannelInboundHandler { + private final HttpResponseListener listener; + + public HttpResponseDelegator(HttpResponseListener listener) { + this.listener = listener; + } + + @Override + protected void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + this.listener.messageReceived(msg); + serverLatch.countDown(); + } + } + + private final class FrameAdapter extends ByteToMessageDecoder { + + private final Http2FrameObserver observer; + private final DefaultHttp2FrameReader reader; + private final CountDownLatch requestLatch; + + FrameAdapter(Http2FrameObserver observer, CountDownLatch requestLatch) { + this.observer = observer; + reader = new DefaultHttp2FrameReader(); + this.requestLatch = requestLatch; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) + throws Exception { + reader.readFrame(ctx, in, new Http2FrameObserver() { + + @Override + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endOfStream, boolean endOfSegment) + throws Http2Exception { + observer.onDataRead(ctx, streamId, copy(data), padding, endOfStream, + endOfSegment); + requestLatch.countDown(); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int padding, boolean endStream, boolean endSegment) + throws Http2Exception { + observer.onHeadersRead(ctx, streamId, headers, padding, endStream, endSegment); + requestLatch.countDown(); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, + Http2Headers headers, int streamDependency, short weight, + boolean exclusive, int padding, boolean endStream, boolean endSegment) + throws Http2Exception { + observer.onHeadersRead(ctx, streamId, headers, streamDependency, weight, + exclusive, padding, endStream, endSegment); + requestLatch.countDown(); + } + + @Override + public void onPriorityRead(ChannelHandlerContext ctx, int streamId, + int streamDependency, short weight, boolean exclusive) + throws Http2Exception { + observer.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive); + requestLatch.countDown(); + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + throws Http2Exception { + observer.onRstStreamRead(ctx, streamId, errorCode); + requestLatch.countDown(); + } + + @Override + public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { + observer.onSettingsAckRead(ctx); + requestLatch.countDown(); + } + + @Override + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) + throws Http2Exception { + observer.onSettingsRead(ctx, settings); + requestLatch.countDown(); + } + + @Override + public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) + throws Http2Exception { + observer.onPingRead(ctx, copy(data)); + requestLatch.countDown(); + } + + @Override + public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) + throws Http2Exception { + observer.onPingAckRead(ctx, copy(data)); + requestLatch.countDown(); + } + + @Override + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, + int promisedStreamId, Http2Headers headers, int padding) + throws Http2Exception { + observer.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding); + requestLatch.countDown(); + } + + @Override + public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, + long errorCode, ByteBuf debugData) throws Http2Exception { + observer.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData)); + requestLatch.countDown(); + } + + @Override + public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, + int windowSizeIncrement) throws Http2Exception { + observer.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); + requestLatch.countDown(); + } + + @Override + public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, + Http2Flags flags, ByteBuf payload) { + observer.onUnknownFrame(ctx, frameType, streamId, flags, payload); + requestLatch.countDown(); + } + }); + } + + ByteBuf copy(ByteBuf buffer) { + return Unpooled.copiedBuffer(buffer); + } + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java b/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java index 375e4ff3e1..fe27013a78 100644 --- a/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java +++ b/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java @@ -35,6 +35,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Set; +import java.util.TreeSet; +import java.util.HashSet; import java.util.TimeZone; public class DefaultTextHeaders implements TextHeaders { @@ -876,6 +878,54 @@ public class DefaultTextHeaders implements TextHeaders { return this; } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + for (String name : names()) { + result = prime * result + name.hashCode(); + Set values = new TreeSet(getAll(name)); + for (String value : values) { + result = prime * result + value.hashCode(); + } + } + return result; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DefaultTextHeaders)) { + return false; + } + + DefaultTextHeaders other = (DefaultTextHeaders) o; + + // First, check that the set of names match. + Set names = names(); + if (!names.equals(other.names())) { + return false; + } + + // Compare the values for each name. + for (String name : names) { + List values = getAll(name); + List otherValues = other.getAll(name); + if (values.size() != otherValues.size()) { + return false; + } + + // Convert the values to a set and remove values from the other object to see if + // they match. + Set valueSet = new HashSet(values); + valueSet.removeAll(otherValues); + if (!valueSet.isEmpty()) { + return false; + } + } + + return true; + } + private static final class HeaderEntry implements Map.Entry { private final DefaultTextHeaders parent; final int hash; diff --git a/codec/src/test/java/io/netty/handler/codec/DefaultTextHeadersTest.java b/codec/src/test/java/io/netty/handler/codec/DefaultTextHeadersTest.java new file mode 100644 index 0000000000..c4c15e3eb4 --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/DefaultTextHeadersTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class DefaultTextHeadersTest { + + @Test + public void testEqualsMultipleHeaders() { + DefaultTextHeaders h1 = new DefaultTextHeaders(); + h1.set("foo", "goo"); + h1.set("foo2", "goo2"); + + DefaultTextHeaders h2 = new DefaultTextHeaders(); + h2.set("foo", "goo"); + h2.set("foo2", "goo2"); + + assertTrue(h1.equals(h2)); + assertTrue(h2.equals(h1)); + assertTrue(h2.equals(h2)); + assertTrue(h1.equals(h1)); + } + + @Test + public void testEqualsDuplicateMultipleHeaders() { + DefaultTextHeaders h1 = new DefaultTextHeaders(); + h1.set("foo", "goo"); + h1.set("foo2", "goo2"); + h1.add("foo2", "goo3"); + h1.add("foo", "goo4"); + + DefaultTextHeaders h2 = new DefaultTextHeaders(); + h2.set("foo", "goo"); + h2.set("foo2", "goo2"); + h2.add("foo", "goo4"); + h2.add("foo2", "goo3"); + + assertTrue(h1.equals(h2)); + assertTrue(h2.equals(h1)); + assertTrue(h2.equals(h2)); + assertTrue(h1.equals(h1)); + } + + @Test + public void testNotEqualsDuplicateMultipleHeaders() { + DefaultTextHeaders h1 = new DefaultTextHeaders(); + h1.set("foo", "goo"); + h1.set("foo2", "goo2"); + h1.add("foo2", "goo3"); + h1.add("foo", "goo4"); + + DefaultTextHeaders h2 = new DefaultTextHeaders(); + h2.set("foo", "goo"); + h2.set("foo2", "goo2"); + h2.add("foo", "goo4"); + + assertFalse(h1.equals(h2)); + assertFalse(h2.equals(h1)); + assertTrue(h2.equals(h2)); + assertTrue(h1.equals(h1)); + } +} diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java index a8d3b1ae89..3ec588fe23 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java @@ -175,7 +175,6 @@ public final class HttpUploadClient { //connection will not close but needed // headers.set("Connection","keep-alive"); - // headers.set("Keep-Alive","300"); headers.set( HttpHeaders.Names.COOKIE, ClientCookieEncoder.encode( diff --git a/example/src/main/java/io/netty/example/http2/client/Http2Client.java b/example/src/main/java/io/netty/example/http2/client/Http2Client.java index 358f411bd4..9b4759ba6a 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2Client.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2Client.java @@ -30,7 +30,9 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.util.CharsetUtil; +import java.net.URI; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import static io.netty.handler.codec.http.HttpMethod.*; import static io.netty.handler.codec.http.HttpVersion.*; @@ -45,6 +47,9 @@ public final class Http2Client { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080")); + static final String URL = System.getProperty("url", "/whatever"); + static final String URL2 = System.getProperty("url2"); + static final String URL2DATA = System.getProperty("url2data", "test data!"); public static void main(String[] args) throws Exception { // Configure SSL. @@ -59,7 +64,7 @@ public final class Http2Client { } EventLoopGroup workerGroup = new NioEventLoopGroup(); - Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx); + Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx, Integer.MAX_VALUE); try { // Configure the client. @@ -75,23 +80,34 @@ public final class Http2Client { System.out.println("Connected to [" + HOST + ':' + PORT + ']'); // Wait for the HTTP/2 upgrade to occur. - Http2ClientConnectionHandler http2ConnectionHandler = initializer.connectionHandler(); - http2ConnectionHandler.awaitInitialization(); + Http2SettingsHandler http2SettingsHandler = initializer.settingsHandler(); + http2SettingsHandler.awaitSettings(5, TimeUnit.SECONDS); - // Create a simple POST request with just headers. - FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, "/whatever", - Unpooled.copiedBuffer("sample data".getBytes(CharsetUtil.UTF_8))); - request.headers().add(HttpHeaders.Names.HOST, HOST + ':' + PORT); - - // Send the request to the server. - System.err.println("Sending request..."); - ChannelFuture requestFuture = channel.writeAndFlush(request).sync(); - System.err.println("Back from sending headers..."); - requestFuture.sync(); - - // Waits for the complete response - http2ConnectionHandler.awaitResponse(); - System.out.println("Finished HTTP/2 request"); + HttpResponseHandler responseHandler = initializer.responseHandler(); + int streamId = 3; + URI hostName = URI.create((SSL ? "https" : "http") + "://" + HOST + ':' + PORT); + System.err.println("Sending request(s)..."); + if (URL != null) { + // Create a simple GET request. + FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, URL); + request.headers().add(HttpHeaders.Names.HOST, hostName); + request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); + channel.writeAndFlush(request); + responseHandler.put(streamId, channel.newPromise()); + streamId += 2; + } + if (URL2 != null) { + // Create a simple POST request with a body. + FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, URL2, + Unpooled.copiedBuffer(URL2DATA.getBytes(CharsetUtil.UTF_8))); + request.headers().add(HttpHeaders.Names.HOST, hostName); + request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); + channel.writeAndFlush(request); + responseHandler.put(streamId, channel.newPromise()); + streamId += 2; + } + responseHandler.awaitResponses(5, TimeUnit.SECONDS); + System.out.println("Finished HTTP/2 request(s)"); // Wait until the connection is closed. channel.close().syncUninterruptibly(); diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientConnectionHandler.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientConnectionHandler.java deleted file mode 100644 index 78766389b9..0000000000 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientConnectionHandler.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package io.netty.example.http2.client; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.FullHttpMessage; -import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; -import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DefaultHttp2FrameReader; -import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; -import io.netty.handler.codec.http2.DefaultHttp2Headers; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; -import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2Exception; -import io.netty.handler.codec.http2.Http2FrameLogger; -import io.netty.handler.codec.http2.Http2FrameReader; -import io.netty.handler.codec.http2.Http2FrameWriter; -import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2InboundFrameLogger; -import io.netty.handler.codec.http2.Http2OutboundFrameLogger; -import io.netty.handler.codec.http2.Http2Settings; -import io.netty.util.CharsetUtil; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static io.netty.example.http2.Http2ExampleUtil.*; -import static io.netty.util.internal.logging.InternalLogLevel.*; - -/** - * A subclass of the connection handler that interprets response messages as text and prints it out - * to the console. - */ -public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler { - - private static final Http2FrameLogger logger = - new Http2FrameLogger(INFO, InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class)); - - private final ChannelPromise initPromise; - private final ChannelPromise responsePromise; - private ByteBuf collectedData; - - public Http2ClientConnectionHandler(ChannelPromise initPromise, ChannelPromise responsePromise) { - this(initPromise, responsePromise, new DefaultHttp2Connection(false)); - } - - private Http2ClientConnectionHandler(ChannelPromise initPromise, - ChannelPromise responsePromise, Http2Connection connection) { - super(connection, frameReader(), frameWriter(), new DefaultHttp2InboundFlowController( - connection), new DefaultHttp2OutboundFlowController(connection)); - this.initPromise = initPromise; - this.responsePromise = responsePromise; - } - - /** - * Wait for this handler to be added after the upgrade to HTTP/2, and for initial preface - * handshake to complete. - */ - public void awaitInitialization() throws Exception { - if (!initPromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) { - throw new IllegalStateException("Timed out waiting for initialization"); - } - if (!initPromise.isSuccess()) { - throw new RuntimeException(initPromise.cause()); - } - } - - /** - * Wait for this full response to be received and printed out. - */ - public void awaitResponse() throws Exception { - if (!responsePromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) { - throw new IllegalStateException("Timed out waiting for completion of the response"); - } - if (!responsePromise.isSuccess()) { - throw new RuntimeException(initPromise.cause()); - } - } - - /** - * Handles conversion of a {@link FullHttpMessage} to HTTP/2 frames. - */ - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof FullHttpMessage) { - FullHttpMessage httpMsg = (FullHttpMessage) msg; - boolean hasData = httpMsg.content().isReadable(); - - // Convert and write the headers. - DefaultHttp2Headers.Builder headers = DefaultHttp2Headers.newBuilder(); - for (Map.Entry entry : httpMsg.headers().entries()) { - headers.add(entry.getKey(), entry.getValue()); - } - int streamId = nextStreamId(); - writeHeaders(ctx, promise, streamId, headers.build(), 0, !hasData, false); - if (hasData) { - writeData(ctx, ctx.newPromise(), streamId, httpMsg.content(), 0, true, true); - } - } else { - ctx.write(msg, promise); - } - } - - @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream, boolean endOfSegment) throws Http2Exception { - - // Copy the data into the buffer. - int available = data.readableBytes(); - if (collectedData == null) { - collectedData = ctx().alloc().buffer(available); - collectedData.writeBytes(data, data.readerIndex(), data.readableBytes()); - } else { - // Expand the buffer - ByteBuf newBuffer = ctx().alloc().buffer(collectedData.readableBytes() + available); - newBuffer.writeBytes(collectedData); - newBuffer.writeBytes(data); - collectedData.release(); - collectedData = newBuffer; - } - - // If it's the last frame, print the complete message. - if (endOfStream) { - System.out.println("Received message: " + collectedData.toString(CharsetUtil.UTF_8)); - - // Free the data buffer. - collectedData.release(); - collectedData = null; - - responsePromise.setSuccess(); - } - } - - @Override - public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, - int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, - boolean endSegment) throws Http2Exception { - if (headers.contains(UPGRADE_RESPONSE_HEADER)) { - System.out.println("Received HTTP/2 response to the HTTP->HTTP/2 upgrade request"); - } - } - - @Override - public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { - if (!initPromise.isDone()) { - initPromise.setSuccess(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (!initPromise.isDone()) { - initPromise.setFailure(cause); - } - if (!responsePromise.isDone()) { - initPromise.setFailure(cause); - } - super.exceptionCaught(ctx, cause); - } - - private static Http2FrameReader frameReader() { - return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger); - } - - private static Http2FrameWriter frameWriter() { - return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger); - } -} diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java index e6f0d87191..6db6b31c2f 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java @@ -17,30 +17,62 @@ package io.netty.example.http2.client; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2FrameReader; +import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; +import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; +import io.netty.handler.codec.http2.DelegatingHttp2ConnectionHandler; +import io.netty.handler.codec.http2.DelegatingHttp2HttpConnectionHandler; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2FrameReader; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2InboundFrameLogger; +import io.netty.handler.codec.http2.Http2OutboundFrameLogger; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientUpgradeHandler; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter; import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; import io.netty.handler.ssl.SslContext; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import static io.netty.util.internal.logging.InternalLogLevel.*; /** * Configures the client pipeline to support HTTP/2 frames. */ public class Http2ClientInitializer extends ChannelInitializer { + private static final Http2FrameLogger logger = + new Http2FrameLogger(INFO, InternalLoggerFactory.getInstance(Http2ClientInitializer.class)); private final SslContext sslCtx; - private Http2ClientConnectionHandler connectionHandler; + private final long maxContentLength; + private DelegatingHttp2ConnectionHandler connectionHandler; + private HttpResponseHandler responseHandler; + private Http2SettingsHandler settingsHandler; - public Http2ClientInitializer(SslContext sslCtx) { + public Http2ClientInitializer(SslContext sslCtx, long maxContentLength) { this.sslCtx = sslCtx; + this.maxContentLength = maxContentLength; } @Override public void initChannel(SocketChannel ch) throws Exception { - connectionHandler = new Http2ClientConnectionHandler(ch.newPromise(), ch.newPromise()); + Http2Connection connection = new DefaultHttp2Connection(false); + connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection, + frameReader(), frameWriter(), new DefaultHttp2InboundFlowController(connection), + new DefaultHttp2OutboundFlowController(connection), + InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)); + responseHandler = new HttpResponseHandler(); + settingsHandler = new Http2SettingsHandler(ch.newPromise()); if (sslCtx != null) { configureSsl(ch); } else { @@ -48,15 +80,29 @@ public class Http2ClientInitializer extends ChannelInitializer { } } - public Http2ClientConnectionHandler connectionHandler() { - return connectionHandler; + public HttpResponseHandler responseHandler() { + return responseHandler; + } + + public Http2SettingsHandler settingsHandler() { + return settingsHandler; + } + + protected void configureEndOfPipeline(ChannelPipeline pipeline) { + pipeline.addLast("Http2SettingsHandler", settingsHandler); + pipeline.addLast("Decompressor", new HttpContentDecompressor()); + pipeline.addLast("Aggregator", new HttpObjectAggregator((int) maxContentLength)); + pipeline.addLast("HttpResponseHandler", responseHandler); } /** * Configure the pipeline for TLS NPN negotiation to HTTP/2. */ private void configureSsl(SocketChannel ch) { - ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), connectionHandler); + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("SslHandler", sslCtx.newHandler(ch.alloc())); + pipeline.addLast("Http2Handler", connectionHandler); + configureEndOfPipeline(pipeline); } /** @@ -67,16 +113,16 @@ public class Http2ClientInitializer extends ChannelInitializer { Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(connectionHandler); HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536); - ch.pipeline().addLast(sourceCodec); - ch.pipeline().addLast(upgradeHandler); - ch.pipeline().addLast(new UpgradeRequestHandler()); - ch.pipeline().addLast(new UserEventLogger()); + ch.pipeline().addLast("Http2SourceCodec", sourceCodec); + ch.pipeline().addLast("Http2UpgradeHandler", upgradeHandler); + ch.pipeline().addLast("Http2UpgradeRequestHandler", new UpgradeRequestHandler()); + ch.pipeline().addLast("Logger", new UserEventLogger()); } /** * A handler that triggers the cleartext upgrade to HTTP/2 by sending an initial HTTP request. */ - private static class UpgradeRequestHandler extends ChannelHandlerAdapter { + private final class UpgradeRequestHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { DefaultFullHttpRequest upgradeRequest = @@ -86,7 +132,9 @@ public class Http2ClientInitializer extends ChannelInitializer { super.channelActive(ctx); // Done with this handler, remove it from the pipeline. - ctx.pipeline().remove(ctx.name()); + ctx.pipeline().remove(this); + + Http2ClientInitializer.this.configureEndOfPipeline(ctx.pipeline()); } } @@ -100,4 +148,12 @@ public class Http2ClientInitializer extends ChannelInitializer { super.userEventTriggered(ctx, evt); } } + + private static Http2FrameReader frameReader() { + return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger); + } + + private static Http2FrameWriter frameWriter() { + return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger); + } } diff --git a/example/src/main/java/io/netty/example/http2/client/Http2SettingsHandler.java b/example/src/main/java/io/netty/example/http2/client/Http2SettingsHandler.java new file mode 100644 index 0000000000..7e49ae3147 --- /dev/null +++ b/example/src/main/java/io/netty/example/http2/client/Http2SettingsHandler.java @@ -0,0 +1,62 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.example.http2.client; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http2.Http2Settings; +import java.util.concurrent.TimeUnit; + +/** + * Reads the first {@link Http2Settings} object and notifies a {@link ChannelPromise} + */ +public class Http2SettingsHandler extends SimpleChannelInboundHandler { + private ChannelPromise promise; + + /** + * Create new instance + * + * @param promise Promise object used to notify when first settings are received + */ + public Http2SettingsHandler(ChannelPromise promise) { + this.promise = promise; + } + + /** + * Wait for this handler to be added after the upgrade to HTTP/2, and for initial preface + * handshake to complete. + * + * @param timeout Time to wait + * @param Units for {@code timeout} + * @throws Exception if timeout or other failure occurs + */ + public void awaitSettings(long timeout, TimeUnit unit) throws Exception { + if (!promise.awaitUninterruptibly(timeout, unit)) { + throw new IllegalStateException("Timed out waiting for settings"); + } + if (!promise.isSuccess()) { + throw new RuntimeException(promise.cause()); + } + } + + @Override + protected void messageReceived(ChannelHandlerContext ctx, Http2Settings msg) throws Exception { + promise.setSuccess(); + + // Only care about the first settings message + ctx.pipeline().remove(this); + } +} diff --git a/example/src/main/java/io/netty/example/http2/client/HttpResponseHandler.java b/example/src/main/java/io/netty/example/http2/client/HttpResponseHandler.java new file mode 100644 index 0000000000..6b55923791 --- /dev/null +++ b/example/src/main/java/io/netty/example/http2/client/HttpResponseHandler.java @@ -0,0 +1,100 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.example.http2.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http2.Http2HttpHeaders; +import io.netty.util.CharsetUtil; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Process {@link FullHttpResponse} translated from HTTP/2 frames + */ +public class HttpResponseHandler extends SimpleChannelInboundHandler { + + private SortedMap streamidPromiseMap; + + public HttpResponseHandler() { + streamidPromiseMap = new TreeMap(); + } + + /** + * Create an association between an anticipated response stream id and a {@link ChannelPromise} + * + * @param streamId The stream for which a response is expected + * @param promise The promise object that will be used to wait/notify events + * @return The previous object associated with {@code streamId} + * @see {@link io.netty.example.http2.client.HttpResponseHandler#awaitResponses awaitResponses} + */ + public ChannelPromise put(int streamId, ChannelPromise promise) { + return streamidPromiseMap.put(streamId, promise); + } + + /** + * Wait (sequentially) for a time duration for each anticipated response + * + * @param timeout Value of time to wait for each response + * @param unit Units associated with {@code timeout} + * @see {@link io.netty.example.http2.client.HttpResponseHandler#put put} + */ + public void awaitResponses(long timeout, TimeUnit unit) { + Iterator> itr = streamidPromiseMap.entrySet().iterator(); + while (itr.hasNext()) { + Entry entry = itr.next(); + ChannelPromise promise = entry.getValue(); + if (!promise.awaitUninterruptibly(timeout, unit)) { + throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey()); + } + if (!promise.isSuccess()) { + throw new RuntimeException(promise.cause()); + } + System.out.println("---Stream id: " + entry.getKey() + " received---"); + itr.remove(); + } + } + + @Override + protected void messageReceived(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { + int streamId = Integer.parseInt(msg.headers().get(Http2HttpHeaders.Names.STREAM_ID)); + ChannelPromise promise = streamidPromiseMap.get(streamId); + if (promise == null) { + System.err.println("Message received for unknown stream id " + streamId); + } else { + // Do stuff with the message (for now just print it) + ByteBuf content = msg.content(); + if (content.isReadable()) { + int contentLength = content.readableBytes(); + byte[] arr = new byte[contentLength]; + content.readBytes(arr); + System.out.println(new String(arr, 0, contentLength, CharsetUtil.UTF_8)); + } + + promise.setSuccess(); + } + } +} diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index 0f3aa52204..75e1ab8280 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -21,7 +21,6 @@ import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER; import static io.netty.util.internal.logging.InternalLogLevel.INFO; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.example.http2.client.Http2ClientConnectionHandler; import io.netty.handler.codec.http.HttpServerUpgradeHandler; import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; import io.netty.handler.codec.http2.DefaultHttp2Connection; @@ -45,7 +44,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory; public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { private static final Http2FrameLogger logger = new Http2FrameLogger(INFO, - InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class)); + InternalLoggerFactory.getInstance(HelloWorldHttp2Handler.class)); static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8)); public HelloWorldHttp2Handler() { @@ -68,7 +67,8 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) { // Write an HTTP/2 response to the upgrade request Http2Headers headers = - DefaultHttp2Headers.newBuilder().set(UPGRADE_RESPONSE_HEADER, "true").build(); + DefaultHttp2Headers.newBuilder().status("200") + .set(UPGRADE_RESPONSE_HEADER, "true").build(); writeHeaders(ctx, ctx.newPromise(), 1, headers, 0, true, true); } super.userEventTriggered(ctx, evt);