Fix bugs in ZLIB codec where they produce malformed stream or their streams are not flushed on time

- Fixes #2014
- Add the tests that mix JDK ZLIB codec and JZlib codecs
- Fix a bug where JdkZlibEncoder does not encode the GZIP header when nothing was written to te channel
- Fix a bug where the encoders do not consider the overhead of the wrapper format when calculating the estimated compressed output size.
- Fix a bug where the decoders do not discard the received data after the compressed stream is finished
This commit is contained in:
Trustin Lee 2013-11-29 18:08:34 +09:00
parent 4ea39aa9df
commit 8b495bb6e1
8 changed files with 204 additions and 31 deletions

View File

@ -15,14 +15,13 @@
*/
package io.netty.handler.codec.compression;
import com.jcraft.jzlib.Inflater;
import com.jcraft.jzlib.JZlib;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import com.jcraft.jzlib.Inflater;
import com.jcraft.jzlib.JZlib;
public class JZlibDecoder extends ZlibDecoder {
private final Inflater z = new Inflater();
@ -85,6 +84,11 @@ public class JZlibDecoder extends ZlibDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (finished) {
// Skip data received after finished.
in.skipBytes(in.readableBytes());
return;
}
if (!in.isReadable()) {
return;

View File

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
*/
public class JZlibEncoder extends ZlibEncoder {
private final int wrapperOverhead;
private final Deflater z = new Deflater();
private volatile boolean finished;
private volatile ChannelHandlerContext ctx;
@ -145,6 +146,8 @@ public class JZlibEncoder extends ZlibEncoder {
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
}
wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
}
/**
@ -233,6 +236,8 @@ public class JZlibEncoder extends ZlibEncoder {
ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
}
}
wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
}
@Override
@ -295,7 +300,7 @@ public class JZlibEncoder extends ZlibEncoder {
int oldNextInIndex = z.next_in_index;
// Configure output.
int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12;
int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
out.ensureWritable(maxOutputLength);
z.avail_out = maxOutputLength;
z.next_out = out.array();

View File

@ -113,7 +113,13 @@ public class JdkZlibDecoder extends ZlibDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (!in.isReadable() && finished) {
if (finished) {
// Skip data received after finished.
in.skipBytes(in.readableBytes());
return;
}
if (!in.isReadable()) {
return;
}
@ -329,12 +335,12 @@ public class JdkZlibDecoder extends ZlibDecoder {
// read ISIZE and verify
int dataLength = 0;
for (int i = 0; i < 4; ++i) {
dataLength |= buf.readUnsignedByte() << (i * 8);
dataLength |= buf.readUnsignedByte() << i * 8;
}
int readLength = inflater.getTotalOut();
if (dataLength != readLength) {
throw new CompressionException(
"Number of bytes missmatch. Expected: " + dataLength + ", Got: " + readLength);
"Number of bytes mismatch. Expected: " + dataLength + ", Got: " + readLength);
}
return true;
}
@ -342,7 +348,7 @@ public class JdkZlibDecoder extends ZlibDecoder {
private void verifyCrc(ByteBuf in) {
long crcValue = 0;
for (int i = 0; i < 4; ++i) {
crcValue |= (long) in.readUnsignedByte() << (i * 8);
crcValue |= (long) in.readUnsignedByte() << i * 8;
}
long readCrc = crc.getValue();
if (crcValue != readCrc) {

View File

@ -33,6 +33,7 @@ import java.util.zip.Deflater;
*/
public class JdkZlibEncoder extends ZlibEncoder {
private final ZlibWrapper wrapper;
private final byte[] encodeBuf = new byte[8192];
private final Deflater deflater;
private volatile boolean finished;
@ -41,7 +42,6 @@ public class JdkZlibEncoder extends ZlibEncoder {
/*
* GZIP support
*/
private final boolean gzip;
private final CRC32 crc = new CRC32();
private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
private boolean writeHeader = true;
@ -106,7 +106,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
"allowed for compression.");
}
gzip = wrapper == ZlibWrapper.GZIP;
this.wrapper = wrapper;
deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
}
@ -147,7 +147,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
throw new NullPointerException("dictionary");
}
gzip = false;
wrapper = ZlibWrapper.ZLIB;
deflater = new Deflater(compressionLevel);
deflater.setDictionary(dictionary);
}
@ -200,20 +200,31 @@ public class JdkZlibEncoder extends ZlibEncoder {
uncompressed.readBytes(inAry);
int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12;
out.ensureWritable(sizeEstimate);
if (gzip) {
crc.update(inAry);
if (writeHeader) {
out.writeBytes(gzipHeader);
writeHeader = false;
if (writeHeader) {
writeHeader = false;
switch (wrapper) {
case GZIP:
out.ensureWritable(sizeEstimate + gzipHeader.length);
out.writeBytes(gzipHeader);
break;
case ZLIB:
out.ensureWritable(sizeEstimate + 2); // first two magic bytes
break;
default:
out.ensureWritable(sizeEstimate);
}
} else {
out.ensureWritable(sizeEstimate);
}
if (wrapper == ZlibWrapper.GZIP) {
crc.update(inAry);
}
deflater.setInput(inAry);
while (!deflater.needsInput()) {
int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH);
out.writeBytes(encodeBuf, 0, numBytes);
deflate(out);
}
}
@ -245,13 +256,19 @@ public class JdkZlibEncoder extends ZlibEncoder {
}
finished = true;
ByteBuf footer = ctx.alloc().buffer();
if (writeHeader && wrapper == ZlibWrapper.GZIP) {
// Write the GZIP header first if not written yet. (i.e. user wrote nothing.)
writeHeader = false;
footer.writeBytes(gzipHeader);
}
deflater.finish();
while (!deflater.finished()) {
int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length);
footer.writeBytes(encodeBuf, 0, numBytes);
deflate(footer);
}
if (gzip) {
if (wrapper == ZlibWrapper.GZIP) {
int crcValue = (int) crc.getValue();
int uncBytes = deflater.getTotalIn();
footer.writeByte(crcValue);
@ -267,6 +284,14 @@ public class JdkZlibEncoder extends ZlibEncoder {
return ctx.writeAndFlush(footer, promise);
}
private void deflate(ByteBuf out) {
int numBytes;
do {
numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH);
out.writeBytes(encodeBuf, 0, numBytes);
} while (numBytes > 0);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;

View File

@ -61,6 +61,25 @@ final class ZlibUtil {
return convertedWrapperType;
}
static int wrapperOverhead(ZlibWrapper wrapper) {
int overhead;
switch (wrapper) {
case NONE:
overhead = 0;
break;
case ZLIB:
case ZLIB_OR_NONE:
overhead = 2;
break;
case GZIP:
overhead = 10;
break;
default:
throw new Error();
}
return overhead;
}
private ZlibUtil() {
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
public class ZlibCrossTest1 extends ZlibTest {
@Override
protected ZlibEncoder createEncoder(ZlibWrapper wrapper) {
return new JdkZlibEncoder(wrapper);
}
@Override
protected ZlibDecoder createDecoder(ZlibWrapper wrapper) {
return new JZlibDecoder(wrapper);
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import org.junit.Test;
public class ZlibCrossTest2 extends ZlibTest {
@Override
protected ZlibEncoder createEncoder(ZlibWrapper wrapper) {
return new JZlibEncoder(wrapper);
}
@Override
protected ZlibDecoder createDecoder(ZlibWrapper wrapper) {
return new JdkZlibDecoder(wrapper);
}
@Override
@Test(expected = IllegalArgumentException.class)
public void testZLIB_OR_NONE() throws Exception {
new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE);
}
@Override
@Test(expected = IllegalArgumentException.class)
public void testZLIB_OR_NONE2() throws Exception {
new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE);
}
@Override
@Test(expected = IllegalArgumentException.class)
public void testZLIB_OR_NONE3() throws Exception {
new JdkZlibDecoder(ZlibWrapper.ZLIB_OR_NONE);
}
}

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
@ -32,12 +33,9 @@ public abstract class ZlibTest {
private static final byte[] BYTES_SMALL = new byte[128];
private static final byte[] BYTES_LARGE = new byte[1024 * 1024];
static {
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = (byte) i;
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = (byte) i;
}
ThreadLocalRandom rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
protected abstract ZlibEncoder createEncoder(ZlibWrapper wrapper);
@ -64,7 +62,7 @@ public abstract class ZlibTest {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper));
chEncoder.writeOutbound(data.copy());
assertTrue(chEncoder.finish());
chEncoder.flush();
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
for (;;) {
@ -75,8 +73,6 @@ public abstract class ZlibTest {
chDecoderZlib.writeInbound(deflatedData);
}
assertTrue(chDecoderZlib.finish());
byte[] decompressed = new byte[bytes.length];
int offset = 0;
for (;;) {
@ -95,9 +91,43 @@ public abstract class ZlibTest {
assertArrayEquals(bytes, decompressed);
assertNull(chDecoderZlib.readInbound());
// Closing an encoder channel will generate a footer.
assertTrue(chEncoder.finish());
// But, the footer will be decoded into nothing. It's only for validation.
assertFalse(chDecoderZlib.finish());
data.release();
}
private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper));
// Closing an encoder channel without writing anything should generate both header and footer.
assertTrue(chEncoder.finish());
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
for (;;) {
ByteBuf deflatedData = (ByteBuf) chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoderZlib.writeInbound(deflatedData);
}
// Decoder should not generate anything at all.
for (;;) {
ByteBuf buf = (ByteBuf) chDecoderZlib.readInbound();
if (buf == null) {
break;
}
buf.release();
fail("should decode nothing");
}
assertFalse(chDecoderZlib.finish());
}
private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, BYTES_SMALL);
}
@ -108,36 +138,42 @@ public abstract class ZlibTest {
@Test
public void testZLIB() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
}
@Test
public void testNONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE);
}
@Test
public void testGZIP() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
}
@Test
public void testZLIB_OR_NONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testZLIB_OR_NONE2() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testZLIB_OR_NONE3() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.ZLIB_OR_NONE);
}