diff --git a/codec/pom.xml b/codec/pom.xml index 89f0c4b8c1..b71e8a6917 100644 --- a/codec/pom.xml +++ b/codec/pom.xml @@ -77,7 +77,7 @@ test - + org.apache.commons commons-compress diff --git a/codec/src/test/java/io/netty/handler/codec/compression/AbstractCompressionTest.java b/codec/src/test/java/io/netty/handler/codec/compression/AbstractCompressionTest.java new file mode 100644 index 0000000000..a0b2158f5c --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/AbstractCompressionTest.java @@ -0,0 +1,38 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.util.internal.ThreadLocalRandom; + +public abstract class AbstractCompressionTest { + + protected static final ThreadLocalRandom rand; + + protected static final byte[] BYTES_SMALL = new byte[256]; + protected static final byte[] BYTES_LARGE = new byte[256 * 1024]; + + static { + rand = ThreadLocalRandom.current(); + fillArrayWithCompressibleData(BYTES_SMALL); + fillArrayWithCompressibleData(BYTES_LARGE); + } + + private static void fillArrayWithCompressibleData(byte[] array) { + for (int i = 0; i < array.length; i++) { + array[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); + } + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/AbstractDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/AbstractDecoderTest.java new file mode 100644 index 0000000000..29634c627a --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/AbstractDecoderTest.java @@ -0,0 +1,148 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Before; +import org.junit.Rule; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; + +import static org.junit.Assert.*; + +@RunWith(Theories.class) +public abstract class AbstractDecoderTest extends AbstractCompressionTest { + + protected static final ByteBuf WRAPPED_BYTES_SMALL; + protected static final ByteBuf WRAPPED_BYTES_LARGE; + + static { + WRAPPED_BYTES_SMALL = Unpooled.wrappedBuffer(BYTES_SMALL); + WRAPPED_BYTES_LARGE = Unpooled.wrappedBuffer(BYTES_LARGE); + } + + @Rule + public final ExpectedException expected = ExpectedException.none(); + + protected EmbeddedChannel channel; + + protected static byte[] compressedBytesSmall; + protected static byte[] compressedBytesLarge; + + protected AbstractDecoderTest() throws Exception { + compressedBytesSmall = compress(BYTES_SMALL); + compressedBytesLarge = compress(BYTES_LARGE); + } + + /** + * Compresses data with some external library. + */ + protected abstract byte[] compress(byte[] data) throws Exception; + + @Before + public abstract void initChannel(); + + @DataPoints("smallData") + public static ByteBuf[] smallData() { + ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesSmall); + ByteBuf direct = Unpooled.directBuffer(compressedBytesSmall.length); + direct.writeBytes(compressedBytesSmall); + return new ByteBuf[] {heap, direct}; + } + + @DataPoints("largeData") + public static ByteBuf[] largeData() { + ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesLarge); + ByteBuf direct = Unpooled.directBuffer(compressedBytesLarge.length); + direct.writeBytes(compressedBytesLarge); + return new ByteBuf[] {heap, direct}; + } + + @Theory + public void testDecompressionOfSmallChunkOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception { + testDecompression(WRAPPED_BYTES_SMALL, data); + } + + @Theory + public void testDecompressionOfLargeChunkOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception { + testDecompression(WRAPPED_BYTES_LARGE, data); + } + + @Theory + public void testDecompressionOfBatchedFlowOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception { + testDecompressionOfBatchedFlow(WRAPPED_BYTES_LARGE, data); + } + + protected void testDecompression(final ByteBuf expected, final ByteBuf data) throws Exception { + assertTrue(channel.writeInbound(data)); + + ByteBuf decompressed = readDecompressed(channel); + assertEquals(expected, decompressed); + + decompressed.release(); + } + + protected void testDecompressionOfBatchedFlow(final ByteBuf expected, final ByteBuf data) throws Exception { + final int compressedLength = data.readableBytes(); + int written = 0, length = rand.nextInt(100); + while (written + length < compressedLength) { + ByteBuf compressedBuf = data.slice(written, length); + channel.writeInbound(compressedBuf.retain()); + written += length; + length = rand.nextInt(100); + } + ByteBuf compressedBuf = data.slice(written, compressedLength - written); + assertTrue(channel.writeInbound(compressedBuf.retain())); + + ByteBuf decompressedBuf = readDecompressed(channel); + assertEquals(expected, decompressedBuf); + + decompressedBuf.release(); + data.release(); + } + + protected static ByteBuf readDecompressed(final EmbeddedChannel channel) { + CompositeByteBuf decompressed = Unpooled.compositeBuffer(); + ByteBuf msg; + while ((msg = channel.readInbound()) != null) { + decompressed.addComponent(msg); + decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes()); + } + return decompressed; + } + + protected static void tryDecodeAndCatchBufLeaks(final EmbeddedChannel channel, final ByteBuf data) { + try { + channel.writeInbound(data); + } finally { + for (;;) { + ByteBuf inflated = channel.readInbound(); + if (inflated == null) { + break; + } + inflated.release(); + } + channel.finish(); + } + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/AbstractEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/AbstractEncoderTest.java new file mode 100644 index 0000000000..93a130384f --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/AbstractEncoderTest.java @@ -0,0 +1,118 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Before; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; + +import static org.junit.Assert.*; + +@RunWith(Theories.class) +public abstract class AbstractEncoderTest extends AbstractCompressionTest { + + protected EmbeddedChannel channel; + + /** + * Decompresses data with some external library. + */ + protected abstract ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception; + + @Before + public abstract void initChannel(); + + @DataPoints("smallData") + public static ByteBuf[] smallData() { + ByteBuf heap = Unpooled.wrappedBuffer(BYTES_SMALL); + ByteBuf direct = Unpooled.directBuffer(BYTES_SMALL.length); + direct.writeBytes(BYTES_SMALL); + return new ByteBuf[] {heap, direct}; + } + + @DataPoints("largeData") + public static ByteBuf[] largeData() { + ByteBuf heap = Unpooled.wrappedBuffer(BYTES_LARGE); + ByteBuf direct = Unpooled.directBuffer(BYTES_LARGE.length); + direct.writeBytes(BYTES_LARGE); + return new ByteBuf[] {heap, direct}; + } + + @Theory + public void testCompressionOfSmallChunkOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception { + testCompression(data); + } + + @Theory + public void testCompressionOfLargeChunkOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception { + testCompression(data); + } + + @Theory + public void testCompressionOfBatchedFlowOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception { + testCompressionOfBatchedFlow(data); + } + + protected void testCompression(final ByteBuf data) throws Exception { + final int dataLength = data.readableBytes(); + assertTrue(channel.writeOutbound(data.retain())); + assertTrue(channel.finish()); + + ByteBuf decompressed = readDecompressed(dataLength); + assertEquals(data.resetReaderIndex(), decompressed); + + decompressed.release(); + data.release(); + } + + protected void testCompressionOfBatchedFlow(final ByteBuf data) throws Exception { + final int dataLength = data.readableBytes(); + int written = 0, length = rand.nextInt(100); + while (written + length < dataLength) { + ByteBuf in = data.slice(written, length); + assertTrue(channel.writeOutbound(in.retain())); + written += length; + length = rand.nextInt(100); + } + ByteBuf in = data.slice(written, dataLength - written); + assertTrue(channel.writeOutbound(in.retain())); + assertTrue(channel.finish()); + + ByteBuf decompressed = readDecompressed(dataLength); + assertEquals(data, decompressed); + + decompressed.release(); + data.release(); + } + + protected ByteBuf readDecompressed(final int dataLength) throws Exception { + CompositeByteBuf compressed = Unpooled.compositeBuffer(); + ByteBuf msg; + while ((msg = channel.readOutbound()) != null) { + compressed.addComponent(msg); + compressed.writerIndex(compressed.writerIndex() + msg.readableBytes()); + } + ByteBuf decompressed = decompress(compressed, dataLength); + compressed.release(); + return decompressed; + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/IntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java similarity index 65% rename from codec/src/test/java/io/netty/handler/codec/compression/IntegrationTest.java rename to codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java index cf17f18971..b0b8139480 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/IntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java @@ -23,6 +23,8 @@ import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.ThreadLocalRandom; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.Arrays; @@ -30,12 +32,42 @@ import java.util.Arrays; import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; -public abstract class IntegrationTest { +public abstract class AbstractIntegrationTest { protected static final ThreadLocalRandom rand = ThreadLocalRandom.current(); - protected abstract EmbeddedChannel createEncoderEmbeddedChannel(); - protected abstract EmbeddedChannel createDecoderEmbeddedChannel(); + protected EmbeddedChannel encoder; + protected EmbeddedChannel decoder; + + protected abstract EmbeddedChannel createEncoder(); + protected abstract EmbeddedChannel createDecoder(); + + @Before + public void initChannels() throws Exception { + encoder = createEncoder(); + decoder = createDecoder(); + } + + @After + public void closeChannels() throws Exception { + encoder.close(); + for (;;) { + Object msg = encoder.readOutbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + + decoder.close(); + for (;;) { + Object msg = decoder.readInbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + } @Test public void testEmpty() throws Exception { @@ -112,51 +144,28 @@ public abstract class IntegrationTest { protected void testIdentity(final byte[] data) { final ByteBuf in = Unpooled.wrappedBuffer(data); - final EmbeddedChannel encoder = createEncoderEmbeddedChannel(); - final EmbeddedChannel decoder = createDecoderEmbeddedChannel(); - try { - ByteBuf msg; + assertTrue(encoder.writeOutbound(in.retain())); + assertTrue(encoder.finish()); - encoder.writeOutbound(in.copy()); - encoder.finish(); - final CompositeByteBuf compressed = Unpooled.compositeBuffer(); - while ((msg = encoder.readOutbound()) != null) { - compressed.addComponent(msg); - compressed.writerIndex(compressed.writerIndex() + msg.readableBytes()); - } - assertThat(compressed, is(notNullValue())); - - decoder.writeInbound(compressed.retain()); - assertFalse(compressed.isReadable()); - final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); - while ((msg = decoder.readInbound()) != null) { - decompressed.addComponent(msg); - decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes()); - } - assertEquals(in, decompressed); - - compressed.release(); - decompressed.release(); - in.release(); - } finally { - encoder.close(); - decoder.close(); - - for (;;) { - Object msg = encoder.readOutbound(); - if (msg == null) { - break; - } - ReferenceCountUtil.release(msg); - } - - for (;;) { - Object msg = decoder.readInbound(); - if (msg == null) { - break; - } - ReferenceCountUtil.release(msg); - } + final CompositeByteBuf compressed = Unpooled.compositeBuffer(); + ByteBuf msg; + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(msg); + compressed.writerIndex(compressed.writerIndex() + msg.readableBytes()); } + assertThat(compressed, is(notNullValue())); + + decoder.writeInbound(compressed.retain()); + assertFalse(compressed.isReadable()); + final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(msg); + decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes()); + } + assertEquals(in.resetReaderIndex(), decompressed); + + compressed.release(); + decompressed.release(); + in.release(); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2DecoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2DecoderTest.java index 7e19a47712..11382552ac 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2DecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2DecoderTest.java @@ -16,23 +16,17 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.internal.ThreadLocalRandom; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.io.ByteArrayOutputStream; import java.util.Arrays; import static io.netty.handler.codec.compression.Bzip2Constants.*; -import static org.junit.Assert.*; -public class Bzip2DecoderTest { +public class Bzip2DecoderTest extends AbstractDecoderTest { private static final byte[] DATA = { 0x42, 0x5A, 0x68, 0x37, 0x31, 0x41, 0x59, 0x26, 0x53, 0x59, 0x77, 0x7B, (byte) 0xCA, (byte) 0xC0, 0x00, 0x00, @@ -41,23 +35,10 @@ public class Bzip2DecoderTest { (byte) 0x89, (byte) 0x99, (byte) 0xC5, (byte) 0xDC, (byte) 0x91, 0x4E, 0x14, 0x24, 0x1D, (byte) 0xDE, (byte) 0xF2, (byte) 0xB0, 0x00 }; - private static final ThreadLocalRandom rand; - - private static final byte[] BYTES_SMALL = new byte[256]; - private static final byte[] BYTES_LARGE = new byte[MAX_BLOCK_SIZE * BASE_BLOCK_SIZE + 256]; - - static { - rand = ThreadLocalRandom.current(); - rand.nextBytes(BYTES_SMALL); - rand.nextBytes(BYTES_LARGE); + public Bzip2DecoderTest() throws Exception { } - @Rule - public ExpectedException expected = ExpectedException.none(); - - private EmbeddedChannel channel; - - @Before + @Override public void initChannel() { channel = new EmbeddedChannel(new Bzip2Decoder()); } @@ -123,19 +104,7 @@ public class Bzip2DecoderTest { final byte[] data = Arrays.copyOf(DATA, DATA.length); data[41] = (byte) 0xDD; - ByteBuf in = Unpooled.wrappedBuffer(data); - try { - channel.writeInbound(in); - } finally { - for (;;) { - ByteBuf inflated = channel.readInbound(); - if (inflated == null) { - break; - } - inflated.release(); - } - channel.finish(); - } + tryDecodeAndCatchBufLeaks(channel, Unpooled.wrappedBuffer(data)); } @Test @@ -186,75 +155,13 @@ public class Bzip2DecoderTest { channel.writeInbound(in); } - private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception { + @Override + protected byte[] compress(byte[] data) throws Exception { ByteArrayOutputStream os = new ByteArrayOutputStream(); - BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, randomBlockSize()); + BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, MIN_BLOCK_SIZE); bZip2Os.write(data); bZip2Os.close(); - ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray()); - channel.writeInbound(compressed); - - ByteBuf uncompressed = readUncompressed(channel); - ByteBuf dataBuf = Unpooled.wrappedBuffer(data); - - assertEquals(dataBuf, uncompressed); - - uncompressed.release(); - dataBuf.release(); - } - - @Test - public void testDecompressionOfSmallChunkOfData() throws Exception { - testDecompression(channel, BYTES_SMALL); - } - - @Test - public void testDecompressionOfLargeChunkOfData() throws Exception { - testDecompression(channel, BYTES_LARGE); - } - - @Test - public void testDecompressionOfBatchedFlowOfData() throws Exception { - final byte[] data = BYTES_LARGE; - - ByteArrayOutputStream os = new ByteArrayOutputStream(); - BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, randomBlockSize()); - bZip2Os.write(data); - bZip2Os.close(); - - final byte[] compressedArray = os.toByteArray(); - int written = 0, length = rand.nextInt(100); - while (written + length < compressedArray.length) { - ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length); - channel.writeInbound(compressed); - written += length; - length = rand.nextInt(100); - } - ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); - channel.writeInbound(compressed); - - ByteBuf uncompressed = readUncompressed(channel); - ByteBuf dataBuf = Unpooled.wrappedBuffer(data); - - assertEquals(dataBuf, uncompressed); - - uncompressed.release(); - dataBuf.release(); - } - - private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception { - CompositeByteBuf uncompressed = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = channel.readInbound()) != null) { - uncompressed.addComponent(msg); - uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes()); - } - - return uncompressed; - } - - private static int randomBlockSize() { - return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1); + return os.toByteArray(); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java index 4295d6e191..65c87c58dc 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java @@ -16,110 +16,41 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.internal.ThreadLocalRandom; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.junit.Before; -import org.junit.Test; -import java.io.ByteArrayInputStream; +import java.io.InputStream; import static io.netty.handler.codec.compression.Bzip2Constants.*; import static org.junit.Assert.*; -public class Bzip2EncoderTest { +public class Bzip2EncoderTest extends AbstractEncoderTest { - private static final ThreadLocalRandom rand; - - private static final byte[] BYTES_SMALL = new byte[256]; - private static final byte[] BYTES_LARGE = new byte[MAX_BLOCK_SIZE * BASE_BLOCK_SIZE + 256]; - - static { - rand = ThreadLocalRandom.current(); - rand.nextBytes(BYTES_SMALL); - rand.nextBytes(BYTES_LARGE); - } - - private EmbeddedChannel channel; - - @Before + @Override public void initChannel() { - channel = new EmbeddedChannel(new Bzip2Encoder(randomBlockSize())); + channel = new EmbeddedChannel(new Bzip2Encoder(MIN_BLOCK_SIZE)); } - private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { - ByteBuf in = Unpooled.wrappedBuffer(data); - channel.writeOutbound(in); - channel.finish(); + @Override + protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { + InputStream is = new ByteBufInputStream(compressed); + BZip2CompressorInputStream bzip2Is = new BZip2CompressorInputStream(is); - byte[] uncompressed = uncompress(channel, data.length); - - assertArrayEquals(data, uncompressed); - } - - @Test - public void testCompressionOfSmallChunkOfData() throws Exception { - testCompression(channel, BYTES_SMALL); - } - - @Test - public void testCompressionOfLargeChunkOfData() throws Exception { - testCompression(channel, BYTES_LARGE); - } - - @Test - public void testCompressionOfBatchedFlowOfData() throws Exception { - final byte[] data = BYTES_LARGE; - - int written = 0, length = rand.nextInt(100); - while (written + length < data.length) { - ByteBuf in = Unpooled.wrappedBuffer(data, written, length); - channel.writeOutbound(in); - written += length; - length = rand.nextInt(100); - } - ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); - channel.writeOutbound(in); - channel.finish(); - - byte[] uncompressed = uncompress(channel, data.length); - - assertArrayEquals(data, uncompressed); - } - - private static byte[] uncompress(EmbeddedChannel channel, int length) throws Exception { - CompositeByteBuf out = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = channel.readOutbound()) != null) { - out.addComponent(msg); - out.writerIndex(out.writerIndex() + msg.readableBytes()); - } - - byte[] compressed = new byte[out.readableBytes()]; - out.readBytes(compressed); - out.release(); - - ByteArrayInputStream is = new ByteArrayInputStream(compressed); - BZip2CompressorInputStream bZip2Is = new BZip2CompressorInputStream(is); - byte[] uncompressed = new byte[length]; - int remaining = length; + byte[] decompressed = new byte[originalLength]; + int remaining = originalLength; while (remaining > 0) { - int read = bZip2Is.read(uncompressed, length - remaining, remaining); + int read = bzip2Is.read(decompressed, originalLength - remaining, remaining); if (read > 0) { remaining -= read; } else { break; } } + assertEquals(-1, bzip2Is.read()); + bzip2Is.close(); - assertEquals(-1, bZip2Is.read()); - - return uncompressed; - } - - private static int randomBlockSize() { - return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1); + return Unpooled.wrappedBuffer(decompressed); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java index 1485f94df3..5326b13eb5 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java @@ -18,15 +18,15 @@ package io.netty.handler.codec.compression; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; -public class Bzip2IntegrationTest extends IntegrationTest { +public class Bzip2IntegrationTest extends AbstractIntegrationTest { @Override - protected EmbeddedChannel createEncoderEmbeddedChannel() { + protected EmbeddedChannel createEncoder() { return new EmbeddedChannel(new Bzip2Encoder()); } @Override - protected EmbeddedChannel createDecoderEmbeddedChannel() { + protected EmbeddedChannel createDecoder() { return new EmbeddedChannel(new Bzip2Decoder()); } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java index add8398c14..2bb074657e 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java @@ -19,118 +19,94 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.ReferenceCountUtil; import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; -public class FastLzIntegrationTest extends IntegrationTest { +public class FastLzIntegrationTest extends AbstractIntegrationTest { - public static class TestWithChecksum extends IntegrationTest { + public static class TestWithChecksum extends AbstractIntegrationTest { @Override - protected EmbeddedChannel createEncoderEmbeddedChannel() { + protected EmbeddedChannel createEncoder() { return new EmbeddedChannel(new FastLzFrameEncoder(true)); } @Override - protected EmbeddedChannel createDecoderEmbeddedChannel() { + protected EmbeddedChannel createDecoder() { return new EmbeddedChannel(new FastLzFrameDecoder(true)); } } - public static class TestRandomChecksum extends IntegrationTest { + public static class TestRandomChecksum extends AbstractIntegrationTest { @Override - protected EmbeddedChannel createEncoderEmbeddedChannel() { + protected EmbeddedChannel createEncoder() { return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean())); } @Override - protected EmbeddedChannel createDecoderEmbeddedChannel() { + protected EmbeddedChannel createDecoder() { return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean())); } } @Override - protected EmbeddedChannel createEncoderEmbeddedChannel() { + protected EmbeddedChannel createEncoder() { return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean())); } @Override - protected EmbeddedChannel createDecoderEmbeddedChannel() { + protected EmbeddedChannel createDecoder() { return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean())); } @Override // test batched flow of data protected void testIdentity(final byte[] data) { final ByteBuf original = Unpooled.wrappedBuffer(data); - final EmbeddedChannel encoder = createEncoderEmbeddedChannel(); - final EmbeddedChannel decoder = createDecoderEmbeddedChannel(); - try { - int written = 0, length = rand.nextInt(100); - while (written + length < data.length) { - ByteBuf in = Unpooled.wrappedBuffer(data, written, length); - encoder.writeOutbound(in); - written += length; - length = rand.nextInt(100); - } - ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); + int written = 0, length = rand.nextInt(100); + while (written + length < data.length) { + ByteBuf in = Unpooled.wrappedBuffer(data, written, length); encoder.writeOutbound(in); - encoder.finish(); - - ByteBuf msg; - final CompositeByteBuf compressed = Unpooled.compositeBuffer(); - while ((msg = encoder.readOutbound()) != null) { - compressed.addComponent(msg); - compressed.writerIndex(compressed.writerIndex() + msg.readableBytes()); - } - assertThat(compressed, is(notNullValue())); - - final byte[] compressedArray = new byte[compressed.readableBytes()]; - compressed.readBytes(compressedArray); - written = 0; + written += length; length = rand.nextInt(100); - while (written + length < compressedArray.length) { - in = Unpooled.wrappedBuffer(compressedArray, written, length); - decoder.writeInbound(in); - written += length; - length = rand.nextInt(100); - } - in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); - decoder.writeInbound(in); - - assertFalse(compressed.isReadable()); - final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); - while ((msg = decoder.readInbound()) != null) { - decompressed.addComponent(msg); - decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes()); - } - assertEquals(original, decompressed); - - compressed.release(); - decompressed.release(); - original.release(); - } finally { - encoder.close(); - decoder.close(); - - for (;;) { - Object msg = encoder.readOutbound(); - if (msg == null) { - break; - } - ReferenceCountUtil.release(msg); - } - - for (;;) { - Object msg = decoder.readInbound(); - if (msg == null) { - break; - } - ReferenceCountUtil.release(msg); - } } + ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); + encoder.writeOutbound(in); + encoder.finish(); + + ByteBuf msg; + final CompositeByteBuf compressed = Unpooled.compositeBuffer(); + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(msg); + compressed.writerIndex(compressed.writerIndex() + msg.readableBytes()); + } + assertThat(compressed, is(notNullValue())); + + final byte[] compressedArray = new byte[compressed.readableBytes()]; + compressed.readBytes(compressedArray); + written = 0; + length = rand.nextInt(100); + while (written + length < compressedArray.length) { + in = Unpooled.wrappedBuffer(compressedArray, written, length); + decoder.writeInbound(in); + written += length; + length = rand.nextInt(100); + } + in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); + decoder.writeInbound(in); + + assertFalse(compressed.isReadable()); + final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(msg); + decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes()); + } + assertEquals(original, decompressed); + + compressed.release(); + decompressed.release(); + original.release(); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameDecoderTest.java index bb926dc826..0d5235513c 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameDecoderTest.java @@ -16,23 +16,17 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.internal.ThreadLocalRandom; import net.jpountz.lz4.LZ4BlockOutputStream; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.io.ByteArrayOutputStream; import java.util.Arrays; import static io.netty.handler.codec.compression.Lz4Constants.*; -import static org.junit.Assert.*; -public class Lz4FrameDecoderTest { +public class Lz4FrameDecoderTest extends AbstractDecoderTest { private static final byte[] DATA = { 0x4C, 0x5A, 0x34, 0x42, 0x6C, 0x6F, 0x63, 0x6B, // magic bytes 0x16, // token @@ -44,28 +38,10 @@ public class Lz4FrameDecoderTest { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last empty block 0x00, 0x00, 0x00, 0x00 }; - private static final ThreadLocalRandom rand; - - private static final byte[] BYTES_SMALL = new byte[256]; - private static final byte[] BYTES_LARGE = new byte[256000]; - - static { - rand = ThreadLocalRandom.current(); - //fill arrays with compressible data - for (int i = 0; i < BYTES_SMALL.length; i++) { - BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } - for (int i = 0; i < BYTES_LARGE.length; i++) { - BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } + public Lz4FrameDecoderTest() throws Exception { } - @Rule - public ExpectedException expected = ExpectedException.none(); - - private EmbeddedChannel channel; - - @Before + @Override public void initChannel() { channel = new EmbeddedChannel(new Lz4FrameDecoder(true)); } @@ -150,90 +126,17 @@ public class Lz4FrameDecoderTest { final byte[] data = Arrays.copyOf(DATA, DATA.length); data[44] = 0x01; - ByteBuf in = Unpooled.wrappedBuffer(data); - try { - channel.writeInbound(in); - } finally { - for (;;) { - ByteBuf inflated = channel.readInbound(); - if (inflated == null) { - break; - } - inflated.release(); - } - channel.finish(); - } + tryDecodeAndCatchBufLeaks(channel, Unpooled.wrappedBuffer(data)); } - private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception { + @Override + protected byte[] compress(byte[] data) throws Exception { ByteArrayOutputStream os = new ByteArrayOutputStream(); - LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize()); + LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, + rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1)); lz4Os.write(data); lz4Os.close(); - ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray()); - channel.writeInbound(compressed); - - ByteBuf uncompressed = readUncompressed(channel); - ByteBuf dataBuf = Unpooled.wrappedBuffer(data); - - assertEquals(dataBuf, uncompressed); - - uncompressed.release(); - dataBuf.release(); - } - - @Test - public void testDecompressionOfSmallChunkOfData() throws Exception { - testDecompression(channel, BYTES_SMALL); - } - - @Test - public void testDecompressionOfLargeChunkOfData() throws Exception { - testDecompression(channel, BYTES_LARGE); - } - - @Test - public void testDecompressionOfBatchedFlowOfData() throws Exception { - final byte[] data = BYTES_LARGE; - - ByteArrayOutputStream os = new ByteArrayOutputStream(); - LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize()); - lz4Os.write(data); - lz4Os.close(); - - final byte[] compressedArray = os.toByteArray(); - int written = 0, length = rand.nextInt(100); - while (written + length < compressedArray.length) { - ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length); - channel.writeInbound(compressed); - written += length; - length = rand.nextInt(100); - } - ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); - channel.writeInbound(compressed); - - ByteBuf uncompressed = readUncompressed(channel); - ByteBuf dataBuf = Unpooled.wrappedBuffer(data); - - assertEquals(dataBuf, uncompressed); - - uncompressed.release(); - dataBuf.release(); - } - - private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception { - CompositeByteBuf uncompressed = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = channel.readInbound()) != null) { - uncompressed.addComponent(msg); - uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes()); - } - - return uncompressed; - } - - private static int randomBlockSize() { - return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1); + return os.toByteArray(); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java index b1c53e67dd..9be2d8eb40 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java @@ -16,108 +16,40 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.internal.ThreadLocalRandom; import net.jpountz.lz4.LZ4BlockInputStream; -import org.junit.Before; -import org.junit.Test; -import java.io.ByteArrayInputStream; +import java.io.InputStream; import static org.junit.Assert.*; -public class Lz4FrameEncoderTest { +public class Lz4FrameEncoderTest extends AbstractEncoderTest { - private static final ThreadLocalRandom rand; - - private static final byte[] BYTES_SMALL = new byte[256]; - private static final byte[] BYTES_LARGE = new byte[256000]; - - static { - rand = ThreadLocalRandom.current(); - //fill arrays with compressible data - for (int i = 0; i < BYTES_SMALL.length; i++) { - BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } - for (int i = 0; i < BYTES_LARGE.length; i++) { - BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } - } - - private EmbeddedChannel channel; - - @Before + @Override public void initChannel() { channel = new EmbeddedChannel(new Lz4FrameEncoder()); } - private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { - ByteBuf in = Unpooled.wrappedBuffer(data); - channel.writeOutbound(in); - channel.finish(); - - final byte[] uncompressed = uncompress(channel, data.length); - - assertArrayEquals(data, uncompressed); - } - - @Test - public void testCompressionOfSmallChunkOfData() throws Exception { - testCompression(channel, BYTES_SMALL); - } - - @Test - public void testCompressionOfLargeChunkOfData() throws Exception { - testCompression(channel, BYTES_LARGE); - } - - @Test - public void testCompressionOfBatchedFlowOfData() throws Exception { - final byte[] data = BYTES_LARGE; - - int written = 0, length = rand.nextInt(1, 100); - while (written + length < data.length) { - ByteBuf in = Unpooled.wrappedBuffer(data, written, length); - channel.writeOutbound(in); - written += length; - length = rand.nextInt(1, 100); - } - ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); - channel.writeOutbound(in); - channel.finish(); - - final byte[] uncompressed = uncompress(channel, data.length); - - assertArrayEquals(data, uncompressed); - } - - private static byte[] uncompress(EmbeddedChannel channel, int originalLength) throws Exception { - CompositeByteBuf out = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = channel.readOutbound()) != null) { - out.addComponent(msg); - out.writerIndex(out.writerIndex() + msg.readableBytes()); - } - - byte[] compressed = new byte[out.readableBytes()]; - out.readBytes(compressed); - out.release(); - - ByteArrayInputStream is = new ByteArrayInputStream(compressed); + @Override + protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { + InputStream is = new ByteBufInputStream(compressed); LZ4BlockInputStream lz4Is = new LZ4BlockInputStream(is); - byte[] uncompressed = new byte[originalLength]; + + byte[] decompressed = new byte[originalLength]; int remaining = originalLength; while (remaining > 0) { - int read = lz4Is.read(uncompressed, originalLength - remaining, remaining); + int read = lz4Is.read(decompressed, originalLength - remaining, remaining); if (read > 0) { remaining -= read; } else { break; } } + assertEquals(-1, lz4Is.read()); + lz4Is.close(); - return uncompressed; + return Unpooled.wrappedBuffer(decompressed); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameIntegrationTest.java index b125d6853b..dde01c70ea 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameIntegrationTest.java @@ -17,15 +17,15 @@ package io.netty.handler.codec.compression; import io.netty.channel.embedded.EmbeddedChannel; -public class Lz4FrameIntegrationTest extends IntegrationTest { +public class Lz4FrameIntegrationTest extends AbstractIntegrationTest { @Override - protected EmbeddedChannel createEncoderEmbeddedChannel() { + protected EmbeddedChannel createEncoder() { return new EmbeddedChannel(new Lz4FrameEncoder()); } @Override - protected EmbeddedChannel createDecoderEmbeddedChannel() { + protected EmbeddedChannel createDecoder() { return new EmbeddedChannel(new Lz4FrameDecoder()); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java index ee122639c1..327c505976 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java @@ -17,44 +17,18 @@ package io.netty.handler.codec.compression; import com.ning.compress.lzf.LZFEncoder; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.internal.ThreadLocalRandom; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; -import static com.ning.compress.lzf.LZFChunk.BYTE_Z; -import static com.ning.compress.lzf.LZFChunk.BYTE_V; -import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED; -import static org.junit.Assert.*; +import static com.ning.compress.lzf.LZFChunk.*; -public class LzfDecoderTest { +public class LzfDecoderTest extends AbstractDecoderTest { - private static final ThreadLocalRandom rand; - - private static final byte[] BYTES_SMALL = new byte[256]; - private static final byte[] BYTES_LARGE = new byte[256000]; - - static { - rand = ThreadLocalRandom.current(); - //fill arrays with compressible data - for (int i = 0; i < BYTES_SMALL.length; i++) { - BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } - for (int i = 0; i < BYTES_LARGE.length; i++) { - BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } + public LzfDecoderTest() throws Exception { } - @Rule - public ExpectedException expected = ExpectedException.none(); - - private EmbeddedChannel channel; - - @Before + @Override public void initChannel() { channel = new EmbeddedChannel(new LzfDecoder()); } @@ -86,63 +60,8 @@ public class LzfDecoderTest { channel.writeInbound(in); } - private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception { - byte[] compressedArray = LZFEncoder.encode(data); - ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray); - - channel.writeInbound(compressed); - - ByteBuf uncompressed = readUncompressed(channel); - ByteBuf dataBuf = Unpooled.wrappedBuffer(data); - - assertEquals(dataBuf, uncompressed); - - uncompressed.release(); - dataBuf.release(); - } - - @Test - public void testDecompressionOfSmallChunkOfData() throws Exception { - testDecompression(channel, BYTES_SMALL); - } - - @Test - public void testDecompressionOfLargeChunkOfData() throws Exception { - testDecompression(channel, BYTES_LARGE); - } - - @Test - public void testDecompressionOfBatchedFlowOfData() throws Exception { - final byte[] data = BYTES_LARGE; - - byte[] compressedArray = LZFEncoder.encode(data); - int written = 0, length = rand.nextInt(100); - while (written + length < compressedArray.length) { - ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length); - channel.writeInbound(compressed); - written += length; - length = rand.nextInt(100); - } - ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); - channel.writeInbound(compressed); - - ByteBuf uncompressed = readUncompressed(channel); - ByteBuf dataBuf = Unpooled.wrappedBuffer(data); - - assertEquals(dataBuf, uncompressed); - - uncompressed.release(); - dataBuf.release(); - } - - private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception { - CompositeByteBuf uncompressed = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = channel.readInbound()) != null) { - uncompressed.addComponent(msg); - uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes()); - } - - return uncompressed; + @Override + protected byte[] compress(byte[] data) throws Exception { + return LZFEncoder.encode(data); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java index 5fc0299941..be14e04e96 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java @@ -17,92 +17,22 @@ package io.netty.handler.codec.compression; import com.ning.compress.lzf.LZFDecoder; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.internal.ThreadLocalRandom; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; +public class LzfEncoderTest extends AbstractEncoderTest { -public class LzfEncoderTest { - - private static final ThreadLocalRandom rand; - - private static final byte[] BYTES_SMALL = new byte[256]; - private static final byte[] BYTES_LARGE = new byte[256000]; - - static { - rand = ThreadLocalRandom.current(); - //fill arrays with compressible data - for (int i = 0; i < BYTES_SMALL.length; i++) { - BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } - for (int i = 0; i < BYTES_LARGE.length; i++) { - BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); - } - } - - private EmbeddedChannel channel; - - @Before + @Override public void initChannel() { channel = new EmbeddedChannel(new LzfEncoder()); } - private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { - ByteBuf in = Unpooled.wrappedBuffer(data); - channel.writeOutbound(in); - channel.finish(); + @Override + protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { + byte[] compressedArray = new byte[compressed.readableBytes()]; + compressed.readBytes(compressedArray); - final byte[] uncompressed = uncompress(channel); - - assertArrayEquals(data, uncompressed); - } - - @Test - public void testCompressionOfSmallChunkOfData() throws Exception { - testCompression(channel, BYTES_SMALL); - } - - @Test - public void testCompressionOfLargeChunkOfData() throws Exception { - testCompression(channel, BYTES_LARGE); - } - - @Test - public void testCompressionOfBatchedFlowOfData() throws Exception { - final byte[] data = BYTES_LARGE; - - int written = 0, length = rand.nextInt(100); - while (written + length < data.length) { - ByteBuf in = Unpooled.wrappedBuffer(data, written, length); - channel.writeOutbound(in); - written += length; - length = rand.nextInt(100); - } - ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); - channel.writeOutbound(in); - channel.finish(); - - final byte[] uncompressed = uncompress(channel); - - assertArrayEquals(data, uncompressed); - } - - private static byte[] uncompress(EmbeddedChannel channel) throws Exception { - CompositeByteBuf out = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = channel.readOutbound()) != null) { - out.addComponent(msg); - out.writerIndex(out.writerIndex() + msg.readableBytes()); - } - - byte[] compressed = new byte[out.readableBytes()]; - out.readBytes(compressed); - out.release(); - - return LZFDecoder.decode(compressed); + byte[] decompressed = LZFDecoder.decode(compressedArray); + return Unpooled.wrappedBuffer(decompressed); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfIntegrationTest.java index 4904d46e4b..9e71c152a0 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/LzfIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfIntegrationTest.java @@ -17,15 +17,15 @@ package io.netty.handler.codec.compression; import io.netty.channel.embedded.EmbeddedChannel; -public class LzfIntegrationTest extends IntegrationTest { +public class LzfIntegrationTest extends AbstractIntegrationTest { @Override - protected EmbeddedChannel createEncoderEmbeddedChannel() { + protected EmbeddedChannel createEncoder() { return new EmbeddedChannel(new LzfEncoder()); } @Override - protected EmbeddedChannel createDecoderEmbeddedChannel() { + protected EmbeddedChannel createDecoder() { return new EmbeddedChannel(new LzfDecoder()); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java index a1bee8e94b..8f4fb7b39c 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java @@ -20,126 +20,82 @@ import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.internal.ThreadLocalRandom; import lzma.sdk.lzma.Decoder; import lzma.streams.LzmaInputStream; -import org.junit.Before; -import org.junit.Test; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theory; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import static org.junit.Assert.*; -public class LzmaFrameEncoderTest { +public class LzmaFrameEncoderTest extends AbstractEncoderTest { - private static final ThreadLocalRandom rand; - - private static final byte[] BYTES_SMALL = new byte[256]; - private static final byte[] BYTES_LARGE = new byte[256000]; - - static { - rand = ThreadLocalRandom.current(); - rand.nextBytes(BYTES_SMALL); - rand.nextBytes(BYTES_LARGE); - } - - private EmbeddedChannel channel; - - @Before + @Override public void initChannel() { channel = new EmbeddedChannel(new LzmaFrameEncoder()); } - private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { - ByteBuf in = Unpooled.wrappedBuffer(data); - assertTrue(channel.writeOutbound(in)); - assertTrue(channel.finish()); - - byte[] uncompressed = uncompress(channel, data.length); - - assertArrayEquals(data, uncompressed); + @Theory + @Override + public void testCompressionOfBatchedFlowOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception { + testCompressionOfBatchedFlow(data); } - @Test - public void testCompressionOfSmallChunkOfData() throws Exception { - testCompression(channel, BYTES_SMALL); - } - - @Test - public void testCompressionOfLargeChunkOfData() throws Exception { - testCompression(channel, BYTES_LARGE); - } - - @Test - public void testCompressionOfBatchedFlowOfData() throws Exception { - final byte[] data = BYTES_SMALL; - + @Override + protected void testCompressionOfBatchedFlow(final ByteBuf data) throws Exception { + List originalLengths = new ArrayList(); + final int dataLength = data.readableBytes(); int written = 0, length = rand.nextInt(50); - while (written + length < data.length) { - ByteBuf in = Unpooled.wrappedBuffer(data, written, length); - assertTrue(channel.writeOutbound(in)); + while (written + length < dataLength) { + ByteBuf in = data.slice(written, length); + assertTrue(channel.writeOutbound(in.retain())); written += length; + originalLengths.add(length); length = rand.nextInt(50); } - ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); - assertTrue(channel.writeOutbound(in)); + length = dataLength - written; + ByteBuf in = data.slice(written, dataLength - written); + originalLengths.add(length); + assertTrue(channel.writeOutbound(in.retain())); assertTrue(channel.finish()); - byte[] uncompressed = new byte[data.length]; - int outOffset = 0; - + CompositeByteBuf decompressed = Unpooled.compositeBuffer(); ByteBuf msg; + int i = 0; while ((msg = channel.readOutbound()) != null) { - InputStream is = new ByteBufInputStream(msg); - LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder()); - for (;;) { - int read = lzmaIs.read(uncompressed, outOffset, data.length - outOffset); - if (read > 0) { - outOffset += read; - } else { - break; - } - } - assertEquals(0, is.available()); - assertEquals(-1, is.read()); - - is.close(); - lzmaIs.close(); + ByteBuf decompressedMsg = decompress(msg, originalLengths.get(i++)); + decompressed.addComponent(decompressedMsg); + decompressed.writerIndex(decompressed.writerIndex() + decompressedMsg.readableBytes()); msg.release(); } + assertEquals(originalLengths.size(), i); + assertEquals(data, decompressed); - assertArrayEquals(data, uncompressed); + decompressed.release(); + data.release(); } - private static byte[] uncompress(EmbeddedChannel channel, int length) throws Exception { - CompositeByteBuf out = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = channel.readOutbound()) != null) { - out.addComponent(msg); - out.writerIndex(out.writerIndex() + msg.readableBytes()); - } - - InputStream is = new ByteBufInputStream(out); + @Override + protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { + InputStream is = new ByteBufInputStream(compressed); LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder()); - byte[] uncompressed = new byte[length]; - int remaining = length; + + byte[] decompressed = new byte[originalLength]; + int remaining = originalLength; while (remaining > 0) { - int read = lzmaIs.read(uncompressed, length - remaining, remaining); + int read = lzmaIs.read(decompressed, originalLength - remaining, remaining); if (read > 0) { remaining -= read; } else { break; } } - - assertEquals(0, is.available()); - assertEquals(-1, is.read()); assertEquals(-1, lzmaIs.read()); - - is.close(); lzmaIs.close(); - out.release(); - return uncompressed; + return Unpooled.wrappedBuffer(decompressed); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/SnappyFrameEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/SnappyFrameEncoderTest.java index 6dbcb1ee74..5947a93b11 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/SnappyFrameEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/SnappyFrameEncoderTest.java @@ -77,9 +77,9 @@ public class SnappyFrameEncoderTest { 'n', 'e', 't', 't', 'y' }); - channel.writeOutbound(in.copy()); - in.readerIndex(0); // rewind the buffer to write the same data - channel.writeOutbound(in.copy()); + channel.writeOutbound(in.retain()); + in.resetReaderIndex(); // rewind the buffer to write the same data + channel.writeOutbound(in); assertTrue(channel.finish()); ByteBuf expected = Unpooled.wrappedBuffer(new byte[] { @@ -98,7 +98,6 @@ public class SnappyFrameEncoderTest { actual.writerIndex(actual.writerIndex() + m.readableBytes()); } assertEquals(releaseLater(expected), releaseLater(actual)); - in.release(); } /** diff --git a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java index 63a3886f41..65317dd7cf 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java @@ -20,7 +20,7 @@ import org.junit.Test; import java.util.Random; -public class SnappyIntegrationTest extends IntegrationTest { +public class SnappyIntegrationTest extends AbstractIntegrationTest { /** * The number of random regression tests run by testRandom() runs. Whenever testRandom() finds the case that @@ -32,12 +32,12 @@ public class SnappyIntegrationTest extends IntegrationTest { private static final int RANDOM_RUNS = 1; @Override - protected EmbeddedChannel createEncoderEmbeddedChannel() { + protected EmbeddedChannel createEncoder() { return new EmbeddedChannel(new SnappyFrameEncoder()); } @Override - protected EmbeddedChannel createDecoderEmbeddedChannel() { + protected EmbeddedChannel createDecoder() { return new EmbeddedChannel(new SnappyFrameDecoder()); } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java b/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java index fc68f5ff86..9afe53859e 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java @@ -98,13 +98,12 @@ public abstract class ZlibTest { EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP)); try { - chDecoderGZip.writeInbound(deflatedData.copy()); + chDecoderGZip.writeInbound(deflatedData); assertTrue(chDecoderGZip.finish()); ByteBuf buf = chDecoderGZip.readInbound(); assertEquals(buf, data); assertNull(chDecoderGZip.readInbound()); data.release(); - deflatedData.release(); buf.release(); } finally { dispose(chDecoderGZip); @@ -116,8 +115,9 @@ public abstract class ZlibTest { EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); try { - chEncoder.writeOutbound(data.copy()); + chEncoder.writeOutbound(data.retain()); chEncoder.flush(); + data.resetReaderIndex(); for (;;) { ByteBuf deflatedData = chEncoder.readOutbound(); diff --git a/pom.xml b/pom.xml index 0ba881894e..71f9f4f400 100644 --- a/pom.xml +++ b/pom.xml @@ -798,7 +798,7 @@ test - + org.apache.commons commons-compress