From e1830ccf47afbe9dd1358e5bb3e8b4755dbc6082 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 5 Mar 2021 13:29:20 +0100 Subject: [PATCH] Add support for direct ByteBufs in JdkZlibEncoder and JdkZlibDecoder (#11057) Motivation: The JDK deflate implementation added support for operating on ByteBuffers in Java 11 or so. This means that we don't need to restrict that implementation to ByteBufs that are heap based and can expose arrays. Modification: Add clauses to JdkZlibEncoder and JdkZlibDecoder for handling ByteBufs that don't have arrays, but do have one nioByteBuffer. Expand the test coverage in JdkZlibTest to include all relevant combinations of buffer types and data types. Result: The JdkZlibEncoder and JdkZlibDecoder should now work on basically all non-composite ByteBufs, and likely also composite ByteBufs that have exactly one component. --- .../codec/compression/JdkZlibDecoder.java | 18 +- .../codec/compression/JdkZlibEncoder.java | 40 +- .../codec/compression/ZlibDecoder.java | 5 +- .../codec/compression/ZlibEncoder.java | 4 + .../codec/compression/JdkZlibTest.java | 544 +++++++++++++++++- .../handler/codec/compression/ZlibTest.java | 429 -------------- pom.xml | 11 + 7 files changed, 596 insertions(+), 455 deletions(-) delete mode 100644 codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java index 3c714139ae..229bd284dd 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; +import java.nio.ByteBuffer; import java.util.zip.CRC32; import java.util.zip.DataFormatException; import java.util.zip.Deflater; @@ -233,15 +234,24 @@ public class JdkZlibDecoder extends ZlibDecoder { try { boolean readFooter = false; while (!inflater.needsInput()) { - byte[] outArray = decompressed.array(); int writerIndex = decompressed.writerIndex(); - int outIndex = decompressed.arrayOffset() + writerIndex; int writable = decompressed.writableBytes(); - int outputLength = inflater.inflate(outArray, outIndex, writable); + int outputLength; + if (decompressed.hasArray()) { + byte[] outArray = decompressed.array(); + int outIndex = decompressed.arrayOffset() + writerIndex; + outputLength = inflater.inflate(outArray, outIndex, writable); + } else if (decompressed.nioBufferCount() == 1) { + ByteBuffer buffer = decompressed.internalNioBuffer(writerIndex, writable); + outputLength = inflater.inflate(buffer); + } else { + throw new IllegalStateException( + "Decompress buffer must have array or exactly 1 NIO buffer: " + decompressed); + } if (outputLength > 0) { decompressed.writerIndex(writerIndex + outputLength); if (crc != null) { - crc.update(outArray, outIndex, outputLength); + crc.update(decompressed, writerIndex, outputLength); } } else if (inflater.needsDictionary()) { if (dictionary == null) { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index 4ef50a0a36..c95a4d10e0 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromiseNotifier; import io.netty.util.concurrent.EventExecutor; +import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; import java.util.zip.Deflater; @@ -81,6 +82,10 @@ public class JdkZlibEncoder extends ZlibEncoder { this(wrapper, 6); } + public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { + this(wrapper, compressionLevel, false); + } + /** * Creates a new zlib encoder with the specified {@code compressionLevel} * and the specified wrapper. @@ -89,10 +94,12 @@ public class JdkZlibEncoder extends ZlibEncoder { * {@code 1} yields the fastest compression and {@code 9} yields the * best compression. {@code 0} means no compression. The default * compression level is {@code 6}. + * @param preferDirectBuffers {@code true} if a direct {@link ByteBuf} should be tried to be used as target for + * decompression, or {@code false} if heap allocated {@link ByteBuf}s should be used. * * @throws CompressionException if failed to initialize zlib */ - public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { + public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel, boolean preferDirectBuffers) { if (compressionLevel < 0 || compressionLevel > 9) { throw new IllegalArgumentException( "compressionLevel: " + compressionLevel + " (expected: 0-9)"); @@ -251,7 +258,7 @@ public class JdkZlibEncoder extends ZlibEncoder { // no op } } - return ctx.alloc().heapBuffer(sizeEstimate); + return ctx.alloc().buffer(sizeEstimate); } @Override @@ -308,13 +315,28 @@ public class JdkZlibEncoder extends ZlibEncoder { } private void deflate(ByteBuf out) { - int numBytes; - do { - int writerIndex = out.writerIndex(); - numBytes = deflater.deflate( - out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH); - out.writerIndex(writerIndex + numBytes); - } while (numBytes > 0); + if (out.hasArray()) { + int numBytes; + do { + int writerIndex = out.writerIndex(); + numBytes = deflater.deflate( + out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH); + out.writerIndex(writerIndex + numBytes); + } while (numBytes > 0); + } else if (out.nioBufferCount() == 1) { + // Use internalNioBuffer because nioBuffer is allowed to copy, + // which is fine for reading but not for writing. + int numBytes; + do { + int writerIndex = out.writerIndex(); + ByteBuffer buffer = out.internalNioBuffer(writerIndex, out.writableBytes()); + numBytes = deflater.deflate(buffer, Deflater.SYNC_FLUSH); + out.writerIndex(writerIndex + numBytes); + } while (numBytes > 0); + } else { + throw new IllegalArgumentException( + "Don't know how to deflate buffer without array or NIO buffer count of 1: " + out); + } } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java index 7a1edfe2da..0444b2e03a 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java @@ -29,6 +29,7 @@ public abstract class ZlibDecoder extends ByteToMessageDecoder { * Maximum allowed size of the decompression buffer. */ protected final int maxAllocation; + protected final boolean preferDirect = false; /** * Same as {@link #ZlibDecoder(int)} with maxAllocation = 0. @@ -63,10 +64,10 @@ public abstract class ZlibDecoder extends ByteToMessageDecoder { protected ByteBuf prepareDecompressBuffer(ChannelHandlerContext ctx, ByteBuf buffer, int preferredSize) { if (buffer == null) { if (maxAllocation == 0) { - return ctx.alloc().heapBuffer(preferredSize); + return ctx.alloc().buffer(preferredSize); } - return ctx.alloc().heapBuffer(Math.min(preferredSize, maxAllocation), maxAllocation); + return ctx.alloc().buffer(Math.min(preferredSize, maxAllocation), maxAllocation); } // this always expands the buffer if possible, even if the expansion is less than preferredSize diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java index b558eaa076..600be717cb 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java @@ -29,6 +29,10 @@ public abstract class ZlibEncoder extends MessageToByteEncoder { super(false); } + protected ZlibEncoder(boolean preferDirectBuffers) { + super(preferDirectBuffers); + } + /** * Returns {@code true} if and only if the end of the compressed stream * has been reached. diff --git a/codec/src/test/java/io/netty/handler/codec/compression/JdkZlibTest.java b/codec/src/test/java/io/netty/handler/codec/compression/JdkZlibTest.java index fcd9dbca6c..1c8e40d805 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/JdkZlibTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/JdkZlibTest.java @@ -15,37 +15,513 @@ */ package io.netty.handler.codec.compression; +import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.EmptyArrays; import org.apache.commons.compress.utils.IOUtils; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Arrays; import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; -import static org.junit.Assert.*; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +public class JdkZlibTest { + private static final byte[] BYTES_SMALL = new byte[128]; + private static final byte[] BYTES_LARGE = new byte[1024 * 1024]; + private static final byte[] BYTES_LARGE2 = ("\n" + + "\n" + + "\n" + + " Apache Tomcat\n" + + "\n" + + '\n' + + "\n" + + "

It works !

\n" + + '\n' + + "

If you're seeing this page via a web browser, it means you've setup Tomcat successfully." + + " Congratulations!

\n" + + " \n" + + "

This is the default Tomcat home page." + + " It can be found on the local filesystem at: /var/lib/tomcat7/webapps/ROOT/index.html

\n" + + '\n' + + "

Tomcat7 veterans might be pleased to learn that this system instance of Tomcat is installed with" + + " CATALINA_HOME in /usr/share/tomcat7 and CATALINA_BASE in" + + " /var/lib/tomcat7, following the rules from" + + " /usr/share/doc/tomcat7-common/RUNNING.txt.gz.

\n" + + '\n' + + "

You might consider installing the following packages, if you haven't already done so:

\n" + + '\n' + + "

tomcat7-docs: This package installs a web application that allows to browse the Tomcat 7" + + " documentation locally. Once installed, you can access it by clicking here.

\n" + + '\n' + + "

tomcat7-examples: This package installs a web application that allows to access the Tomcat" + + " 7 Servlet and JSP examples. Once installed, you can access it by clicking" + + " here.

\n" + + '\n' + + "

tomcat7-admin: This package installs two web applications that can help managing this Tomcat" + + " instance. Once installed, you can access the manager webapp and" + + " the host-manager webapp.

\n" + + '\n' + + "

NOTE: For security reasons, using the manager webapp is restricted" + + " to users with role \"manager\"." + + " The host-manager webapp is restricted to users with role \"admin\". Users are " + + "defined in /etc/tomcat7/tomcat-users.xml.

\n" + + '\n' + + '\n' + + '\n' + + "").getBytes(CharsetUtil.UTF_8); -public class JdkZlibTest extends ZlibTest { - - @Override - protected ZlibEncoder createEncoder(ZlibWrapper wrapper) { - return new JdkZlibEncoder(wrapper); + static { + Random rand = ThreadLocalRandom.current(); + rand.nextBytes(BYTES_SMALL); + rand.nextBytes(BYTES_LARGE); + } + + enum Data { + NONE(null), + SMALL(BYTES_SMALL), + LARGE(BYTES_LARGE); + + final byte[] bytes; + + Data(byte[] bytes) { + this.bytes = bytes; + } + } + + enum BufferType { + HEAP, + DIRECT; + + ByteBuf allocate(byte[] bytes) { + switch (this) { + case HEAP: return Unpooled.wrappedBuffer(bytes); + case DIRECT: return Unpooled.directBuffer(bytes.length).writeBytes(bytes); + } + return fail("Fall-through should not be possible: " + this); + } + } + + protected ZlibDecoder createDecoder(ZlibWrapper wrapper) { + return createDecoder(wrapper, 0); + } + + protected ZlibEncoder createEncoder(ZlibWrapper wrapper, BufferType bufferType) { + return new JdkZlibEncoder(wrapper, 6, bufferType == BufferType.DIRECT); } - @Override protected ZlibDecoder createDecoder(ZlibWrapper wrapper, int maxAllocation) { return new JdkZlibDecoder(wrapper, maxAllocation); } - @Test(expected = DecompressionException.class) - @Override + static Stream compressionConfigurations() { + Stream.Builder args = Stream.builder(); + Data[] dataVals = Data.values(); + BufferType[] bufferTypeVals = BufferType.values(); + ZlibWrapper[] zlibWrappers = ZlibWrapper.values(); + for (Data data : dataVals) { + for (BufferType inBuf : bufferTypeVals) { + for (BufferType outBuf : bufferTypeVals) { + for (ZlibWrapper inputWrapper : zlibWrappers) { + for (ZlibWrapper outputWrapper : zlibWrappers) { + args.add(Arguments.of(data, inBuf, outBuf, inputWrapper, outputWrapper)); + } + } + } + } + } + return args.build(); + } + + static Stream workingConfigurations() { + return compressionConfigurations().filter(JdkZlibTest::isWorkingConfiguration); + } + + private static boolean isWorkingConfiguration(Arguments args) { + Object[] objs = args.get(); + ZlibWrapper inWrap = (ZlibWrapper) objs[3]; + ZlibWrapper outWrap = (ZlibWrapper) objs[4]; + if (inWrap == ZlibWrapper.ZLIB_OR_NONE) { + return false; + } + if (inWrap == ZlibWrapper.GZIP || outWrap == ZlibWrapper.GZIP) { + return inWrap == outWrap; + } + if (inWrap == ZlibWrapper.NONE) { + return outWrap == ZlibWrapper.NONE || outWrap == ZlibWrapper.ZLIB_OR_NONE; + } + if (outWrap == ZlibWrapper.NONE) { + return inWrap == ZlibWrapper.NONE; + } + return true; + } + + @ParameterizedTest + @MethodSource("workingConfigurations") + void compressionInputOutput( + Data data, BufferType inBuf, BufferType outBuf, ZlibWrapper inWrap, ZlibWrapper outWrap) { + EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(inWrap, inBuf)); + EmbeddedChannel chDecoder = new EmbeddedChannel(createDecoder(outWrap)); + chEncoder.config().setAllocator(new UnpooledByteBufAllocator(inBuf == BufferType.DIRECT)); + chDecoder.config().setAllocator(new UnpooledByteBufAllocator(outBuf == BufferType.DIRECT)); + + try { + if (data != Data.NONE) { + chEncoder.writeOutbound(inBuf.allocate(data.bytes)); + chEncoder.flush(); + + for (;;) { + ByteBuf deflatedData = chEncoder.readOutbound(); + if (deflatedData == null) { + break; + } + chDecoder.writeInbound(deflatedData); + } + + byte[] decompressed = new byte[data.bytes.length]; + int offset = 0; + for (;;) { + ByteBuf buf = chDecoder.readInbound(); + if (buf == null) { + break; + } + int length = buf.readableBytes(); + buf.readBytes(decompressed, offset, length); + offset += length; + buf.release(); + if (offset == decompressed.length) { + break; + } + } + assertArrayEquals(data.bytes, decompressed); + assertNull(chDecoder.readInbound()); + } + + // Closing an encoder channel will generate a footer. + assertTrue(chEncoder.finish()); + for (;;) { + Object msg = chEncoder.readOutbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + // But, the footer will be decoded into nothing. It's only for validation. + assertFalse(chDecoder.finish()); + } finally { + dispose(chEncoder); + dispose(chDecoder); + } + } + + @Test + public void testGZIP2() throws Exception { + byte[] bytes = "message".getBytes(CharsetUtil.UTF_8); + ByteBuf data = Unpooled.wrappedBuffer(bytes); + ByteBuf deflatedData = Unpooled.wrappedBuffer(gzip(bytes)); + + EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP)); + try { + while (deflatedData.isReadable()) { + chDecoderGZip.writeInbound(deflatedData.readRetainedSlice(1)); + } + deflatedData.release(); + assertTrue(chDecoderGZip.finish()); + ByteBuf buf = Unpooled.buffer(); + for (;;) { + ByteBuf b = chDecoderGZip.readInbound(); + if (b == null) { + break; + } + buf.writeBytes(b); + b.release(); + } + assertEquals(buf, data); + assertNull(chDecoderGZip.readInbound()); + data.release(); + buf.release(); + } finally { + dispose(chDecoderGZip); + } + } + + private void testCompress0(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper, ByteBuf data) throws Exception { + EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper, BufferType.HEAP)); + EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); + + try { + chEncoder.writeOutbound(data.retain()); + chEncoder.flush(); + data.readerIndex(0); + + for (;;) { + ByteBuf deflatedData = chEncoder.readOutbound(); + if (deflatedData == null) { + break; + } + chDecoderZlib.writeInbound(deflatedData); + } + + byte[] decompressed = new byte[data.readableBytes()]; + int offset = 0; + for (;;) { + ByteBuf buf = chDecoderZlib.readInbound(); + if (buf == null) { + break; + } + int length = buf.readableBytes(); + buf.readBytes(decompressed, offset, length); + offset += length; + buf.release(); + if (offset == decompressed.length) { + break; + } + } + assertEquals(data, Unpooled.wrappedBuffer(decompressed)); + assertNull(chDecoderZlib.readInbound()); + + // Closing an encoder channel will generate a footer. + assertTrue(chEncoder.finish()); + for (;;) { + Object msg = chEncoder.readOutbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + // But, the footer will be decoded into nothing. It's only for validation. + assertFalse(chDecoderZlib.finish()); + + data.release(); + } finally { + dispose(chEncoder); + dispose(chDecoderZlib); + } + } + + private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { + EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper, BufferType.HEAP)); + EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); + + try { + // Closing an encoder channel without writing anything should generate both header and footer. + assertTrue(chEncoder.finish()); + + for (;;) { + ByteBuf deflatedData = chEncoder.readOutbound(); + if (deflatedData == null) { + break; + } + chDecoderZlib.writeInbound(deflatedData); + } + + // Decoder should not generate anything at all. + boolean decoded = false; + for (;;) { + ByteBuf buf = chDecoderZlib.readInbound(); + if (buf == null) { + break; + } + + buf.release(); + decoded = true; + } + assertFalse(decoded, "should decode nothing"); + + assertFalse(chDecoderZlib.finish()); + } finally { + dispose(chEncoder); + dispose(chDecoderZlib); + } + } + + private static void dispose(EmbeddedChannel ch) { + if (ch.finish()) { + for (;;) { + Object msg = ch.readInbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + for (;;) { + Object msg = ch.readOutbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + } + } + + // Test for https://github.com/netty/netty/issues/2572 + private void testDecompressOnly(ZlibWrapper decoderWrapper, byte[] compressed, byte[] data) throws Exception { + EmbeddedChannel chDecoder = new EmbeddedChannel(createDecoder(decoderWrapper)); + chDecoder.writeInbound(Unpooled.copiedBuffer(compressed)); + assertTrue(chDecoder.finish()); + + ByteBuf decoded = Unpooled.buffer(data.length); + + for (;;) { + ByteBuf buf = chDecoder.readInbound(); + if (buf == null) { + break; + } + decoded.writeBytes(buf); + buf.release(); + } + assertEquals(Unpooled.copiedBuffer(data), decoded); + decoded.release(); + } + + private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { + testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_SMALL)); + testCompress0(encoderWrapper, decoderWrapper, + Unpooled.directBuffer(BYTES_SMALL.length).writeBytes(BYTES_SMALL)); + } + + private void testCompressLarge(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { + testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_LARGE)); + testCompress0(encoderWrapper, decoderWrapper, + Unpooled.directBuffer(BYTES_LARGE.length).writeBytes(BYTES_LARGE)); + } + + @Test + public void testZLIB() throws Exception { + testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); + testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); + testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); + testDecompressOnly(ZlibWrapper.ZLIB, deflate(BYTES_LARGE2), BYTES_LARGE2); + } + + @Test + public void testNONE() throws Exception { + testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE); + testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE); + testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE); + } + + @Test + public void testGZIP() throws Exception { + testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP); + testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP); + testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP); + testDecompressOnly(ZlibWrapper.GZIP, gzip(BYTES_LARGE2), BYTES_LARGE2); + } + + @Test + public void testGZIPCompressOnly() throws Exception { + testGZIPCompressOnly0(null); // Do not write anything; just finish the stream. + testGZIPCompressOnly0(EmptyArrays.EMPTY_BYTES); // Write an empty array. + testGZIPCompressOnly0(BYTES_SMALL); + testGZIPCompressOnly0(BYTES_LARGE); + } + + private void testGZIPCompressOnly0(byte[] data) throws IOException { + EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(ZlibWrapper.GZIP, BufferType.HEAP)); + if (data != null) { + chEncoder.writeOutbound(Unpooled.wrappedBuffer(data)); + } + assertTrue(chEncoder.finish()); + + ByteBuf encoded = Unpooled.buffer(); + for (;;) { + ByteBuf buf = chEncoder.readOutbound(); + if (buf == null) { + break; + } + encoded.writeBytes(buf); + buf.release(); + } + + ByteBuf decoded = Unpooled.buffer(); + GZIPInputStream stream = new GZIPInputStream(new ByteBufInputStream(encoded, true)); + try { + byte[] buf = new byte[8192]; + for (;;) { + int readBytes = stream.read(buf); + if (readBytes < 0) { + break; + } + decoded.writeBytes(buf, 0, readBytes); + } + } finally { + stream.close(); + } + + if (data != null) { + assertEquals(Unpooled.wrappedBuffer(data), decoded); + } else { + assertFalse(decoded.isReadable()); + } + + decoded.release(); + } + + @Test + public void testZLIB_OR_NONE() throws Exception { + testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); + testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); + testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); + } + + @Test + public void testZLIB_OR_NONE2() throws Exception { + testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); + testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); + testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); + } + + @Test public void testZLIB_OR_NONE3() throws Exception { - super.testZLIB_OR_NONE3(); + assertThrows(DecompressionException.class, () -> testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE)); + assertThrows(DecompressionException.class, () -> testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE)); + assertThrows(DecompressionException.class, () -> testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE)); + } + + @Test + public void testMaxAllocation() throws Exception { + int maxAllocation = 1024; + ZlibDecoder decoder = createDecoder(ZlibWrapper.ZLIB, maxAllocation); + EmbeddedChannel chDecoder = new EmbeddedChannel(decoder); + TestByteBufAllocator alloc = new TestByteBufAllocator(chDecoder.alloc()); + chDecoder.config().setAllocator(alloc); + + try { + chDecoder.writeInbound(Unpooled.wrappedBuffer(deflate(BYTES_LARGE))); + fail("decompressed size > maxAllocation, so should have thrown exception"); + } catch (DecompressionException e) { + assertTrue(e.getMessage().startsWith("Decompression buffer has reached maximum size")); + assertEquals(maxAllocation, alloc.getMaxAllocation()); + assertTrue(decoder.isClosed()); + assertFalse(chDecoder.finish()); + } } @Test @@ -120,4 +596,50 @@ public class JdkZlibTest extends ZlibTest { chDecoderGZip.close(); } } + + private static byte[] gzip(byte[] bytes) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GZIPOutputStream stream = new GZIPOutputStream(out); + stream.write(bytes); + stream.close(); + return out.toByteArray(); + } + + private static byte[] deflate(byte[] bytes) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + OutputStream stream = new DeflaterOutputStream(out); + stream.write(bytes); + stream.close(); + return out.toByteArray(); + } + + private static final class TestByteBufAllocator extends AbstractByteBufAllocator { + private final ByteBufAllocator wrapped; + private int maxAllocation; + + TestByteBufAllocator(ByteBufAllocator wrapped) { + this.wrapped = wrapped; + } + + public int getMaxAllocation() { + return maxAllocation; + } + + @Override + public boolean isDirectBufferPooled() { + return wrapped.isDirectBufferPooled(); + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + maxAllocation = Math.max(maxAllocation, maxCapacity); + return wrapped.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + maxAllocation = Math.max(maxAllocation, maxCapacity); + return wrapped.directBuffer(initialCapacity, maxCapacity); + } + } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java b/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java deleted file mode 100644 index dbd091488a..0000000000 --- a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.compression; - -import io.netty.buffer.AbstractByteBufAllocator; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.Unpooled; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.CharsetUtil; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.EmptyArrays; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; -import java.util.zip.DeflaterOutputStream; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import static org.junit.Assert.*; - -public abstract class ZlibTest { - - private static final byte[] BYTES_SMALL = new byte[128]; - private static final byte[] BYTES_LARGE = new byte[1024 * 1024]; - private static final byte[] BYTES_LARGE2 = ("\n" + - "\n" + - "\n" + - " Apache Tomcat\n" + - "\n" + - '\n' + - "\n" + - "

It works !

\n" + - '\n' + - "

If you're seeing this page via a web browser, it means you've setup Tomcat successfully." + - " Congratulations!

\n" + - " \n" + - "

This is the default Tomcat home page." + - " It can be found on the local filesystem at: /var/lib/tomcat7/webapps/ROOT/index.html

\n" + - '\n' + - "

Tomcat7 veterans might be pleased to learn that this system instance of Tomcat is installed with" + - " CATALINA_HOME in /usr/share/tomcat7 and CATALINA_BASE in" + - " /var/lib/tomcat7, following the rules from" + - " /usr/share/doc/tomcat7-common/RUNNING.txt.gz.

\n" + - '\n' + - "

You might consider installing the following packages, if you haven't already done so:

\n" + - '\n' + - "

tomcat7-docs: This package installs a web application that allows to browse the Tomcat 7" + - " documentation locally. Once installed, you can access it by clicking here.

\n" + - '\n' + - "

tomcat7-examples: This package installs a web application that allows to access the Tomcat" + - " 7 Servlet and JSP examples. Once installed, you can access it by clicking" + - " here.

\n" + - '\n' + - "

tomcat7-admin: This package installs two web applications that can help managing this Tomcat" + - " instance. Once installed, you can access the manager webapp and" + - " the host-manager webapp.

\n" + - '\n' + - "

NOTE: For security reasons, using the manager webapp is restricted" + - " to users with role \"manager\"." + - " The host-manager webapp is restricted to users with role \"admin\". Users are " + - "defined in /etc/tomcat7/tomcat-users.xml.

\n" + - '\n' + - '\n' + - '\n' + - "").getBytes(CharsetUtil.UTF_8); - - static { - Random rand = ThreadLocalRandom.current(); - rand.nextBytes(BYTES_SMALL); - rand.nextBytes(BYTES_LARGE); - } - - protected ZlibDecoder createDecoder(ZlibWrapper wrapper) { - return createDecoder(wrapper, 0); - } - - protected abstract ZlibEncoder createEncoder(ZlibWrapper wrapper); - protected abstract ZlibDecoder createDecoder(ZlibWrapper wrapper, int maxAllocation); - - @Test - public void testGZIP2() throws Exception { - byte[] bytes = "message".getBytes(CharsetUtil.UTF_8); - ByteBuf data = Unpooled.wrappedBuffer(bytes); - ByteBuf deflatedData = Unpooled.wrappedBuffer(gzip(bytes)); - - EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP)); - try { - while (deflatedData.isReadable()) { - chDecoderGZip.writeInbound(deflatedData.readRetainedSlice(1)); - } - deflatedData.release(); - assertTrue(chDecoderGZip.finish()); - ByteBuf buf = Unpooled.buffer(); - for (;;) { - ByteBuf b = chDecoderGZip.readInbound(); - if (b == null) { - break; - } - buf.writeBytes(b); - b.release(); - } - assertEquals(buf, data); - assertNull(chDecoderGZip.readInbound()); - data.release(); - buf.release(); - } finally { - dispose(chDecoderGZip); - } - } - - private void testCompress0(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper, ByteBuf data) throws Exception { - EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper)); - EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); - - try { - chEncoder.writeOutbound(data.retain()); - chEncoder.flush(); - data.readerIndex(0); - - for (;;) { - ByteBuf deflatedData = chEncoder.readOutbound(); - if (deflatedData == null) { - break; - } - chDecoderZlib.writeInbound(deflatedData); - } - - byte[] decompressed = new byte[data.readableBytes()]; - int offset = 0; - for (;;) { - ByteBuf buf = chDecoderZlib.readInbound(); - if (buf == null) { - break; - } - int length = buf.readableBytes(); - buf.readBytes(decompressed, offset, length); - offset += length; - buf.release(); - if (offset == decompressed.length) { - break; - } - } - assertEquals(data, Unpooled.wrappedBuffer(decompressed)); - assertNull(chDecoderZlib.readInbound()); - - // Closing an encoder channel will generate a footer. - assertTrue(chEncoder.finish()); - for (;;) { - Object msg = chEncoder.readOutbound(); - if (msg == null) { - break; - } - ReferenceCountUtil.release(msg); - } - // But, the footer will be decoded into nothing. It's only for validation. - assertFalse(chDecoderZlib.finish()); - - data.release(); - } finally { - dispose(chEncoder); - dispose(chDecoderZlib); - } - } - - private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { - EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper)); - EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); - - try { - // Closing an encoder channel without writing anything should generate both header and footer. - assertTrue(chEncoder.finish()); - - for (;;) { - ByteBuf deflatedData = chEncoder.readOutbound(); - if (deflatedData == null) { - break; - } - chDecoderZlib.writeInbound(deflatedData); - } - - // Decoder should not generate anything at all. - boolean decoded = false; - for (;;) { - ByteBuf buf = chDecoderZlib.readInbound(); - if (buf == null) { - break; - } - - buf.release(); - decoded = true; - } - assertFalse("should decode nothing", decoded); - - assertFalse(chDecoderZlib.finish()); - } finally { - dispose(chEncoder); - dispose(chDecoderZlib); - } - } - - private static void dispose(EmbeddedChannel ch) { - if (ch.finish()) { - for (;;) { - Object msg = ch.readInbound(); - if (msg == null) { - break; - } - ReferenceCountUtil.release(msg); - } - for (;;) { - Object msg = ch.readOutbound(); - if (msg == null) { - break; - } - ReferenceCountUtil.release(msg); - } - } - } - - // Test for https://github.com/netty/netty/issues/2572 - private void testDecompressOnly(ZlibWrapper decoderWrapper, byte[] compressed, byte[] data) throws Exception { - EmbeddedChannel chDecoder = new EmbeddedChannel(createDecoder(decoderWrapper)); - chDecoder.writeInbound(Unpooled.copiedBuffer(compressed)); - assertTrue(chDecoder.finish()); - - ByteBuf decoded = Unpooled.buffer(data.length); - - for (;;) { - ByteBuf buf = chDecoder.readInbound(); - if (buf == null) { - break; - } - decoded.writeBytes(buf); - buf.release(); - } - assertEquals(Unpooled.copiedBuffer(data), decoded); - decoded.release(); - } - - private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { - testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_SMALL)); - testCompress0(encoderWrapper, decoderWrapper, - Unpooled.directBuffer(BYTES_SMALL.length).writeBytes(BYTES_SMALL)); - } - - private void testCompressLarge(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { - testCompress0(encoderWrapper, decoderWrapper, Unpooled.wrappedBuffer(BYTES_LARGE)); - testCompress0(encoderWrapper, decoderWrapper, - Unpooled.directBuffer(BYTES_LARGE.length).writeBytes(BYTES_LARGE)); - } - - @Test - public void testZLIB() throws Exception { - testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); - testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); - testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); - testDecompressOnly(ZlibWrapper.ZLIB, deflate(BYTES_LARGE2), BYTES_LARGE2); - } - - @Test - public void testNONE() throws Exception { - testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE); - testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE); - testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE); - } - - @Test - public void testGZIP() throws Exception { - testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP); - testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP); - testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP); - testDecompressOnly(ZlibWrapper.GZIP, gzip(BYTES_LARGE2), BYTES_LARGE2); - } - - @Test - public void testGZIPCompressOnly() throws Exception { - testGZIPCompressOnly0(null); // Do not write anything; just finish the stream. - testGZIPCompressOnly0(EmptyArrays.EMPTY_BYTES); // Write an empty array. - testGZIPCompressOnly0(BYTES_SMALL); - testGZIPCompressOnly0(BYTES_LARGE); - } - - private void testGZIPCompressOnly0(byte[] data) throws IOException { - EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(ZlibWrapper.GZIP)); - if (data != null) { - chEncoder.writeOutbound(Unpooled.wrappedBuffer(data)); - } - assertTrue(chEncoder.finish()); - - ByteBuf encoded = Unpooled.buffer(); - for (;;) { - ByteBuf buf = chEncoder.readOutbound(); - if (buf == null) { - break; - } - encoded.writeBytes(buf); - buf.release(); - } - - ByteBuf decoded = Unpooled.buffer(); - GZIPInputStream stream = new GZIPInputStream(new ByteBufInputStream(encoded, true)); - try { - byte[] buf = new byte[8192]; - for (;;) { - int readBytes = stream.read(buf); - if (readBytes < 0) { - break; - } - decoded.writeBytes(buf, 0, readBytes); - } - } finally { - stream.close(); - } - - if (data != null) { - assertEquals(Unpooled.wrappedBuffer(data), decoded); - } else { - assertFalse(decoded.isReadable()); - } - - decoded.release(); - } - - @Test - public void testZLIB_OR_NONE() throws Exception { - testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); - testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); - testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); - } - - @Test - public void testZLIB_OR_NONE2() throws Exception { - testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); - testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); - testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); - } - - @Test - public void testZLIB_OR_NONE3() throws Exception { - testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); - testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); - testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); - } - - @Test - public void testMaxAllocation() throws Exception { - int maxAllocation = 1024; - ZlibDecoder decoder = createDecoder(ZlibWrapper.ZLIB, maxAllocation); - EmbeddedChannel chDecoder = new EmbeddedChannel(decoder); - TestByteBufAllocator alloc = new TestByteBufAllocator(chDecoder.alloc()); - chDecoder.config().setAllocator(alloc); - - try { - chDecoder.writeInbound(Unpooled.wrappedBuffer(deflate(BYTES_LARGE))); - fail("decompressed size > maxAllocation, so should have thrown exception"); - } catch (DecompressionException e) { - assertTrue(e.getMessage().startsWith("Decompression buffer has reached maximum size")); - assertEquals(maxAllocation, alloc.getMaxAllocation()); - assertTrue(decoder.isClosed()); - assertFalse(chDecoder.finish()); - } - } - - private static byte[] gzip(byte[] bytes) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - GZIPOutputStream stream = new GZIPOutputStream(out); - stream.write(bytes); - stream.close(); - return out.toByteArray(); - } - - private static byte[] deflate(byte[] bytes) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - OutputStream stream = new DeflaterOutputStream(out); - stream.write(bytes); - stream.close(); - return out.toByteArray(); - } - - private static final class TestByteBufAllocator extends AbstractByteBufAllocator { - private ByteBufAllocator wrapped; - private int maxAllocation; - - TestByteBufAllocator(ByteBufAllocator wrapped) { - this.wrapped = wrapped; - } - - public int getMaxAllocation() { - return maxAllocation; - } - - @Override - public boolean isDirectBufferPooled() { - return wrapped.isDirectBufferPooled(); - } - - @Override - protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { - maxAllocation = Math.max(maxAllocation, maxCapacity); - return wrapped.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { - maxAllocation = Math.max(maxAllocation, maxCapacity); - return wrapped.directBuffer(initialCapacity, maxCapacity); - } - } -} diff --git a/pom.xml b/pom.xml index ce859d7838..4133e2b10b 100644 --- a/pom.xml +++ b/pom.xml @@ -604,6 +604,12 @@ ${junit.version} test + + org.junit.jupiter + junit-jupiter-params + ${junit.version} + test + org.junit.vintage junit-vintage-engine @@ -729,6 +735,11 @@ junit-jupiter-engine test + + org.junit.jupiter + junit-jupiter-params + test + org.junit.vintage junit-vintage-engine