diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java
index baddaa7538..806615a6ac 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java
@@ -47,16 +47,13 @@ public class HttpContentDecompressor extends HttpContentDecoder {
@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
- if ("gzip".equalsIgnoreCase(contentEncoding) || "x-gzip".equalsIgnoreCase(contentEncoding)) {
+ if (HttpHeaders.Values.GZIP.equalsIgnoreCase(contentEncoding) ||
+ HttpHeaders.Values.XGZIP.equalsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
- if ("deflate".equalsIgnoreCase(contentEncoding) || "x-deflate".equalsIgnoreCase(contentEncoding)) {
- ZlibWrapper wrapper;
- if (strict) {
- wrapper = ZlibWrapper.ZLIB;
- } else {
- wrapper = ZlibWrapper.ZLIB_OR_NONE;
- }
+ if (HttpHeaders.Values.DEFLATE.equalsIgnoreCase(contentEncoding) ||
+ HttpHeaders.Values.XDEFLATE.equalsIgnoreCase(contentEncoding)) {
+ final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(wrapper));
}
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java
index 1c042c227f..b56eeb88f3 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java
@@ -399,10 +399,18 @@ public interface HttpHeaders extends TextHeaders {
* {@code "deflate"}
*/
public static final AsciiString DEFLATE = new AsciiString("deflate");
+ /**
+ * {@code "x-deflate"}
+ */
+ public static final AsciiString XDEFLATE = new AsciiString("deflate");
/**
* {@code "gzip"}
*/
public static final AsciiString GZIP = new AsciiString("gzip");
+ /**
+ * {@code "x-gzip"}
+ */
+ public static final AsciiString XGZIP = new AsciiString("x-gzip");
/**
* {@code "identity"}
*/
diff --git a/codec-http2/pom.xml b/codec-http2/pom.xml
index 02914ebeb0..5f79e09c96 100644
--- a/codec-http2/pom.xml
+++ b/codec-http2/pom.xml
@@ -50,6 +50,11 @@
mockito-alltest
+
+ com.jcraft
+ jzlib
+ true
+
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DecompressorHttp2FrameReader.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DecompressorHttp2FrameReader.java
new file mode 100644
index 0000000000..bd711d00a8
--- /dev/null
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DecompressorHttp2FrameReader.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.netty.handler.codec.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.AsciiString;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.compression.ZlibCodecFactory;
+import io.netty.handler.codec.compression.ZlibWrapper;
+import io.netty.handler.codec.http.HttpHeaders;
+
+/**
+ * A HTTP2 frame reader that will decompress data frames according
+ * to the {@code content-encoding} header for each stream.
+ */
+public class DecompressorHttp2FrameReader extends DefaultHttp2FrameReader {
+ private static final AsciiString CONTENT_ENCODING_LOWER_CASE = HttpHeaders.Names.CONTENT_ENCODING.toLowerCase();
+ private static final AsciiString CONTENT_LENGTH_LOWER_CASE = HttpHeaders.Names.CONTENT_LENGTH.toLowerCase();
+ private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
+ @Override
+ public void streamRemoved(Http2Stream stream) {
+ final EmbeddedChannel decoder = stream.decompressor();
+ if (decoder != null) {
+ cleanup(stream, decoder);
+ }
+ }
+ };
+
+ private final Http2Connection connection;
+ private final boolean strict;
+
+ /**
+ * Create a new instance with non-strict deflate decoding.
+ * {@link #DecompressorHttp2FrameReader(Http2Connection, boolean)}
+ */
+ public DecompressorHttp2FrameReader(Http2Connection connection) {
+ this(connection, false);
+ }
+
+ /**
+ * Create a new instance.
+ * @param strict
+ *
+ *
{@code true} to use use strict handling of deflate if used
+ *
{@code false} be more lenient with decompression
+ *
+ */
+ public DecompressorHttp2FrameReader(Http2Connection connection, boolean strict) {
+ this.connection = connection;
+ this.strict = strict;
+
+ connection.addListener(CLEAN_UP_LISTENER);
+ }
+
+ /**
+ * Returns a new {@link EmbeddedChannel} that decodes the HTTP2 message
+ * content encoded in the specified {@code contentEncoding}.
+ *
+ * @param contentEncoding the value of the {@code content-encoding} header
+ * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported.
+ * {@code null} otherwise (alternatively, you can throw a {@link Http2Exception}
+ * to block unknown encoding).
+ * @throws Http2Exception If the specified encoding is not not supported and warrants an exception
+ */
+ protected EmbeddedChannel newContentDecoder(CharSequence contentEncoding) throws Http2Exception {
+ if (HttpHeaders.Values.GZIP.equalsIgnoreCase(contentEncoding) ||
+ HttpHeaders.Values.XGZIP.equalsIgnoreCase(contentEncoding)) {
+ return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
+ }
+ if (HttpHeaders.Values.DEFLATE.equalsIgnoreCase(contentEncoding) ||
+ HttpHeaders.Values.XDEFLATE.equalsIgnoreCase(contentEncoding)) {
+ final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
+ // To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
+ return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(wrapper));
+ }
+ // 'identity' or unsupported
+ return null;
+ }
+
+ /**
+ * Returns the expected content encoding of the decoded content.
+ * This getMethod returns {@code "identity"} by default, which is the case for
+ * most decoders.
+ *
+ * @param contentEncoding the value of the {@code content-encoding} header
+ * @return the expected content encoding of the new content.
+ * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
+ */
+ protected CharSequence getTargetContentEncoding(
+ @SuppressWarnings("UnusedParameters") CharSequence contentEncoding) throws Http2Exception {
+ return HttpHeaders.Values.IDENTITY;
+ }
+
+ /**
+ * Checks if a new decoder object is needed for the stream identified by {@code streamId}.
+ * This method will modify the {@code content-encoding} header contained in {@code builder}.
+ * @param streamId The identifier for the headers inside {@code builder}
+ * @param builder Object representing headers which have been read
+ * @param endOfStream Indicates if the stream has ended
+ * @throws Http2Exception If the {@code content-encoding} is not supported
+ */
+ private void initDecoder(int streamId, Http2Headers.Builder builder, boolean endOfStream)
+ throws Http2Exception {
+ final Http2Stream stream = connection.stream(streamId);
+ if (stream != null) {
+ EmbeddedChannel decoder = stream.decompressor();
+ if (decoder == null) {
+ if (!endOfStream) {
+ // Determine the content encoding.
+ CharSequence contentEncoding = builder.get(CONTENT_ENCODING_LOWER_CASE);
+ if (contentEncoding == null) {
+ contentEncoding = HttpHeaders.Values.IDENTITY;
+ }
+ decoder = newContentDecoder(contentEncoding);
+ if (decoder != null) {
+ stream.decompressor(decoder);
+ // Decode the content and remove or replace the existing headers
+ // so that the message looks like a decoded message.
+ CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
+ if (HttpHeaders.Values.IDENTITY.equalsIgnoreCase(targetContentEncoding)) {
+ builder.remove(CONTENT_ENCODING_LOWER_CASE);
+ } else {
+ builder.set(CONTENT_ENCODING_LOWER_CASE, targetContentEncoding);
+ }
+ }
+ }
+ } else if (endOfStream) {
+ cleanup(stream, decoder);
+ }
+ if (decoder != null) {
+ // The content length will be for the compressed data. Since we will decompress the data
+ // this content-length will not be correct. Instead of queuing messages or delaying sending
+ // header frames...just remove the content-length header
+ builder.remove(CONTENT_LENGTH_LOWER_CASE);
+ }
+ }
+ }
+
+ /**
+ * Release remaining content from the {@link EmbeddedChannel} and remove the decoder from the {@link Http2Stream}.
+ * @param stream The stream for which {@code decoder} is the decompressor for
+ * @param decoder The decompressor for {@code stream}
+ */
+ private static void cleanup(Http2Stream stream, EmbeddedChannel decoder) {
+ if (decoder.finish()) {
+ for (;;) {
+ final ByteBuf buf = decoder.readInbound();
+ if (buf == null) {
+ break;
+ }
+ buf.release();
+ }
+ }
+ stream.decompressor(null);
+ }
+
+ /**
+ * Read the next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
+ * @param decoder The channel to read from
+ * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
+ */
+ private static ByteBuf nextReadableBuf(EmbeddedChannel decoder) {
+ for (;;) {
+ final ByteBuf buf = decoder.readInbound();
+ if (buf == null) {
+ return null;
+ }
+ if (!buf.isReadable()) {
+ buf.release();
+ continue;
+ }
+ return buf;
+ }
+ }
+
+ @Override
+ protected void notifyListenerOnDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
+ boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
+ final Http2Stream stream = connection.stream(streamId);
+ final EmbeddedChannel decoder = stream == null ? null : stream.decompressor();
+ if (decoder == null) {
+ super.notifyListenerOnDataRead(ctx, streamId, data, padding, endOfStream, listener);
+ } else {
+ // call retain here as it will call release after its written to the channel
+ decoder.writeInbound(data.retain());
+ ByteBuf buf = nextReadableBuf(decoder);
+ if (buf == null) {
+ if (endOfStream) {
+ super.notifyListenerOnDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true, listener);
+ }
+ // END_STREAM is not set and the data could not be decoded yet.
+ // The assumption has to be there will be more data frames to complete the decode.
+ // We don't have enough information here to know if this is an error.
+ } else {
+ for (;;) {
+ final ByteBuf nextBuf = nextReadableBuf(decoder);
+ if (nextBuf == null) {
+ super.notifyListenerOnDataRead(ctx, streamId, buf, padding, endOfStream, listener);
+ break;
+ } else {
+ super.notifyListenerOnDataRead(ctx, streamId, buf, padding, false, listener);
+ }
+ buf = nextBuf;
+ }
+ }
+
+ if (endOfStream) {
+ cleanup(stream, decoder);
+ }
+ }
+ }
+
+ @Override
+ protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
+ int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream,
+ Http2FrameListener listener) throws Http2Exception {
+ initDecoder(streamId, builder, endOfStream);
+ super.notifyListenerOnHeadersRead(ctx, streamId, builder, streamDependency, weight,
+ exclusive, padding, endOfStream, listener);
+ }
+
+ @Override
+ protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
+ int padding, boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
+ initDecoder(streamId, builder, endOfStream);
+ super.notifyListenerOnHeadersRead(ctx, streamId, builder, padding, endOfStream, listener);
+ }
+}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java
index 24210e1216..15a0a00639 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java
@@ -29,6 +29,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
+import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
@@ -189,6 +190,7 @@ public class DefaultHttp2Connection implements Http2Connection {
private boolean terminateReceived;
private FlowState inboundFlow;
private FlowState outboundFlow;
+ private EmbeddedChannel decompressor;
private Object data;
DefaultStream(int id) {
@@ -241,6 +243,19 @@ public class DefaultHttp2Connection implements Http2Connection {
return (T) data;
}
+ @Override
+ public void decompressor(EmbeddedChannel decompressor) {
+ if (this.decompressor != null && decompressor != null) {
+ throw new IllegalStateException("decompressor can not be reassigned");
+ }
+ this.decompressor = decompressor;
+ }
+
+ @Override
+ public EmbeddedChannel decompressor() {
+ return decompressor;
+ }
+
@Override
public FlowState inboundFlow() {
return inboundFlow;
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java
index 47c2b89548..c3881ea62b 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
@@ -368,6 +367,23 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
}
}
+ protected void notifyListenerOnDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
+ int padding, boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
+ listener.onDataRead(ctx, streamId, data, padding, endOfStream);
+ }
+
+ protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
+ int streamDependency, short weight, boolean exclusive, int padding,
+ boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
+ listener.onHeadersRead(ctx, streamId, builder.build(), streamDependency,
+ weight, exclusive, padding, endOfStream);
+ }
+
+ protected void notifyListenerOnHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers.Builder builder,
+ int padding, boolean endOfStream, Http2FrameListener listener) throws Http2Exception {
+ listener.onHeadersRead(ctx, streamId, builder.build(), padding, endOfStream);
+ }
+
private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload,
Http2FrameListener listener) throws Http2Exception {
short padding = readPadding(payload);
@@ -380,7 +396,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
}
ByteBuf data = payload.readSlice(dataLength);
- listener.onDataRead(ctx, streamId, data, padding, flags.endOfStream());
+ notifyListenerOnDataRead(ctx, streamId, data, padding, flags.endOfStream(), listener);
payload.skipBytes(payload.readableBytes());
}
@@ -409,11 +425,11 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception {
- builder().addFragment(fragment, ctx.alloc(), endOfHeaders);
+ final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
+ hdrBlockBuilder.addFragment(fragment, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
- Http2Headers headers = builder().buildHeaders();
- listener.onHeadersRead(ctx, headersStreamId, headers, streamDependency,
- weight, exclusive, padding, headersFlags.endOfStream());
+ notifyListenerOnHeadersRead(ctx, headersStreamId, hdrBlockBuilder.builder(),
+ streamDependency, weight, exclusive, padding, headersFlags.endOfStream(), listener);
close();
}
}
@@ -435,11 +451,11 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception {
- builder().addFragment(fragment, ctx.alloc(), endOfHeaders);
+ final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
+ hdrBlockBuilder.addFragment(fragment, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
- Http2Headers headers = builder().buildHeaders();
- listener.onHeadersRead(ctx, headersStreamId, headers, padding,
- headersFlags.endOfStream());
+ notifyListenerOnHeadersRead(ctx, headersStreamId, hdrBlockBuilder.builder(), padding,
+ headersFlags.endOfStream(), listener);
close();
}
}
@@ -507,9 +523,9 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception {
- builder().addFragment(fragment, ctx.alloc(), endOfHeaders);
+ headersBlockBuilder().addFragment(fragment, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
- Http2Headers headers = builder().buildHeaders();
+ Http2Headers headers = headersBlockBuilder().builder().build();
listener.onPushPromiseRead(ctx, pushPromiseStreamId, promisedStreamId, headers,
padding);
close();
@@ -586,7 +602,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
* {@link Http2FrameListener} once the end of headers is reached.
*/
private abstract class HeadersContinuation {
- private final HeadersBuilder builder = new HeadersBuilder();
+ private final HeadersBlockBuilder builder = new HeadersBlockBuilder();
/**
* Returns the stream for which headers are currently being processed.
@@ -603,7 +619,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
abstract void processFragment(boolean endOfHeaders, ByteBuf fragment,
Http2FrameListener listener) throws Http2Exception;
- final HeadersBuilder builder() {
+ final HeadersBlockBuilder headersBlockBuilder() {
return builder;
}
@@ -619,7 +635,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
* Utility class to help with construction of the headers block that may potentially span
* multiple frames.
*/
- private class HeadersBuilder {
+ protected class HeadersBlockBuilder {
private ByteBuf headerBlock;
/**
@@ -660,7 +676,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
* Builds the headers from the completed headers block. After this is called, this builder
* should not be called again.
*/
- Http2Headers buildHeaders() throws Http2Exception {
+ Http2Headers.Builder builder() throws Http2Exception {
try {
return headersDecoder.decodeHeaders(headerBlock);
} finally {
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Headers.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Headers.java
index e8017feb37..33618fffdd 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Headers.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Headers.java
@@ -46,7 +46,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
@Override
- public String get(String name) {
+ public String get(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
@@ -65,7 +65,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
@Override
- public List getAll(String name) {
+ public List getAll(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
@@ -97,7 +97,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
@Override
- public boolean contains(String name) {
+ public boolean contains(CharSequence name) {
return get(name) != null;
}
@@ -156,7 +156,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
/**
* Builds instances of {@link DefaultHttp2Headers}.
*/
- public static class Builder {
+ public static class Builder implements Http2Headers.Builder {
private HeaderEntry[] entries;
private HeaderEntry head;
private Http2Headers buildResults;
@@ -166,10 +166,45 @@ public final class DefaultHttp2Headers extends Http2Headers {
clear();
}
- /**
- * Clears all existing headers from this collection and replaces them with the given header
- * set.
- */
+ @Override
+ public String get(CharSequence name) {
+ if (name == null) {
+ throw new NullPointerException("name");
+ }
+
+ int h = hash(name);
+ int i = index(h);
+ HeaderEntry e = entries[i];
+ while (e != null) {
+ if (e.hash == h && eq(name, e.key)) {
+ return e.value;
+ }
+ e = e.next;
+ }
+ return null;
+ }
+
+ @Override
+ public List getAll(CharSequence name) {
+ if (name == null) {
+ throw new NullPointerException("name");
+ }
+
+ LinkedList values = new LinkedList();
+
+ int h = hash(name);
+ int i = index(h);
+ HeaderEntry e = entries[i];
+ while (e != null) {
+ if (e.hash == h && eq(name, e.key)) {
+ values.addFirst(e.value);
+ }
+ e = e.next;
+ }
+ return values;
+ }
+
+ @Override
public void set(Http2Headers headers) {
// No need to lazy copy the previous results, since we're starting from scratch.
clear();
@@ -178,21 +213,13 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
}
- /**
- * Adds the given header to the collection.
- *
- * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
- */
- public Builder add(final CharSequence name, final Object value) {
+ @Override
+ public Builder add(CharSequence name, Object value) {
return add(name.toString(), value);
}
- /**
- * Adds the given header to the collection.
- *
- * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
- */
- public Builder add(final String name, final Object value) {
+ @Override
+ public Builder add(String name, Object value) {
// If this is the first call on the builder since the last build, copy the previous
// results.
lazyCopy();
@@ -207,17 +234,8 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
- /**
- * Removes the header with the given name from this collection.
- */
- public Builder remove(final CharSequence name) {
- return remove(name.toString());
- }
-
- /**
- * Removes the header with the given name from this collection.
- */
- public Builder remove(final String name) {
+ @Override
+ public Builder remove(CharSequence name) {
if (name == null) {
throw new NullPointerException("name");
}
@@ -226,28 +244,31 @@ public final class DefaultHttp2Headers extends Http2Headers {
// results.
lazyCopy();
- String lowerCaseName = name.toLowerCase();
- int nameHash = hash(lowerCaseName);
- int hashTableIndex = index(nameHash);
- remove0(nameHash, hashTableIndex, lowerCaseName);
+ remove0(name);
return this;
}
- /**
- * Sets the given header in the collection, replacing any previous values.
- *
- * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
- */
- public Builder set(final CharSequence name, final Object value) {
+ @Override
+ public Builder remove(String name) {
+ if (name == null) {
+ throw new NullPointerException("name");
+ }
+
+ // If this is the first call on the builder since the last build, copy the previous
+ // results.
+ lazyCopy();
+
+ remove0(name.toLowerCase());
+ return this;
+ }
+
+ @Override
+ public Builder set(CharSequence name, Object value) {
return set(name.toString(), value);
}
- /**
- * Sets the given header in the collection, replacing any previous values.
- *
- * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
- */
- public Builder set(final String name, final Object value) {
+ @Override
+ public Builder set(String name, Object value) {
// If this is the first call on the builder since the last build, copy the previous
// results.
lazyCopy();
@@ -263,12 +284,8 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
- /**
- * Sets the given header in the collection, replacing any previous values.
- *
- * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
- */
- public Builder set(final String name, final Iterable> values) {
+ @Override
+ public Builder set(String name, Iterable> values) {
if (values == null) {
throw new NullPointerException("values");
}
@@ -295,9 +312,12 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
- /**
- * Clears all values from this collection.
- */
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
public Builder clear() {
// No lazy copy required, since we're just creating a new array.
entries = new HeaderEntry[BUCKET_SIZE];
@@ -308,44 +328,32 @@ public final class DefaultHttp2Headers extends Http2Headers {
return this;
}
- /**
- * Sets the {@link PseudoHeaderName#METHOD} header.
- */
+ @Override
public Builder method(String method) {
return set(METHOD.value(), method);
}
- /**
- * Sets the {@link PseudoHeaderName#SCHEME} header.
- */
+ @Override
public Builder scheme(String scheme) {
return set(SCHEME.value(), scheme);
}
- /**
- * Sets the {@link PseudoHeaderName#AUTHORITY} header.
- */
+ @Override
public Builder authority(String authority) {
return set(AUTHORITY.value(), authority);
}
- /**
- * Sets the {@link PseudoHeaderName#PATH} header.
- */
+ @Override
public Builder path(String path) {
return set(PseudoHeaderName.PATH.value(), path);
}
- /**
- * Sets the {@link PseudoHeaderName#STATUS} header.
- */
+ @Override
public Builder status(String status) {
return set(PseudoHeaderName.STATUS.value(), status);
}
- /**
- * Builds a new instance of {@link DefaultHttp2Headers}.
- */
+ @Override
public DefaultHttp2Headers build() {
// If this is the first call on the builder since the last build, copy the previous
// results.
@@ -383,7 +391,13 @@ public final class DefaultHttp2Headers extends Http2Headers {
size++;
}
- private void remove0(int hash, int hashTableIndex, String name) {
+ private void remove0(final CharSequence name) {
+ final int nameHash = hash(name);
+ final int hashTableIndex = index(nameHash);
+ remove0(nameHash, hashTableIndex, name);
+ }
+
+ private void remove0(int hash, int hashTableIndex, CharSequence name) {
HeaderEntry e = entries[hashTableIndex];
if (e == null) {
return;
@@ -477,7 +491,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
}
- private static int hash(String name) {
+ private static int hash(CharSequence name) {
int h = 0;
for (int i = name.length() - 1; i >= 0; i--) {
char c = name.charAt(i);
@@ -496,7 +510,7 @@ public final class DefaultHttp2Headers extends Http2Headers {
}
}
- private static boolean eq(String name1, String name2) {
+ private static boolean eq(CharSequence name1, CharSequence name2) {
int nameLen = name1.length();
if (nameLen != name2.length()) {
return false;
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoder.java
index 6726c7c22f..bb2b32a43e 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoder.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoder.java
@@ -65,7 +65,7 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
}
@Override
- public Http2Headers decodeHeaders(ByteBuf headerBlock) throws Http2Exception {
+ public Http2Headers.Builder decodeHeaders(ByteBuf headerBlock) throws Http2Exception {
try {
final DefaultHttp2Headers.Builder headersBuilder = new DefaultHttp2Headers.Builder();
HeaderListener listener = new HeaderListener() {
@@ -83,13 +83,12 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
// TODO: what's the right thing to do here?
}
- Http2Headers headers = headersBuilder.build();
- if (headers.size() > maxHeaderListSize) {
+ if (headersBuilder.size() > maxHeaderListSize) {
throw protocolError("Number of headers (%d) exceeds maxHeaderListSize (%d)",
- headers.size(), maxHeaderListSize);
+ headersBuilder.size(), maxHeaderListSize);
}
- return headers;
+ return headersBuilder;
} catch (IOException e) {
throw new Http2Exception(COMPRESSION_ERROR, e.getMessage());
} catch (Throwable e) {
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Headers.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Headers.java
index bfe4231d49..4331fed357 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Headers.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Headers.java
@@ -32,12 +32,12 @@ public abstract class Http2Headers implements Iterable> {
public static final Http2Headers EMPTY_HEADERS = new Http2Headers() {
@Override
- public String get(String name) {
+ public String get(CharSequence name) {
return null;
}
@Override
- public List getAll(String name) {
+ public List getAll(CharSequence name) {
return Collections.emptyList();
}
@@ -47,7 +47,7 @@ public abstract class Http2Headers implements Iterable> {
}
@Override
- public boolean contains(String name) {
+ public boolean contains(CharSequence name) {
return false;
}
@@ -153,14 +153,14 @@ public abstract class Http2Headers implements Iterable> {
*
* @return the header value or {@code null} if there is no such header
*/
- public abstract String get(String name);
+ public abstract String get(CharSequence name);
/**
* Returns the header values with the specified header name.
*
* @return the {@link List} of header values. An empty list if there is no such header.
*/
- public abstract List getAll(String name);
+ public abstract List getAll(CharSequence name);
/**
* Returns all header names and values that this frame contains.
@@ -173,7 +173,7 @@ public abstract class Http2Headers implements Iterable> {
/**
* Returns {@code true} if and only if there is a header with the specified header name.
*/
- public abstract boolean contains(String name);
+ public abstract boolean contains(CharSequence name);
/**
* Checks if no header exists.
@@ -206,6 +206,117 @@ public abstract class Http2Headers implements Iterable> {
*/
public abstract String forEach(HeaderVisitor visitor);
+ /**
+ * Interface for the Builder pattern for {@link Http2Headers}.
+ */
+ public interface Builder {
+ /**
+ * Build all the collected headers into a {@link Http2Headers}.
+ * @return The {@link Http2Headers} object which this builder has been used for
+ */
+ Http2Headers build();
+
+ /**
+ * Gets the number of headers contained in this object.
+ */
+ int size();
+
+ /**
+ * Clears all values from this collection.
+ */
+ Builder clear();
+
+ /**
+ * Returns the header value with the specified header name. If there is more than one header
+ * value for the specified header name, the first value is returned.
+ *
+ * Note that all HTTP2 headers names are lower case and this method will not force {@code name} to lower case.
+ * @return the header value or {@code null} if there is no such header
+ */
+ String get(CharSequence name);
+
+ /**
+ * Returns the header values with the specified header name.
+ *
+ * Note that all HTTP2 headers names are lower case and this method will not force {@code name} to lower case.
+ * @return the {@link List} of header values. An empty list if there is no such header.
+ */
+ List getAll(CharSequence name);
+
+ /**
+ * Clears all existing headers from this collection and replaces them with the given header
+ * set.
+ */
+ void set(Http2Headers headers);
+
+ /**
+ * Adds the given header to the collection.
+ * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
+ */
+ Builder add(CharSequence name, Object value);
+
+ /**
+ * Adds the given header to the collection.
+ * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
+ */
+ Builder add(String name, Object value);
+
+ /**
+ * Removes the header with the given name from this collection.
+ * This method will not force the {@code name} to lower case before looking for a match.
+ */
+ Builder remove(CharSequence name);
+
+ /**
+ * Removes the header with the given name from this collection.
+ * This method will force the {@code name} to lower case before looking for a match.
+ */
+ Builder remove(String name);
+
+ /**
+ * Sets the given header in the collection, replacing any previous values.
+ * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
+ */
+ Builder set(CharSequence name, Object value);
+
+ /**
+ * Sets the given header in the collection, replacing any previous values.
+ * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
+ */
+ Builder set(String name, Object value);
+
+ /**
+ * Sets the given header in the collection, replacing any previous values.
+ * @throws IllegalArgumentException if the name or value of this header is invalid for any reason.
+ */
+ Builder set(String name, Iterable> values);
+
+ /**
+ * Sets the {@link PseudoHeaderName#METHOD} header.
+ */
+ Builder method(String method);
+
+ /**
+ * Sets the {@link PseudoHeaderName#SCHEME} header.
+ */
+ Builder scheme(String scheme);
+
+ /**
+ * Sets the {@link PseudoHeaderName#AUTHORITY} header.
+ */
+ Builder authority(String authority);
+
+ /**
+ * Sets the {@link PseudoHeaderName#PATH} header.
+ */
+ Builder path(String path);
+
+ /**
+ * Sets the {@link PseudoHeaderName#STATUS} header.
+ */
+ Builder status(String status);
+ }
+
/**
* Gets the {@link PseudoHeaderName#METHOD} header or {@code null} if there is no such header
*/
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HeadersDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HeadersDecoder.java
index 3ad4b1f687..2c1207bfda 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HeadersDecoder.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2HeadersDecoder.java
@@ -25,7 +25,7 @@ public interface Http2HeadersDecoder {
/**
* Decodes the given headers block and returns the headers.
*/
- Http2Headers decodeHeaders(ByteBuf headerBlock) throws Http2Exception;
+ Http2Headers.Builder decodeHeaders(ByteBuf headerBlock) throws Http2Exception;
/**
* Sets the new max header table size for this decoder.
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java
index 4add5e0511..bf684e435c 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java
@@ -15,6 +15,8 @@
package io.netty.handler.codec.http2;
+import io.netty.channel.embedded.EmbeddedChannel;
+
import java.util.Collection;
/**
@@ -122,6 +124,16 @@ public interface Http2Stream {
*/
T data();
+ /**
+ * Associate an object responsible for decompressing data frames for this stream
+ */
+ void decompressor(EmbeddedChannel decompressor);
+
+ /**
+ * Get the object capable of decompressing data frames for this stream
+ */
+ EmbeddedChannel decompressor();
+
/**
* Gets the in-bound flow control state for this stream.
*/
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpUtil.java
index 3353c36d65..12fb3c1b23 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpUtil.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpUtil.java
@@ -56,52 +56,52 @@ public final class HttpUtil {
* HTTP extension header which will identify the stream id
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
*
- * {@code "X-HTTP2-Stream-ID"}
+ * {@code "x-http2-stream-id"}
*/
- public static final AsciiString STREAM_ID = new AsciiString("X-HTTP2-Stream-ID");
+ public static final AsciiString STREAM_ID = new AsciiString("x-http2-stream-id");
/**
* HTTP extension header which will identify the authority pseudo header
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
*
- * {@code "X-HTTP2-Authority"}
+ * {@code "x-http2-authority"}
*/
- public static final AsciiString AUTHORITY = new AsciiString("X-HTTP2-Authority");
+ public static final AsciiString AUTHORITY = new AsciiString("x-http2-authority");
/**
* HTTP extension header which will identify the scheme pseudo header
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
*
- * {@code "X-HTTP2-Scheme"}
+ * {@code "x-http2-scheme"}
*/
- public static final AsciiString SCHEME = new AsciiString("X-HTTP2-Scheme");
+ public static final AsciiString SCHEME = new AsciiString("x-http2-scheme");
/**
* HTTP extension header which will identify the path pseudo header
* from the HTTP/2 event(s) responsible for generating a {@code HttpObject}
*
- * {@code "X-HTTP2-Path"}
+ * {@code "x-http2-path"}
*/
- public static final AsciiString PATH = new AsciiString("X-HTTP2-Path");
+ public static final AsciiString PATH = new AsciiString("x-http2-path");
/**
* HTTP extension header which will identify the stream id used to create this stream
* in a HTTP/2 push promise frame
*
- * {@code "X-HTTP2-Stream-Promise-ID"}
+ * {@code "x-http2-stream-promise-id"}
*/
- public static final AsciiString STREAM_PROMISE_ID = new AsciiString("X-HTTP2-Stream-Promise-ID");
+ public static final AsciiString STREAM_PROMISE_ID = new AsciiString("x-http2-stream-promise-id");
/**
* HTTP extension header which will identify the stream id which this stream is dependent on.
* This stream will be a child node of the stream id associated with this header value.
*
- * {@code "X-HTTP2-Stream-Dependency-ID"}
+ * {@code "x-http2-stream-dependency-id"}
*/
- public static final AsciiString STREAM_DEPENDENCY_ID = new AsciiString("X-HTTP2-Stream-Dependency-ID");
+ public static final AsciiString STREAM_DEPENDENCY_ID = new AsciiString("x-http2-stream-dependency-id");
/**
* HTTP extension header which will identify the weight
* (if non-default and the priority is not on the default stream) of the associated HTTP/2 stream
* responsible responsible for generating a {@code HttpObject}
*
- * {@code "X-HTTP2-Stream-Weight"}
+ * {@code "x-http2-stream-weight"}
*/
- public static final AsciiString STREAM_WEIGHT = new AsciiString("X-HTTP2-Stream-Weight");
+ public static final AsciiString STREAM_WEIGHT = new AsciiString("x-http2-stream-weight");
}
}
diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java
index ccea9b4a2e..264e18b3ad 100644
--- a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java
+++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java
@@ -273,9 +273,6 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
"Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
}
- // TODO: provide hooks to a HttpContentDecoder type interface
- // Preferably provide these hooks in the HTTP2 codec so even non-translation layer use-cases benefit
- // (and then data will already be decoded here)
content.writeBytes(data, data.readerIndex(), data.readableBytes());
if (endOfStream) {
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java
new file mode 100644
index 0000000000..fe15ea6ce8
--- /dev/null
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.netty.handler.codec.http2;
+
+import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.ZlibCodecFactory;
+import io.netty.handler.codec.compression.ZlibWrapper;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
+import io.netty.util.NetUtil;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Test for data decompression in the HTTP/2 codec.
+ */
+public class DataCompressionHttp2Test {
+ private List dataCapture;
+
+ @Mock
+ private Http2FrameListener serverListener;
+ @Mock
+ private Http2FrameListener clientListener;
+
+ private ByteBufAllocator alloc;
+ private Http2FrameWriter frameWriter;
+ private ServerBootstrap sb;
+ private Bootstrap cb;
+ private Channel serverChannel;
+ private Channel serverConnectedChannel;
+ private Channel clientChannel;
+ private CountDownLatch serverLatch;
+ private CountDownLatch clientLatch;
+ private Http2TestUtil.FrameAdapter serverAdapter;
+ private Http2TestUtil.FrameAdapter clientAdapter;
+ private Http2Connection serverConnection;
+
+ @Before
+ public void setup() throws InterruptedException {
+ MockitoAnnotations.initMocks(this);
+ alloc = UnpooledByteBufAllocator.DEFAULT;
+ sb = new ServerBootstrap();
+ cb = new Bootstrap();
+
+ serverLatch(new CountDownLatch(1));
+ clientLatch(new CountDownLatch(1));
+ frameWriter = new DefaultHttp2FrameWriter();
+ serverConnection = new DefaultHttp2Connection(true);
+
+ sb.group(new NioEventLoopGroup(), new NioEventLoopGroup());
+ sb.channel(NioServerSocketChannel.class);
+ sb.childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ serverAdapter = new Http2TestUtil.FrameAdapter(serverConnection, new DecompressorHttp2FrameReader(
+ serverConnection), serverListener, serverLatch, false);
+ p.addLast("reader", serverAdapter);
+ p.addLast(Http2CodecUtil.ignoreSettingsHandler());
+ serverConnectedChannel = ch;
+ }
+ });
+
+ cb.group(new NioEventLoopGroup());
+ cb.channel(NioSocketChannel.class);
+ cb.handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ clientAdapter = new Http2TestUtil.FrameAdapter(clientListener, clientLatch, false);
+ p.addLast("reader", clientAdapter);
+ p.addLast(Http2CodecUtil.ignoreSettingsHandler());
+ }
+ });
+
+ serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
+ int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
+
+ ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
+ assertTrue(ccf.awaitUninterruptibly().isSuccess());
+ clientChannel = ccf.channel();
+ }
+
+ @After
+ public void teardown() throws InterruptedException {
+ if (dataCapture != null) {
+ for (int i = 0; i < dataCapture.size(); ++i) {
+ dataCapture.get(i).release();
+ }
+ dataCapture = null;
+ }
+ serverChannel.close().sync();
+ sb.group().shutdownGracefully();
+ sb.childGroup().shutdownGracefully();
+ cb.group().shutdownGracefully();
+ serverAdapter = null;
+ clientAdapter = null;
+ serverConnection = null;
+ }
+
+ @Test
+ public void justHeadersNoData() throws Exception {
+ final Http2Headers headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path")
+ .set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
+ // Required because the decompressor intercepts the onXXXRead events before
+ // our {@link Http2TestUtil$FrameAdapter} does.
+ Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writeHeaders(ctxClient(), 3, headers, 0, true, newPromiseClient());
+ ctxClient().flush();
+ }
+ });
+ awaitServer();
+ verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers), eq(0), eq(true));
+ }
+
+ @Test
+ public void gzipEncodingSingleEmptyMessage() throws Exception {
+ serverLatch(new CountDownLatch(2));
+ final String text = "";
+ final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
+ final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
+ try {
+ final ByteBuf encodedData = encodeData(data, encoder);
+ final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
+ .set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
+ // Required because the decompressor intercepts the onXXXRead events before
+ // our {@link Http2TestUtil$FrameAdapter} does.
+ Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
+ frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
+ ctxClient().flush();
+ }
+ });
+ awaitServer();
+ data.readerIndex(0);
+ ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
+ verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
+ eq(true));
+ dataCapture = dataCaptor.getAllValues();
+ assertEquals(data, dataCapture.get(0));
+ } finally {
+ data.release();
+ cleanupEncoder(encoder);
+ }
+ }
+
+ @Test
+ public void gzipEncodingSingleMessage() throws Exception {
+ serverLatch(new CountDownLatch(2));
+ final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
+ final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
+ final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
+ try {
+ final ByteBuf encodedData = encodeData(data, encoder);
+ final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
+ .set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
+ // Required because the decompressor intercepts the onXXXRead events before
+ // our {@link Http2TestUtil$FrameAdapter} does.
+ Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
+ frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
+ ctxClient().flush();
+ }
+ });
+ awaitServer();
+ data.readerIndex(0);
+ ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
+ verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
+ eq(true));
+ dataCapture = dataCaptor.getAllValues();
+ assertEquals(data, dataCapture.get(0));
+ } finally {
+ data.release();
+ cleanupEncoder(encoder);
+ }
+ }
+
+ @Test
+ public void gzipEncodingMultipleMessages() throws Exception {
+ serverLatch(new CountDownLatch(3));
+ final String text1 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
+ final String text2 = "dddddddddddddddddddeeeeeeeeeeeeeeeeeeeffffffffffffffffffff";
+ final ByteBuf data1 = Unpooled.copiedBuffer(text1.getBytes());
+ final ByteBuf data2 = Unpooled.copiedBuffer(text2.getBytes());
+ final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
+ try {
+ final ByteBuf encodedData1 = encodeData(data1, encoder);
+ final ByteBuf encodedData2 = encodeData(data2, encoder);
+ final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
+ .set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.GZIP).build();
+ // Required because the decompressor intercepts the onXXXRead events before
+ // our {@link Http2TestUtil$FrameAdapter} does.
+ Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
+ frameWriter.writeData(ctxClient(), 3, encodedData1, 0, false, newPromiseClient());
+ frameWriter.writeData(ctxClient(), 3, encodedData2, 0, true, newPromiseClient());
+ ctxClient().flush();
+ }
+ });
+ awaitServer();
+ data1.readerIndex(0);
+ data2.readerIndex(0);
+ ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
+ ArgumentCaptor endStreamCaptor = ArgumentCaptor.forClass(Boolean.class);
+ verify(serverListener, times(2)).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(),
+ eq(0), endStreamCaptor.capture());
+ dataCapture = dataCaptor.getAllValues();
+ assertEquals(data1, dataCapture.get(0));
+ assertEquals(data2, dataCapture.get(1));
+ List endStreamCapture = endStreamCaptor.getAllValues();
+ assertEquals(false, endStreamCapture.get(0));
+ assertEquals(true, endStreamCapture.get(1));
+ } finally {
+ data1.release();
+ data2.release();
+ cleanupEncoder(encoder);
+ }
+ }
+
+ @Test
+ public void deflateEncodingSingleLargeMessage() throws Exception {
+ serverLatch(new CountDownLatch(2));
+ final ByteBuf data = Unpooled.buffer(1 << 16);
+ final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.ZLIB));
+ try {
+ for (int i = 0; i < data.capacity(); ++i) {
+ data.writeByte((byte) 'a');
+ }
+ final ByteBuf encodedData = encodeData(data, encoder);
+ final Http2Headers headers = new DefaultHttp2Headers.Builder().method("POST").path("/some/path")
+ .set(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.DEFLATE).build();
+ // Required because the decompressor intercepts the onXXXRead events before
+ // our {@link Http2TestUtil$FrameAdapter} does.
+ Http2TestUtil.FrameAdapter.getOrCreateStream(serverConnection, 3, false);
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
+ frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
+ ctxClient().flush();
+ }
+ });
+ awaitServer();
+ data.readerIndex(0);
+ ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
+ verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
+ eq(true));
+ dataCapture = dataCaptor.getAllValues();
+ assertEquals(data, dataCapture.get(0));
+ } finally {
+ data.release();
+ cleanupEncoder(encoder);
+ }
+ }
+
+ private ByteBuf encodeData(ByteBuf data, EmbeddedChannel encoder) {
+ ByteBuf encoded = alloc.buffer(data.readableBytes());
+ encoder.writeOutbound(data.retain());
+ for (;;) {
+ final ByteBuf buf = encoder.readOutbound();
+ if (buf == null) {
+ break;
+ }
+ if (!buf.isReadable()) {
+ buf.release();
+ continue;
+ }
+ encoded.writeBytes(buf);
+ buf.release();
+ }
+ return encoded;
+ }
+
+ private static void cleanupEncoder(EmbeddedChannel encoder) {
+ if (encoder.finish()) {
+ for (;;) {
+ final ByteBuf buf = encoder.readOutbound();
+ if (buf == null) {
+ break;
+ }
+ buf.release();
+ }
+ }
+ }
+
+ private void serverLatch(CountDownLatch latch) {
+ serverLatch = latch;
+ if (serverAdapter != null) {
+ serverAdapter.latch(serverLatch);
+ }
+ }
+
+ private void clientLatch(CountDownLatch latch) {
+ clientLatch = latch;
+ if (clientAdapter != null) {
+ clientAdapter.latch(clientLatch);
+ }
+ }
+
+ private void awaitServer() throws Exception {
+ serverLatch.await(5, SECONDS);
+ }
+
+ private void awaitClient() throws Exception {
+ clientLatch.await(5, SECONDS);
+ }
+
+ private ChannelHandlerContext ctxClient() {
+ return clientChannel.pipeline().firstContext();
+ }
+
+ private ChannelPromise newPromiseClient() {
+ return ctxClient().newPromise();
+ }
+
+ private ChannelHandlerContext ctxServer() {
+ return serverConnectedChannel.pipeline().firstContext();
+ }
+
+ private ChannelPromise newPromiseServer() {
+ return ctxServer().newPromise();
+ }
+}
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoderTest.java
index 61dc8f76c8..6e8f132220 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoderTest.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2HeadersDecoderTest.java
@@ -43,7 +43,7 @@ public class DefaultHttp2HeadersDecoderTest {
@Test
public void decodeShouldSucceed() throws Exception {
ByteBuf buf = encode(":method", "GET", "akey", "avalue");
- Http2Headers headers = decoder.decodeHeaders(buf);
+ Http2Headers headers = decoder.decodeHeaders(buf).build();
assertEquals(2, headers.size());
assertEquals("GET", headers.method());
assertEquals("avalue", headers.get("akey"));
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandlerTest.java
index 9ecafe3fd6..539c82a700 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandlerTest.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2HttpConnectionHandlerTest.java
@@ -60,7 +60,6 @@ import org.mockito.MockitoAnnotations;
* Testing the {@link DelegatingHttp2HttpConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames
*/
public class DelegatingHttp2HttpConnectionHandlerTest {
-
@Mock
private Http2FrameListener clientListener;
diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java
index 3e333c1921..d5e48cdc73 100644
--- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java
+++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameRoundtripTest.java
@@ -15,6 +15,16 @@
package io.netty.handler.codec.http2;
+import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
+import static io.netty.util.CharsetUtil.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@@ -28,9 +38,13 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -38,23 +52,13 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import static io.netty.handler.codec.http2.Http2TestUtil.*;
-import static io.netty.util.CharsetUtil.*;
-import static java.util.concurrent.TimeUnit.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
/**
* Tests encoding/decoding each HTTP2 frame type.
*/
public class Http2FrameRoundtripTest {
@Mock
- private Http2FrameListener serverObserver;
+ private Http2FrameListener serverListener;
private ArgumentCaptor dataCaptor;
private Http2FrameWriter frameWriter;
@@ -63,12 +67,13 @@ public class Http2FrameRoundtripTest {
private Channel serverChannel;
private Channel clientChannel;
private CountDownLatch requestLatch;
+ private Http2TestUtil.FrameAdapter serverAdapter;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
- requestLatch = new CountDownLatch(1);
+ serverLatch(new CountDownLatch(1));
frameWriter = new DefaultHttp2FrameWriter();
dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
@@ -81,7 +86,8 @@ public class Http2FrameRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
- p.addLast("reader", new FrameAdapter(serverObserver));
+ serverAdapter = new Http2TestUtil.FrameAdapter(serverListener, requestLatch, true);
+ p.addLast("reader", serverAdapter);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
}
});
@@ -92,7 +98,7 @@ public class Http2FrameRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
- p.addLast("reader", new FrameAdapter(null));
+ p.addLast("reader", new Http2TestUtil.FrameAdapter(null, null, true));
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
}
});
@@ -107,26 +113,37 @@ public class Http2FrameRoundtripTest {
@After
public void teardown() throws Exception {
+ List capturedData = dataCaptor.getAllValues();
+ for (int i = 0; i < capturedData.size(); ++i) {
+ capturedData.get(i).release();
+ }
serverChannel.close().sync();
sb.group().shutdownGracefully();
sb.childGroup().shutdownGracefully();
cb.group().shutdownGracefully();
+ serverAdapter = null;
}
@Test
public void dataFrameShouldMatch() throws Exception {
final String text = "hello world";
- runInChannel(clientChannel, new Http2Runnable() {
- @Override
- public void run() {
- frameWriter.writeData(ctx(), 0x7FFFFFFF,
- Unpooled.copiedBuffer(text.getBytes()), 100, true, newPromise());
- ctx().flush();
- }
- });
- awaitRequests();
- verify(serverObserver).onDataRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
- dataCaptor.capture(), eq(100), eq(true));
+ final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
+ try {
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writeData(ctx(), 0x7FFFFFFF, data.retain(), 100, true, newPromise());
+ ctx().flush();
+ }
+ });
+ awaitRequests();
+ verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ dataCaptor.capture(), eq(100), eq(true));
+ List capturedData = dataCaptor.getAllValues();
+ assertEquals(data, capturedData.get(0));
+ } finally {
+ data.release();
+ }
}
@Test
@@ -142,7 +159,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
- verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(headers), eq(0), eq(true));
}
@@ -160,39 +177,50 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
- verify(serverObserver).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(headers), eq(4), eq((short) 255), eq(true), eq(0), eq(true));
}
@Test
public void goAwayFrameShouldMatch() throws Exception {
final String text = "test";
- runInChannel(clientChannel, new Http2Runnable() {
- @Override
- public void run() {
- frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL,
- Unpooled.copiedBuffer(text.getBytes()), newPromise());
- ctx().flush();
- }
- });
- awaitRequests();
- verify(serverObserver).onGoAwayRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
- eq(0xFFFFFFFFL), dataCaptor.capture());
+ final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
+ try {
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writeGoAway(ctx(), 0x7FFFFFFF, 0xFFFFFFFFL, data.retain(), newPromise());
+ ctx().flush();
+ }
+ });
+ awaitRequests();
+ verify(serverListener).onGoAwayRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ eq(0xFFFFFFFFL), dataCaptor.capture());
+ List capturedData = dataCaptor.getAllValues();
+ assertEquals(data, capturedData.get(0));
+ } finally {
+ data.release();
+ }
}
@Test
public void pingFrameShouldMatch() throws Exception {
- final ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
- runInChannel(clientChannel, new Http2Runnable() {
- @Override
- public void run() {
- frameWriter.writePing(ctx(), true, buf, newPromise());
- ctx().flush();
- }
- });
- awaitRequests();
- verify(serverObserver)
- .onPingAckRead(any(ChannelHandlerContext.class), dataCaptor.capture());
+ final ByteBuf data = Unpooled.copiedBuffer("01234567", UTF_8);
+ try {
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ frameWriter.writePing(ctx(), true, data.retain(), newPromise());
+ ctx().flush();
+ }
+ });
+ awaitRequests();
+ verify(serverListener).onPingAckRead(any(ChannelHandlerContext.class), dataCaptor.capture());
+ List capturedData = dataCaptor.getAllValues();
+ assertEquals(data, capturedData.get(0));
+ } finally {
+ data.release();
+ }
}
@Test
@@ -205,7 +233,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
- verify(serverObserver).onPriorityRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ verify(serverListener).onPriorityRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(1), eq((short) 1), eq(true));
}
@@ -222,7 +250,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
- verify(serverObserver).onPushPromiseRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ verify(serverListener).onPushPromiseRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(1), eq(headers), eq(5));
}
@@ -236,7 +264,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
- verify(serverObserver).onRstStreamRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ verify(serverListener).onRstStreamRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0xFFFFFFFFL));
}
@@ -254,7 +282,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
- verify(serverObserver).onSettingsRead(any(ChannelHandlerContext.class), eq(settings));
+ verify(serverListener).onSettingsRead(any(ChannelHandlerContext.class), eq(settings));
}
@Test
@@ -267,7 +295,7 @@ public class Http2FrameRoundtripTest {
}
});
awaitRequests();
- verify(serverObserver).onWindowUpdateRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
+ verify(serverListener).onWindowUpdateRead(any(ChannelHandlerContext.class), eq(0x7FFFFFFF),
eq(0x7FFFFFFF));
}
@@ -277,26 +305,46 @@ public class Http2FrameRoundtripTest {
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
final String text = "hello world";
- final int numStreams = 10000;
- int expectedFrames = numStreams * 2;
- requestLatch = new CountDownLatch(expectedFrames);
- runInChannel(clientChannel, new Http2Runnable() {
- @Override
- public void run() {
- for (int i = 1; i < numStreams + 1; ++i) {
- frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false,
- 0, false, newPromise());
- frameWriter.writeData(ctx(), i,
- Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
- ctx().flush();
+ final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
+ try {
+ final int numStreams = 10000;
+ final int expectedFrames = numStreams * 2;
+ serverLatch(new CountDownLatch(expectedFrames));
+ runInChannel(clientChannel, new Http2Runnable() {
+ @Override
+ public void run() {
+ for (int i = 1; i < numStreams + 1; ++i) {
+ frameWriter.writeHeaders(ctx(), i, headers, 0, (short) 16, false, 0, false, newPromise());
+ frameWriter.writeData(ctx(), i, data.retain(), 0, true, newPromise());
+ ctx().flush();
+ }
}
+ });
+ awaitRequests(30);
+ verify(serverListener, times(numStreams)).onDataRead(any(ChannelHandlerContext.class), anyInt(),
+ dataCaptor.capture(), eq(0), eq(true));
+ List capturedData = dataCaptor.getAllValues();
+ for (int i = 0; i < capturedData.size(); ++i) {
+ assertEquals(data, capturedData.get(i));
}
- });
- awaitRequests();
+ } finally {
+ data.release();
+ }
+ }
+
+ private void awaitRequests(long seconds) throws InterruptedException {
+ requestLatch.await(seconds, SECONDS);
}
private void awaitRequests() throws InterruptedException {
- requestLatch.await(5, SECONDS);
+ awaitRequests(5);
+ }
+
+ private void serverLatch(CountDownLatch latch) {
+ requestLatch = latch;
+ if (serverAdapter != null) {
+ serverAdapter.latch(latch);
+ }
}
private ChannelHandlerContext ctx() {
@@ -306,123 +354,4 @@ public class Http2FrameRoundtripTest {
private ChannelPromise newPromise() {
return ctx().newPromise();
}
-
- private final class FrameAdapter extends ByteToMessageDecoder {
-
- private final Http2FrameListener observer;
- private final DefaultHttp2FrameReader reader;
-
- FrameAdapter(Http2FrameListener observer) {
- this.observer = observer;
- reader = new DefaultHttp2FrameReader();
- }
-
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List