From 3f7b674db8c9c2fc562b95327e089bb3603c9009 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 29 Nov 2013 18:08:34 +0900 Subject: [PATCH] Fix bugs in ZLIB codec where they produce malformed stream or their streams are not flushed on time - Fixes #2014 - Add the tests that mix JDK ZLIB codec and JZlib codecs - Fix a bug where JdkZlibEncoder does not encode the GZIP header when nothing was written to te channel - Fix a bug where the encoders do not consider the overhead of the wrapper format when calculating the estimated compressed output size. - Fix a bug where the decoders do not discard the received data after the compressed stream is finished --- .../codec/compression/JZlibDecoder.java | 10 ++-- .../codec/compression/JZlibEncoder.java | 7 ++- .../codec/compression/JdkZlibDecoder.java | 14 +++-- .../codec/compression/JdkZlibEncoder.java | 53 +++++++++++++----- .../handler/codec/compression/ZlibUtil.java | 19 +++++++ .../codec/compression/ZlibCrossTest1.java | 29 ++++++++++ .../codec/compression/ZlibCrossTest2.java | 49 +++++++++++++++++ .../handler/codec/compression/ZlibTest.java | 54 +++++++++++++++---- 8 files changed, 204 insertions(+), 31 deletions(-) create mode 100644 codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest1.java create mode 100644 codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest2.java diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java index aaf679cc24..924cbda887 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java @@ -15,14 +15,13 @@ */ package io.netty.handler.codec.compression; +import com.jcraft.jzlib.Inflater; +import com.jcraft.jzlib.JZlib; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import java.util.List; -import com.jcraft.jzlib.Inflater; -import com.jcraft.jzlib.JZlib; - public class JZlibDecoder extends ZlibDecoder { private final Inflater z = new Inflater(); @@ -85,6 +84,11 @@ public class JZlibDecoder extends ZlibDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (finished) { + // Skip data received after finished. + in.skipBytes(in.readableBytes()); + return; + } if (!in.isReadable()) { return; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index 8decd9b7d9..dd5a2cb2dc 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; */ public class JZlibEncoder extends ZlibEncoder { + private final int wrapperOverhead; private final Deflater z = new Deflater(); private volatile boolean finished; private volatile ChannelHandlerContext ctx; @@ -145,6 +146,8 @@ public class JZlibEncoder extends ZlibEncoder { if (resultCode != JZlib.Z_OK) { ZlibUtil.fail(z, "initialization failure", resultCode); } + + wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper); } /** @@ -233,6 +236,8 @@ public class JZlibEncoder extends ZlibEncoder { ZlibUtil.fail(z, "failed to set the dictionary", resultCode); } } + + wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB); } @Override @@ -295,7 +300,7 @@ public class JZlibEncoder extends ZlibEncoder { int oldNextInIndex = z.next_in_index; // Configure output. - int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12; + int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead; out.ensureWritable(maxOutputLength); z.avail_out = maxOutputLength; z.next_out = out.array(); diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java index 7549444679..b346b3f952 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibDecoder.java @@ -114,7 +114,13 @@ public class JdkZlibDecoder extends ZlibDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - if (!in.isReadable() && finished) { + if (finished) { + // Skip data received after finished. + in.skipBytes(in.readableBytes()); + return; + } + + if (!in.isReadable()) { return; } @@ -330,12 +336,12 @@ public class JdkZlibDecoder extends ZlibDecoder { // read ISIZE and verify int dataLength = 0; for (int i = 0; i < 4; ++i) { - dataLength |= buf.readUnsignedByte() << (i * 8); + dataLength |= buf.readUnsignedByte() << i * 8; } int readLength = inflater.getTotalOut(); if (dataLength != readLength) { throw new CompressionException( - "Number of bytes missmatch. Expected: " + dataLength + ", Got: " + readLength); + "Number of bytes mismatch. Expected: " + dataLength + ", Got: " + readLength); } return true; } @@ -343,7 +349,7 @@ public class JdkZlibDecoder extends ZlibDecoder { private void verifyCrc(ByteBuf in) { long crcValue = 0; for (int i = 0; i < 4; ++i) { - crcValue |= (long) in.readUnsignedByte() << (i * 8); + crcValue |= (long) in.readUnsignedByte() << i * 8; } long readCrc = crc.getValue(); if (crcValue != readCrc) { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index d0fa3f0b0c..e93354457a 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -33,6 +33,7 @@ import java.util.zip.Deflater; */ public class JdkZlibEncoder extends ZlibEncoder { + private final ZlibWrapper wrapper; private final byte[] encodeBuf = new byte[8192]; private final Deflater deflater; private volatile boolean finished; @@ -41,7 +42,6 @@ public class JdkZlibEncoder extends ZlibEncoder { /* * GZIP support */ - private final boolean gzip; private final CRC32 crc = new CRC32(); private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0}; private boolean writeHeader = true; @@ -106,7 +106,7 @@ public class JdkZlibEncoder extends ZlibEncoder { "allowed for compression."); } - gzip = wrapper == ZlibWrapper.GZIP; + this.wrapper = wrapper; deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB); } @@ -147,7 +147,7 @@ public class JdkZlibEncoder extends ZlibEncoder { throw new NullPointerException("dictionary"); } - gzip = false; + wrapper = ZlibWrapper.ZLIB; deflater = new Deflater(compressionLevel); deflater.setDictionary(dictionary); } @@ -200,20 +200,31 @@ public class JdkZlibEncoder extends ZlibEncoder { uncompressed.readBytes(inAry); int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12; - out.ensureWritable(sizeEstimate); - if (gzip) { - crc.update(inAry); - if (writeHeader) { - out.writeBytes(gzipHeader); - writeHeader = false; + if (writeHeader) { + writeHeader = false; + switch (wrapper) { + case GZIP: + out.ensureWritable(sizeEstimate + gzipHeader.length); + out.writeBytes(gzipHeader); + break; + case ZLIB: + out.ensureWritable(sizeEstimate + 2); // first two magic bytes + break; + default: + out.ensureWritable(sizeEstimate); } + } else { + out.ensureWritable(sizeEstimate); + } + + if (wrapper == ZlibWrapper.GZIP) { + crc.update(inAry); } deflater.setInput(inAry); while (!deflater.needsInput()) { - int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH); - out.writeBytes(encodeBuf, 0, numBytes); + deflate(out); } } @@ -245,13 +256,19 @@ public class JdkZlibEncoder extends ZlibEncoder { } finished = true; + ByteBuf footer = ctx.alloc().buffer(); + if (writeHeader && wrapper == ZlibWrapper.GZIP) { + // Write the GZIP header first if not written yet. (i.e. user wrote nothing.) + writeHeader = false; + footer.writeBytes(gzipHeader); + } + deflater.finish(); while (!deflater.finished()) { - int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length); - footer.writeBytes(encodeBuf, 0, numBytes); + deflate(footer); } - if (gzip) { + if (wrapper == ZlibWrapper.GZIP) { int crcValue = (int) crc.getValue(); int uncBytes = deflater.getTotalIn(); footer.writeByte(crcValue); @@ -267,6 +284,14 @@ public class JdkZlibEncoder extends ZlibEncoder { return ctx.writeAndFlush(footer, promise); } + private void deflate(ByteBuf out) { + int numBytes; + do { + numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH); + out.writeBytes(encodeBuf, 0, numBytes); + } while (numBytes > 0); + } + @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java index ff200ffb17..09eb42c8b8 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java @@ -61,6 +61,25 @@ final class ZlibUtil { return convertedWrapperType; } + static int wrapperOverhead(ZlibWrapper wrapper) { + int overhead; + switch (wrapper) { + case NONE: + overhead = 0; + break; + case ZLIB: + case ZLIB_OR_NONE: + overhead = 2; + break; + case GZIP: + overhead = 10; + break; + default: + throw new Error(); + } + return overhead; + } + private ZlibUtil() { } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest1.java b/codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest1.java new file mode 100644 index 0000000000..9e16e1a3dc --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest1.java @@ -0,0 +1,29 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +public class ZlibCrossTest1 extends ZlibTest { + + @Override + protected ZlibEncoder createEncoder(ZlibWrapper wrapper) { + return new JdkZlibEncoder(wrapper); + } + + @Override + protected ZlibDecoder createDecoder(ZlibWrapper wrapper) { + return new JZlibDecoder(wrapper); + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest2.java b/codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest2.java new file mode 100644 index 0000000000..4495bef4c3 --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/ZlibCrossTest2.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import org.junit.Test; + +public class ZlibCrossTest2 extends ZlibTest { + + @Override + protected ZlibEncoder createEncoder(ZlibWrapper wrapper) { + return new JZlibEncoder(wrapper); + } + + @Override + protected ZlibDecoder createDecoder(ZlibWrapper wrapper) { + return new JdkZlibDecoder(wrapper); + } + + @Override + @Test(expected = IllegalArgumentException.class) + public void testZLIB_OR_NONE() throws Exception { + new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE); + } + + @Override + @Test(expected = IllegalArgumentException.class) + public void testZLIB_OR_NONE2() throws Exception { + new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE); + } + + @Override + @Test(expected = IllegalArgumentException.class) + public void testZLIB_OR_NONE3() throws Exception { + new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE); + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java b/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java index e7cfc4384d..bdd893db3a 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; +import io.netty.util.internal.ThreadLocalRandom; import org.junit.Test; import java.io.ByteArrayOutputStream; @@ -32,12 +33,9 @@ public abstract class ZlibTest { private static final byte[] BYTES_SMALL = new byte[128]; private static final byte[] BYTES_LARGE = new byte[1024 * 1024]; static { - for (int i = 0; i < BYTES_SMALL.length; i++) { - BYTES_SMALL[i] = (byte) i; - } - for (int i = 0; i < BYTES_LARGE.length; i++) { - BYTES_LARGE[i] = (byte) i; - } + ThreadLocalRandom rand = ThreadLocalRandom.current(); + rand.nextBytes(BYTES_SMALL); + rand.nextBytes(BYTES_LARGE); } protected abstract ZlibEncoder createEncoder(ZlibWrapper wrapper); @@ -64,7 +62,7 @@ public abstract class ZlibTest { EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper)); chEncoder.writeOutbound(data.copy()); - assertTrue(chEncoder.finish()); + chEncoder.flush(); EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); for (;;) { @@ -75,8 +73,6 @@ public abstract class ZlibTest { chDecoderZlib.writeInbound(deflatedData); } - assertTrue(chDecoderZlib.finish()); - byte[] decompressed = new byte[bytes.length]; int offset = 0; for (;;) { @@ -95,9 +91,43 @@ public abstract class ZlibTest { assertArrayEquals(bytes, decompressed); assertNull(chDecoderZlib.readInbound()); + // Closing an encoder channel will generate a footer. + assertTrue(chEncoder.finish()); + // But, the footer will be decoded into nothing. It's only for validation. + assertFalse(chDecoderZlib.finish()); + data.release(); } + private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { + EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper)); + + // Closing an encoder channel without writing anything should generate both header and footer. + assertTrue(chEncoder.finish()); + + EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); + for (;;) { + ByteBuf deflatedData = (ByteBuf) chEncoder.readOutbound(); + if (deflatedData == null) { + break; + } + chDecoderZlib.writeInbound(deflatedData); + } + + // Decoder should not generate anything at all. + for (;;) { + ByteBuf buf = (ByteBuf) chDecoderZlib.readInbound(); + if (buf == null) { + break; + } + + buf.release(); + fail("should decode nothing"); + } + + assertFalse(chDecoderZlib.finish()); + } + private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { testCompress0(encoderWrapper, decoderWrapper, BYTES_SMALL); } @@ -108,36 +138,42 @@ public abstract class ZlibTest { @Test public void testZLIB() throws Exception { + testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); } @Test public void testNONE() throws Exception { + testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE); testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE); testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE); } @Test public void testGZIP() throws Exception { + testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP); testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP); testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP); } @Test public void testZLIB_OR_NONE() throws Exception { + testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); } @Test public void testZLIB_OR_NONE2() throws Exception { + testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); } @Test public void testZLIB_OR_NONE3() throws Exception { + testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); }