HTTP/2 Data Decompression

Motivation:
The HTTP/2 codec does not provide a way to decompress data. This functionality is supported by the HTTP codec and is expected to be a commonly used feature.

Modifications:
-The Http2FrameReader will be modified to allow hooks for decompression
-New classes which detect the decompression from HTTP/2 header frames and uses that decompression when HTTP/2 data frames come in
-New unit tests

Result:
The HTTP/2 codec will provide a means to support data decompression
This commit is contained in:
Scott Mitchell 2014-09-07 17:04:13 -04:00 committed by Scott Mitchell
parent 9b811a24e8
commit 96a044fabe
22 changed files with 1654 additions and 789 deletions

View File

@ -47,16 +47,13 @@ public class HttpContentDecompressor extends HttpContentDecoder {
@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
if ("gzip".equalsIgnoreCase(contentEncoding) || "x-gzip".equalsIgnoreCase(contentEncoding)) {
if (HttpHeaders.Values.GZIP.equalsIgnoreCase(contentEncoding) ||
HttpHeaders.Values.XGZIP.equalsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
if ("deflate".equalsIgnoreCase(contentEncoding) || "x-deflate".equalsIgnoreCase(contentEncoding)) {
ZlibWrapper wrapper;
if (strict) {
wrapper = ZlibWrapper.ZLIB;
} else {
wrapper = ZlibWrapper.ZLIB_OR_NONE;
}
if (HttpHeaders.Values.DEFLATE.equalsIgnoreCase(contentEncoding) ||
HttpHeaders.Values.XDEFLATE.equalsIgnoreCase(contentEncoding)) {
final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(wrapper));
}

View File

@ -399,10 +399,18 @@ public interface HttpHeaders extends TextHeaders {
* {@code "deflate"}
*/
public static final AsciiString DEFLATE = new AsciiString("deflate");
/**
* {@code "x-deflate"}
*/
public static final AsciiString XDEFLATE = new AsciiString("deflate");
/**
* {@code "gzip"}
*/
public static final AsciiString GZIP = new AsciiString("gzip");
/**
* {@code "x-gzip"}
*/
public static final AsciiString XGZIP = new AsciiString("x-gzip");
/**
* {@code "identity"}
*/

View File

@ -50,6 +50,11 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,243 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.http.HttpHeaders;
/**
* A HTTP2 frame reader that will decompress data frames according
* to the {@code content-encoding} header for each stream.
*/
public class DecompressorHttp2FrameReader extends DefaultHttp2FrameReader {
private static final AsciiString CONTENT_ENCODING_LOWER_CASE = HttpHeaders.Names.CONTENT_ENCODING.toLowerCase();
private static final AsciiString CONTENT_LENGTH_LOWER_CASE = HttpHeaders.Names.CONTENT_LENGTH.toLowerCase();
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override
public void streamRemoved(Http2Stream stream) {
final EmbeddedChannel decoder = stream.decompressor();
if (decoder != null) {
cleanup(stream, decoder);
}
}
};
private final Http2Connection connection;
private final boolean strict;
/**
* Create a new instance with non-strict deflate decoding.
* {@link #DecompressorHttp2FrameReader(Http2Connection, boolean)}
*/
public DecompressorHttp2FrameReader(Http2Connection connection) {
this(connection, false);
}
/**
* Create a new instance.
* @param strict
* <ul>
* <li>{@code true} to use use strict handling of deflate if used</li>
* <li>{@code false} be more lenient with decompression</li>
* </ul>
*/
public DecompressorHttp2FrameReader(Http2Connection connection, boolean strict) {
this.connection = connection;
this.strict = strict;
connection.addListener(CLEAN_UP_LISTENER);
}
/**
* Returns a new {@link EmbeddedChannel} that decodes the HTTP2 message
* content encoded in the specified {@code contentEncoding}.
*
* @param contentEncoding the value of the {@code content-encoding} header
* @return a new {@link ByteToMessageDecoder} if the specified encoding is supported.
* {@code null} otherwise (alternatively, you can throw a {@link Http2Exception}
* to block unknown encoding).
* @throws Http2Exception If the specified encoding is not not supported and warrants an exception
*/
protected EmbeddedChannel newContentDecoder(CharSequence contentEncoding) throws Http2Exception {
if (HttpHeaders.Values.GZIP.equalsIgnoreCase(contentEncoding) ||
HttpHeaders.Values.XGZIP.equalsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
if (HttpHeaders.Values.DEFLATE.equalsIgnoreCase(contentEncoding) ||
HttpHeaders.Values.XDEFLATE.equalsIgnoreCase(contentEncoding)) {
final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(wrapper));
}
// 'identity' or unsupported
return null;
}
/**
* Returns the expected content encoding of the decoded content.
* This getMethod returns {@code "identity"} by default, which is the case for
* most decoders.
*
* @param contentEncoding the value of the {@code content-encoding} header
* @return the expected content encoding of the new content.
* @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
*/
protected CharSequence getTargetContentEncoding(
@SuppressWarnings("UnusedParameters") CharSequence contentEncoding) throws Http2Exception {
return HttpHeaders.Values.IDENTITY;
}
/**
* Checks if a new decoder object is needed for the stream identified by {@code streamId}.
* This method will modify the {@code content-encoding} header contained in {@code builder}.
* @param streamId The identifier for the headers inside {@code builder}
* @param builder Object representing headers which have been read
* @param endOfStream Indicates if the stream has ended
* @throws Http2Exception If the {@code content-encoding} is not supported
*/
private void initDecoder(int streamId, Http2Headers.Builder builder, boolean endOfStream)
throws Http2Exception {
final Http2Stream stream = connection.stream(streamId);
if (stream != null) {
EmbeddedChannel decoder = stream.decompressor();
if (decoder == null) {
if (!endOfStream) {
// Determine the content encoding.
CharSequence contentEncoding = builder.get(CONTENT_ENCODING_LOWER_CASE);
if (contentEncoding == null) {
contentEncoding = HttpHeaders.Values.IDENTITY;
}
decoder = newContentDecoder(contentEncoding);
if (decoder != null) {
stream.decompressor(decoder);
// Decode the content and remove or replace the existing headers
// so that the message looks like a decoded message.
CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
if (HttpHeaders.Values.IDENTITY.equalsIgnoreCase(targetContentEncoding)) {
builder.remove(CONTENT_ENCODING_LOWER_CASE);
} else {
builder.set(CONTENT_ENCODING_LOWER_CASE, targetContentEncoding);
}
}
}
} else if (endOfStream) {
cleanup(stream, decoder);
}
if (decoder != null) {
// The content length will be for the compressed data. Since we will decompress the data
// this content-length will not be correct. Instead of queuing messages or delaying sending
// header frames...just remove the content-length header
builder.remove(CONTENT_LENGTH_LOWER_CASE);
}
}
}
/**
* Release remaining content from the {@link EmbeddedChannel} and remove the decoder from the {@link Http2Stream}.
* @param stream The stream for which {@code decoder} is the decompressor for
* @param decoder The decompressor for {@code stream}
*/
private static void cleanup(Http2Stream stream, EmbeddedChannel decoder) {
if (decoder.finish()) {
for (;;) {
final ByteBuf buf = decoder.readInbound();
if (buf == null) {
break;
}
buf.release();
}
}
stream.decompressor(null);
}
/**
* Read the next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
* @param decoder The channel to read from
* @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
*/
private static ByteBuf nextReadableBuf(EmbeddedChannel decoder) {
for (;;) {
final ByteBuf buf = decoder.readInbound();
if (buf == null) {
return null;
}
if (!buf.isReadable()) {
buf.release();
continue;
}
return buf;
}
}
@Override
protected void notifyListenerOnDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
final Http2Stream stream = connection.stream(streamId);
final EmbeddedChannel decoder = stream == null ? null : stream.decompressor();
if (decoder == null) {
super.notifyListenerOnDataRead(ctx, streamId, data, padding, endOfStream, listener);
} else {
// call retain here as it will call release after its written to the channel
decoder.writeInbound(data.retain());
ByteBuf buf = nextReadableBuf(decoder);
if (buf == null) {
if (endOfStream) {
super.notifyListenerOnDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true, listener);
}
// END_STREAM is not set and the data could not be decoded yet.
// The assumption has to be there will be more data frames to complete the decode.
// We don't have enough information here to know if this is an error.
} else {
for (;;) {
final ByteBuf nextBuf = nextReadableBuf(decoder);
if (nextBuf == null) {
super.notifyListenerOnDataRead(ctx, streamId, buf, padding, endOfStream, listener);
break;
} else {
super.notifyListenerOnDataRead(ctx, streamId, buf, padding, false, listener);
}
buf = nextBuf;
}
}
if (endOfStream) {
cleanup(stream, decoder);
}
}
}
@Override
protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream,
Http2FrameListener listener) throws Http2Exception {
initDecoder(streamId, builder, endOfStream);
super.notifyListenerOnHeadersRead(ctx, streamId, builder, streamDependency, weight,
exclusive, padding, endOfStream, listener);
}
@Override
protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
int padding, boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
initDecoder(streamId, builder, endOfStream);
super.notifyListenerOnHeadersRead(ctx, streamId, builder, padding, endOfStream, listener);
}
}

View File

@ -29,6 +29,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
@ -189,6 +190,7 @@ public class DefaultHttp2Connection implements Http2Connection {
private boolean terminateReceived;
private FlowState inboundFlow;
private FlowState outboundFlow;
private EmbeddedChannel decompressor;
private Object data;
DefaultStream(int id) {
@ -241,6 +243,19 @@ public class DefaultHttp2Connection implements Http2Connection {
return (T) data;
}
@Override
public void decompressor(EmbeddedChannel decompressor) {
if (this.decompressor != null && decompressor != null) {
throw new IllegalStateException("decompressor can not be reassigned");
}
this.decompressor = decompressor;
}
@Override
public EmbeddedChannel decompressor() {
return decompressor;
}
@Override
public FlowState inboundFlow() {
return inboundFlow;

View File

@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
@ -368,6 +367,23 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
}
}
protected void notifyListenerOnDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
listener.onDataRead(ctx, streamId, data, padding, endOfStream);
}
protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
int streamDependency, short weight, boolean exclusive, int padding,
boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
listener.onHeadersRead(ctx, streamId, builder.build(), streamDependency,
weight, exclusive, padding, endOfStream);
}
protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
int padding, boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
listener.onHeadersRead(ctx, streamId, builder.build(), padding, endOfStream);
}
private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload,
Http2FrameListener listener) throws Http2Exception {
short padding = readPadding(payload);
@ -380,7 +396,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
}
ByteBuf data = payload.readSlice(dataLength);
listener.onDataRead(ctx, streamId, data, padding, flags.endOfStream());
notifyListenerOnDataRead(ctx, streamId, data, padding, flags.endOfStream(), listener);
payload.skipBytes(payload.readableBytes());
}
@ -409,11 +425,11 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception {
builder().addFragment(fragment, ctx.alloc(), endOfHeaders);
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
Http2Headers headers = builder().buildHeaders();
listener.onHeadersRead(ctx, headersStreamId, headers, streamDependency,
weight, exclusive, padding, headersFlags.endOfStream());
notifyListenerOnHeadersRead(ctx, headersStreamId, hdrBlockBuilder.builder(),
streamDependency, weight, exclusive, padding, headersFlags.endOfStream(), listener);
close();
}
}
@ -435,11 +451,11 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception {
builder().addFragment(fragment, ctx.alloc(), endOfHeaders);
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
Http2Headers headers = builder().buildHeaders();
listener.onHeadersRead(ctx, headersStreamId, headers, padding,
headersFlags.endOfStream());
notifyListenerOnHeadersRead(ctx, headersStreamId, hdrBlockBuilder.builder(), padding,
headersFlags.endOfStream(), listener);
close();
}
}
@ -507,9 +523,9 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception {
builder().addFragment(fragment, ctx.alloc(), endOfHeaders);
headersBlockBuilder().addFragment(fragment, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
Http2Headers headers = builder().buildHeaders();
Http2Headers headers = headersBlockBuilder().builder().build();
listener.onPushPromiseRead(ctx, pushPromiseStreamId, promisedStreamId, headers,
padding);
close();
@ -586,7 +602,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
* {@link Http2FrameListener} once the end of headers is reached.
*/
private abstract class HeadersContinuation {
private final HeadersBuilder builder = new HeadersBuilder();
private final HeadersBlockBuilder builder = new HeadersBlockBuilder();
/**
* Returns the stream for which headers are currently being processed.
@ -603,7 +619,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
abstract void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception;
final HeadersBuilder builder() {
final HeadersBlockBuilder headersBlockBuilder() {
return builder;
}
@ -619,7 +635,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
* Utility class to help with construction of the headers block that may potentially span
* multiple frames.
*/
private class HeadersBuilder {
protected class HeadersBlockBuilder {
private ByteBuf headerBlock;
/**
@ -660,7 +676,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
* Builds the headers from the completed headers block. After this is called, this builder
* should not be called again.
*/
Http2Headers buildHeaders() throws Http2Exception {
Http2Headers.Builder builder() throws Http2Exception {
try {
return headersDecoder.decodeHeaders(headerBlock);
} finally {

View File

@ -46,7 +46,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
@Override
public String get(String name) {
public String get(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
@ -65,7 +65,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
@Override
public List<String> getAll(String name) {
public List<String> getAll(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
@ -97,7 +97,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
@Override
public boolean contains(String name) {
public boolean contains(CharSequence name) {
return get(name) != null;
}
@ -156,7 +156,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
/**
* Builds instances of {@link DefaultHttp2Headers}.
*/
public static class Builder {
public static class Builder implements Http2Headers.Builder {
private HeaderEntry[] entries;
private HeaderEntry head;
private Http2Headers buildResults;
@ -166,10 +166,45 @@ public final class DefaultHttp2Headers extends Http2Headers {
clear();
}
/**
* Clears all existing headers from this collection and replaces them with the given header
* set.
*/
@Override
public String get(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
int h = hash(name);
int i = index(h);
HeaderEntry e = entries[i];
while (e != null) {
if (e.hash == h && eq(name, e.key)) {
return e.value;
}
e = e.next;
}
return null;
}
@Override
public List<String> getAll(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
LinkedList<String> values = new LinkedList<String>();
int h = hash(name);
int i = index(h);
HeaderEntry e = entries[i];
while (e != null) {
if (e.hash == h && eq(name, e.key)) {
values.addFirst(e.value);
}
e = e.next;
}
return values;
}
@Override
public void set(Http2Headers headers) {
// No need to lazy copy the previous results, since we're starting from scratch.
clear();
@ -178,21 +213,13 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
}
/**
* Adds the given header to the collection.
*
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
public Builder add(final CharSequence name, final Object value) {
@Override
public Builder add(CharSequence name, Object value) {
return add(name.toString(), value);
}
/**
* Adds the given header to the collection.
*
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
public Builder add(final String name, final Object value) {
@Override
public Builder add(String name, Object value) {
// If this is the first call on the builder since the last build, copy the previous
// results.
lazyCopy();
@ -207,17 +234,8 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
/**
* Removes the header with the given name from this collection.
*/
public Builder remove(final CharSequence name) {
return remove(name.toString());
}
/**
* Removes the header with the given name from this collection.
*/
public Builder remove(final String name) {
@Override
public Builder remove(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
@ -226,28 +244,31 @@ public final class DefaultHttp2Headers extends Http2Headers {
// results.
lazyCopy();
String lowerCaseName = name.toLowerCase();
int nameHash = hash(lowerCaseName);
int hashTableIndex = index(nameHash);
remove0(nameHash, hashTableIndex, lowerCaseName);
remove0(name);
return this;
}
/**
* Sets the given header in the collection, replacing any previous values.
*
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
public Builder set(final CharSequence name, final Object value) {
@Override
public Builder remove(String name) {
if (name == null) {
throw new NullPointerException("name");
}
// If this is the first call on the builder since the last build, copy the previous
// results.
lazyCopy();
remove0(name.toLowerCase());
return this;
}
@Override
public Builder set(CharSequence name, Object value) {
return set(name.toString(), value);
}
/**
* Sets the given header in the collection, replacing any previous values.
*
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
public Builder set(final String name, final Object value) {
@Override
public Builder set(String name, Object value) {
// If this is the first call on the builder since the last build, copy the previous
// results.
lazyCopy();
@ -263,12 +284,8 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
/**
* Sets the given header in the collection, replacing any previous values.
*
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
public Builder set(final String name, final Iterable<?> values) {
@Override
public Builder set(String name, Iterable<?> values) {
if (values == null) {
throw new NullPointerException("values");
}
@ -295,9 +312,12 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
/**
* Clears all values from this collection.
*/
@Override
public int size() {
return size;
}
@Override
public Builder clear() {
// No lazy copy required, since we're just creating a new array.
entries = new HeaderEntry[BUCKET_SIZE];
@ -308,44 +328,32 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
/**
* Sets the {@link PseudoHeaderName#METHOD} header.
*/
@Override
public Builder method(String method) {
return set(METHOD.value(), method);
}
/**
* Sets the {@link PseudoHeaderName#SCHEME} header.
*/
@Override
public Builder scheme(String scheme) {
return set(SCHEME.value(), scheme);
}
/**
* Sets the {@link PseudoHeaderName#AUTHORITY} header.
*/
@Override
public Builder authority(String authority) {
return set(AUTHORITY.value(), authority);
}
/**
* Sets the {@link PseudoHeaderName#PATH} header.
*/
@Override
public Builder path(String path) {
return set(PseudoHeaderName.PATH.value(), path);
}
/**
* Sets the {@link PseudoHeaderName#STATUS} header.
*/
@Override
public Builder status(String status) {
return set(PseudoHeaderName.STATUS.value(), status);
}
/**
* Builds a new instance of {@link DefaultHttp2Headers}.
*/
@Override
public DefaultHttp2Headers build() {
// If this is the first call on the builder since the last build, copy the previous
// results.
@ -383,7 +391,13 @@ public final class DefaultHttp2Headers extends Http2Headers {
size++;
}
private void remove0(int hash, int hashTableIndex, String name) {
private void remove0(final CharSequence name) {
final int nameHash = hash(name);
final int hashTableIndex = index(nameHash);
remove0(nameHash, hashTableIndex, name);
}
private void remove0(int hash, int hashTableIndex, CharSequence name) {
HeaderEntry e = entries[hashTableIndex];
if (e == null) {
return;
@ -477,7 +491,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
}
private static int hash(String name) {
private static int hash(CharSequence name) {
int h = 0;
for (int i = name.length() - 1; i >= 0; i--) {
char c = name.charAt(i);
@ -496,7 +510,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
}
private static boolean eq(String name1, String name2) {
private static boolean eq(CharSequence name1, CharSequence name2) {
int nameLen = name1.length();
if (nameLen != name2.length()) {
return false;

View File

@ -65,7 +65,7 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
}
@Override
public Http2Headers decodeHeaders(ByteBuf headerBlock) throws Http2Exception {
public Http2Headers.Builder decodeHeaders(ByteBuf headerBlock) throws Http2Exception {
try {
final DefaultHttp2Headers.Builder headersBuilder = new DefaultHttp2Headers.Builder();
HeaderListener listener = new HeaderListener() {
@ -83,13 +83,12 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
// TODO: what's the right thing to do here?
}
Http2Headers headers = headersBuilder.build();
if (headers.size() > maxHeaderListSize) {
if (headersBuilder.size() > maxHeaderListSize) {
throw protocolError("Number of headers (%d) exceeds maxHeaderListSize (%d)",
headers.size(), maxHeaderListSize);
headersBuilder.size(), maxHeaderListSize);
}
return headers;
return headersBuilder;
} catch (IOException e) {
throw new Http2Exception(COMPRESSION_ERROR, e.getMessage());
} catch (Throwable e) {

View File

@ -32,12 +32,12 @@ public abstract class Http2Headers implements Iterable<Entry<String, String>> {
public static final Http2Headers EMPTY_HEADERS = new Http2Headers() {
@Override
public String get(String name) {
public String get(CharSequence name) {
return null;
}
@Override
public List<String> getAll(String name) {
public List<String> getAll(CharSequence name) {
return Collections.emptyList();
}
@ -47,7 +47,7 @@ public abstract class Http2Headers implements Iterable<Entry<String, String>> {
}
@Override
public boolean contains(String name) {
public boolean contains(CharSequence name) {
return false;
}
@ -153,14 +153,14 @@ public abstract class Http2Headers implements Iterable<Entry<String, String>> {
*
* @return the header value or {@code null} if there is no such header
*/
public abstract String get(String name);
public abstract String get(CharSequence name);
/**
* Returns the header values with the specified header name.
*
* @return the {@link List} of header values. An empty list if there is no such header.
*/
public abstract List<String> getAll(String name);
public abstract List<String> getAll(CharSequence name);
/**
* Returns all header names and values that this frame contains.
@ -173,7 +173,7 @@ public abstract class Http2Headers implements Iterable<Entry<String, String>> {
/**
* Returns {@code true} if and only if there is a header with the specified header name.
*/
public abstract boolean contains(String name);
public abstract boolean contains(CharSequence name);
/**
* Checks if no header exists.
@ -206,6 +206,117 @@ public abstract class Http2Headers implements Iterable<Entry<String, String>> {
*/
public abstract String forEach(HeaderVisitor visitor);
/**
* Interface for the Builder pattern for {@link Http2Headers}.
*/
public interface Builder {
/**
* Build all the collected headers into a {@link Http2Headers}.
* @return The {@link Http2Headers} object which this builder has been used for
*/
Http2Headers build();
/**
* Gets the number of headers contained in this object.
*/
int size();
/**
* Clears all values from this collection.
*/
Builder clear();
/**
* Returns the header value with the specified header name. If there is more than one header
* value for the specified header name, the first value is returned.
* <p>
* Note that all HTTP2 headers names are lower case and this method will not force {@code name} to lower case.
* @return the header value or {@code null} if there is no such header
*/
String get(CharSequence name);
/**
* Returns the header values with the specified header name.
* <p>
* Note that all HTTP2 headers names are lower case and this method will not force {@code name} to lower case.
* @return the {@link List} of header values. An empty list if there is no such header.
*/
List<String> getAll(CharSequence name);
/**
* Clears all existing headers from this collection and replaces them with the given header
* set.
*/
void set(Http2Headers headers);
/**
* Adds the given header to the collection.
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
Builder add(CharSequence name, Object value);
/**
* Adds the given header to the collection.
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
Builder add(String name, Object value);
/**
* Removes the header with the given name from this collection.
* This method will <b>not</b> force the {@code name} to lower case before looking for a match.
*/
Builder remove(CharSequence name);
/**
* Removes the header with the given name from this collection.
* This method will force the {@code name} to lower case before looking for a match.
*/
Builder remove(String name);
/**
* Sets the given header in the collection, replacing any previous values.
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
Builder set(CharSequence name, Object value);
/**
* Sets the given header in the collection, replacing any previous values.
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
Builder set(String name, Object value);
/**
* Sets the given header in the collection, replacing any previous values.
* @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
*/
Builder set(String name, Iterable<?> values);
/**
* Sets the {@link PseudoHeaderName#METHOD} header.
*/
Builder method(String method);
/**
* Sets the {@link PseudoHeaderName#SCHEME} header.
*/
Builder scheme(String scheme);
/**
* Sets the {@link PseudoHeaderName#AUTHORITY} header.
*/
Builder authority(String authority);
/**
* Sets the {@link PseudoHeaderName#PATH} header.
*/
Builder path(String path);
/**
* Sets the {@link PseudoHeaderName#STATUS} header.
*/
Builder status(String status);
}
/**
* Gets the {@link PseudoHeaderName#METHOD} header or {@code null} if there is no such header
*/

View File

@ -25,7 +25,7 @@ public interface Http2HeadersDecoder {
/**
* Decodes the given headers block and returns the headers.
*/
Http2Headers decodeHeaders(ByteBuf headerBlock) throws Http2Exception;
Http2Headers.Builder decodeHeaders(ByteBuf headerBlock) throws Http2Exception;
/**
* Sets the new max header table size for this decoder.

View File

@ -15,6 +15,8 @@
package io.netty.handler.codec.http2;
import io.netty.channel.embedded.EmbeddedChannel;
import java.util.Collection;
/**
@ -122,6 +124,16 @@ public interface Http2Stream {
*/
<T> T data();
/**
* Associate an object responsible for decompressing data frames for this stream
*/
void decompressor(EmbeddedChannel decompressor);
/**
* Get the object capable of decompressing data frames for this stream
*/
EmbeddedChannel decompressor();
/**
* Gets the in-bound flow control state for this stream.
*/

View File

@ -56,52 +56,52 @@ public final class HttpUtil {
* HTTP extension header which will identify the stream id
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
* <p>
* {@code "X-HTTP2-Stream-ID"}
* {@code "x-http2-stream-id"}
*/
public static final AsciiString STREAM_ID = new AsciiString("X-HTTP2-Stream-ID");
public static final AsciiString STREAM_ID = new AsciiString("x-http2-stream-id");
/**
* HTTP extension header which will identify the authority pseudo header
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
* <p>
* {@code "X-HTTP2-Authority"}
* {@code "x-http2-authority"}
*/
public static final AsciiString AUTHORITY = new AsciiString("X-HTTP2-Authority");
public static final AsciiString AUTHORITY = new AsciiString("x-http2-authority");
/**
* HTTP extension header which will identify the scheme pseudo header
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
* <p>
* {@code "X-HTTP2-Scheme"}
* {@code "x-http2-scheme"}
*/
public static final AsciiString SCHEME = new AsciiString("X-HTTP2-Scheme");
public static final AsciiString SCHEME = new AsciiString("x-http2-scheme");
/**
* HTTP extension header which will identify the path pseudo header
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
* <p>
* {@code "X-HTTP2-Path"}
* {@code "x-http2-path"}
*/
public static final AsciiString PATH = new AsciiString("X-HTTP2-Path");
public static final AsciiString PATH = new AsciiString("x-http2-path");
/**
* HTTP extension header which will identify the stream id used to create this stream
* in a HTTP/2 push promise frame
* <p>
* {@code "X-HTTP2-Stream-Promise-ID"}
* {@code "x-http2-stream-promise-id"}
*/
public static final AsciiString STREAM_PROMISE_ID = new AsciiString("X-HTTP2-Stream-Promise-ID");
public static final AsciiString STREAM_PROMISE_ID = new AsciiString("x-http2-stream-promise-id");
/**
* HTTP extension header which will identify the stream id which this stream is dependent on.
* This stream will be a child node of the stream id associated with this header value.
* <p>
* {@code "X-HTTP2-Stream-Dependency-ID"}
* {@code "x-http2-stream-dependency-id"}
*/
public static final AsciiString STREAM_DEPENDENCY_ID = new AsciiString("X-HTTP2-Stream-Dependency-ID");
public static final AsciiString STREAM_DEPENDENCY_ID = new AsciiString("x-http2-stream-dependency-id");
/**
* HTTP extension header which will identify the weight
* (if non-default and the priority is not on the default stream) of the associated HTTP/2 stream
* responsible responsible for generating a {@code HttpObject}
* <p>
* {@code "X-HTTP2-Stream-Weight"}
* {@code "x-http2-stream-weight"}
*/
public static final AsciiString STREAM_WEIGHT = new AsciiString("X-HTTP2-Stream-Weight");
public static final AsciiString STREAM_WEIGHT = new AsciiString("x-http2-stream-weight");
}
}

View File

@ -273,9 +273,6 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
"Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
}
// TODO: provide hooks to a HttpContentDecoder type interface
// Preferably provide these hooks in the HTTP2 codec so even non-translation layer use-cases benefit
// (and then data will already be decoded here)
content.writeBytes(data, data.readerIndex(), data.readableBytes());
if (endOfStream) {

View File

@ -0,0 +1,379 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Test for data decompression in the HTTP/2 codec.
*/
public class DataCompressionHttp2Test {
private List<ByteBuf> dataCapture;
@Mock
private Http2FrameListener serverListener;
@Mock
private Http2FrameListener clientListener;
private ByteBufAllocator alloc;
private Http2FrameWriter frameWriter;
private ServerBootstrap sb;
private Bootstrap cb;
private Channel serverChannel;
private Channel serverConnectedChannel;
private Channel clientChannel;
private CountDownLatch serverLatch;
private CountDownLatch clientLatch;
private Http2TestUtil.FrameAdapter serverAdapter;
private Http2TestUtil.FrameAdapter clientAdapter;
private Http2Connection serverConnection;
@Before
public void setup() throws InterruptedException {
MockitoAnnotations.initMocks(this);
alloc = UnpooledByteBufAllocator.DEFAULT;
sb = new ServerBootstrap();
cb = new Bootstrap();
serverLatch(new CountDownLatch(1));
clientLatch(new CountDownLatch(1));
frameWriter = new DefaultHttp2FrameWriter();
serverConnection = new DefaultHttp2Connection(true);
sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
serverAdapter = new Http2TestUtil.FrameAdapter(serverConnection, new DecompressorHttp2FrameReader(
serverConnection), serverListener, serverLatch, false);
p.addLast("reader", serverAdapter);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
serverConnectedChannel = ch;
}
});
cb.group(new NioEventLoopGroup());
cb.channel(NioSocketChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
clientAdapter = new Http2TestUtil.FrameAdapter(clientListener, clientLatch, false);
p.addLast("reader", clientAdapter);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
}
@After
public void teardown() throws InterruptedException {
if (dataCapture != null) {
for (int i = 0; i < dataCapture.size(); ++i) {
dataCapture.get(i).release();
}
dataCapture = null;
}
serverChannel.close().sync();
sb.group().shutdownGracefully();
sb.childGroup().shutdownGracefully();
cb.group().shutdownGracefully();
serverAdapter = null;
clientAdapter = null;
serverConnection = null;
}
@Test
public void justHeadersNoData() throws Exception {
final Http2Headers headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path")
.set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitServer();
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers), eq(0), eq(true));
}
@Test
public void gzipEncodingSingleEmptyMessage() throws Exception {
serverLatch(new CountDownLatch(2));
final String text = "";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
try {
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
.set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitServer();
data.readerIndex(0);
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally {
data.release();
cleanupEncoder(encoder);
}
}
@Test
public void gzipEncodingSingleMessage() throws Exception {
serverLatch(new CountDownLatch(2));
final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
try {
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
.set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitServer();
data.readerIndex(0);
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally {
data.release();
cleanupEncoder(encoder);
}
}
@Test
public void gzipEncodingMultipleMessages() throws Exception {
serverLatch(new CountDownLatch(3));
final String text1 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
final String text2 = "dddddddddddddddddddeeeeeeeeeeeeeeeeeeeffffffffffffffffffff";
final ByteBuf data1 = Unpooled.copiedBuffer(text1.getBytes());
final ByteBuf data2 = Unpooled.copiedBuffer(text2.getBytes());
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
try {
final ByteBuf encodedData1 = encodeData(data1, encoder);
final ByteBuf encodedData2 = encodeData(data2, encoder);
final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
.set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData1, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData2, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitServer();
data1.readerIndex(0);
data2.readerIndex(0);
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
ArgumentCaptor<Boolean> endStreamCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(serverListener, times(2)).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(),
eq(0), endStreamCaptor.capture());
dataCapture = dataCaptor.getAllValues();
assertEquals(data1, dataCapture.get(0));
assertEquals(data2, dataCapture.get(1));
List<Boolean> endStreamCapture = endStreamCaptor.getAllValues();
assertEquals(false, endStreamCapture.get(0));
assertEquals(true, endStreamCapture.get(1));
} finally {
data1.release();
data2.release();
cleanupEncoder(encoder);
}
}
@Test
public void deflateEncodingSingleLargeMessage() throws Exception {
serverLatch(new CountDownLatch(2));
final ByteBuf data = Unpooled.buffer(1 << 16);
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.ZLIB));
try {
for (int i = 0; i < data.capacity(); ++i) {
data.writeByte((byte) 'a');
}
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
.set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.DEFLATE).build();
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitServer();
data.readerIndex(0);
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally {
data.release();
cleanupEncoder(encoder);
}
}
private ByteBuf encodeData(ByteBuf data, EmbeddedChannel encoder) {
ByteBuf encoded = alloc.buffer(data.readableBytes());
encoder.writeOutbound(data.retain());
for (;;) {
final ByteBuf buf = encoder.readOutbound();
if (buf == null) {
break;
}
if (!buf.isReadable()) {
buf.release();
continue;
}
encoded.writeBytes(buf);
buf.release();
}
return encoded;
}
private static void cleanupEncoder(EmbeddedChannel encoder) {
if (encoder.finish()) {
for (;;) {
final ByteBuf buf = encoder.readOutbound();
if (buf == null) {
break;
}
buf.release();
}
}
}
private void serverLatch(CountDownLatch latch) {
serverLatch = latch;
if (serverAdapter != null) {
serverAdapter.latch(serverLatch);
}
}
private void clientLatch(CountDownLatch latch) {
clientLatch = latch;
if (clientAdapter != null) {
clientAdapter.latch(clientLatch);
}
}
private void awaitServer() throws Exception {
serverLatch.await(5, SECONDS);
}
private void awaitClient() throws Exception {
clientLatch.await(5, SECONDS);
}
private ChannelHandlerContext ctxClient() {
return clientChannel.pipeline().firstContext();
}
private ChannelPromise newPromiseClient() {
return ctxClient().newPromise();
}
private ChannelHandlerContext ctxServer() {
return serverConnectedChannel.pipeline().firstContext();
}
private ChannelPromise newPromiseServer() {
return ctxServer().newPromise();
}
}

View File

@ -43,7 +43,7 @@ public class DefaultHttp2HeadersDecoderTest {
@Test
public void decodeShouldSucceed() throws Exception {
ByteBuf buf = encode(":method", "GET", "akey", "avalue");
Http2Headers headers = decoder.decodeHeaders(buf);
Http2Headers headers = decoder.decodeHeaders(buf).build();
assertEquals(2, headers.size());
assertEquals("GET", headers.method());
assertEquals("avalue", headers.get("akey"));

View File

@ -60,7 +60,6 @@ import org.mockito.MockitoAnnotations;
* Testing the {@link DelegatingHttp2HttpConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames
*/
public class DelegatingHttp2HttpConnectionHandlerTest {
@Mock
private Http2FrameListener clientListener;

View File

@ -15,6 +15,16 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
import static io.netty.util.CharsetUtil.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -28,9 +38,13 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -38,23 +52,13 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2TestUtil.*;
import static io.netty.util.CharsetUtil.*;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Tests encoding/decoding each HTTP2 frame type.
*/
public class Http2FrameRoundtripTest {
@Mock
private Http2FrameListener serverObserver;
private Http2FrameListener serverListener;
private ArgumentCaptor<ByteBuf> dataCaptor;
private Http2FrameWriter frameWriter;
@ -63,12 +67,13 @@ public class Http2FrameRoundtripTest {
private Channel serverChannel;
private Channel clientChannel;
private CountDownLatch requestLatch;
private Http2TestUtil.FrameAdapter serverAdapter;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
requestLatch = new CountDownLatch(1);
serverLatch(new CountDownLatch(1));
frameWriter = new DefaultHttp2FrameWriter();
dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
@ -81,7 +86,8 @@ public class Http2FrameRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("reader", new FrameAdapter(serverObserver));
serverAdapter = new Http2TestUtil.FrameAdapter(serverListener, requestLatch, true);
p.addLast("reader", serverAdapter);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
}
});
@ -92,7 +98,7 @@ public class Http2FrameRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("reader", new FrameAdapter(null));
p.addLast("reader", new Http2TestUtil.FrameAdapter(null, null, true));
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
}
});
@ -107,26 +113,37 @@ public class Http2FrameRoundtripTest {
@After
public void teardown() throws Exception {
List<ByteBuf> capturedData = dataCaptor.getAllValues();
for (int i = 0; i < capturedData.size(); ++i) {
capturedData.get(i).release();
}
serverChannel.close().sync();
sb.group().shutdownGracefully();
sb.childGroup().shutdownGracefully();
cb.group().shutdownGracefully();
serverAdapter = null;
}
@Test
public void dataFrameShouldMatch() throws Exception {
final String text = "hello world";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
try {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeData(ctx(), 0x7FFFFFFF,
Unpooled.copiedBuffer(text.getBytes()), 100, true, newPromise());
frameWriter.writeData(ctx(), 0x7FFFFFFF, data.retain(), 100, true, newPromise());
ctx().flush();
}
});
awaitRequests();
verify(serverObserver).onDataRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
dataCaptor.capture(), eq(100), eq(true));
List<ByteBuf> capturedData = dataCaptor.getAllValues();
assertEquals(data, capturedData.get(0));
} finally {
data.release();
}
}
@Test
@ -142,7 +159,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(headers), eq(0), eq(true));
}
@ -160,39 +177,50 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(headers), eq(4), eq((short) 255), eq(true), eq(0), eq(true));
}
@Test
public void goAwayFrameShouldMatch() throws Exception {
final String text = "test";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
try {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL,
Unpooled.copiedBuffer(text.getBytes()), newPromise());
frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, data.retain(), newPromise());
ctx().flush();
}
});
awaitRequests();
verify(serverObserver).onGoAwayRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onGoAwayRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0xFFFFFFFFL), dataCaptor.capture());
List<ByteBuf> capturedData = dataCaptor.getAllValues();
assertEquals(data, capturedData.get(0));
} finally {
data.release();
}
}
@Test
public void pingFrameShouldMatch() throws Exception {
final ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
final ByteBuf data = Unpooled.copiedBuffer("01234567", UTF_8);
try {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writePing(ctx(), true, buf, newPromise());
frameWriter.writePing(ctx(), true, data.retain(), newPromise());
ctx().flush();
}
});
awaitRequests();
verify(serverObserver)
.onPingAckRead(any(ChannelHandlerContext.class), dataCaptor.capture());
verify(serverListener).onPingAckRead(any(ChannelHandlerContext.class), dataCaptor.capture());
List<ByteBuf> capturedData = dataCaptor.getAllValues();
assertEquals(data, capturedData.get(0));
} finally {
data.release();
}
}
@Test
@ -205,7 +233,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
verify(serverObserver).onPriorityRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onPriorityRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(1), eq((short) 1), eq(true));
}
@ -222,7 +250,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
verify(serverObserver).onPushPromiseRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onPushPromiseRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(1), eq(headers), eq(5));
}
@ -236,7 +264,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
verify(serverObserver).onRstStreamRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onRstStreamRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0xFFFFFFFFL));
}
@ -254,7 +282,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
verify(serverObserver).onSettingsRead(any(ChannelHandlerContext.class), eq(settings));
verify(serverListener).onSettingsRead(any(ChannelHandlerContext.class), eq(settings));
}
@Test
@ -267,7 +295,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
verify(serverObserver).onWindowUpdateRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
verify(serverListener).onWindowUpdateRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0x7FFFFFFF));
}
@ -277,26 +305,46 @@ public class Http2FrameRoundtripTest {
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
final String text = "hello world";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
try {
final int numStreams = 10000;
int expectedFrames = numStreams * 2;
requestLatch = new CountDownLatch(expectedFrames);
final int expectedFrames = numStreams * 2;
serverLatch(new CountDownLatch(expectedFrames));
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
for (int i = 1; i < numStreams + 1; ++i) {
frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false,
0, false, newPromise());
frameWriter.writeData(ctx(), i,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false, 0, false, newPromise());
frameWriter.writeData(ctx(), i, data.retain(), 0, true, newPromise());
ctx().flush();
}
}
});
awaitRequests();
awaitRequests(30);
verify(serverListener, times(numStreams)).onDataRead(any(ChannelHandlerContext.class), anyInt(),
dataCaptor.capture(), eq(0), eq(true));
List<ByteBuf> capturedData = dataCaptor.getAllValues();
for (int i = 0; i < capturedData.size(); ++i) {
assertEquals(data, capturedData.get(i));
}
} finally {
data.release();
}
}
private void awaitRequests(long seconds) throws InterruptedException {
requestLatch.await(seconds, SECONDS);
}
private void awaitRequests() throws InterruptedException {
requestLatch.await(5, SECONDS);
awaitRequests(5);
}
private void serverLatch(CountDownLatch latch) {
requestLatch = latch;
if (serverAdapter != null) {
serverAdapter.latch(latch);
}
}
private ChannelHandlerContext ctx() {
@ -306,123 +354,4 @@ public class Http2FrameRoundtripTest {
private ChannelPromise newPromise() {
return ctx().newPromise();
}
private final class FrameAdapter extends ByteToMessageDecoder {
private final Http2FrameListener observer;
private final DefaultHttp2FrameReader reader;
FrameAdapter(Http2FrameListener observer) {
this.observer = observer;
reader = new DefaultHttp2FrameReader();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
reader.readFrame(ctx, in, new Http2FrameListener() {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream)
throws Http2Exception {
observer.onDataRead(ctx, streamId, copy(data), padding, endOfStream);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endStream)
throws Http2Exception {
observer.onHeadersRead(ctx, streamId, headers, padding, endStream);
requestLatch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream)
throws Http2Exception {
observer.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream);
requestLatch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId,
int streamDependency, short weight, boolean exclusive)
throws Http2Exception {
observer.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
requestLatch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
observer.onRstStreamRead(ctx, streamId, errorCode);
requestLatch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
observer.onSettingsAckRead(ctx);
requestLatch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
observer.onSettingsRead(ctx, settings);
requestLatch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
observer.onPingRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
observer.onPingAckRead(ctx, copy(data));
requestLatch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding)
throws Http2Exception {
observer.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
requestLatch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId,
long errorCode, ByteBuf debugData) throws Http2Exception {
observer.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData));
requestLatch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement) throws Http2Exception {
observer.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
requestLatch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) {
observer.onUnknownFrame(ctx, frameType, streamId, flags, payload);
requestLatch.countDown();
}
});
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
}
}
}

View File

@ -94,7 +94,7 @@ public class Http2HeaderBlockIOTest {
private void assertRoundtripSuccessful(Http2Headers in) throws Http2Exception {
encoder.encodeHeaders(in, buffer);
Http2Headers out = decoder.decodeHeaders(buffer);
Http2Headers out = decoder.decodeHeaders(buffer).build();
assertEquals(in, out);
}
}

View File

@ -14,7 +14,14 @@
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* Utilities for the integration tests.
@ -45,4 +52,184 @@ final class Http2TestUtil {
private Http2TestUtil() {
}
static class FrameAdapter extends ByteToMessageDecoder {
private final boolean copyBufs;
private final Http2Connection connection;
private final Http2FrameListener listener;
private final DefaultHttp2FrameReader reader;
private CountDownLatch latch;
FrameAdapter(Http2FrameListener listener, CountDownLatch latch, boolean copyBufs) {
this(null, listener, latch, copyBufs);
}
FrameAdapter(Http2Connection connection, Http2FrameListener listener, CountDownLatch latch, boolean copyBufs) {
this(connection, new DefaultHttp2FrameReader(), listener, latch, copyBufs);
}
FrameAdapter(Http2Connection connection, DefaultHttp2FrameReader reader, Http2FrameListener listener,
CountDownLatch latch, boolean copyBufs) {
this.connection = connection;
this.listener = listener;
this.reader = reader;
this.copyBufs = copyBufs;
latch(latch);
}
public void latch(CountDownLatch latch) {
this.latch = latch;
}
public Http2Stream getOrCreateStream(int streamId, boolean halfClosed) throws Http2Exception {
return getOrCreateStream(connection, streamId, halfClosed);
}
public static Http2Stream getOrCreateStream(Http2Connection connection, int streamId, boolean halfClosed)
throws Http2Exception {
if (connection != null) {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
if ((connection.isServer() && streamId % 2 == 0) || (!connection.isServer() && streamId % 2 != 0)) {
stream = connection.local().createStream(streamId, halfClosed);
} else {
stream = connection.remote().createStream(streamId, halfClosed);
}
}
return stream;
}
return null;
}
private void closeStream(Http2Stream stream) {
closeStream(stream, false);
}
protected void closeStream(Http2Stream stream, boolean dataRead) {
if (stream != null) {
stream.close();
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
reader.readFrame(ctx, in, new Http2FrameListener() {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, endOfStream);
listener.onDataRead(ctx, streamId, copyBufs ? copy(data) : data, padding, endOfStream);
if (endOfStream) {
closeStream(stream, true);
}
latch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, endStream);
listener.onHeadersRead(ctx, streamId, headers, padding, endStream);
if (endStream) {
closeStream(stream);
}
latch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)
throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, endStream);
if (stream != null) {
stream.setPriority(streamDependency, weight, exclusive);
}
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding,
endStream);
if (endStream) {
closeStream(stream);
}
latch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, false);
if (stream != null) {
stream.setPriority(streamDependency, weight, exclusive);
}
listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
latch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, false);
listener.onRstStreamRead(ctx, streamId, errorCode);
closeStream(stream);
latch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
listener.onSettingsAckRead(ctx);
latch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
listener.onSettingsRead(ctx, settings);
latch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingRead(ctx, copyBufs ? copy(data) : data);
latch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingAckRead(ctx, copyBufs ? copy(data) : data);
latch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
getOrCreateStream(promisedStreamId, false);
listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
latch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
listener.onGoAwayRead(ctx, lastStreamId, errorCode, copyBufs ? copy(debugData) : debugData);
latch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
getOrCreateStream(streamId, false);
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
latch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload) {
listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
latch.countDown();
}
});
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
}
}
}

View File

@ -18,10 +18,9 @@ import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.reset;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -36,14 +35,11 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponseStatus;
@ -63,10 +59,12 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Testing the {@link InboundHttp2ToHttpPriorityAdapter} and base class {@link InboundHttp2ToHttpAdapter}
* for HTTP/2 frames into {@link HttpObject}s
* Testing the {@link InboundHttp2ToHttpPriorityAdapter} and base class {@link InboundHttp2ToHttpAdapter} for HTTP/2
* frames into {@link HttpObject}s
*/
public class InboundHttp2ToHttpAdapterTest {
private List<FullHttpMessage> capturedRequests;
private List<FullHttpMessage> capturedResponses;
@Mock
private HttpResponseListener serverListener;
@ -108,9 +106,10 @@ public class InboundHttp2ToHttpAdapterTest {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(true);
p.addLast("reader", new FrameAdapter(connection,
InboundHttp2ToHttpPriorityAdapter.newInstance(connection, maxContentLength),
new CountDownLatch(10)));
p.addLast(
"reader",
new HttpAdapterFrameAdapter(connection, InboundHttp2ToHttpPriorityAdapter.newInstance(
connection, maxContentLength), new CountDownLatch(10)));
serverDelegator = new HttpResponseDelegator(serverListener, serverLatch);
p.addLast(serverDelegator);
serverConnectedChannel = ch;
@ -124,9 +123,10 @@ public class InboundHttp2ToHttpAdapterTest {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(false);
p.addLast("reader", new FrameAdapter(connection,
InboundHttp2ToHttpPriorityAdapter.newInstance(connection, maxContentLength),
new CountDownLatch(10)));
p.addLast(
"reader",
new HttpAdapterFrameAdapter(connection, InboundHttp2ToHttpPriorityAdapter.newInstance(
connection, maxContentLength), new CountDownLatch(10)));
clientDelegator = new HttpResponseDelegator(clientListener, clientLatch);
p.addLast(clientDelegator);
}
@ -142,6 +142,8 @@ public class InboundHttp2ToHttpAdapterTest {
@After
public void teardown() throws Exception {
cleanupCapturedRequests();
cleanupCapturedResponses();
serverChannel.close().sync();
sb.group().shutdownGracefully();
sb.childGroup().shutdownGracefully();
@ -155,8 +157,9 @@ public class InboundHttp2ToHttpAdapterTest {
@Test
public void clientRequestSingleHeaderNoDataFrames() throws Exception {
final HttpMessage request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.SCHEME, "https");
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.AUTHORITY, "example.org");
@ -172,7 +175,13 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
} finally {
request.release();
}
}
@Test
@ -181,24 +190,29 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET")
.path("/some/path/resource2").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, true,
newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
} finally {
request.release();
}
}
@Test
public void clientRequestMultipleDataFrames() throws Exception {
@ -206,27 +220,33 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET")
.path("/some/path/resource2").build();
final int midPoint = text.length() / 2;
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.slice(0, midPoint).retain(), 0,
false, newPromiseClient());
frameWriter
.writeData(ctxClient(), 3, content.slice(0, midPoint).retain(), 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.slice(midPoint, text.length() - midPoint).retain(), 0,
true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
} finally {
request.release();
}
}
@Test
public void clientRequestMultipleEmptyDataFrames() throws Exception {
@ -234,11 +254,12 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET")
.path("/some/path/resource2").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
@ -250,9 +271,14 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
} finally {
request.release();
}
}
@Test
public void clientRequestMultipleHeaders() throws Exception {
@ -262,6 +288,7 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
@ -269,8 +296,8 @@ public class InboundHttp2ToHttpAdapterTest {
trailingHeaders.set("FoO", "goo");
trailingHeaders.set("foO2", "goo2");
trailingHeaders.add("fOo2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET")
.path("/some/path/resource2").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo").set("foo2", "goo2")
.add("foo2", "goo3").build();
runInChannel(clientChannel, new Http2Runnable() {
@ -278,15 +305,19 @@ public class InboundHttp2ToHttpAdapterTest {
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, true,
newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
} finally {
request.release();
}
}
@Test
public void clientRequestTrailingHeaders() throws Exception {
@ -294,6 +325,7 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
@ -301,8 +333,8 @@ public class InboundHttp2ToHttpAdapterTest {
trailingHeaders.set("Foo", "goo");
trailingHeaders.set("fOo2", "goo2");
trailingHeaders.add("foO2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET")
.path("/some/path/resource2").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo").set("foo2", "goo2")
.add("foo2", "goo3").build();
runInChannel(clientChannel, new Http2Runnable() {
@ -315,9 +347,14 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
} finally {
request.release();
}
}
@Test
public void clientRequestStreamDependencyInHttpMessageFlow() throws Exception {
@ -328,21 +365,21 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content2 = Unpooled.copiedBuffer(text2.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource", content, true);
final FullHttpMessage request2 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource2", content2, true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final FullHttpMessage request2 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource2", content2, true);
HttpHeaders httpHeaders2 = request2.headers();
httpHeaders2.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(HttpUtil.ExtensionHeaders.Names.STREAM_DEPENDENCY_ID, 3);
httpHeaders2.set(HttpUtil.ExtensionHeaders.Names.STREAM_WEIGHT, 123);
httpHeaders2.set(HttpHeaders.Names.CONTENT_LENGTH, text2.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("PUT").path("/some/path/resource")
.build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().method("PUT").path("/some/path/resource2")
.build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("PUT")
.path("/some/path/resource").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().method("PUT")
.path("/some/path/resource2").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
@ -355,14 +392,16 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
ArgumentCaptor<FullHttpMessage> httpObjectCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(request2, capturedHttpObjects.get(1));
capturedRequests = httpObjectCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
assertEquals(request2, capturedRequests.get(1));
} finally {
request.release();
request2.release();
}
}
@Test
public void clientRequestStreamDependencyOutsideHttpMessageFlow() throws Exception {
@ -373,22 +412,21 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content2 = Unpooled.copiedBuffer(text2.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource", content, true);
final FullHttpMessage request2 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource2", content2, true);
final FullHttpMessage request3 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpUtil.OUT_OF_MESSAGE_SEQUENCE_METHOD, HttpUtil.OUT_OF_MESSAGE_SEQUENCE_PATH, true);
try {
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final FullHttpMessage request2 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource2", content2, true);
HttpHeaders httpHeaders2 = request2.headers();
httpHeaders2.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(HttpHeaders.Names.CONTENT_LENGTH, text2.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("PUT").path("/some/path/resource")
.build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().method("PUT").path("/some/path/resource2")
.build();
final FullHttpMessage request3 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpUtil.OUT_OF_MESSAGE_SEQUENCE_METHOD,
HttpUtil.OUT_OF_MESSAGE_SEQUENCE_PATH, true);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("PUT")
.path("/some/path/resource").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().method("PUT")
.path("/some/path/resource2").build();
HttpHeaders httpHeaders3 = request3.headers();
httpHeaders3.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 5);
httpHeaders3.set(HttpUtil.ExtensionHeaders.Names.STREAM_DEPENDENCY_ID, 3);
@ -406,16 +444,18 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
ArgumentCaptor<FullHttpMessage> httpObjectCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener, times(3)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(request2, capturedHttpObjects.get(1));
assertEquals(request3, capturedHttpObjects.get(2));
capturedRequests = httpObjectCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
assertEquals(request2, capturedRequests.get(1));
assertEquals(request3, capturedRequests.get(2));
} finally {
request.release();
request2.release();
request3.release();
}
}
@Test
public void serverRequestPushPromise() throws Exception {
@ -426,11 +466,14 @@ public class InboundHttp2ToHttpAdapterTest {
final ByteBuf content2 = Unpooled.copiedBuffer(text2.getBytes());
final FullHttpMessage response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
content, true);
final FullHttpMessage response2 = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED,
content2, true);
final FullHttpMessage request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/push/test", true);
try {
HttpHeaders httpHeaders = response.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final FullHttpMessage response2 = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED,
content2, true);
HttpHeaders httpHeaders2 = response2.headers();
httpHeaders2.set(HttpUtil.ExtensionHeaders.Names.SCHEME, "https");
httpHeaders2.set(HttpUtil.ExtensionHeaders.Names.AUTHORITY, "example.org");
@ -438,11 +481,11 @@ public class InboundHttp2ToHttpAdapterTest {
httpHeaders2.set(HttpUtil.ExtensionHeaders.Names.STREAM_PROMISE_ID, 3);
httpHeaders2.set(HttpHeaders.Names.CONTENT_LENGTH, text2.length());
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/push/test", true);
httpHeaders = request.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2Headers3 = new DefaultHttp2Headers.Builder().method("GET").path("/push/test").build();
final Http2Headers http2Headers3 = new DefaultHttp2Headers.Builder().method("GET")
.path("/push/test").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
@ -451,7 +494,10 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().status("200").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().status("201").scheme("https")
@ -467,14 +513,17 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitResponses();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(clientListener, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(response, capturedHttpObjects.get(0));
assertEquals(response2, capturedHttpObjects.get(1));
ArgumentCaptor<FullHttpMessage> responseCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(clientListener, times(2)).messageReceived(responseCaptor.capture());
capturedResponses = responseCaptor.getAllValues();
assertEquals(response, capturedResponses.get(0));
assertEquals(response2, capturedResponses.get(1));
} finally {
request.release();
response.release();
response2.release();
}
}
@Test
public void serverResponseHeaderInformational() throws Exception {
@ -486,6 +535,13 @@ public class InboundHttp2ToHttpAdapterTest {
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("PUT").path("/info/test")
.set(HttpHeaders.Names.EXPECT.toString(), HttpHeaders.Values.CONTINUE).build();
final FullHttpMessage response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
final String text = "a big payload";
final ByteBuf payload = Unpooled.copiedBuffer(text.getBytes());
final FullHttpMessage request2 = request.copy(payload);
final FullHttpMessage response2 = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
try {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
@ -494,10 +550,13 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request));
ArgumentCaptor<FullHttpMessage> requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request, capturedRequests.get(0));
cleanupCapturedRequests();
reset(serverListener);
final FullHttpMessage response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
httpHeaders = response.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
@ -510,17 +569,17 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitResponses();
verify(clientListener).messageReceived(eq(response));
ArgumentCaptor<FullHttpMessage> responseCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(clientListener).messageReceived(responseCaptor.capture());
capturedResponses = responseCaptor.getAllValues();
assertEquals(response, capturedResponses.get(0));
cleanupCapturedResponses();
reset(clientListener);
setServerLatch(1);
final String text = "a big payload";
final ByteBuf payload = Unpooled.copiedBuffer(text.getBytes());
final FullHttpMessage request2 = request.copy(payload);
httpHeaders = request2.headers();
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
httpHeaders.remove(HttpHeaders.Names.EXPECT);
request.release();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
@ -529,11 +588,12 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitRequests();
verify(serverListener).messageReceived(eq(request2));
request2.release();
requestCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(serverListener).messageReceived(requestCaptor.capture());
capturedRequests = requestCaptor.getAllValues();
assertEquals(request2, capturedRequests.get(0));
setClientLatch(1);
final FullHttpMessage response2 = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpHeaders = response2.headers();
httpHeaders.set(HttpUtil.ExtensionHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
@ -546,7 +606,34 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
awaitResponses();
verify(clientListener).messageReceived(eq(response2));
responseCaptor = ArgumentCaptor.forClass(FullHttpMessage.class);
verify(clientListener).messageReceived(responseCaptor.capture());
capturedResponses = responseCaptor.getAllValues();
assertEquals(response2, capturedResponses.get(0));
} finally {
request.release();
request2.release();
response.release();
response2.release();
}
}
private void cleanupCapturedRequests() {
if (capturedRequests != null) {
for (int i = 0; i < capturedRequests.size(); ++i) {
capturedRequests.get(i).release();
}
capturedRequests = null;
}
}
private void cleanupCapturedResponses() {
if (capturedResponses != null) {
for (int i = 0; i < capturedResponses.size(); ++i) {
capturedResponses.get(i).release();
}
capturedResponses = null;
}
}
private void setServerLatch(int count) {
@ -612,150 +699,16 @@ public class InboundHttp2ToHttpAdapterTest {
}
}
private final class FrameAdapter extends ByteToMessageDecoder {
private final Http2Connection connection;
private final Http2FrameListener listener;
private final DefaultHttp2FrameReader reader;
private final CountDownLatch latch;
FrameAdapter(Http2Connection connection, Http2FrameListener listener, CountDownLatch latch) {
this.connection = connection;
this.listener = listener;
reader = new DefaultHttp2FrameReader();
this.latch = latch;
private final class HttpAdapterFrameAdapter extends Http2TestUtil.FrameAdapter {
HttpAdapterFrameAdapter(Http2Connection connection, Http2FrameListener listener, CountDownLatch latch) {
super(connection, listener, latch, false);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
reader.readFrame(ctx, in, new Http2FrameListener() {
public Http2Stream getOrCreateStream(int streamId, boolean halfClosed) throws Http2Exception {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
if ((connection.isServer() && streamId % 2 == 0) ||
(!connection.isServer() && streamId % 2 != 0)) {
stream = connection.local().createStream(streamId, halfClosed);
} else {
stream = connection.remote().createStream(streamId, halfClosed);
protected void closeStream(Http2Stream stream, boolean dataRead) {
if (!dataRead) { // NOTE: Do not close the stream to allow the out of order messages to be processed
super.closeStream(stream, dataRead);
}
}
return stream;
}
public void closeStream(Http2Stream stream) {
if (stream != null) {
stream.close();
}
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
listener.onDataRead(ctx, streamId, copy(data), padding, endOfStream);
// NOTE: Do not close the stream to allow the out of order messages to be processed
latch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, endStream);
listener.onHeadersRead(ctx, streamId, headers, padding, endStream);
if (endStream) {
closeStream(stream);
}
latch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)
throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, endStream);
stream.setPriority(streamDependency, weight, exclusive);
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding,
endStream);
if (endStream) {
closeStream(stream);
}
latch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, false);
stream.setPriority(streamDependency, weight, exclusive);
listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
latch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
Http2Stream stream = getOrCreateStream(streamId, false);
listener.onRstStreamRead(ctx, streamId, errorCode);
closeStream(stream);
latch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
listener.onSettingsAckRead(ctx);
latch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
listener.onSettingsRead(ctx, settings);
latch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingRead(ctx, copy(data));
latch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
listener.onPingAckRead(ctx, copy(data));
latch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
getOrCreateStream(promisedStreamId, false);
listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
latch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
listener.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData));
latch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
getOrCreateStream(streamId, false);
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
latch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload) {
listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
latch.countDown();
}
});
}
ByteBuf copy(ByteBuf buffer) {
return Unpooled.copiedBuffer(buffer);
}
}
}

View File

@ -98,8 +98,8 @@ public final class Http2Client {
// Create a simple GET request.
FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, URL);
request.headers().add(HttpHeaders.Names.HOST, hostName);
// TODO: fix me when HTTP/2 supports decompression
// request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.DEFLATE);
channel.writeAndFlush(request);
responseHandler.put(streamId, channel.newPromise());
streamId += 2;
@ -109,8 +109,8 @@ public final class Http2Client {
FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, POST, URL2,
Unpooled.copiedBuffer(URL2DATA.getBytes(CharsetUtil.UTF_8)));
request.headers().add(HttpHeaders.Names.HOST, hostName);
// TODO: fix me when HTTP/2 supports decompression
// request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
request.headers().add(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.DEFLATE);
channel.writeAndFlush(request);
responseHandler.put(streamId, channel.newPromise());
streamId += 2;

View File

@ -25,8 +25,8 @@ import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.DecompressorHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
@ -63,10 +63,11 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameWriter frameWriter = frameWriter();
final Http2Connection connection = new DefaultHttp2Connection(false);
final Http2FrameWriter frameWriter = frameWriter();
connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection,
frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection, frameWriter),
frameReader(connection), frameWriter,
new DefaultHttp2InboundFlowController(connection, frameWriter),
new DefaultHttp2OutboundFlowController(connection, frameWriter),
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength));
responseHandler = new HttpResponseHandler();
@ -145,8 +146,8 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
}
}
private static Http2FrameReader frameReader() {
return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger);
private static Http2FrameReader frameReader(Http2Connection connection) {
return new Http2InboundFrameLogger(new DecompressorHttp2FrameReader(connection), logger);
}
private static Http2FrameWriter frameWriter() {