Add support for direct ByteBufs in JdkZlibEncoder and JdkZlibDecoder (#11057)

Motivation:
The JDK deflate implementation added support for operating on ByteBuffers in Java 11 or so.
This means that we don't need to restrict that implementation to ByteBufs that are heap based and can expose arrays.

Modification:
Add clauses to JdkZlibEncoder and JdkZlibDecoder for handling ByteBufs that don't have arrays, but do have one nioByteBuffer.
Expand the test coverage in JdkZlibTest to include all relevant combinations of buffer types and data types.

Result:
The JdkZlibEncoder and JdkZlibDecoder should now work on basically all non-composite ByteBufs, and likely also composite ByteBufs that have exactly one component.
This commit is contained in:
Chris Vest 2021-03-05 13:29:20 +01:00 committed by GitHub
parent bfea65ef52
commit e1830ccf47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 596 additions and 455 deletions

View File

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
import java.util.zip.Deflater; import java.util.zip.Deflater;
@ -233,15 +234,24 @@ public class JdkZlibDecoder extends ZlibDecoder {
try { try {
boolean readFooter = false; boolean readFooter = false;
while (!inflater.needsInput()) { while (!inflater.needsInput()) {
byte[] outArray = decompressed.array();
int writerIndex = decompressed.writerIndex(); int writerIndex = decompressed.writerIndex();
int outIndex = decompressed.arrayOffset() + writerIndex;
int writable = decompressed.writableBytes(); int writable = decompressed.writableBytes();
int outputLength = inflater.inflate(outArray, outIndex, writable); int outputLength;
if (decompressed.hasArray()) {
byte[] outArray = decompressed.array();
int outIndex = decompressed.arrayOffset() + writerIndex;
outputLength = inflater.inflate(outArray, outIndex, writable);
} else if (decompressed.nioBufferCount() == 1) {
ByteBuffer buffer = decompressed.internalNioBuffer(writerIndex, writable);
outputLength = inflater.inflate(buffer);
} else {
throw new IllegalStateException(
"Decompress buffer must have array or exactly 1 NIO buffer: " + decompressed);
}
if (outputLength > 0) { if (outputLength > 0) {
decompressed.writerIndex(writerIndex + outputLength); decompressed.writerIndex(writerIndex + outputLength);
if (crc != null) { if (crc != null) {
crc.update(outArray, outIndex, outputLength); crc.update(decompressed, writerIndex, outputLength);
} }
} else if (inflater.needsDictionary()) { } else if (inflater.needsDictionary()) {
if (dictionary == null) { if (dictionary == null) {

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier; import io.netty.channel.ChannelPromiseNotifier;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.Deflater; import java.util.zip.Deflater;
@ -81,6 +82,10 @@ public class JdkZlibEncoder extends ZlibEncoder {
this(wrapper, 6); this(wrapper, 6);
} }
public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
this(wrapper, compressionLevel, false);
}
/** /**
* Creates a new zlib encoder with the specified {@code compressionLevel} * Creates a new zlib encoder with the specified {@code compressionLevel}
* and the specified wrapper. * and the specified wrapper.
@ -89,10 +94,12 @@ public class JdkZlibEncoder extends ZlibEncoder {
* {@code 1} yields the fastest compression and {@code 9} yields the * {@code 1} yields the fastest compression and {@code 9} yields the
* best compression. {@code 0} means no compression. The default * best compression. {@code 0} means no compression. The default
* compression level is {@code 6}. * compression level is {@code 6}.
* @param preferDirectBuffers {@code true} if a direct {@link ByteBuf} should be tried to be used as target for
* decompression, or {@code false} if heap allocated {@link ByteBuf}s should be used.
* *
* @throws CompressionException if failed to initialize zlib * @throws CompressionException if failed to initialize zlib
*/ */
public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel, boolean preferDirectBuffers) {
if (compressionLevel < 0 || compressionLevel > 9) { if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"compressionLevel: " + compressionLevel + " (expected: 0-9)"); "compressionLevel: " + compressionLevel + " (expected: 0-9)");
@ -251,7 +258,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
// no op // no op
} }
} }
return ctx.alloc().heapBuffer(sizeEstimate); return ctx.alloc().buffer(sizeEstimate);
} }
@Override @Override
@ -308,13 +315,28 @@ public class JdkZlibEncoder extends ZlibEncoder {
} }
private void deflate(ByteBuf out) { private void deflate(ByteBuf out) {
int numBytes; if (out.hasArray()) {
do { int numBytes;
int writerIndex = out.writerIndex(); do {
numBytes = deflater.deflate( int writerIndex = out.writerIndex();
out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH); numBytes = deflater.deflate(
out.writerIndex(writerIndex + numBytes); out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
} while (numBytes > 0); out.writerIndex(writerIndex + numBytes);
} while (numBytes > 0);
} else if (out.nioBufferCount() == 1) {
// Use internalNioBuffer because nioBuffer is allowed to copy,
// which is fine for reading but not for writing.
int numBytes;
do {
int writerIndex = out.writerIndex();
ByteBuffer buffer = out.internalNioBuffer(writerIndex, out.writableBytes());
numBytes = deflater.deflate(buffer, Deflater.SYNC_FLUSH);
out.writerIndex(writerIndex + numBytes);
} while (numBytes > 0);
} else {
throw new IllegalArgumentException(
"Don't know how to deflate buffer without array or NIO buffer count of 1: " + out);
}
} }
@Override @Override

View File

@ -29,6 +29,7 @@ public abstract class ZlibDecoder extends ByteToMessageDecoder {
* Maximum allowed size of the decompression buffer. * Maximum allowed size of the decompression buffer.
*/ */
protected final int maxAllocation; protected final int maxAllocation;
protected final boolean preferDirect = false;
/** /**
* Same as {@link #ZlibDecoder(int)} with maxAllocation = 0. * Same as {@link #ZlibDecoder(int)} with maxAllocation = 0.
@ -63,10 +64,10 @@ public abstract class ZlibDecoder extends ByteToMessageDecoder {
protected ByteBuf prepareDecompressBuffer(ChannelHandlerContext ctx, ByteBuf buffer, int preferredSize) { protected ByteBuf prepareDecompressBuffer(ChannelHandlerContext ctx, ByteBuf buffer, int preferredSize) {
if (buffer == null) { if (buffer == null) {
if (maxAllocation == 0) { if (maxAllocation == 0) {
return ctx.alloc().heapBuffer(preferredSize); return ctx.alloc().buffer(preferredSize);
} }
return ctx.alloc().heapBuffer(Math.min(preferredSize, maxAllocation), maxAllocation); return ctx.alloc().buffer(Math.min(preferredSize, maxAllocation), maxAllocation);
} }
// this always expands the buffer if possible, even if the expansion is less than preferredSize // this always expands the buffer if possible, even if the expansion is less than preferredSize

View File

@ -29,6 +29,10 @@ public abstract class ZlibEncoder extends MessageToByteEncoder<ByteBuf> {
super(false); super(false);
} }
protected ZlibEncoder(boolean preferDirectBuffers) {
super(preferDirectBuffers);
}
/** /**
* Returns {@code true} if and only if the end of the compressed stream * Returns {@code true} if and only if the end of the compressed stream
* has been reached. * has been reached.

View File

@ -15,37 +15,513 @@
*/ */
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.compress.utils.IOUtils;
import org.junit.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Queue; import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class JdkZlibTest {
private static final byte[] BYTES_SMALL = new byte[128];
private static final byte[] BYTES_LARGE = new byte[1024 * 1024];
private static final byte[] BYTES_LARGE2 = ("<!--?xml version=\"1.0\" encoding=\"ISO-8859-1\"?-->\n" +
"<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Strict//EN\" " +
"\"https://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd\">\n" +
"<html xmlns=\"https://www.w3.org/1999/xhtml\" xml:lang=\"en\" lang=\"en\"><head>\n" +
" <title>Apache Tomcat</title>\n" +
"</head>\n" +
'\n' +
"<body>\n" +
"<h1>It works !</h1>\n" +
'\n' +
"<p>If you're seeing this page via a web browser, it means you've setup Tomcat successfully." +
" Congratulations!</p>\n" +
" \n" +
"<p>This is the default Tomcat home page." +
" It can be found on the local filesystem at: <code>/var/lib/tomcat7/webapps/ROOT/index.html</code></p>\n" +
'\n' +
"<p>Tomcat7 veterans might be pleased to learn that this system instance of Tomcat is installed with" +
" <code>CATALINA_HOME</code> in <code>/usr/share/tomcat7</code> and <code>CATALINA_BASE</code> in" +
" <code>/var/lib/tomcat7</code>, following the rules from" +
" <code>/usr/share/doc/tomcat7-common/RUNNING.txt.gz</code>.</p>\n" +
'\n' +
"<p>You might consider installing the following packages, if you haven't already done so:</p>\n" +
'\n' +
"<p><b>tomcat7-docs</b>: This package installs a web application that allows to browse the Tomcat 7" +
" documentation locally. Once installed, you can access it by clicking <a href=\"docs/\">here</a>.</p>\n" +
'\n' +
"<p><b>tomcat7-examples</b>: This package installs a web application that allows to access the Tomcat" +
" 7 Servlet and JSP examples. Once installed, you can access it by clicking" +
" <a href=\"examples/\">here</a>.</p>\n" +
'\n' +
"<p><b>tomcat7-admin</b>: This package installs two web applications that can help managing this Tomcat" +
" instance. Once installed, you can access the <a href=\"manager/html\">manager webapp</a> and" +
" the <a href=\"host-manager/html\">host-manager webapp</a>.</p><p>\n" +
'\n' +
"</p><p>NOTE: For security reasons, using the manager webapp is restricted" +
" to users with role \"manager\"." +
" The host-manager webapp is restricted to users with role \"admin\". Users are " +
"defined in <code>/etc/tomcat7/tomcat-users.xml</code>.</p>\n" +
'\n' +
'\n' +
'\n' +
"</body></html>").getBytes(CharsetUtil.UTF_8);
public class JdkZlibTest extends ZlibTest { static {
Random rand = ThreadLocalRandom.current();
@Override rand.nextBytes(BYTES_SMALL);
protected ZlibEncoder createEncoder(ZlibWrapper wrapper) { rand.nextBytes(BYTES_LARGE);
return new JdkZlibEncoder(wrapper); }
enum Data {
NONE(null),
SMALL(BYTES_SMALL),
LARGE(BYTES_LARGE);
final byte[] bytes;
Data(byte[] bytes) {
this.bytes = bytes;
}
}
enum BufferType {
HEAP,
DIRECT;
ByteBuf allocate(byte[] bytes) {
switch (this) {
case HEAP: return Unpooled.wrappedBuffer(bytes);
case DIRECT: return Unpooled.directBuffer(bytes.length).writeBytes(bytes);
}
return fail("Fall-through should not be possible: " + this);
}
}
protected ZlibDecoder createDecoder(ZlibWrapper wrapper) {
return createDecoder(wrapper, 0);
}
protected ZlibEncoder createEncoder(ZlibWrapper wrapper, BufferType bufferType) {
return new JdkZlibEncoder(wrapper, 6, bufferType == BufferType.DIRECT);
} }
@Override
protected ZlibDecoder createDecoder(ZlibWrapper wrapper, int maxAllocation) { protected ZlibDecoder createDecoder(ZlibWrapper wrapper, int maxAllocation) {
return new JdkZlibDecoder(wrapper, maxAllocation); return new JdkZlibDecoder(wrapper, maxAllocation);
} }
@Test(expected = DecompressionException.class) static Stream<Arguments> compressionConfigurations() {
@Override Stream.Builder<Arguments> args = Stream.builder();
Data[] dataVals = Data.values();
BufferType[] bufferTypeVals = BufferType.values();
ZlibWrapper[] zlibWrappers = ZlibWrapper.values();
for (Data data : dataVals) {
for (BufferType inBuf : bufferTypeVals) {
for (BufferType outBuf : bufferTypeVals) {
for (ZlibWrapper inputWrapper : zlibWrappers) {
for (ZlibWrapper outputWrapper : zlibWrappers) {
args.add(Arguments.of(data, inBuf, outBuf, inputWrapper, outputWrapper));
}
}
}
}
}
return args.build();
}
static Stream<Arguments> workingConfigurations() {
return compressionConfigurations().filter(JdkZlibTest::isWorkingConfiguration);
}
private static boolean isWorkingConfiguration(Arguments args) {
Object[] objs = args.get();
ZlibWrapper inWrap = (ZlibWrapper) objs[3];
ZlibWrapper outWrap = (ZlibWrapper) objs[4];
if (inWrap == ZlibWrapper.ZLIB_OR_NONE) {
return false;
}
if (inWrap == ZlibWrapper.GZIP || outWrap == ZlibWrapper.GZIP) {
return inWrap == outWrap;
}
if (inWrap == ZlibWrapper.NONE) {
return outWrap == ZlibWrapper.NONE || outWrap == ZlibWrapper.ZLIB_OR_NONE;
}
if (outWrap == ZlibWrapper.NONE) {
return inWrap == ZlibWrapper.NONE;
}
return true;
}
@ParameterizedTest
@MethodSource("workingConfigurations")
void compressionInputOutput(
Data data, BufferType inBuf, BufferType outBuf, ZlibWrapper inWrap, ZlibWrapper outWrap) {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(inWrap, inBuf));
EmbeddedChannel chDecoder = new EmbeddedChannel(createDecoder(outWrap));
chEncoder.config().setAllocator(new UnpooledByteBufAllocator(inBuf == BufferType.DIRECT));
chDecoder.config().setAllocator(new UnpooledByteBufAllocator(outBuf == BufferType.DIRECT));
try {
if (data != Data.NONE) {
chEncoder.writeOutbound(inBuf.allocate(data.bytes));
chEncoder.flush();
for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoder.writeInbound(deflatedData);
}
byte[] decompressed = new byte[data.bytes.length];
int offset = 0;
for (;;) {
ByteBuf buf = chDecoder.readInbound();
if (buf == null) {
break;
}
int length = buf.readableBytes();
buf.readBytes(decompressed, offset, length);
offset += length;
buf.release();
if (offset == decompressed.length) {
break;
}
}
assertArrayEquals(data.bytes, decompressed);
assertNull(chDecoder.readInbound());
}
// Closing an encoder channel will generate a footer.
assertTrue(chEncoder.finish());
for (;;) {
Object msg = chEncoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
// But, the footer will be decoded into nothing. It's only for validation.
assertFalse(chDecoder.finish());
} finally {
dispose(chEncoder);
dispose(chDecoder);
}
}
@Test
public void testGZIP2() throws Exception {
byte[] bytes = "message".getBytes(CharsetUtil.UTF_8);
ByteBuf data = Unpooled.wrappedBuffer(bytes);
ByteBuf deflatedData = Unpooled.wrappedBuffer(gzip(bytes));
EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP));
try {
while (deflatedData.isReadable()) {
chDecoderGZip.writeInbound(deflatedData.readRetainedSlice(1));
}
deflatedData.release();
assertTrue(chDecoderGZip.finish());
ByteBuf buf = Unpooled.buffer();
for (;;) {
ByteBuf b = chDecoderGZip.readInbound();
if (b == null) {
break;
}
buf.writeBytes(b);
b.release();
}
assertEquals(buf, data);
assertNull(chDecoderGZip.readInbound());
data.release();
buf.release();
} finally {
dispose(chDecoderGZip);
}
}
private void testCompress0(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper, ByteBuf data) throws Exception {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper, BufferType.HEAP));
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
try {
chEncoder.writeOutbound(data.retain());
chEncoder.flush();
data.readerIndex(0);
for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoderZlib.writeInbound(deflatedData);
}
byte[] decompressed = new byte[data.readableBytes()];
int offset = 0;
for (;;) {
ByteBuf buf = chDecoderZlib.readInbound();
if (buf == null) {
break;
}
int length = buf.readableBytes();
buf.readBytes(decompressed, offset, length);
offset += length;
buf.release();
if (offset == decompressed.length) {
break;
}
}
assertEquals(data, Unpooled.wrappedBuffer(decompressed));
assertNull(chDecoderZlib.readInbound());
// Closing an encoder channel will generate a footer.
assertTrue(chEncoder.finish());
for (;;) {
Object msg = chEncoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
// But, the footer will be decoded into nothing. It's only for validation.
assertFalse(chDecoderZlib.finish());
data.release();
} finally {
dispose(chEncoder);
dispose(chDecoderZlib);
}
}
private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper, BufferType.HEAP));
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
try {
// Closing an encoder channel without writing anything should generate both header and footer.
assertTrue(chEncoder.finish());
for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoderZlib.writeInbound(deflatedData);
}
// Decoder should not generate anything at all.
boolean decoded = false;
for (;;) {
ByteBuf buf = chDecoderZlib.readInbound();
if (buf == null) {
break;
}
buf.release();
decoded = true;
}
assertFalse(decoded, "should decode nothing");
assertFalse(chDecoderZlib.finish());
} finally {
dispose(chEncoder);
dispose(chDecoderZlib);
}
}
private static void dispose(EmbeddedChannel ch) {
if (ch.finish()) {
for (;;) {
Object msg = ch.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
for (;;) {
Object msg = ch.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
}
}
// Test for https://github.com/netty/netty/issues/2572
private void testDecompressOnly(ZlibWrapper decoderWrapper, byte[] compressed, byte[] data) throws Exception {
EmbeddedChannel chDecoder = new EmbeddedChannel(createDecoder(decoderWrapper));
chDecoder.writeInbound(Unpooled.copiedBuffer(compressed));
assertTrue(chDecoder.finish());
ByteBuf decoded = Unpooled.buffer(data.length);
for (;;) {
ByteBuf buf = chDecoder.readInbound();
if (buf == null) {
break;
}
decoded.writeBytes(buf);
buf.release();
}
assertEquals(Unpooled.copiedBuffer(data), decoded);
decoded.release();
}
private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_SMALL));
testCompress0(encoderWrapper, decoderWrapper,
Unpooled.directBuffer(BYTES_SMALL.length).writeBytes(BYTES_SMALL));
}
private void testCompressLarge(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_LARGE));
testCompress0(encoderWrapper, decoderWrapper,
Unpooled.directBuffer(BYTES_LARGE.length).writeBytes(BYTES_LARGE));
}
@Test
public void testZLIB() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testDecompressOnly(ZlibWrapper.ZLIB, deflate(BYTES_LARGE2), BYTES_LARGE2);
}
@Test
public void testNONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE);
}
@Test
public void testGZIP() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testDecompressOnly(ZlibWrapper.GZIP, gzip(BYTES_LARGE2), BYTES_LARGE2);
}
@Test
public void testGZIPCompressOnly() throws Exception {
testGZIPCompressOnly0(null); // Do not write anything; just finish the stream.
testGZIPCompressOnly0(EmptyArrays.EMPTY_BYTES); // Write an empty array.
testGZIPCompressOnly0(BYTES_SMALL);
testGZIPCompressOnly0(BYTES_LARGE);
}
private void testGZIPCompressOnly0(byte[] data) throws IOException {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(ZlibWrapper.GZIP, BufferType.HEAP));
if (data != null) {
chEncoder.writeOutbound(Unpooled.wrappedBuffer(data));
}
assertTrue(chEncoder.finish());
ByteBuf encoded = Unpooled.buffer();
for (;;) {
ByteBuf buf = chEncoder.readOutbound();
if (buf == null) {
break;
}
encoded.writeBytes(buf);
buf.release();
}
ByteBuf decoded = Unpooled.buffer();
GZIPInputStream stream = new GZIPInputStream(new ByteBufInputStream(encoded, true));
try {
byte[] buf = new byte[8192];
for (;;) {
int readBytes = stream.read(buf);
if (readBytes < 0) {
break;
}
decoded.writeBytes(buf, 0, readBytes);
}
} finally {
stream.close();
}
if (data != null) {
assertEquals(Unpooled.wrappedBuffer(data), decoded);
} else {
assertFalse(decoded.isReadable());
}
decoded.release();
}
@Test
public void testZLIB_OR_NONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testZLIB_OR_NONE2() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testZLIB_OR_NONE3() throws Exception { public void testZLIB_OR_NONE3() throws Exception {
super.testZLIB_OR_NONE3(); assertThrows(DecompressionException.class, () -> testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE));
assertThrows(DecompressionException.class, () -> testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE));
assertThrows(DecompressionException.class, () -> testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE));
}
@Test
public void testMaxAllocation() throws Exception {
int maxAllocation = 1024;
ZlibDecoder decoder = createDecoder(ZlibWrapper.ZLIB, maxAllocation);
EmbeddedChannel chDecoder = new EmbeddedChannel(decoder);
TestByteBufAllocator alloc = new TestByteBufAllocator(chDecoder.alloc());
chDecoder.config().setAllocator(alloc);
try {
chDecoder.writeInbound(Unpooled.wrappedBuffer(deflate(BYTES_LARGE)));
fail("decompressed size > maxAllocation, so should have thrown exception");
} catch (DecompressionException e) {
assertTrue(e.getMessage().startsWith("Decompression buffer has reached maximum size"));
assertEquals(maxAllocation, alloc.getMaxAllocation());
assertTrue(decoder.isClosed());
assertFalse(chDecoder.finish());
}
} }
@Test @Test
@ -120,4 +596,50 @@ public class JdkZlibTest extends ZlibTest {
chDecoderGZip.close(); chDecoderGZip.close();
} }
} }
private static byte[] gzip(byte[] bytes) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream stream = new GZIPOutputStream(out);
stream.write(bytes);
stream.close();
return out.toByteArray();
}
private static byte[] deflate(byte[] bytes) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
OutputStream stream = new DeflaterOutputStream(out);
stream.write(bytes);
stream.close();
return out.toByteArray();
}
private static final class TestByteBufAllocator extends AbstractByteBufAllocator {
private final ByteBufAllocator wrapped;
private int maxAllocation;
TestByteBufAllocator(ByteBufAllocator wrapped) {
this.wrapped = wrapped;
}
public int getMaxAllocation() {
return maxAllocation;
}
@Override
public boolean isDirectBufferPooled() {
return wrapped.isDirectBufferPooled();
}
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
maxAllocation = Math.max(maxAllocation, maxCapacity);
return wrapped.heapBuffer(initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
maxAllocation = Math.max(maxAllocation, maxCapacity);
return wrapped.directBuffer(initialCapacity, maxCapacity);
}
}
} }

View File

@ -1,429 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import static org.junit.Assert.*;
public abstract class ZlibTest {
private static final byte[] BYTES_SMALL = new byte[128];
private static final byte[] BYTES_LARGE = new byte[1024 * 1024];
private static final byte[] BYTES_LARGE2 = ("<!--?xml version=\"1.0\" encoding=\"ISO-8859-1\"?-->\n" +
"<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Strict//EN\" " +
"\"https://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd\">\n" +
"<html xmlns=\"https://www.w3.org/1999/xhtml\" xml:lang=\"en\" lang=\"en\"><head>\n" +
" <title>Apache Tomcat</title>\n" +
"</head>\n" +
'\n' +
"<body>\n" +
"<h1>It works !</h1>\n" +
'\n' +
"<p>If you're seeing this page via a web browser, it means you've setup Tomcat successfully." +
" Congratulations!</p>\n" +
" \n" +
"<p>This is the default Tomcat home page." +
" It can be found on the local filesystem at: <code>/var/lib/tomcat7/webapps/ROOT/index.html</code></p>\n" +
'\n' +
"<p>Tomcat7 veterans might be pleased to learn that this system instance of Tomcat is installed with" +
" <code>CATALINA_HOME</code> in <code>/usr/share/tomcat7</code> and <code>CATALINA_BASE</code> in" +
" <code>/var/lib/tomcat7</code>, following the rules from" +
" <code>/usr/share/doc/tomcat7-common/RUNNING.txt.gz</code>.</p>\n" +
'\n' +
"<p>You might consider installing the following packages, if you haven't already done so:</p>\n" +
'\n' +
"<p><b>tomcat7-docs</b>: This package installs a web application that allows to browse the Tomcat 7" +
" documentation locally. Once installed, you can access it by clicking <a href=\"docs/\">here</a>.</p>\n" +
'\n' +
"<p><b>tomcat7-examples</b>: This package installs a web application that allows to access the Tomcat" +
" 7 Servlet and JSP examples. Once installed, you can access it by clicking" +
" <a href=\"examples/\">here</a>.</p>\n" +
'\n' +
"<p><b>tomcat7-admin</b>: This package installs two web applications that can help managing this Tomcat" +
" instance. Once installed, you can access the <a href=\"manager/html\">manager webapp</a> and" +
" the <a href=\"host-manager/html\">host-manager webapp</a>.</p><p>\n" +
'\n' +
"</p><p>NOTE: For security reasons, using the manager webapp is restricted" +
" to users with role \"manager\"." +
" The host-manager webapp is restricted to users with role \"admin\". Users are " +
"defined in <code>/etc/tomcat7/tomcat-users.xml</code>.</p>\n" +
'\n' +
'\n' +
'\n' +
"</body></html>").getBytes(CharsetUtil.UTF_8);
static {
Random rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
protected ZlibDecoder createDecoder(ZlibWrapper wrapper) {
return createDecoder(wrapper, 0);
}
protected abstract ZlibEncoder createEncoder(ZlibWrapper wrapper);
protected abstract ZlibDecoder createDecoder(ZlibWrapper wrapper, int maxAllocation);
@Test
public void testGZIP2() throws Exception {
byte[] bytes = "message".getBytes(CharsetUtil.UTF_8);
ByteBuf data = Unpooled.wrappedBuffer(bytes);
ByteBuf deflatedData = Unpooled.wrappedBuffer(gzip(bytes));
EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP));
try {
while (deflatedData.isReadable()) {
chDecoderGZip.writeInbound(deflatedData.readRetainedSlice(1));
}
deflatedData.release();
assertTrue(chDecoderGZip.finish());
ByteBuf buf = Unpooled.buffer();
for (;;) {
ByteBuf b = chDecoderGZip.readInbound();
if (b == null) {
break;
}
buf.writeBytes(b);
b.release();
}
assertEquals(buf, data);
assertNull(chDecoderGZip.readInbound());
data.release();
buf.release();
} finally {
dispose(chDecoderGZip);
}
}
private void testCompress0(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper, ByteBuf data) throws Exception {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper));
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
try {
chEncoder.writeOutbound(data.retain());
chEncoder.flush();
data.readerIndex(0);
for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoderZlib.writeInbound(deflatedData);
}
byte[] decompressed = new byte[data.readableBytes()];
int offset = 0;
for (;;) {
ByteBuf buf = chDecoderZlib.readInbound();
if (buf == null) {
break;
}
int length = buf.readableBytes();
buf.readBytes(decompressed, offset, length);
offset += length;
buf.release();
if (offset == decompressed.length) {
break;
}
}
assertEquals(data, Unpooled.wrappedBuffer(decompressed));
assertNull(chDecoderZlib.readInbound());
// Closing an encoder channel will generate a footer.
assertTrue(chEncoder.finish());
for (;;) {
Object msg = chEncoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
// But, the footer will be decoded into nothing. It's only for validation.
assertFalse(chDecoderZlib.finish());
data.release();
} finally {
dispose(chEncoder);
dispose(chDecoderZlib);
}
}
private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper));
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
try {
// Closing an encoder channel without writing anything should generate both header and footer.
assertTrue(chEncoder.finish());
for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoderZlib.writeInbound(deflatedData);
}
// Decoder should not generate anything at all.
boolean decoded = false;
for (;;) {
ByteBuf buf = chDecoderZlib.readInbound();
if (buf == null) {
break;
}
buf.release();
decoded = true;
}
assertFalse("should decode nothing", decoded);
assertFalse(chDecoderZlib.finish());
} finally {
dispose(chEncoder);
dispose(chDecoderZlib);
}
}
private static void dispose(EmbeddedChannel ch) {
if (ch.finish()) {
for (;;) {
Object msg = ch.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
for (;;) {
Object msg = ch.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
}
}
// Test for https://github.com/netty/netty/issues/2572
private void testDecompressOnly(ZlibWrapper decoderWrapper, byte[] compressed, byte[] data) throws Exception {
EmbeddedChannel chDecoder = new EmbeddedChannel(createDecoder(decoderWrapper));
chDecoder.writeInbound(Unpooled.copiedBuffer(compressed));
assertTrue(chDecoder.finish());
ByteBuf decoded = Unpooled.buffer(data.length);
for (;;) {
ByteBuf buf = chDecoder.readInbound();
if (buf == null) {
break;
}
decoded.writeBytes(buf);
buf.release();
}
assertEquals(Unpooled.copiedBuffer(data), decoded);
decoded.release();
}
private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_SMALL));
testCompress0(encoderWrapper, decoderWrapper,
Unpooled.directBuffer(BYTES_SMALL.length).writeBytes(BYTES_SMALL));
}
private void testCompressLarge(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_LARGE));
testCompress0(encoderWrapper, decoderWrapper,
Unpooled.directBuffer(BYTES_LARGE.length).writeBytes(BYTES_LARGE));
}
@Test
public void testZLIB() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testDecompressOnly(ZlibWrapper.ZLIB, deflate(BYTES_LARGE2), BYTES_LARGE2);
}
@Test
public void testNONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE);
}
@Test
public void testGZIP() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testDecompressOnly(ZlibWrapper.GZIP, gzip(BYTES_LARGE2), BYTES_LARGE2);
}
@Test
public void testGZIPCompressOnly() throws Exception {
testGZIPCompressOnly0(null); // Do not write anything; just finish the stream.
testGZIPCompressOnly0(EmptyArrays.EMPTY_BYTES); // Write an empty array.
testGZIPCompressOnly0(BYTES_SMALL);
testGZIPCompressOnly0(BYTES_LARGE);
}
private void testGZIPCompressOnly0(byte[] data) throws IOException {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(ZlibWrapper.GZIP));
if (data != null) {
chEncoder.writeOutbound(Unpooled.wrappedBuffer(data));
}
assertTrue(chEncoder.finish());
ByteBuf encoded = Unpooled.buffer();
for (;;) {
ByteBuf buf = chEncoder.readOutbound();
if (buf == null) {
break;
}
encoded.writeBytes(buf);
buf.release();
}
ByteBuf decoded = Unpooled.buffer();
GZIPInputStream stream = new GZIPInputStream(new ByteBufInputStream(encoded, true));
try {
byte[] buf = new byte[8192];
for (;;) {
int readBytes = stream.read(buf);
if (readBytes < 0) {
break;
}
decoded.writeBytes(buf, 0, readBytes);
}
} finally {
stream.close();
}
if (data != null) {
assertEquals(Unpooled.wrappedBuffer(data), decoded);
} else {
assertFalse(decoded.isReadable());
}
decoded.release();
}
@Test
public void testZLIB_OR_NONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testZLIB_OR_NONE2() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testZLIB_OR_NONE3() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testMaxAllocation() throws Exception {
int maxAllocation = 1024;
ZlibDecoder decoder = createDecoder(ZlibWrapper.ZLIB, maxAllocation);
EmbeddedChannel chDecoder = new EmbeddedChannel(decoder);
TestByteBufAllocator alloc = new TestByteBufAllocator(chDecoder.alloc());
chDecoder.config().setAllocator(alloc);
try {
chDecoder.writeInbound(Unpooled.wrappedBuffer(deflate(BYTES_LARGE)));
fail("decompressed size > maxAllocation, so should have thrown exception");
} catch (DecompressionException e) {
assertTrue(e.getMessage().startsWith("Decompression buffer has reached maximum size"));
assertEquals(maxAllocation, alloc.getMaxAllocation());
assertTrue(decoder.isClosed());
assertFalse(chDecoder.finish());
}
}
private static byte[] gzip(byte[] bytes) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream stream = new GZIPOutputStream(out);
stream.write(bytes);
stream.close();
return out.toByteArray();
}
private static byte[] deflate(byte[] bytes) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
OutputStream stream = new DeflaterOutputStream(out);
stream.write(bytes);
stream.close();
return out.toByteArray();
}
private static final class TestByteBufAllocator extends AbstractByteBufAllocator {
private ByteBufAllocator wrapped;
private int maxAllocation;
TestByteBufAllocator(ByteBufAllocator wrapped) {
this.wrapped = wrapped;
}
public int getMaxAllocation() {
return maxAllocation;
}
@Override
public boolean isDirectBufferPooled() {
return wrapped.isDirectBufferPooled();
}
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
maxAllocation = Math.max(maxAllocation, maxCapacity);
return wrapped.heapBuffer(initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
maxAllocation = Math.max(maxAllocation, maxCapacity);
return wrapped.directBuffer(initialCapacity, maxCapacity);
}
}
}

11
pom.xml
View File

@ -604,6 +604,12 @@
<version>${junit.version}</version> <version>${junit.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.junit.vintage</groupId> <groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId> <artifactId>junit-vintage-engine</artifactId>
@ -729,6 +735,11 @@
<artifactId>junit-jupiter-engine</artifactId> <artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.junit.vintage</groupId> <groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId> <artifactId>junit-vintage-engine</artifactId>