HTTP/2 Data Compressor

Motivation:
The HTTP/2 codec currently does not provide an interface to compress data. There is an analogous case to this in the HTTP codec and it is expected to be used commonly enough that it will be beneficial to have the feature in the http2-codec.

Modifications:
- Add a class which extends DefaultHttp2ConnectionEncoder and provides hooks to an EmbeddedChannel
- Add a compressor element to the Http2Stream interface
- Update unit tests to utilize the new feature

Result:
HTTP/2 codec supports data compression.
This commit is contained in:
Scott Mitchell 2014-11-05 23:06:31 -05:00
parent b33901c5a6
commit 685075f1a0
7 changed files with 416 additions and 85 deletions

View File

@ -15,8 +15,11 @@
*/
package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
@ -26,16 +29,6 @@ import io.netty.handler.codec.compression.ZlibWrapper;
* handler modifies the message, please refer to {@link HttpContentDecoder}.
*/
public class HttpContentDecompressor extends HttpContentDecoder {
/**
* {@code "x-deflate"}
*/
private static final AsciiString X_DEFLATE = new AsciiString("x-deflate");
/**
* {@code "x-gzip"}
*/
private static final AsciiString X_GZIP = new AsciiString("x-gzip");
private final boolean strict;
/**
@ -57,11 +50,11 @@ public class HttpContentDecompressor extends HttpContentDecoder {
@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
if (HttpHeaderValues.GZIP.equalsIgnoreCase(contentEncoding) ||
if (GZIP.equalsIgnoreCase(contentEncoding) ||
X_GZIP.equalsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
if (HttpHeaderValues.DEFLATE.equalsIgnoreCase(contentEncoding) ||
if (DEFLATE.equalsIgnoreCase(contentEncoding) ||
X_DEFLATE.equalsIgnoreCase(contentEncoding)) {
final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.

View File

@ -76,6 +76,10 @@ public final class HttpHeaderValues {
* {@code "deflate"}
*/
public static final AsciiString DEFLATE = new AsciiString("deflate");
/**
* {@code "x-deflate"}
*/
public static final AsciiString X_DEFLATE = new AsciiString("x-deflate");
/**
* {@code "file"}
* See {@link HttpHeaderNames#CONTENT_DISPOSITION}
@ -95,6 +99,10 @@ public final class HttpHeaderValues {
* {@code "gzip"}
*/
public static final AsciiString GZIP = new AsciiString("gzip");
/**
* {@code "x-gzip"}
*/
public static final AsciiString X_GZIP = new AsciiString("x-gzip");
/**
* {@code "identity"}
*/

View File

@ -0,0 +1,289 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseAggregator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
/**
* A HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each stream.
* The compression provided by this class will be applied to the data for the entire stream.
*/
public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEncoder {
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override
public void streamRemoved(Http2Stream stream) {
final EmbeddedChannel compressor = stream.compressor();
if (compressor != null) {
cleanup(stream, compressor);
}
}
};
private final int compressionLevel;
private final int windowBits;
private final int memLevel;
/**
* Builder for new instances of {@link CompressorHttp2ConnectionEncoder}
*/
public static class Builder extends DefaultHttp2ConnectionEncoder.Builder {
protected int compressionLevel = 6;
protected int windowBits = 15;
protected int memLevel = 8;
public Builder compressionLevel(int compressionLevel) {
this.compressionLevel = compressionLevel;
return this;
}
public Builder windowBits(int windowBits) {
this.windowBits = windowBits;
return this;
}
public Builder memLevel(int memLevel) {
this.memLevel = memLevel;
return this;
}
@Override
public CompressorHttp2ConnectionEncoder build() {
return new CompressorHttp2ConnectionEncoder(this);
}
}
protected CompressorHttp2ConnectionEncoder(Builder builder) {
super(builder);
if (builder.compressionLevel < 0 || builder.compressionLevel > 9) {
throw new IllegalArgumentException("compressionLevel: " + builder.compressionLevel + " (expected: 0-9)");
}
if (builder.windowBits < 9 || builder.windowBits > 15) {
throw new IllegalArgumentException("windowBits: " + builder.windowBits + " (expected: 9-15)");
}
if (builder.memLevel < 1 || builder.memLevel > 9) {
throw new IllegalArgumentException("memLevel: " + builder.memLevel + " (expected: 1-9)");
}
this.compressionLevel = builder.compressionLevel;
this.windowBits = builder.windowBits;
this.memLevel = builder.memLevel;
connection().addListener(CLEAN_UP_LISTENER);
}
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) {
final Http2Stream stream = connection().stream(streamId);
final EmbeddedChannel compressor = stream == null ? null : stream.compressor();
if (compressor == null) {
// The compressor may be null if no compatible encoding type was found in this stream's headers
return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
}
try {
// call retain here as it will call release after its written to the channel
compressor.writeOutbound(data.retain());
ByteBuf buf = nextReadableBuf(compressor);
if (buf == null) {
if (endOfStream) {
return super.writeData(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, endOfStream, promise);
}
// END_STREAM is not set and the assumption is data is still forthcoming.
promise.setSuccess();
return promise;
}
ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(promise);
for (;;) {
final ByteBuf nextBuf = nextReadableBuf(compressor);
final boolean endOfStreamForBuf = nextBuf == null ? endOfStream : false;
ChannelPromise newPromise = ctx.newPromise();
aggregator.add(newPromise);
super.writeData(ctx, streamId, buf, padding, endOfStreamForBuf, newPromise);
if (nextBuf == null) {
break;
}
buf = nextBuf;
}
return promise;
} finally {
if (endOfStream) {
cleanup(stream, compressor);
}
}
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream, ChannelPromise promise) {
initCompressor(streamId, headers, endStream);
return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
}
@Override
public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
final int streamDependency, final short weight, final boolean exclusive, final int padding,
final boolean endOfStream, final ChannelPromise promise) {
initCompressor(streamId, headers, endOfStream);
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream,
promise);
}
/**
* Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
* {@code contentEncoding}.
*
* @param contentEncoding the value of the {@code content-encoding} header
* @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
* (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
* @throws Http2Exception If the specified encoding is not not supported and warrants an exception
*/
protected EmbeddedChannel newContentCompressor(AsciiString contentEncoding) throws Http2Exception {
if (GZIP.equalsIgnoreCase(contentEncoding) || X_GZIP.equalsIgnoreCase(contentEncoding)) {
return newCompressionChannel(ZlibWrapper.GZIP);
}
if (DEFLATE.equalsIgnoreCase(contentEncoding) || X_DEFLATE.equalsIgnoreCase(contentEncoding)) {
return newCompressionChannel(ZlibWrapper.ZLIB);
}
// 'identity' or unsupported
return null;
}
/**
* Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
* behavior, which is the case for most compressors.
*
* @param contentEncoding the value of the {@code content-encoding} header
* @return the expected content encoding of the new content.
* @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
*/
protected AsciiString getTargetContentEncoding(AsciiString contentEncoding) throws Http2Exception {
return contentEncoding;
}
/**
* Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
* @param wrapper Defines what type of encoder should be used
*/
private EmbeddedChannel newCompressionChannel(ZlibWrapper wrapper) {
return new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
memLevel));
}
/**
* Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
* modify the {@code content-encoding} header contained in {@code headers}.
*
* @param streamId The identifier for the headers inside {@code headers}
* @param headers Object representing headers which are to be written
* @param endOfStream Indicates if the stream has ended
*/
private void initCompressor(int streamId, Http2Headers headers, boolean endOfStream) {
final Http2Stream stream = connection().stream(streamId);
if (stream == null) {
return;
}
EmbeddedChannel compressor = stream.compressor();
if (compressor == null) {
if (!endOfStream) {
AsciiString encoding = headers.get(CONTENT_ENCODING);
if (encoding == null) {
encoding = IDENTITY;
}
try {
compressor = newContentCompressor(encoding);
if (compressor != null) {
AsciiString targetContentEncoding = getTargetContentEncoding(encoding);
if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) {
headers.remove(CONTENT_ENCODING);
} else {
headers.set(CONTENT_ENCODING, targetContentEncoding);
}
}
} catch (Throwable ignored) {
// Ignore
}
}
} else if (endOfStream) {
cleanup(stream, compressor);
}
if (compressor != null) {
// The content length will be for the decompressed data. Since we will compress the data
// this content-length will not be correct. Instead of queuing messages or delaying sending
// header frames...just remove the content-length header
headers.remove(CONTENT_LENGTH);
}
}
/**
* Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
*
* @param stream The stream for which {@code compressor} is the compressor for
* @param decompressor The compressor for {@code stream}
*/
private static void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
if (compressor.finish()) {
for (;;) {
final ByteBuf buf = compressor.readOutbound();
if (buf == null) {
break;
}
buf.release();
}
}
stream.compressor(null);
}
/**
* Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
*
* @param decompressor The channel to read from
* @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
*/
private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
for (;;) {
final ByteBuf buf = compressor.readOutbound();
if (buf == null) {
return null;
}
if (!buf.isReadable()) {
buf.release();
continue;
}
return buf;
}
}
}

View File

@ -221,6 +221,7 @@ public class DefaultHttp2Connection implements Http2Connection {
private FlowState inboundFlow;
private FlowState outboundFlow;
private EmbeddedChannel decompressor;
private EmbeddedChannel compressor;
private Object data;
DefaultStream(int id) {
@ -310,6 +311,19 @@ public class DefaultHttp2Connection implements Http2Connection {
return decompressor;
}
@Override
public void compressor(EmbeddedChannel compressor) {
if (this.compressor != null && compressor != null) {
throw new IllegalStateException("compressor can not be reassigned");
}
this.compressor = compressor;
}
@Override
public EmbeddedChannel compressor() {
return compressor;
}
@Override
public FlowState inboundFlow() {
return inboundFlow;

View File

@ -14,6 +14,13 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -22,24 +29,12 @@ import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
/**
* A HTTP2 frame listener that will decompress data frames according to the {@code content-encoding} header for each
* stream.
* stream. The decompression provided by this class will be applied to the data for the entire stream.
*/
public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
/**
* {@code "x-deflate"}
*/
private static final AsciiString X_DEFLATE = new AsciiString("x-deflate");
/**
* {@code "x-gzip"}
*/
private static final AsciiString X_GZIP = new AsciiString("x-gzip");
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override
public void streamRemoved(Http2Stream stream) {
@ -72,6 +67,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
final Http2Stream stream = connection.stream(streamId);
final EmbeddedChannel decompressor = stream == null ? null : stream.decompressor();
if (decompressor == null) {
// The decompressor may be null if no compatible encoding type was found in this stream's headers
listener.onDataRead(ctx, streamId, data, padding, endOfStream);
return;
}
@ -90,12 +86,13 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
} else {
for (;;) {
final ByteBuf nextBuf = nextReadableBuf(decompressor);
final boolean endOfStreamForBuf = nextBuf == null ? endOfStream : false;
listener.onDataRead(ctx, streamId, buf, padding, endOfStreamForBuf);
if (nextBuf == null) {
listener.onDataRead(ctx, streamId, buf, padding, endOfStream);
break;
}
listener.onDataRead(ctx, streamId, buf, padding, false);
buf = nextBuf;
}
}
@ -130,11 +127,11 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
* @throws Http2Exception If the specified encoding is not not supported and warrants an exception
*/
protected EmbeddedChannel newContentDecompressor(AsciiString contentEncoding) throws Http2Exception {
if (HttpHeaderValues.GZIP.equalsIgnoreCase(contentEncoding) ||
if (GZIP.equalsIgnoreCase(contentEncoding) ||
X_GZIP.equalsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
}
if (HttpHeaderValues.DEFLATE.equalsIgnoreCase(contentEncoding) ||
if (DEFLATE.equalsIgnoreCase(contentEncoding) ||
X_DEFLATE.equalsIgnoreCase(contentEncoding)) {
final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
@ -154,7 +151,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
*/
protected AsciiString getTargetContentEncoding(@SuppressWarnings("UnusedParameters") AsciiString contentEncoding)
throws Http2Exception {
return HttpHeaderValues.IDENTITY;
return IDENTITY;
}
/**
@ -176,9 +173,9 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
if (decompressor == null) {
if (!endOfStream) {
// Determine the content encoding.
AsciiString contentEncoding = headers.get(HttpHeaderNames.CONTENT_ENCODING);
AsciiString contentEncoding = headers.get(CONTENT_ENCODING);
if (contentEncoding == null) {
contentEncoding = HttpHeaderValues.IDENTITY;
contentEncoding = IDENTITY;
}
decompressor = newContentDecompressor(contentEncoding);
if (decompressor != null) {
@ -186,21 +183,22 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
// Decode the content and remove or replace the existing headers
// so that the message looks like a decoded message.
AsciiString targetContentEncoding = getTargetContentEncoding(contentEncoding);
if (HttpHeaderValues.IDENTITY.equalsIgnoreCase(targetContentEncoding)) {
headers.remove(HttpHeaderNames.CONTENT_ENCODING);
if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) {
headers.remove(CONTENT_ENCODING);
} else {
headers.set(HttpHeaderNames.CONTENT_ENCODING, targetContentEncoding);
headers.set(CONTENT_ENCODING, targetContentEncoding);
}
}
}
} else if (endOfStream) {
cleanup(stream, decompressor);
}
if (decompressor != null) {
// The content length will be for the compressed data. Since we will decompress the data
// this content-length will not be correct. Instead of queuing messages or delaying sending
// header frames...just remove the content-length header
headers.remove(HttpHeaderNames.CONTENT_LENGTH);
headers.remove(CONTENT_LENGTH);
}
}

View File

@ -153,6 +153,16 @@ public interface Http2Stream {
*/
EmbeddedChannel decompressor();
/**
* Associate an object responsible for compressing data frames for this stream
*/
void compressor(EmbeddedChannel decompressor);
/**
* Get the object capable of compressing data frames for this stream
*/
EmbeddedChannel compressor();
/**
* Gets the in-bound flow control state for this stream.
*/

View File

@ -14,6 +14,17 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2TestUtil.as;
import static io.netty.handler.codec.http2.Http2TestUtil.runInChannel;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -35,8 +46,15 @@ import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -44,15 +62,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2TestUtil.*;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Test for data decompression in the HTTP/2 codec.
*/
@ -66,9 +75,14 @@ public class DataCompressionHttp2Test {
private Http2FrameListener serverListener;
@Mock
private Http2FrameListener clientListener;
@Mock
private Http2LifecycleManager serverLifeCycleManager;
@Mock
private Http2LifecycleManager clientLifeCycleManager;
private ByteBufAllocator alloc;
private Http2FrameWriter frameWriter;
private Http2ConnectionEncoder serverEncoder;
private Http2ConnectionEncoder clientEncoder;
private ServerBootstrap sb;
private Bootstrap cb;
private Channel serverChannel;
@ -107,22 +121,22 @@ public class DataCompressionHttp2Test {
@Test
public void justHeadersNoData() throws Exception {
bootstrapEnv(2, 1);
final Http2Headers headers =
new DefaultHttp2Headers().method(GET).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
bootstrapEnv(1, 1);
final Http2Headers headers = new DefaultHttp2Headers().method(GET).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, true, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitServer();
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers), eq(0), eq(true));
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(headers), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
}
@Test
@ -133,17 +147,16 @@ public class DataCompressionHttp2Test {
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
try {
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers =
new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
ctxClient().flush();
}
});
@ -151,7 +164,7 @@ public class DataCompressionHttp2Test {
data.resetReaderIndex();
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally {
@ -168,17 +181,16 @@ public class DataCompressionHttp2Test {
final EmbeddedChannel encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
try {
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers =
new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
ctxClient().flush();
}
});
@ -186,7 +198,7 @@ public class DataCompressionHttp2Test {
data.resetReaderIndex();
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally {
@ -206,18 +218,17 @@ public class DataCompressionHttp2Test {
try {
final ByteBuf encodedData1 = encodeData(data1, encoder);
final ByteBuf encodedData2 = encodeData(data2, encoder);
final Http2Headers headers =
new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData1, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData2, 0, true, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData1, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData2, 0, true, newPromiseClient());
ctxClient().flush();
}
});
@ -227,7 +238,7 @@ public class DataCompressionHttp2Test {
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
ArgumentCaptor<Boolean> endStreamCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(serverListener, times(2)).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(),
eq(0), endStreamCaptor.capture());
eq(0), endStreamCaptor.capture());
dataCapture = dataCaptor.getAllValues();
assertEquals(data1, dataCapture.get(0));
assertEquals(data2, dataCapture.get(1));
@ -252,16 +263,15 @@ public class DataCompressionHttp2Test {
data.writeByte((byte) 'a');
}
final ByteBuf encodedData = encodeData(data, encoder);
final Http2Headers headers =
new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
final Http2Settings settings = new Http2Settings();
// Assume the compression operation will reduce the size by at least 10 bytes
settings.initialWindowSize(BUFFER_SIZE - 10);
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeSettings(ctxServer(), settings, newPromiseServer());
serverEncoder.writeSettings(ctxServer(), settings, newPromiseServer());
ctxServer().flush();
}
});
@ -273,9 +283,9 @@ public class DataCompressionHttp2Test {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeSettings(ctxClient(), settings, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, encodedData, 0, true, newPromiseClient());
ctxClient().flush();
}
});
@ -283,7 +293,7 @@ public class DataCompressionHttp2Test {
data.resetReaderIndex();
ArgumentCaptor<ByteBuf> dataCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), dataCaptor.capture(), eq(0),
eq(true));
eq(true));
dataCapture = dataCaptor.getAllValues();
assertEquals(data, dataCapture.get(0));
} finally {
@ -329,7 +339,6 @@ public class DataCompressionHttp2Test {
serverLatch = new CountDownLatch(serverCountDown);
clientLatch = new CountDownLatch(clientCountDown);
frameWriter = new DefaultHttp2FrameWriter();
serverConnection = new DefaultHttp2Connection(true);
final CountDownLatch latch = new CountDownLatch(1);
@ -339,9 +348,13 @@ public class DataCompressionHttp2Test {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
serverAdapter = new FrameAdapter(serverConnection,
new DelegatingDecompressorFrameListener(serverConnection, serverListener),
serverLatch);
CompressorHttp2ConnectionEncoder.Builder builder = new CompressorHttp2ConnectionEncoder.Builder();
Http2FrameWriter writer = new DefaultHttp2FrameWriter();
serverEncoder = builder.connection(serverConnection).frameWriter(writer)
.outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer))
.lifecycleManager(serverLifeCycleManager).build();
serverAdapter = new FrameAdapter(serverConnection, new DelegatingDecompressorFrameListener(
serverConnection, serverListener), serverLatch);
p.addLast("reader", serverAdapter);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
serverConnectedChannel = ch;
@ -355,7 +368,13 @@ public class DataCompressionHttp2Test {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
clientAdapter = new FrameAdapter(clientListener, clientLatch);
Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameWriter writer = new DefaultHttp2FrameWriter();
CompressorHttp2ConnectionEncoder.Builder builder = new CompressorHttp2ConnectionEncoder.Builder();
clientEncoder = builder.connection(connection).frameWriter(writer)
.outboundFlow(new DefaultHttp2OutboundFlowController(connection, writer))
.lifecycleManager(clientLifeCycleManager).build();
clientAdapter = new FrameAdapter(connection, clientListener, clientLatch);
p.addLast("reader", clientAdapter);
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
}