Refactor tests for compression codecs

Motivation:

Too many duplicated code of tests for different compression codecs.

Modifications:

- Added abstract classes AbstractCompressionTest, AbstractDecoderTest and AbstractEncoderTest which contains common variables and tests for any compression codec.
- Removed common tests which are implemented in AbstractDecoderTest and AbstractEncoderTest from current tests for compression codecs.
- Implemented abstract methods of AbstractDecoderTest and AbstractEncoderTest in current tests for compression codecs.
- Added additional checks for current tests.
- Renamed abstract class IntegrationTest to AbstractIntegrationTest.
- Used Theories to run tests with head and direct buffers.
- Removed code duplicates.

Result:

Removed duplicated code of tests for compression codecs and simplified an addition of tests for new compression codecs.
This commit is contained in:
Idel Pivnitskiy 2015-03-18 15:41:08 +03:00 committed by Norman Maurer
parent 6496d2dbc7
commit c9adb41636
20 changed files with 528 additions and 762 deletions

View File

@ -77,7 +77,7 @@
<scope>test</scope>
</dependency>
<!-- Test dependency for Bzip2Decoder -->
<!-- Test dependency for Bzip2 compression codec -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>

View File

@ -0,0 +1,38 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.util.internal.ThreadLocalRandom;
public abstract class AbstractCompressionTest {
protected static final ThreadLocalRandom rand;
protected static final byte[] BYTES_SMALL = new byte[256];
protected static final byte[] BYTES_LARGE = new byte[256 * 1024];
static {
rand = ThreadLocalRandom.current();
fillArrayWithCompressibleData(BYTES_SMALL);
fillArrayWithCompressibleData(BYTES_LARGE);
}
private static void fillArrayWithCompressibleData(byte[] array) {
for (int i = 0; i < array.length; i++) {
array[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Before;
import org.junit.Rule;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.FromDataPoints;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import static org.junit.Assert.*;
@RunWith(Theories.class)
public abstract class AbstractDecoderTest extends AbstractCompressionTest {
protected static final ByteBuf WRAPPED_BYTES_SMALL;
protected static final ByteBuf WRAPPED_BYTES_LARGE;
static {
WRAPPED_BYTES_SMALL = Unpooled.wrappedBuffer(BYTES_SMALL);
WRAPPED_BYTES_LARGE = Unpooled.wrappedBuffer(BYTES_LARGE);
}
@Rule
public final ExpectedException expected = ExpectedException.none();
protected EmbeddedChannel channel;
protected static byte[] compressedBytesSmall;
protected static byte[] compressedBytesLarge;
protected AbstractDecoderTest() throws Exception {
compressedBytesSmall = compress(BYTES_SMALL);
compressedBytesLarge = compress(BYTES_LARGE);
}
/**
* Compresses data with some external library.
*/
protected abstract byte[] compress(byte[] data) throws Exception;
@Before
public abstract void initChannel();
@DataPoints("smallData")
public static ByteBuf[] smallData() {
ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesSmall);
ByteBuf direct = Unpooled.directBuffer(compressedBytesSmall.length);
direct.writeBytes(compressedBytesSmall);
return new ByteBuf[] {heap, direct};
}
@DataPoints("largeData")
public static ByteBuf[] largeData() {
ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesLarge);
ByteBuf direct = Unpooled.directBuffer(compressedBytesLarge.length);
direct.writeBytes(compressedBytesLarge);
return new ByteBuf[] {heap, direct};
}
@Theory
public void testDecompressionOfSmallChunkOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception {
testDecompression(WRAPPED_BYTES_SMALL, data);
}
@Theory
public void testDecompressionOfLargeChunkOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testDecompression(WRAPPED_BYTES_LARGE, data);
}
@Theory
public void testDecompressionOfBatchedFlowOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testDecompressionOfBatchedFlow(WRAPPED_BYTES_LARGE, data);
}
protected void testDecompression(final ByteBuf expected, final ByteBuf data) throws Exception {
assertTrue(channel.writeInbound(data));
ByteBuf decompressed = readDecompressed(channel);
assertEquals(expected, decompressed);
decompressed.release();
}
protected void testDecompressionOfBatchedFlow(final ByteBuf expected, final ByteBuf data) throws Exception {
final int compressedLength = data.readableBytes();
int written = 0, length = rand.nextInt(100);
while (written + length < compressedLength) {
ByteBuf compressedBuf = data.slice(written, length);
channel.writeInbound(compressedBuf.retain());
written += length;
length = rand.nextInt(100);
}
ByteBuf compressedBuf = data.slice(written, compressedLength - written);
assertTrue(channel.writeInbound(compressedBuf.retain()));
ByteBuf decompressedBuf = readDecompressed(channel);
assertEquals(expected, decompressedBuf);
decompressedBuf.release();
data.release();
}
protected static ByteBuf readDecompressed(final EmbeddedChannel channel) {
CompositeByteBuf decompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
return decompressed;
}
protected static void tryDecodeAndCatchBufLeaks(final EmbeddedChannel channel, final ByteBuf data) {
try {
channel.writeInbound(data);
} finally {
for (;;) {
ByteBuf inflated = channel.readInbound();
if (inflated == null) {
break;
}
inflated.release();
}
channel.finish();
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Before;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.FromDataPoints;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
import static org.junit.Assert.*;
@RunWith(Theories.class)
public abstract class AbstractEncoderTest extends AbstractCompressionTest {
protected EmbeddedChannel channel;
/**
* Decompresses data with some external library.
*/
protected abstract ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception;
@Before
public abstract void initChannel();
@DataPoints("smallData")
public static ByteBuf[] smallData() {
ByteBuf heap = Unpooled.wrappedBuffer(BYTES_SMALL);
ByteBuf direct = Unpooled.directBuffer(BYTES_SMALL.length);
direct.writeBytes(BYTES_SMALL);
return new ByteBuf[] {heap, direct};
}
@DataPoints("largeData")
public static ByteBuf[] largeData() {
ByteBuf heap = Unpooled.wrappedBuffer(BYTES_LARGE);
ByteBuf direct = Unpooled.directBuffer(BYTES_LARGE.length);
direct.writeBytes(BYTES_LARGE);
return new ByteBuf[] {heap, direct};
}
@Theory
public void testCompressionOfSmallChunkOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception {
testCompression(data);
}
@Theory
public void testCompressionOfLargeChunkOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testCompression(data);
}
@Theory
public void testCompressionOfBatchedFlowOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testCompressionOfBatchedFlow(data);
}
protected void testCompression(final ByteBuf data) throws Exception {
final int dataLength = data.readableBytes();
assertTrue(channel.writeOutbound(data.retain()));
assertTrue(channel.finish());
ByteBuf decompressed = readDecompressed(dataLength);
assertEquals(data.resetReaderIndex(), decompressed);
decompressed.release();
data.release();
}
protected void testCompressionOfBatchedFlow(final ByteBuf data) throws Exception {
final int dataLength = data.readableBytes();
int written = 0, length = rand.nextInt(100);
while (written + length < dataLength) {
ByteBuf in = data.slice(written, length);
assertTrue(channel.writeOutbound(in.retain()));
written += length;
length = rand.nextInt(100);
}
ByteBuf in = data.slice(written, dataLength - written);
assertTrue(channel.writeOutbound(in.retain()));
assertTrue(channel.finish());
ByteBuf decompressed = readDecompressed(dataLength);
assertEquals(data, decompressed);
decompressed.release();
data.release();
}
protected ByteBuf readDecompressed(final int dataLength) throws Exception {
CompositeByteBuf compressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
ByteBuf decompressed = decompress(compressed, dataLength);
compressed.release();
return decompressed;
}
}

View File

@ -23,6 +23,8 @@ import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
@ -30,12 +32,42 @@ import java.util.Arrays;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public abstract class IntegrationTest {
public abstract class AbstractIntegrationTest {
protected static final ThreadLocalRandom rand = ThreadLocalRandom.current();
protected abstract EmbeddedChannel createEncoderEmbeddedChannel();
protected abstract EmbeddedChannel createDecoderEmbeddedChannel();
protected EmbeddedChannel encoder;
protected EmbeddedChannel decoder;
protected abstract EmbeddedChannel createEncoder();
protected abstract EmbeddedChannel createDecoder();
@Before
public void initChannels() throws Exception {
encoder = createEncoder();
decoder = createDecoder();
}
@After
public void closeChannels() throws Exception {
encoder.close();
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
decoder.close();
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
}
@Test
public void testEmpty() throws Exception {
@ -112,51 +144,28 @@ public abstract class IntegrationTest {
protected void testIdentity(final byte[] data) {
final ByteBuf in = Unpooled.wrappedBuffer(data);
final EmbeddedChannel encoder = createEncoderEmbeddedChannel();
final EmbeddedChannel decoder = createDecoderEmbeddedChannel();
try {
ByteBuf msg;
assertTrue(encoder.writeOutbound(in.retain()));
assertTrue(encoder.finish());
encoder.writeOutbound(in.copy());
encoder.finish();
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(in, decompressed);
compressed.release();
decompressed.release();
in.release();
} finally {
encoder.close();
decoder.close();
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(in.resetReaderIndex(), decompressed);
compressed.release();
decompressed.release();
in.release();
}
}

View File

@ -16,23 +16,17 @@
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import static io.netty.handler.codec.compression.Bzip2Constants.*;
import static org.junit.Assert.*;
public class Bzip2DecoderTest {
public class Bzip2DecoderTest extends AbstractDecoderTest {
private static final byte[] DATA = { 0x42, 0x5A, 0x68, 0x37, 0x31, 0x41, 0x59, 0x26, 0x53,
0x59, 0x77, 0x7B, (byte) 0xCA, (byte) 0xC0, 0x00, 0x00,
@ -41,23 +35,10 @@ public class Bzip2DecoderTest {
(byte) 0x89, (byte) 0x99, (byte) 0xC5, (byte) 0xDC, (byte) 0x91,
0x4E, 0x14, 0x24, 0x1D, (byte) 0xDE, (byte) 0xF2, (byte) 0xB0, 0x00 };
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[MAX_BLOCK_SIZE * BASE_BLOCK_SIZE + 256];
static {
rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
public Bzip2DecoderTest() throws Exception {
}
@Rule
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
@Before
@Override
public void initChannel() {
channel = new EmbeddedChannel(new Bzip2Decoder());
}
@ -123,19 +104,7 @@ public class Bzip2DecoderTest {
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[41] = (byte) 0xDD;
ByteBuf in = Unpooled.wrappedBuffer(data);
try {
channel.writeInbound(in);
} finally {
for (;;) {
ByteBuf inflated = channel.readInbound();
if (inflated == null) {
break;
}
inflated.release();
}
channel.finish();
}
tryDecodeAndCatchBufLeaks(channel, Unpooled.wrappedBuffer(data));
}
@Test
@ -186,75 +155,13 @@ public class Bzip2DecoderTest {
channel.writeInbound(in);
}
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
@Override
protected byte[] compress(byte[] data) throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream();
BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, randomBlockSize());
BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, MIN_BLOCK_SIZE);
bZip2Os.write(data);
bZip2Os.close();
ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray());
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
@Test
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
}
@Test
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
}
@Test
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
ByteArrayOutputStream os = new ByteArrayOutputStream();
BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, randomBlockSize());
bZip2Os.write(data);
bZip2Os.close();
final byte[] compressedArray = os.toByteArray();
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
channel.writeInbound(compressed);
written += length;
length = rand.nextInt(100);
}
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.addComponent(msg);
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
}
return uncompressed;
}
private static int randomBlockSize() {
return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1);
return os.toByteArray();
}
}

View File

@ -16,110 +16,41 @@
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import static io.netty.handler.codec.compression.Bzip2Constants.*;
import static org.junit.Assert.*;
public class Bzip2EncoderTest {
public class Bzip2EncoderTest extends AbstractEncoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[MAX_BLOCK_SIZE * BASE_BLOCK_SIZE + 256];
static {
rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
private EmbeddedChannel channel;
@Before
@Override
public void initChannel() {
channel = new EmbeddedChannel(new Bzip2Encoder(randomBlockSize()));
channel = new EmbeddedChannel(new Bzip2Encoder(MIN_BLOCK_SIZE));
}
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeOutbound(in);
channel.finish();
@Override
protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
InputStream is = new ByteBufInputStream(compressed);
BZip2CompressorInputStream bzip2Is = new BZip2CompressorInputStream(is);
byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
channel.writeOutbound(in);
channel.finish();
byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel, int length) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
ByteArrayInputStream is = new ByteArrayInputStream(compressed);
BZip2CompressorInputStream bZip2Is = new BZip2CompressorInputStream(is);
byte[] uncompressed = new byte[length];
int remaining = length;
byte[] decompressed = new byte[originalLength];
int remaining = originalLength;
while (remaining > 0) {
int read = bZip2Is.read(uncompressed, length - remaining, remaining);
int read = bzip2Is.read(decompressed, originalLength - remaining, remaining);
if (read > 0) {
remaining -= read;
} else {
break;
}
}
assertEquals(-1, bzip2Is.read());
bzip2Is.close();
assertEquals(-1, bZip2Is.read());
return uncompressed;
}
private static int randomBlockSize() {
return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1);
return Unpooled.wrappedBuffer(decompressed);
}
}

View File

@ -18,15 +18,15 @@ package io.netty.handler.codec.compression;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
public class Bzip2IntegrationTest extends IntegrationTest {
public class Bzip2IntegrationTest extends AbstractIntegrationTest {
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new Bzip2Encoder());
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new Bzip2Decoder());
}

View File

@ -19,118 +19,94 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class FastLzIntegrationTest extends IntegrationTest {
public class FastLzIntegrationTest extends AbstractIntegrationTest {
public static class TestWithChecksum extends IntegrationTest {
public static class TestWithChecksum extends AbstractIntegrationTest {
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new FastLzFrameEncoder(true));
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new FastLzFrameDecoder(true));
}
}
public static class TestRandomChecksum extends IntegrationTest {
public static class TestRandomChecksum extends AbstractIntegrationTest {
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean()));
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean()));
}
}
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean()));
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean()));
}
@Override // test batched flow of data
protected void testIdentity(final byte[] data) {
final ByteBuf original = Unpooled.wrappedBuffer(data);
final EmbeddedChannel encoder = createEncoderEmbeddedChannel();
final EmbeddedChannel decoder = createDecoderEmbeddedChannel();
try {
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
encoder.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
encoder.writeOutbound(in);
encoder.finish();
ByteBuf msg;
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
final byte[] compressedArray = new byte[compressed.readableBytes()];
compressed.readBytes(compressedArray);
written = 0;
written += length;
length = rand.nextInt(100);
while (written + length < compressedArray.length) {
in = Unpooled.wrappedBuffer(compressedArray, written, length);
decoder.writeInbound(in);
written += length;
length = rand.nextInt(100);
}
in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
decoder.writeInbound(in);
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(original, decompressed);
compressed.release();
decompressed.release();
original.release();
} finally {
encoder.close();
decoder.close();
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
encoder.writeOutbound(in);
encoder.finish();
ByteBuf msg;
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
final byte[] compressedArray = new byte[compressed.readableBytes()];
compressed.readBytes(compressedArray);
written = 0;
length = rand.nextInt(100);
while (written + length < compressedArray.length) {
in = Unpooled.wrappedBuffer(compressedArray, written, length);
decoder.writeInbound(in);
written += length;
length = rand.nextInt(100);
}
in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
decoder.writeInbound(in);
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(original, decompressed);
compressed.release();
decompressed.release();
original.release();
}
}

View File

@ -16,23 +16,17 @@
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import static io.netty.handler.codec.compression.Lz4Constants.*;
import static org.junit.Assert.*;
public class Lz4FrameDecoderTest {
public class Lz4FrameDecoderTest extends AbstractDecoderTest {
private static final byte[] DATA = { 0x4C, 0x5A, 0x34, 0x42, 0x6C, 0x6F, 0x63, 0x6B, // magic bytes
0x16, // token
@ -44,28 +38,10 @@ public class Lz4FrameDecoderTest {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last empty block
0x00, 0x00, 0x00, 0x00 };
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
public Lz4FrameDecoderTest() throws Exception {
}
@Rule
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
@Before
@Override
public void initChannel() {
channel = new EmbeddedChannel(new Lz4FrameDecoder(true));
}
@ -150,90 +126,17 @@ public class Lz4FrameDecoderTest {
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[44] = 0x01;
ByteBuf in = Unpooled.wrappedBuffer(data);
try {
channel.writeInbound(in);
} finally {
for (;;) {
ByteBuf inflated = channel.readInbound();
if (inflated == null) {
break;
}
inflated.release();
}
channel.finish();
}
tryDecodeAndCatchBufLeaks(channel, Unpooled.wrappedBuffer(data));
}
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
@Override
protected byte[] compress(byte[] data) throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream();
LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize());
LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os,
rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1));
lz4Os.write(data);
lz4Os.close();
ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray());
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
@Test
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
}
@Test
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
}
@Test
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
ByteArrayOutputStream os = new ByteArrayOutputStream();
LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize());
lz4Os.write(data);
lz4Os.close();
final byte[] compressedArray = os.toByteArray();
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
channel.writeInbound(compressed);
written += length;
length = rand.nextInt(100);
}
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.addComponent(msg);
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
}
return uncompressed;
}
private static int randomBlockSize() {
return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1);
return os.toByteArray();
}
}

View File

@ -16,108 +16,40 @@
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import net.jpountz.lz4.LZ4BlockInputStream;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import static org.junit.Assert.*;
public class Lz4FrameEncoderTest {
public class Lz4FrameEncoderTest extends AbstractEncoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
private EmbeddedChannel channel;
@Before
@Override
public void initChannel() {
channel = new EmbeddedChannel(new Lz4FrameEncoder());
}
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeOutbound(in);
channel.finish();
final byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(1, 100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(1, 100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
channel.writeOutbound(in);
channel.finish();
final byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel, int originalLength) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
ByteArrayInputStream is = new ByteArrayInputStream(compressed);
@Override
protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
InputStream is = new ByteBufInputStream(compressed);
LZ4BlockInputStream lz4Is = new LZ4BlockInputStream(is);
byte[] uncompressed = new byte[originalLength];
byte[] decompressed = new byte[originalLength];
int remaining = originalLength;
while (remaining > 0) {
int read = lz4Is.read(uncompressed, originalLength - remaining, remaining);
int read = lz4Is.read(decompressed, originalLength - remaining, remaining);
if (read > 0) {
remaining -= read;
} else {
break;
}
}
assertEquals(-1, lz4Is.read());
lz4Is.close();
return uncompressed;
return Unpooled.wrappedBuffer(decompressed);
}
}

View File

@ -17,15 +17,15 @@ package io.netty.handler.codec.compression;
import io.netty.channel.embedded.EmbeddedChannel;
public class Lz4FrameIntegrationTest extends IntegrationTest {
public class Lz4FrameIntegrationTest extends AbstractIntegrationTest {
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new Lz4FrameEncoder());
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new Lz4FrameDecoder());
}
}

View File

@ -17,44 +17,18 @@ package io.netty.handler.codec.compression;
import com.ning.compress.lzf.LZFEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static com.ning.compress.lzf.LZFChunk.BYTE_Z;
import static com.ning.compress.lzf.LZFChunk.BYTE_V;
import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED;
import static org.junit.Assert.*;
import static com.ning.compress.lzf.LZFChunk.*;
public class LzfDecoderTest {
public class LzfDecoderTest extends AbstractDecoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
public LzfDecoderTest() throws Exception {
}
@Rule
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
@Before
@Override
public void initChannel() {
channel = new EmbeddedChannel(new LzfDecoder());
}
@ -86,63 +60,8 @@ public class LzfDecoderTest {
channel.writeInbound(in);
}
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
byte[] compressedArray = LZFEncoder.encode(data);
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
@Test
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
}
@Test
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
}
@Test
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
byte[] compressedArray = LZFEncoder.encode(data);
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
channel.writeInbound(compressed);
written += length;
length = rand.nextInt(100);
}
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.addComponent(msg);
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
}
return uncompressed;
@Override
protected byte[] compress(byte[] data) throws Exception {
return LZFEncoder.encode(data);
}
}

View File

@ -17,92 +17,22 @@ package io.netty.handler.codec.compression;
import com.ning.compress.lzf.LZFDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class LzfEncoderTest extends AbstractEncoderTest {
public class LzfEncoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
private EmbeddedChannel channel;
@Before
@Override
public void initChannel() {
channel = new EmbeddedChannel(new LzfEncoder());
}
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeOutbound(in);
channel.finish();
@Override
protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
byte[] compressedArray = new byte[compressed.readableBytes()];
compressed.readBytes(compressedArray);
final byte[] uncompressed = uncompress(channel);
assertArrayEquals(data, uncompressed);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
channel.writeOutbound(in);
channel.finish();
final byte[] uncompressed = uncompress(channel);
assertArrayEquals(data, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
return LZFDecoder.decode(compressed);
byte[] decompressed = LZFDecoder.decode(compressedArray);
return Unpooled.wrappedBuffer(decompressed);
}
}

View File

@ -17,15 +17,15 @@ package io.netty.handler.codec.compression;
import io.netty.channel.embedded.EmbeddedChannel;
public class LzfIntegrationTest extends IntegrationTest {
public class LzfIntegrationTest extends AbstractIntegrationTest {
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new LzfEncoder());
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new LzfDecoder());
}
}

View File

@ -20,126 +20,82 @@ import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.theories.FromDataPoints;
import org.junit.experimental.theories.Theory;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.*;
public class LzmaFrameEncoderTest {
public class LzmaFrameEncoderTest extends AbstractEncoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
private EmbeddedChannel channel;
@Before
@Override
public void initChannel() {
channel = new EmbeddedChannel(new LzmaFrameEncoder());
}
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(data);
assertTrue(channel.writeOutbound(in));
assertTrue(channel.finish());
byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
@Theory
@Override
public void testCompressionOfBatchedFlowOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception {
testCompressionOfBatchedFlow(data);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_SMALL;
@Override
protected void testCompressionOfBatchedFlow(final ByteBuf data) throws Exception {
List<Integer> originalLengths = new ArrayList<Integer>();
final int dataLength = data.readableBytes();
int written = 0, length = rand.nextInt(50);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
assertTrue(channel.writeOutbound(in));
while (written + length < dataLength) {
ByteBuf in = data.slice(written, length);
assertTrue(channel.writeOutbound(in.retain()));
written += length;
originalLengths.add(length);
length = rand.nextInt(50);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
assertTrue(channel.writeOutbound(in));
length = dataLength - written;
ByteBuf in = data.slice(written, dataLength - written);
originalLengths.add(length);
assertTrue(channel.writeOutbound(in.retain()));
assertTrue(channel.finish());
byte[] uncompressed = new byte[data.length];
int outOffset = 0;
CompositeByteBuf decompressed = Unpooled.compositeBuffer();
ByteBuf msg;
int i = 0;
while ((msg = channel.readOutbound()) != null) {
InputStream is = new ByteBufInputStream(msg);
LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder());
for (;;) {
int read = lzmaIs.read(uncompressed, outOffset, data.length - outOffset);
if (read > 0) {
outOffset += read;
} else {
break;
}
}
assertEquals(0, is.available());
assertEquals(-1, is.read());
is.close();
lzmaIs.close();
ByteBuf decompressedMsg = decompress(msg, originalLengths.get(i++));
decompressed.addComponent(decompressedMsg);
decompressed.writerIndex(decompressed.writerIndex() + decompressedMsg.readableBytes());
msg.release();
}
assertEquals(originalLengths.size(), i);
assertEquals(data, decompressed);
assertArrayEquals(data, uncompressed);
decompressed.release();
data.release();
}
private static byte[] uncompress(EmbeddedChannel channel, int length) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
InputStream is = new ByteBufInputStream(out);
@Override
protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
InputStream is = new ByteBufInputStream(compressed);
LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder());
byte[] uncompressed = new byte[length];
int remaining = length;
byte[] decompressed = new byte[originalLength];
int remaining = originalLength;
while (remaining > 0) {
int read = lzmaIs.read(uncompressed, length - remaining, remaining);
int read = lzmaIs.read(decompressed, originalLength - remaining, remaining);
if (read > 0) {
remaining -= read;
} else {
break;
}
}
assertEquals(0, is.available());
assertEquals(-1, is.read());
assertEquals(-1, lzmaIs.read());
is.close();
lzmaIs.close();
out.release();
return uncompressed;
return Unpooled.wrappedBuffer(decompressed);
}
}

View File

@ -77,9 +77,9 @@ public class SnappyFrameEncoderTest {
'n', 'e', 't', 't', 'y'
});
channel.writeOutbound(in.copy());
in.readerIndex(0); // rewind the buffer to write the same data
channel.writeOutbound(in.copy());
channel.writeOutbound(in.retain());
in.resetReaderIndex(); // rewind the buffer to write the same data
channel.writeOutbound(in);
assertTrue(channel.finish());
ByteBuf expected = Unpooled.wrappedBuffer(new byte[] {
@ -98,7 +98,6 @@ public class SnappyFrameEncoderTest {
actual.writerIndex(actual.writerIndex() + m.readableBytes());
}
assertEquals(releaseLater(expected), releaseLater(actual));
in.release();
}
/**

View File

@ -20,7 +20,7 @@ import org.junit.Test;
import java.util.Random;
public class SnappyIntegrationTest extends IntegrationTest {
public class SnappyIntegrationTest extends AbstractIntegrationTest {
/**
* The number of random regression tests run by testRandom() runs. Whenever testRandom() finds the case that
@ -32,12 +32,12 @@ public class SnappyIntegrationTest extends IntegrationTest {
private static final int RANDOM_RUNS = 1;
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new SnappyFrameEncoder());
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new SnappyFrameDecoder());
}

View File

@ -98,13 +98,12 @@ public abstract class ZlibTest {
EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP));
try {
chDecoderGZip.writeInbound(deflatedData.copy());
chDecoderGZip.writeInbound(deflatedData);
assertTrue(chDecoderGZip.finish());
ByteBuf buf = chDecoderGZip.readInbound();
assertEquals(buf, data);
assertNull(chDecoderGZip.readInbound());
data.release();
deflatedData.release();
buf.release();
} finally {
dispose(chDecoderGZip);
@ -116,8 +115,9 @@ public abstract class ZlibTest {
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
try {
chEncoder.writeOutbound(data.copy());
chEncoder.writeOutbound(data.retain());
chEncoder.flush();
data.resetReaderIndex();
for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound();

View File

@ -798,7 +798,7 @@
<scope>test</scope>
</dependency>
<!-- Test dependency for Bzip2Decoder -->
<!-- Test dependency for Bzip2 compression codec -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>