Backport ZlibTest and fix the bugs revealed

Related: #3107

Motivation:

ZlibEn/Decoder and JdkZlibEncoder in 3.9 do not have any unit tests.
Before applying any patches, we should backport the tests in 4.x so that
we can make sure we do not break anything.

Modification:

- Backport ZlibTest and its subtypes
  - Remove the test for automatic GZIP header detection because the
    ZlibDecoders in 3.9 do not have that feature
- Initialize JdkZlibEncoder.out and crc only when necessary for reduced
  memory footprint
- Fix the bugs in the ZlibEncoders where it fails to compress correctly
  when there are not enough room in the output buffer

Result:

We are more confident when we make changes in ZlibEncoder/Decoder
Bugs have been squashed
This commit is contained in:
Trustin Lee 2014-11-20 15:12:08 +09:00
parent e30c4fc70f
commit 3451bce17f
6 changed files with 464 additions and 18 deletions

View File

@ -39,16 +39,13 @@ import java.util.zip.Deflater;
*/
public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
private final byte[] out = new byte[8192];
private final ZlibWrapper wrapper;
private final Deflater deflater;
private final AtomicBoolean finished = new AtomicBoolean();
private volatile ChannelHandlerContext ctx;
private byte[] out;
/*
* GZIP support
*/
private final boolean gzip;
private final CRC32 crc = new CRC32();
private final CRC32 crc;
private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
private boolean writeHeader = true;
@ -112,8 +109,13 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
"allowed for compression.");
}
gzip = wrapper == ZlibWrapper.GZIP;
this.wrapper = wrapper;
deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
if (wrapper == ZlibWrapper.GZIP) {
crc = new CRC32();
} else {
crc = null;
}
}
/**
@ -153,7 +155,8 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
throw new NullPointerException("dictionary");
}
gzip = false;
wrapper = ZlibWrapper.ZLIB;
crc = null;
deflater = new Deflater(compressionLevel);
deflater.setDictionary(dictionary);
}
@ -166,6 +169,10 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
return finishEncode(ctx, null);
}
private boolean isGzip() {
return wrapper == ZlibWrapper.GZIP;
}
public boolean isClosed() {
return finished.get();
}
@ -180,11 +187,11 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
byte[] in = new byte[uncompressed.readableBytes()];
uncompressed.readBytes(in);
int sizeEstimate = (int) Math.ceil(in.length * 1.001) + 12;
int sizeEstimate = estimateCompressedSize(in.length);
ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(sizeEstimate, channel.getConfig().getBufferFactory());
synchronized (deflater) {
if (gzip) {
if (isGzip()) {
crc.update(in);
if (writeHeader) {
compressed.writeBytes(gzipHeader);
@ -194,14 +201,28 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
deflater.setInput(in);
while (!deflater.needsInput()) {
int numBytes = deflater.deflate(out, 0, out.length, Deflater.SYNC_FLUSH);
compressed.writeBytes(out, 0, numBytes);
deflate(compressed);
}
}
return compressed;
}
private int estimateCompressedSize(int originalSize) {
int sizeEstimate = (int) Math.ceil(originalSize * 1.001) + 12;
if (writeHeader) {
switch (wrapper) {
case GZIP:
sizeEstimate += gzipHeader.length;
break;
case ZLIB:
sizeEstimate += 2; // first two magic bytes
break;
}
}
return sizeEstimate;
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
throws Exception {
@ -231,7 +252,8 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
return future;
}
ChannelBuffer footer = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
final ChannelBuffer footer = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
final boolean gzip = isGzip();
synchronized (deflater) {
if (gzip && writeHeader) {
// Write the GZIP header first if not written yet. (i.e. user wrote nothing.)
@ -241,8 +263,7 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
deflater.finish();
while (!deflater.finished()) {
int numBytes = deflater.deflate(out, 0, out.length);
footer.writeBytes(out, 0, numBytes);
deflate(footer);
}
if (gzip) {
int crcValue = (int) crc.getValue();
@ -275,6 +296,29 @@ public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAw
return future;
}
private void deflate(ChannelBuffer out) {
int numBytes;
if (out.hasArray()) {
do {
int writerIndex = out.writerIndex();
numBytes = deflater.deflate(
out.array(), out.arrayOffset() + writerIndex, out.writableBytes(),
Deflater.SYNC_FLUSH);
out.writerIndex(writerIndex + numBytes);
} while (numBytes > 0);
} else {
byte[] tmpOut = this.out;
if (tmpOut == null) {
tmpOut = this.out = new byte[8192];
}
do {
numBytes = deflater.deflate(tmpOut, 0, tmpOut.length, Deflater.SYNC_FLUSH);
out.writeBytes(tmpOut, 0, numBytes);
} while (numBytes > 0);
}
}
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}

View File

@ -15,8 +15,6 @@
*/
package org.jboss.netty.handler.codec.compression;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@ -31,6 +29,8 @@ import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
import org.jboss.netty.util.internal.jzlib.JZlib;
import org.jboss.netty.util.internal.jzlib.ZStream;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Compresses a {@link ChannelBuffer} using the deflate algorithm.
@ -41,6 +41,7 @@ public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAware
private static final byte[] EMPTY_ARRAY = new byte[0];
private final int wrapperOverhead;
private final ZStream z = new ZStream();
private final AtomicBoolean finished = new AtomicBoolean();
private volatile ChannelHandlerContext ctx;
@ -143,6 +144,8 @@ public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAware
"allowed for compression.");
}
wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
synchronized (z) {
int resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
ZlibUtil.convertWrapperType(wrapper));
@ -228,6 +231,8 @@ public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAware
throw new NullPointerException("dictionary");
}
wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
synchronized (z) {
int resultCode;
resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
@ -273,7 +278,7 @@ public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAware
z.avail_in = in.length;
// Configure output.
byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12 + wrapperOverhead];
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;

View File

@ -53,6 +53,25 @@ final class ZlibUtil {
return convertedWrapperType;
}
static int wrapperOverhead(ZlibWrapper wrapper) {
int overhead;
switch (wrapper) {
case NONE:
overhead = 0;
break;
case ZLIB:
case ZLIB_OR_NONE:
overhead = 2;
break;
case GZIP:
overhead = 10;
break;
default:
throw new Error();
}
return overhead;
}
private ZlibUtil() {
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.compression;
import org.jboss.netty.channel.ChannelDownstreamHandler;
public class JZlibTest extends ZlibTest {
@Override
protected ChannelDownstreamHandler createEncoder(ZlibWrapper wrapper) {
return new ZlibEncoder(wrapper);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.compression;
import org.jboss.netty.channel.ChannelDownstreamHandler;
public class JdkZlibTest extends ZlibTest {
@Override
protected ChannelDownstreamHandler createEncoder(ZlibWrapper wrapper) {
return new JdkZlibEncoder(wrapper);
}
}

View File

@ -0,0 +1,328 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.compression;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
import org.jboss.netty.handler.codec.embedder.EncoderEmbedder;
import org.jboss.netty.util.CharsetUtil;
import org.jboss.netty.util.internal.EmptyArrays;
import org.jboss.netty.util.internal.ThreadLocalRandom;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import static org.junit.Assert.*;
public abstract class ZlibTest {
private static final byte[] BYTES_SMALL = new byte[128];
private static final byte[] BYTES_LARGE = new byte[1024 * 1024];
private static final byte[] BYTES_LARGE2 = (
"<!--?xml version=\"1.0\" encoding=\"ISO-8859-1\"?-->\n" +
"<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Strict//EN\" " +
"\"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd\">\n" +
"<html xmlns=\"http://www.w3.org/1999/xhtml\" xml:lang=\"en\" lang=\"en\"><head>\n" +
" <title>Apache Tomcat</title>\n" +
"</head>\n" +
'\n' +
"<body>\n" +
"<h1>It works !</h1>\n" +
'\n' +
"<p>If you're seeing this page via a web browser, it means you've setup Tomcat successfully." +
" Congratulations!</p>\n" +
" \n" +
"<p>This is the default Tomcat home page." +
" It can be found on the local filesystem at: <code>/var/lib/tomcat7/webapps/ROOT/index.html</code></p>\n" +
'\n' +
"<p>Tomcat7 veterans might be pleased to learn that this system instance of Tomcat is installed with" +
" <code>CATALINA_HOME</code> in <code>/usr/share/tomcat7</code> and <code>CATALINA_BASE</code> in" +
" <code>/var/lib/tomcat7</code>, following the rules from" +
" <code>/usr/share/doc/tomcat7-common/RUNNING.txt.gz</code>.</p>\n" +
'\n' +
"<p>You might consider installing the following packages, if you haven't already done so:</p>\n" +
'\n' +
"<p><b>tomcat7-docs</b>: This package installs a web application that allows to browse the Tomcat 7" +
" documentation locally. Once installed, you can access it by clicking <a href=\"docs/\">here</a>.</p>\n" +
'\n' +
"<p><b>tomcat7-examples</b>: This package installs a web application that allows to access the Tomcat" +
" 7 Servlet and JSP examples. Once installed, you can access it by clicking" +
" <a href=\"examples/\">here</a>.</p>\n" +
'\n' +
"<p><b>tomcat7-admin</b>: This package installs two web applications that can help managing this Tomcat" +
" instance. Once installed, you can access the <a href=\"manager/html\">manager webapp</a> and" +
" the <a href=\"host-manager/html\">host-manager webapp</a>.</p><p>\n" +
'\n' +
"</p><p>NOTE: For security reasons, using the manager webapp is restricted" +
" to users with role \"manager\"." +
" The host-manager webapp is restricted to users with role \"admin\". Users are " +
"defined in <code>/etc/tomcat7/tomcat-users.xml</code>.</p>\n" +
'\n' +
'\n' +
'\n' +
"</body></html>").getBytes(CharsetUtil.UTF_8);
static {
ThreadLocalRandom rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
protected abstract ChannelDownstreamHandler createEncoder(ZlibWrapper wrapper);
private static ChannelUpstreamHandler createDecoder(ZlibWrapper wrapper) {
// We don't have ZLIB decoder that uses JDK Inflater in 3.x.
return new ZlibDecoder(wrapper);
}
@Test
public void testGZIP2() throws Exception {
byte[] bytes = "message".getBytes(CharsetUtil.UTF_8);
ChannelBuffer data = ChannelBuffers.wrappedBuffer(bytes);
ChannelBuffer deflatedData = ChannelBuffers.wrappedBuffer(gzip(bytes));
DecoderEmbedder<ChannelBuffer> chDecoderGZip =
new DecoderEmbedder<ChannelBuffer>(createDecoder(ZlibWrapper.GZIP));
chDecoderGZip.offer(deflatedData.copy());
assertTrue(chDecoderGZip.finish());
ChannelBuffer buf = chDecoderGZip.poll();
assertEquals(buf, data);
assertNull(chDecoderGZip.poll());
}
private void testCompress0(
ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper, ChannelBuffer data) throws Exception {
EncoderEmbedder<ChannelBuffer> chEncoder =
new EncoderEmbedder<ChannelBuffer>(createEncoder(encoderWrapper));
DecoderEmbedder<ChannelBuffer> chDecoderZlib =
new DecoderEmbedder<ChannelBuffer>(createDecoder(decoderWrapper));
chEncoder.offer(data.copy());
for (;;) {
ChannelBuffer deflatedData = chEncoder.poll();
if (deflatedData == null) {
break;
}
chDecoderZlib.offer(deflatedData);
}
byte[] decompressed = new byte[data.readableBytes()];
int offset = 0;
for (;;) {
ChannelBuffer buf = chDecoderZlib.poll();
if (buf == null) {
break;
}
int length = buf.readableBytes();
buf.readBytes(decompressed, offset, length);
offset += length;
if (offset == decompressed.length) {
break;
}
}
assertEquals(data, ChannelBuffers.wrappedBuffer(decompressed));
assertNull(chDecoderZlib.poll());
// Closing an encoder channel will generate a footer.
assertTrue(chEncoder.finish());
for (;;) {
Object msg = chEncoder.poll();
if (msg == null) {
break;
}
}
// But, the footer will be decoded into nothing. It's only for validation.
assertFalse(chDecoderZlib.finish());
}
private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
EncoderEmbedder<ChannelBuffer> chEncoder =
new EncoderEmbedder<ChannelBuffer>(createEncoder(encoderWrapper));
DecoderEmbedder<ChannelBuffer> chDecoderZlib =
new DecoderEmbedder<ChannelBuffer>(createDecoder(decoderWrapper));
// Closing an encoder channel without writing anything should generate both header and footer.
assertTrue(chEncoder.finish());
for (;;) {
ChannelBuffer deflatedData = chEncoder.poll();
if (deflatedData == null) {
break;
}
chDecoderZlib.offer(deflatedData);
}
// Decoder should not generate anything at all.
boolean decoded = false;
for (;;) {
ChannelBuffer buf = chDecoderZlib.poll();
if (buf == null) {
break;
}
decoded = true;
}
assertFalse("should decode nothing", decoded);
assertFalse(chDecoderZlib.finish());
}
// Test for https://github.com/netty/netty/issues/2572
private static void testDecompressOnly(
ZlibWrapper decoderWrapper, byte[] compressed, byte[] data) throws Exception {
DecoderEmbedder<ChannelBuffer> chDecoder =
new DecoderEmbedder<ChannelBuffer>(createDecoder(decoderWrapper));
chDecoder.offer(ChannelBuffers.wrappedBuffer(compressed));
assertTrue(chDecoder.finish());
ChannelBuffer decoded = ChannelBuffers.dynamicBuffer(data.length);
for (;;) {
ChannelBuffer buf = chDecoder.poll();
if (buf == null) {
break;
}
decoded.writeBytes(buf);
}
assertEquals(ChannelBuffers.wrappedBuffer(data), decoded);
}
private void testCompressSmall(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, ChannelBuffers.wrappedBuffer(BYTES_SMALL));
final ChannelBuffer directSmallBuf = ChannelBuffers.directBuffer(BYTES_SMALL.length);
directSmallBuf.writeBytes(BYTES_SMALL);
testCompress0(encoderWrapper, decoderWrapper, directSmallBuf);
}
private void testCompressLarge(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
testCompress0(encoderWrapper, decoderWrapper, ChannelBuffers.wrappedBuffer(BYTES_LARGE));
final ChannelBuffer directLargeBuf = ChannelBuffers.directBuffer(BYTES_LARGE.length);
directLargeBuf.writeBytes(BYTES_LARGE);
testCompress0(encoderWrapper, decoderWrapper, directLargeBuf);
}
@Test
public void testZLIB() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB);
testDecompressOnly(ZlibWrapper.ZLIB, deflate(BYTES_LARGE2), BYTES_LARGE2);
}
@Test
public void testNONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.NONE);
}
@Test
public void testGZIP() throws Exception {
testCompressNone(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressSmall(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testCompressLarge(ZlibWrapper.GZIP, ZlibWrapper.GZIP);
testDecompressOnly(ZlibWrapper.GZIP, gzip(BYTES_LARGE2), BYTES_LARGE2);
}
@Test
public void testGZIPCompressOnly() throws Exception {
testGZIPCompressOnly0(null); // Do not write anything; just finish the stream.
testGZIPCompressOnly0(EmptyArrays.EMPTY_BYTES); // Write an empty array.
testGZIPCompressOnly0(BYTES_SMALL);
testGZIPCompressOnly0(BYTES_LARGE);
}
private void testGZIPCompressOnly0(byte[] data) throws IOException {
EncoderEmbedder<ChannelBuffer> chEncoder = new EncoderEmbedder<ChannelBuffer>(createEncoder(ZlibWrapper.GZIP));
if (data != null) {
chEncoder.offer(ChannelBuffers.wrappedBuffer(data));
}
assertTrue(chEncoder.finish());
ChannelBuffer encoded = ChannelBuffers.dynamicBuffer();
for (;;) {
ChannelBuffer buf = chEncoder.poll();
if (buf == null) {
break;
}
encoded.writeBytes(buf);
}
ChannelBuffer decoded = ChannelBuffers.dynamicBuffer();
GZIPInputStream stream = new GZIPInputStream(new ChannelBufferInputStream(encoded));
byte[] buf = new byte[8192];
for (;;) {
int readBytes = stream.read(buf);
if (readBytes < 0) {
break;
}
decoded.writeBytes(buf, 0, readBytes);
}
stream.close();
if (data != null) {
assertEquals(ChannelBuffers.wrappedBuffer(data), decoded);
} else {
assertFalse(decoded.readable());
}
}
@Test
public void testZLIB_OR_NONE() throws Exception {
testCompressNone(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.NONE, ZlibWrapper.ZLIB_OR_NONE);
}
@Test
public void testZLIB_OR_NONE2() throws Exception {
testCompressNone(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressSmall(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
testCompressLarge(ZlibWrapper.ZLIB, ZlibWrapper.ZLIB_OR_NONE);
}
private static byte[] gzip(byte[] bytes) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream stream = new GZIPOutputStream(out);
stream.write(bytes);
stream.close();
return out.toByteArray();
}
private static byte[] deflate(byte[] bytes) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
OutputStream stream = new DeflaterOutputStream(out);
stream.write(bytes);
stream.close();
return out.toByteArray();
}
}