Fix bugs in ZLIB codec where they produce malformed stream or their streams are not flushed on time

- Fixes #2014
- Add the tests that mix JDK ZLIB codec and JZlib codecs
- Fix a bug where JdkZlibEncoder does not encode the GZIP header when nothing was written to te channel
- Fix a bug where the encoders do not consider the overhead of the wrapper format when calculating the estimated compressed output size.
- Fix a bug where the decoders do not discard the received data after the compressed stream is finished
This commit is contained in:
Trustin Lee 2013-11-29 18:08:34 +09:00
parent 6cf2748dbb
commit 3f7b674db8
8 changed files with 204 additions and 31 deletions

View File

@ -15,14 +15,13 @@
*/ */
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import com.jcraft.jzlib.Inflater;
import com.jcraft.jzlib.JZlib;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.util.List; import java.util.List;
import com.jcraft.jzlib.Inflater;
import com.jcraft.jzlib.JZlib;
public class JZlibDecoder extends ZlibDecoder { public class JZlibDecoder extends ZlibDecoder {
private final Inflater z = new Inflater(); private final Inflater z = new Inflater();
@ -85,6 +84,11 @@ public class JZlibDecoder extends ZlibDecoder {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (finished) {
// Skip data received after finished.
in.skipBytes(in.readableBytes());
return;
}
if (!in.isReadable()) { if (!in.isReadable()) {
return; return;

View File

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class JZlibEncoder extends ZlibEncoder { public class JZlibEncoder extends ZlibEncoder {
private final int wrapperOverhead;
private final Deflater z = new Deflater(); private final Deflater z = new Deflater();
private volatile boolean finished; private volatile boolean finished;
private volatile ChannelHandlerContext ctx; private volatile ChannelHandlerContext ctx;
@ -145,6 +146,8 @@ public class JZlibEncoder extends ZlibEncoder {
if (resultCode != JZlib.Z_OK) { if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode); ZlibUtil.fail(z, "initialization failure", resultCode);
} }
wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
} }
/** /**
@ -233,6 +236,8 @@ public class JZlibEncoder extends ZlibEncoder {
ZlibUtil.fail(z, "failed to set the dictionary", resultCode); ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
} }
} }
wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
} }
@Override @Override
@ -295,7 +300,7 @@ public class JZlibEncoder extends ZlibEncoder {
int oldNextInIndex = z.next_in_index; int oldNextInIndex = z.next_in_index;
// Configure output. // Configure output.
int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12; int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
out.ensureWritable(maxOutputLength); out.ensureWritable(maxOutputLength);
z.avail_out = maxOutputLength; z.avail_out = maxOutputLength;
z.next_out = out.array(); z.next_out = out.array();

View File

@ -114,7 +114,13 @@ public class JdkZlibDecoder extends ZlibDecoder {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (!in.isReadable() && finished) { if (finished) {
// Skip data received after finished.
in.skipBytes(in.readableBytes());
return;
}
if (!in.isReadable()) {
return; return;
} }
@ -330,12 +336,12 @@ public class JdkZlibDecoder extends ZlibDecoder {
// read ISIZE and verify // read ISIZE and verify
int dataLength = 0; int dataLength = 0;
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
dataLength |= buf.readUnsignedByte() << (i * 8); dataLength |= buf.readUnsignedByte() << i * 8;
} }
int readLength = inflater.getTotalOut(); int readLength = inflater.getTotalOut();
if (dataLength != readLength) { if (dataLength != readLength) {
throw new CompressionException( throw new CompressionException(
"Number of bytes missmatch. Expected: " + dataLength + ", Got: " + readLength); "Number of bytes mismatch. Expected: " + dataLength + ", Got: " + readLength);
} }
return true; return true;
} }
@ -343,7 +349,7 @@ public class JdkZlibDecoder extends ZlibDecoder {
private void verifyCrc(ByteBuf in) { private void verifyCrc(ByteBuf in) {
long crcValue = 0; long crcValue = 0;
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
crcValue |= (long) in.readUnsignedByte() << (i * 8); crcValue |= (long) in.readUnsignedByte() << i * 8;
} }
long readCrc = crc.getValue(); long readCrc = crc.getValue();
if (crcValue != readCrc) { if (crcValue != readCrc) {

View File

@ -33,6 +33,7 @@ import java.util.zip.Deflater;
*/ */
public class JdkZlibEncoder extends ZlibEncoder { public class JdkZlibEncoder extends ZlibEncoder {
private final ZlibWrapper wrapper;
private final byte[] encodeBuf = new byte[8192]; private final byte[] encodeBuf = new byte[8192];
private final Deflater deflater; private final Deflater deflater;
private volatile boolean finished; private volatile boolean finished;
@ -41,7 +42,6 @@ public class JdkZlibEncoder extends ZlibEncoder {
/* /*
* GZIP support * GZIP support
*/ */
private final boolean gzip;
private final CRC32 crc = new CRC32(); private final CRC32 crc = new CRC32();
private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0}; private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
private boolean writeHeader = true; private boolean writeHeader = true;
@ -106,7 +106,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
"allowed for compression."); "allowed for compression.");
} }
gzip = wrapper == ZlibWrapper.GZIP; this.wrapper = wrapper;
deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB); deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
} }
@ -147,7 +147,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
throw new NullPointerException("dictionary"); throw new NullPointerException("dictionary");
} }
gzip = false; wrapper = ZlibWrapper.ZLIB;
deflater = new Deflater(compressionLevel); deflater = new Deflater(compressionLevel);
deflater.setDictionary(dictionary); deflater.setDictionary(dictionary);
} }
@ -200,20 +200,31 @@ public class JdkZlibEncoder extends ZlibEncoder {
uncompressed.readBytes(inAry); uncompressed.readBytes(inAry);
int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12; int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12;
out.ensureWritable(sizeEstimate);
if (gzip) { if (writeHeader) {
crc.update(inAry); writeHeader = false;
if (writeHeader) { switch (wrapper) {
out.writeBytes(gzipHeader); case GZIP:
writeHeader = false; out.ensureWritable(sizeEstimate + gzipHeader.length);
out.writeBytes(gzipHeader);
break;
case ZLIB:
out.ensureWritable(sizeEstimate + 2); // first two magic bytes
break;
default:
out.ensureWritable(sizeEstimate);
} }
} else {
out.ensureWritable(sizeEstimate);
}
if (wrapper == ZlibWrapper.GZIP) {
crc.update(inAry);
} }
deflater.setInput(inAry); deflater.setInput(inAry);
while (!deflater.needsInput()) { while (!deflater.needsInput()) {
int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH); deflate(out);
out.writeBytes(encodeBuf, 0, numBytes);
} }
} }
@ -245,13 +256,19 @@ public class JdkZlibEncoder extends ZlibEncoder {
} }
finished = true; finished = true;
ByteBuf footer = ctx.alloc().buffer(); ByteBuf footer = ctx.alloc().buffer();
if (writeHeader && wrapper == ZlibWrapper.GZIP) {
// Write the GZIP header first if not written yet. (i.e. user wrote nothing.)
writeHeader = false;
footer.writeBytes(gzipHeader);
}
deflater.finish(); deflater.finish();
while (!deflater.finished()) { while (!deflater.finished()) {
int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length); deflate(footer);
footer.writeBytes(encodeBuf, 0, numBytes);
} }
if (gzip) { if (wrapper == ZlibWrapper.GZIP) {
int crcValue = (int) crc.getValue(); int crcValue = (int) crc.getValue();
int uncBytes = deflater.getTotalIn(); int uncBytes = deflater.getTotalIn();
footer.writeByte(crcValue); footer.writeByte(crcValue);
@ -267,6 +284,14 @@ public class JdkZlibEncoder extends ZlibEncoder {
return ctx.writeAndFlush(footer, promise); return ctx.writeAndFlush(footer, promise);
} }
private void deflate(ByteBuf out) {
int numBytes;
do {
numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH);
out.writeBytes(encodeBuf, 0, numBytes);
} while (numBytes > 0);
}
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.ctx = ctx;

View File

@ -61,6 +61,25 @@ final class ZlibUtil {
return convertedWrapperType; return convertedWrapperType;
} }
static int wrapperOverhead(ZlibWrapper wrapper) {
int overhead;
switch (wrapper) {
case NONE:
overhead = 0;
break;
case ZLIB:
case ZLIB_OR_NONE:
overhead = 2;
break;
case GZIP:
overhead = 10;
break;
default:
throw new Error();
}
return overhead;
}
private ZlibUtil() { private ZlibUtil() {
} }
} }

View File

@ -0,0 +1,29 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
public class ZlibCrossTest1 extends ZlibTest {
@Override
protected ZlibEncoder createEncoder(ZlibWrapper wrapper) {
return new JdkZlibEncoder(wrapper);
}
@Override
protected ZlibDecoder createDecoder(ZlibWrapper wrapper) {
return new JZlibDecoder(wrapper);
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import org.junit.Test;
public class ZlibCrossTest2 extends ZlibTest {
@Override
protected ZlibEncoder createEncoder(ZlibWrapper wrapper) {
return new JZlibEncoder(wrapper);
}
@Override
protected ZlibDecoder createDecoder(ZlibWrapper wrapper) {
return new JdkZlibDecoder(wrapper);
}
@Override
@Test(expected = IllegalArgumentException.class)
public void testZLIB_OR_NONE() throws Exception {
new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE);
}
@Override
@Test(expected = IllegalArgumentException.class)
public void testZLIB_OR_NONE2() throws Exception {
new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE);
}
@Override
@Test(expected = IllegalArgumentException.class)
public void testZLIB_OR_NONE3() throws Exception {
new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE);
}
}

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -32,12 +33,9 @@ public abstract class ZlibTest {
private static final byte[] BYTES_SMALL = new byte[128]; private static final byte[] BYTES_SMALL = new byte[128];
private static final byte[] BYTES_LARGE = new byte[1024 * 1024]; private static final byte[] BYTES_LARGE = new byte[1024 * 1024];
static { static {
for (int i = 0; i < BYTES_SMALL.length; i++) { ThreadLocalRandom rand = ThreadLocalRandom.current();
BYTES_SMALL[i] = (byte) i; rand.nextBytes(BYTES_SMALL);
} rand.nextBytes(BYTES_LARGE);
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = (byte) i;
}
} }
protected abstract ZlibEncoder createEncoder(ZlibWrapper wrapper); protected abstract ZlibEncoder createEncoder(ZlibWrapper wrapper);
@ -64,7 +62,7 @@ public abstract class ZlibTest {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper)); EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper));
chEncoder.writeOutbound(data.copy()); chEncoder.writeOutbound(data.copy());
assertTrue(chEncoder.finish()); chEncoder.flush();
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
for (;;) { for (;;) {
@ -75,8 +73,6 @@ public abstract class ZlibTest {
chDecoderZlib.writeInbound(deflatedData); chDecoderZlib.writeInbound(deflatedData);
} }
assertTrue(chDecoderZlib.finish());
byte[] decompressed = new byte[bytes.length]; byte[] decompressed = new byte[bytes.length];
int offset = 0; int offset = 0;
for (;;) { for (;;) {
@ -95,9 +91,43 @@ public abstract class ZlibTest {
assertArrayEquals(bytes, decompressed); assertArrayEquals(bytes, decompressed);
assertNull(chDecoderZlib.readInbound()); assertNull(chDecoderZlib.readInbound());
// Closing an encoder channel will generate a footer.
assertTrue(chEncoder.finish());
// But, the footer will be decoded into nothing. It's only for validation.
assertFalse(chDecoderZlib.finish());
data.release(); data.release();
} }
private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper));
// Closing an encoder channel without writing anything should generate both header and footer.
assertTrue(chEncoder.finish());
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
for (;;) {
ByteBuf deflatedData = (ByteBuf) chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoderZlib.writeInbound(deflatedData);
}
// Decoder should not generate anything at all.
for (;;) {
ByteBuf buf = (ByteBuf) chDecoderZlib.readInbound();
if (buf == null) {
break;
}
buf.release();
fail("should decode nothing");
}
assertFalse(chDecoderZlib.finish());
}
private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception { private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, BYTES_SMALL); testCompress0(encoderWrapper, decoderWrapper, BYTES_SMALL);
} }
@ -108,36 +138,42 @@ public abstract class ZlibTest {
@Test @Test
public void testZLIB() throws Exception { public void testZLIB() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB); testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
} }
@Test @Test
public void testNONE() throws Exception { public void testNONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE); testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE); testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE);
} }
@Test @Test
public void testGZIP() throws Exception { public void testGZIP() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP); testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP); testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
} }
@Test @Test
public void testZLIB_OR_NONE() throws Exception { public void testZLIB_OR_NONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE); testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
} }
@Test @Test
public void testZLIB_OR_NONE2() throws Exception { public void testZLIB_OR_NONE2() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE); testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
} }
@Test @Test
public void testZLIB_OR_NONE3() throws Exception { public void testZLIB_OR_NONE3() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE); testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
} }