Refactor tests for compression codecs

Motivation:

Too many duplicated code of tests for different compression codecs.

Modifications:

- Added abstract classes AbstractCompressionTest, AbstractDecoderTest and AbstractEncoderTest which contains common variables and tests for any compression codec.
- Removed common tests which are implemented in AbstractDecoderTest and AbstractEncoderTest from current tests for compression codecs.
- Implemented abstract methods of AbstractDecoderTest and AbstractEncoderTest in current tests for compression codecs.
- Added additional checks for current tests.
- Renamed abstract class IntegrationTest to AbstractIntegrationTest.
- Used Theories to run tests with head and direct buffers.
- Removed code duplicates.

Result:

Removed duplicated code of tests for compression codecs and simplified an addition of tests for new compression codecs.
This commit is contained in:
Idel Pivnitskiy 2015-03-18 15:41:08 +03:00 committed by Norman Maurer
parent 30b711cf3d
commit 6928a2d79f
20 changed files with 528 additions and 762 deletions

View File

@ -77,7 +77,7 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Test dependency for Bzip2Decoder --> <!-- Test dependency for Bzip2 compression codec -->
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId> <artifactId>commons-compress</artifactId>

View File

@ -0,0 +1,38 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.util.internal.ThreadLocalRandom;
public abstract class AbstractCompressionTest {
protected static final ThreadLocalRandom rand;
protected static final byte[] BYTES_SMALL = new byte[256];
protected static final byte[] BYTES_LARGE = new byte[256 * 1024];
static {
rand = ThreadLocalRandom.current();
fillArrayWithCompressibleData(BYTES_SMALL);
fillArrayWithCompressibleData(BYTES_LARGE);
}
private static void fillArrayWithCompressibleData(byte[] array) {
for (int i = 0; i < array.length; i++) {
array[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Before;
import org.junit.Rule;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.FromDataPoints;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import static org.junit.Assert.*;
@RunWith(Theories.class)
public abstract class AbstractDecoderTest extends AbstractCompressionTest {
protected static final ByteBuf WRAPPED_BYTES_SMALL;
protected static final ByteBuf WRAPPED_BYTES_LARGE;
static {
WRAPPED_BYTES_SMALL = Unpooled.wrappedBuffer(BYTES_SMALL);
WRAPPED_BYTES_LARGE = Unpooled.wrappedBuffer(BYTES_LARGE);
}
@Rule
public final ExpectedException expected = ExpectedException.none();
protected EmbeddedChannel channel;
protected static byte[] compressedBytesSmall;
protected static byte[] compressedBytesLarge;
protected AbstractDecoderTest() throws Exception {
compressedBytesSmall = compress(BYTES_SMALL);
compressedBytesLarge = compress(BYTES_LARGE);
}
/**
* Compresses data with some external library.
*/
protected abstract byte[] compress(byte[] data) throws Exception;
@Before
public abstract void initChannel();
@DataPoints("smallData")
public static ByteBuf[] smallData() {
ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesSmall);
ByteBuf direct = Unpooled.directBuffer(compressedBytesSmall.length);
direct.writeBytes(compressedBytesSmall);
return new ByteBuf[] {heap, direct};
}
@DataPoints("largeData")
public static ByteBuf[] largeData() {
ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesLarge);
ByteBuf direct = Unpooled.directBuffer(compressedBytesLarge.length);
direct.writeBytes(compressedBytesLarge);
return new ByteBuf[] {heap, direct};
}
@Theory
public void testDecompressionOfSmallChunkOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception {
testDecompression(WRAPPED_BYTES_SMALL, data);
}
@Theory
public void testDecompressionOfLargeChunkOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testDecompression(WRAPPED_BYTES_LARGE, data);
}
@Theory
public void testDecompressionOfBatchedFlowOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testDecompressionOfBatchedFlow(WRAPPED_BYTES_LARGE, data);
}
protected void testDecompression(final ByteBuf expected, final ByteBuf data) throws Exception {
assertTrue(channel.writeInbound(data));
ByteBuf decompressed = readDecompressed(channel);
assertEquals(expected, decompressed);
decompressed.release();
}
protected void testDecompressionOfBatchedFlow(final ByteBuf expected, final ByteBuf data) throws Exception {
final int compressedLength = data.readableBytes();
int written = 0, length = rand.nextInt(100);
while (written + length < compressedLength) {
ByteBuf compressedBuf = data.slice(written, length);
channel.writeInbound(compressedBuf.retain());
written += length;
length = rand.nextInt(100);
}
ByteBuf compressedBuf = data.slice(written, compressedLength - written);
assertTrue(channel.writeInbound(compressedBuf.retain()));
ByteBuf decompressedBuf = readDecompressed(channel);
assertEquals(expected, decompressedBuf);
decompressedBuf.release();
data.release();
}
protected static ByteBuf readDecompressed(final EmbeddedChannel channel) {
CompositeByteBuf decompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
return decompressed;
}
protected static void tryDecodeAndCatchBufLeaks(final EmbeddedChannel channel, final ByteBuf data) {
try {
channel.writeInbound(data);
} finally {
for (;;) {
ByteBuf inflated = channel.readInbound();
if (inflated == null) {
break;
}
inflated.release();
}
channel.finish();
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Before;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.FromDataPoints;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
import static org.junit.Assert.*;
@RunWith(Theories.class)
public abstract class AbstractEncoderTest extends AbstractCompressionTest {
protected EmbeddedChannel channel;
/**
* Decompresses data with some external library.
*/
protected abstract ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception;
@Before
public abstract void initChannel();
@DataPoints("smallData")
public static ByteBuf[] smallData() {
ByteBuf heap = Unpooled.wrappedBuffer(BYTES_SMALL);
ByteBuf direct = Unpooled.directBuffer(BYTES_SMALL.length);
direct.writeBytes(BYTES_SMALL);
return new ByteBuf[] {heap, direct};
}
@DataPoints("largeData")
public static ByteBuf[] largeData() {
ByteBuf heap = Unpooled.wrappedBuffer(BYTES_LARGE);
ByteBuf direct = Unpooled.directBuffer(BYTES_LARGE.length);
direct.writeBytes(BYTES_LARGE);
return new ByteBuf[] {heap, direct};
}
@Theory
public void testCompressionOfSmallChunkOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception {
testCompression(data);
}
@Theory
public void testCompressionOfLargeChunkOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testCompression(data);
}
@Theory
public void testCompressionOfBatchedFlowOfData(@FromDataPoints("largeData") ByteBuf data) throws Exception {
testCompressionOfBatchedFlow(data);
}
protected void testCompression(final ByteBuf data) throws Exception {
final int dataLength = data.readableBytes();
assertTrue(channel.writeOutbound(data.retain()));
assertTrue(channel.finish());
ByteBuf decompressed = readDecompressed(dataLength);
assertEquals(data.resetReaderIndex(), decompressed);
decompressed.release();
data.release();
}
protected void testCompressionOfBatchedFlow(final ByteBuf data) throws Exception {
final int dataLength = data.readableBytes();
int written = 0, length = rand.nextInt(100);
while (written + length < dataLength) {
ByteBuf in = data.slice(written, length);
assertTrue(channel.writeOutbound(in.retain()));
written += length;
length = rand.nextInt(100);
}
ByteBuf in = data.slice(written, dataLength - written);
assertTrue(channel.writeOutbound(in.retain()));
assertTrue(channel.finish());
ByteBuf decompressed = readDecompressed(dataLength);
assertEquals(data, decompressed);
decompressed.release();
data.release();
}
protected ByteBuf readDecompressed(final int dataLength) throws Exception {
CompositeByteBuf compressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
ByteBuf decompressed = decompress(compressed, dataLength);
compressed.release();
return decompressed;
}
}

View File

@ -23,6 +23,8 @@ import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.ThreadLocalRandom;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
@ -30,12 +32,42 @@ import java.util.Arrays;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public abstract class IntegrationTest { public abstract class AbstractIntegrationTest {
protected static final ThreadLocalRandom rand = ThreadLocalRandom.current(); protected static final ThreadLocalRandom rand = ThreadLocalRandom.current();
protected abstract EmbeddedChannel createEncoderEmbeddedChannel(); protected EmbeddedChannel encoder;
protected abstract EmbeddedChannel createDecoderEmbeddedChannel(); protected EmbeddedChannel decoder;
protected abstract EmbeddedChannel createEncoder();
protected abstract EmbeddedChannel createDecoder();
@Before
public void initChannels() throws Exception {
encoder = createEncoder();
decoder = createDecoder();
}
@After
public void closeChannels() throws Exception {
encoder.close();
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
decoder.close();
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
}
@Test @Test
public void testEmpty() throws Exception { public void testEmpty() throws Exception {
@ -112,51 +144,28 @@ public abstract class IntegrationTest {
protected void testIdentity(final byte[] data) { protected void testIdentity(final byte[] data) {
final ByteBuf in = Unpooled.wrappedBuffer(data); final ByteBuf in = Unpooled.wrappedBuffer(data);
final EmbeddedChannel encoder = createEncoderEmbeddedChannel(); assertTrue(encoder.writeOutbound(in.retain()));
final EmbeddedChannel decoder = createDecoderEmbeddedChannel(); assertTrue(encoder.finish());
try {
ByteBuf msg;
encoder.writeOutbound(in.copy()); final CompositeByteBuf compressed = Unpooled.compositeBuffer();
encoder.finish(); ByteBuf msg;
final CompositeByteBuf compressed = Unpooled.compositeBuffer(); while ((msg = encoder.readOutbound()) != null) {
while ((msg = encoder.readOutbound()) != null) { compressed.addComponent(msg);
compressed.addComponent(msg); compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(in, decompressed);
compressed.release();
decompressed.release();
in.release();
} finally {
encoder.close();
decoder.close();
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
} }
assertThat(compressed, is(notNullValue()));
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(in.resetReaderIndex(), decompressed);
compressed.release();
decompressed.release();
in.release();
} }
} }

View File

@ -16,23 +16,17 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.util.Arrays; import java.util.Arrays;
import static io.netty.handler.codec.compression.Bzip2Constants.*; import static io.netty.handler.codec.compression.Bzip2Constants.*;
import static org.junit.Assert.*;
public class Bzip2DecoderTest { public class Bzip2DecoderTest extends AbstractDecoderTest {
private static final byte[] DATA = { 0x42, 0x5A, 0x68, 0x37, 0x31, 0x41, 0x59, 0x26, 0x53, private static final byte[] DATA = { 0x42, 0x5A, 0x68, 0x37, 0x31, 0x41, 0x59, 0x26, 0x53,
0x59, 0x77, 0x7B, (byte) 0xCA, (byte) 0xC0, 0x00, 0x00, 0x59, 0x77, 0x7B, (byte) 0xCA, (byte) 0xC0, 0x00, 0x00,
@ -41,23 +35,10 @@ public class Bzip2DecoderTest {
(byte) 0x89, (byte) 0x99, (byte) 0xC5, (byte) 0xDC, (byte) 0x91, (byte) 0x89, (byte) 0x99, (byte) 0xC5, (byte) 0xDC, (byte) 0x91,
0x4E, 0x14, 0x24, 0x1D, (byte) 0xDE, (byte) 0xF2, (byte) 0xB0, 0x00 }; 0x4E, 0x14, 0x24, 0x1D, (byte) 0xDE, (byte) 0xF2, (byte) 0xB0, 0x00 };
private static final ThreadLocalRandom rand; public Bzip2DecoderTest() throws Exception {
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[MAX_BLOCK_SIZE * BASE_BLOCK_SIZE + 256];
static {
rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
} }
@Rule @Override
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
@Before
public void initChannel() { public void initChannel() {
channel = new EmbeddedChannel(new Bzip2Decoder()); channel = new EmbeddedChannel(new Bzip2Decoder());
} }
@ -123,19 +104,7 @@ public class Bzip2DecoderTest {
final byte[] data = Arrays.copyOf(DATA, DATA.length); final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[41] = (byte) 0xDD; data[41] = (byte) 0xDD;
ByteBuf in = Unpooled.wrappedBuffer(data); tryDecodeAndCatchBufLeaks(channel, Unpooled.wrappedBuffer(data));
try {
channel.writeInbound(in);
} finally {
for (;;) {
ByteBuf inflated = channel.readInbound();
if (inflated == null) {
break;
}
inflated.release();
}
channel.finish();
}
} }
@Test @Test
@ -186,75 +155,13 @@ public class Bzip2DecoderTest {
channel.writeInbound(in); channel.writeInbound(in);
} }
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception { @Override
protected byte[] compress(byte[] data) throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, randomBlockSize()); BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, MIN_BLOCK_SIZE);
bZip2Os.write(data); bZip2Os.write(data);
bZip2Os.close(); bZip2Os.close();
ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray()); return os.toByteArray();
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
@Test
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
}
@Test
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
}
@Test
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
ByteArrayOutputStream os = new ByteArrayOutputStream();
BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, randomBlockSize());
bZip2Os.write(data);
bZip2Os.close();
final byte[] compressedArray = os.toByteArray();
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
channel.writeInbound(compressed);
written += length;
length = rand.nextInt(100);
}
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.addComponent(msg);
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
}
return uncompressed;
}
private static int randomBlockSize() {
return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1);
} }
} }

View File

@ -16,110 +16,41 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream; import java.io.InputStream;
import static io.netty.handler.codec.compression.Bzip2Constants.*; import static io.netty.handler.codec.compression.Bzip2Constants.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class Bzip2EncoderTest { public class Bzip2EncoderTest extends AbstractEncoderTest {
private static final ThreadLocalRandom rand; @Override
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[MAX_BLOCK_SIZE * BASE_BLOCK_SIZE + 256];
static {
rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
private EmbeddedChannel channel;
@Before
public void initChannel() { public void initChannel() {
channel = new EmbeddedChannel(new Bzip2Encoder(randomBlockSize())); channel = new EmbeddedChannel(new Bzip2Encoder(MIN_BLOCK_SIZE));
} }
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { @Override
ByteBuf in = Unpooled.wrappedBuffer(data); protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
channel.writeOutbound(in); InputStream is = new ByteBufInputStream(compressed);
channel.finish(); BZip2CompressorInputStream bzip2Is = new BZip2CompressorInputStream(is);
byte[] uncompressed = uncompress(channel, data.length); byte[] decompressed = new byte[originalLength];
int remaining = originalLength;
assertArrayEquals(data, uncompressed);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
channel.writeOutbound(in);
channel.finish();
byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel, int length) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
ByteArrayInputStream is = new ByteArrayInputStream(compressed);
BZip2CompressorInputStream bZip2Is = new BZip2CompressorInputStream(is);
byte[] uncompressed = new byte[length];
int remaining = length;
while (remaining > 0) { while (remaining > 0) {
int read = bZip2Is.read(uncompressed, length - remaining, remaining); int read = bzip2Is.read(decompressed, originalLength - remaining, remaining);
if (read > 0) { if (read > 0) {
remaining -= read; remaining -= read;
} else { } else {
break; break;
} }
} }
assertEquals(-1, bzip2Is.read());
bzip2Is.close();
assertEquals(-1, bZip2Is.read()); return Unpooled.wrappedBuffer(decompressed);
return uncompressed;
}
private static int randomBlockSize() {
return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1);
} }
} }

View File

@ -18,15 +18,15 @@ package io.netty.handler.codec.compression;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test; import org.junit.Test;
public class Bzip2IntegrationTest extends IntegrationTest { public class Bzip2IntegrationTest extends AbstractIntegrationTest {
@Override @Override
protected EmbeddedChannel createEncoderEmbeddedChannel() { protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new Bzip2Encoder()); return new EmbeddedChannel(new Bzip2Encoder());
} }
@Override @Override
protected EmbeddedChannel createDecoderEmbeddedChannel() { protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new Bzip2Decoder()); return new EmbeddedChannel(new Bzip2Decoder());
} }

View File

@ -19,118 +19,94 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class FastLzIntegrationTest extends IntegrationTest { public class FastLzIntegrationTest extends AbstractIntegrationTest {
public static class TestWithChecksum extends IntegrationTest { public static class TestWithChecksum extends AbstractIntegrationTest {
@Override @Override
protected EmbeddedChannel createEncoderEmbeddedChannel() { protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new FastLzFrameEncoder(true)); return new EmbeddedChannel(new FastLzFrameEncoder(true));
} }
@Override @Override
protected EmbeddedChannel createDecoderEmbeddedChannel() { protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new FastLzFrameDecoder(true)); return new EmbeddedChannel(new FastLzFrameDecoder(true));
} }
} }
public static class TestRandomChecksum extends IntegrationTest { public static class TestRandomChecksum extends AbstractIntegrationTest {
@Override @Override
protected EmbeddedChannel createEncoderEmbeddedChannel() { protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean())); return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean()));
} }
@Override @Override
protected EmbeddedChannel createDecoderEmbeddedChannel() { protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean())); return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean()));
} }
} }
@Override @Override
protected EmbeddedChannel createEncoderEmbeddedChannel() { protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean())); return new EmbeddedChannel(new FastLzFrameEncoder(rand.nextBoolean()));
} }
@Override @Override
protected EmbeddedChannel createDecoderEmbeddedChannel() { protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean())); return new EmbeddedChannel(new FastLzFrameDecoder(rand.nextBoolean()));
} }
@Override // test batched flow of data @Override // test batched flow of data
protected void testIdentity(final byte[] data) { protected void testIdentity(final byte[] data) {
final ByteBuf original = Unpooled.wrappedBuffer(data); final ByteBuf original = Unpooled.wrappedBuffer(data);
final EmbeddedChannel encoder = createEncoderEmbeddedChannel();
final EmbeddedChannel decoder = createDecoderEmbeddedChannel();
try { int written = 0, length = rand.nextInt(100);
int written = 0, length = rand.nextInt(100); while (written + length < data.length) {
while (written + length < data.length) { ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
encoder.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
encoder.writeOutbound(in); encoder.writeOutbound(in);
encoder.finish(); written += length;
ByteBuf msg;
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
final byte[] compressedArray = new byte[compressed.readableBytes()];
compressed.readBytes(compressedArray);
written = 0;
length = rand.nextInt(100); length = rand.nextInt(100);
while (written + length < compressedArray.length) {
in = Unpooled.wrappedBuffer(compressedArray, written, length);
decoder.writeInbound(in);
written += length;
length = rand.nextInt(100);
}
in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
decoder.writeInbound(in);
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(original, decompressed);
compressed.release();
decompressed.release();
original.release();
} finally {
encoder.close();
decoder.close();
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
} }
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
encoder.writeOutbound(in);
encoder.finish();
ByteBuf msg;
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
final byte[] compressedArray = new byte[compressed.readableBytes()];
compressed.readBytes(compressedArray);
written = 0;
length = rand.nextInt(100);
while (written + length < compressedArray.length) {
in = Unpooled.wrappedBuffer(compressedArray, written, length);
decoder.writeInbound(in);
written += length;
length = rand.nextInt(100);
}
in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
decoder.writeInbound(in);
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(original, decompressed);
compressed.release();
decompressed.release();
original.release();
} }
} }

View File

@ -16,23 +16,17 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import net.jpountz.lz4.LZ4BlockOutputStream; import net.jpountz.lz4.LZ4BlockOutputStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.util.Arrays; import java.util.Arrays;
import static io.netty.handler.codec.compression.Lz4Constants.*; import static io.netty.handler.codec.compression.Lz4Constants.*;
import static org.junit.Assert.*;
public class Lz4FrameDecoderTest { public class Lz4FrameDecoderTest extends AbstractDecoderTest {
private static final byte[] DATA = { 0x4C, 0x5A, 0x34, 0x42, 0x6C, 0x6F, 0x63, 0x6B, // magic bytes private static final byte[] DATA = { 0x4C, 0x5A, 0x34, 0x42, 0x6C, 0x6F, 0x63, 0x6B, // magic bytes
0x16, // token 0x16, // token
@ -44,28 +38,10 @@ public class Lz4FrameDecoderTest {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last empty block 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last empty block
0x00, 0x00, 0x00, 0x00 }; 0x00, 0x00, 0x00, 0x00 };
private static final ThreadLocalRandom rand; public Lz4FrameDecoderTest() throws Exception {
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
} }
@Rule @Override
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
@Before
public void initChannel() { public void initChannel() {
channel = new EmbeddedChannel(new Lz4FrameDecoder(true)); channel = new EmbeddedChannel(new Lz4FrameDecoder(true));
} }
@ -150,90 +126,17 @@ public class Lz4FrameDecoderTest {
final byte[] data = Arrays.copyOf(DATA, DATA.length); final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[44] = 0x01; data[44] = 0x01;
ByteBuf in = Unpooled.wrappedBuffer(data); tryDecodeAndCatchBufLeaks(channel, Unpooled.wrappedBuffer(data));
try {
channel.writeInbound(in);
} finally {
for (;;) {
ByteBuf inflated = channel.readInbound();
if (inflated == null) {
break;
}
inflated.release();
}
channel.finish();
}
} }
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception { @Override
protected byte[] compress(byte[] data) throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize()); LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os,
rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1));
lz4Os.write(data); lz4Os.write(data);
lz4Os.close(); lz4Os.close();
ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray()); return os.toByteArray();
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
@Test
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
}
@Test
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
}
@Test
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
ByteArrayOutputStream os = new ByteArrayOutputStream();
LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize());
lz4Os.write(data);
lz4Os.close();
final byte[] compressedArray = os.toByteArray();
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
channel.writeInbound(compressed);
written += length;
length = rand.nextInt(100);
}
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.addComponent(msg);
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
}
return uncompressed;
}
private static int randomBlockSize() {
return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1);
} }
} }

View File

@ -16,108 +16,40 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockInputStream;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream; import java.io.InputStream;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class Lz4FrameEncoderTest { public class Lz4FrameEncoderTest extends AbstractEncoderTest {
private static final ThreadLocalRandom rand; @Override
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
private EmbeddedChannel channel;
@Before
public void initChannel() { public void initChannel() {
channel = new EmbeddedChannel(new Lz4FrameEncoder()); channel = new EmbeddedChannel(new Lz4FrameEncoder());
} }
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { @Override
ByteBuf in = Unpooled.wrappedBuffer(data); protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
channel.writeOutbound(in); InputStream is = new ByteBufInputStream(compressed);
channel.finish();
final byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(1, 100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(1, 100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
channel.writeOutbound(in);
channel.finish();
final byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel, int originalLength) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
ByteArrayInputStream is = new ByteArrayInputStream(compressed);
LZ4BlockInputStream lz4Is = new LZ4BlockInputStream(is); LZ4BlockInputStream lz4Is = new LZ4BlockInputStream(is);
byte[] uncompressed = new byte[originalLength];
byte[] decompressed = new byte[originalLength];
int remaining = originalLength; int remaining = originalLength;
while (remaining > 0) { while (remaining > 0) {
int read = lz4Is.read(uncompressed, originalLength - remaining, remaining); int read = lz4Is.read(decompressed, originalLength - remaining, remaining);
if (read > 0) { if (read > 0) {
remaining -= read; remaining -= read;
} else { } else {
break; break;
} }
} }
assertEquals(-1, lz4Is.read());
lz4Is.close();
return uncompressed; return Unpooled.wrappedBuffer(decompressed);
} }
} }

View File

@ -17,15 +17,15 @@ package io.netty.handler.codec.compression;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
public class Lz4FrameIntegrationTest extends IntegrationTest { public class Lz4FrameIntegrationTest extends AbstractIntegrationTest {
@Override @Override
protected EmbeddedChannel createEncoderEmbeddedChannel() { protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new Lz4FrameEncoder()); return new EmbeddedChannel(new Lz4FrameEncoder());
} }
@Override @Override
protected EmbeddedChannel createDecoderEmbeddedChannel() { protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new Lz4FrameDecoder()); return new EmbeddedChannel(new Lz4FrameDecoder());
} }
} }

View File

@ -17,44 +17,18 @@ package io.netty.handler.codec.compression;
import com.ning.compress.lzf.LZFEncoder; import com.ning.compress.lzf.LZFEncoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import static com.ning.compress.lzf.LZFChunk.BYTE_Z; import static com.ning.compress.lzf.LZFChunk.*;
import static com.ning.compress.lzf.LZFChunk.BYTE_V;
import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED;
import static org.junit.Assert.*;
public class LzfDecoderTest { public class LzfDecoderTest extends AbstractDecoderTest {
private static final ThreadLocalRandom rand; public LzfDecoderTest() throws Exception {
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
} }
@Rule @Override
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
@Before
public void initChannel() { public void initChannel() {
channel = new EmbeddedChannel(new LzfDecoder()); channel = new EmbeddedChannel(new LzfDecoder());
} }
@ -86,63 +60,8 @@ public class LzfDecoderTest {
channel.writeInbound(in); channel.writeInbound(in);
} }
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception { @Override
byte[] compressedArray = LZFEncoder.encode(data); protected byte[] compress(byte[] data) throws Exception {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray); return LZFEncoder.encode(data);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
@Test
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
}
@Test
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
}
@Test
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
byte[] compressedArray = LZFEncoder.encode(data);
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
channel.writeInbound(compressed);
written += length;
length = rand.nextInt(100);
}
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.addComponent(msg);
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
}
return uncompressed;
} }
} }

View File

@ -17,92 +17,22 @@ package io.netty.handler.codec.compression;
import com.ning.compress.lzf.LZFDecoder; import com.ning.compress.lzf.LZFDecoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*; public class LzfEncoderTest extends AbstractEncoderTest {
public class LzfEncoderTest { @Override
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
private EmbeddedChannel channel;
@Before
public void initChannel() { public void initChannel() {
channel = new EmbeddedChannel(new LzfEncoder()); channel = new EmbeddedChannel(new LzfEncoder());
} }
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { @Override
ByteBuf in = Unpooled.wrappedBuffer(data); protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
channel.writeOutbound(in); byte[] compressedArray = new byte[compressed.readableBytes()];
channel.finish(); compressed.readBytes(compressedArray);
final byte[] uncompressed = uncompress(channel); byte[] decompressed = LZFDecoder.decode(compressedArray);
return Unpooled.wrappedBuffer(decompressed);
assertArrayEquals(data, uncompressed);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
channel.writeOutbound(in);
channel.finish();
final byte[] uncompressed = uncompress(channel);
assertArrayEquals(data, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
return LZFDecoder.decode(compressed);
} }
} }

View File

@ -17,15 +17,15 @@ package io.netty.handler.codec.compression;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
public class LzfIntegrationTest extends IntegrationTest { public class LzfIntegrationTest extends AbstractIntegrationTest {
@Override @Override
protected EmbeddedChannel createEncoderEmbeddedChannel() { protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new LzfEncoder()); return new EmbeddedChannel(new LzfEncoder());
} }
@Override @Override
protected EmbeddedChannel createDecoderEmbeddedChannel() { protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new LzfDecoder()); return new EmbeddedChannel(new LzfDecoder());
} }
} }

View File

@ -20,126 +20,82 @@ import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import lzma.sdk.lzma.Decoder; import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream; import lzma.streams.LzmaInputStream;
import org.junit.Before; import org.junit.experimental.theories.FromDataPoints;
import org.junit.Test; import org.junit.experimental.theories.Theory;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class LzmaFrameEncoderTest { public class LzmaFrameEncoderTest extends AbstractEncoderTest {
private static final ThreadLocalRandom rand; @Override
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
private EmbeddedChannel channel;
@Before
public void initChannel() { public void initChannel() {
channel = new EmbeddedChannel(new LzmaFrameEncoder()); channel = new EmbeddedChannel(new LzmaFrameEncoder());
} }
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { @Theory
ByteBuf in = Unpooled.wrappedBuffer(data); @Override
assertTrue(channel.writeOutbound(in)); public void testCompressionOfBatchedFlowOfData(@FromDataPoints("smallData") ByteBuf data) throws Exception {
assertTrue(channel.finish()); testCompressionOfBatchedFlow(data);
byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
} }
@Test @Override
public void testCompressionOfSmallChunkOfData() throws Exception { protected void testCompressionOfBatchedFlow(final ByteBuf data) throws Exception {
testCompression(channel, BYTES_SMALL); List<Integer> originalLengths = new ArrayList<Integer>();
} final int dataLength = data.readableBytes();
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_SMALL;
int written = 0, length = rand.nextInt(50); int written = 0, length = rand.nextInt(50);
while (written + length < data.length) { while (written + length < dataLength) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length); ByteBuf in = data.slice(written, length);
assertTrue(channel.writeOutbound(in)); assertTrue(channel.writeOutbound(in.retain()));
written += length; written += length;
originalLengths.add(length);
length = rand.nextInt(50); length = rand.nextInt(50);
} }
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); length = dataLength - written;
assertTrue(channel.writeOutbound(in)); ByteBuf in = data.slice(written, dataLength - written);
originalLengths.add(length);
assertTrue(channel.writeOutbound(in.retain()));
assertTrue(channel.finish()); assertTrue(channel.finish());
byte[] uncompressed = new byte[data.length]; CompositeByteBuf decompressed = Unpooled.compositeBuffer();
int outOffset = 0;
ByteBuf msg; ByteBuf msg;
int i = 0;
while ((msg = channel.readOutbound()) != null) { while ((msg = channel.readOutbound()) != null) {
InputStream is = new ByteBufInputStream(msg); ByteBuf decompressedMsg = decompress(msg, originalLengths.get(i++));
LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder()); decompressed.addComponent(decompressedMsg);
for (;;) { decompressed.writerIndex(decompressed.writerIndex() + decompressedMsg.readableBytes());
int read = lzmaIs.read(uncompressed, outOffset, data.length - outOffset);
if (read > 0) {
outOffset += read;
} else {
break;
}
}
assertEquals(0, is.available());
assertEquals(-1, is.read());
is.close();
lzmaIs.close();
msg.release(); msg.release();
} }
assertEquals(originalLengths.size(), i);
assertEquals(data, decompressed);
assertArrayEquals(data, uncompressed); decompressed.release();
data.release();
} }
private static byte[] uncompress(EmbeddedChannel channel, int length) throws Exception { @Override
CompositeByteBuf out = Unpooled.compositeBuffer(); protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception {
ByteBuf msg; InputStream is = new ByteBufInputStream(compressed);
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
InputStream is = new ByteBufInputStream(out);
LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder()); LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder());
byte[] uncompressed = new byte[length];
int remaining = length; byte[] decompressed = new byte[originalLength];
int remaining = originalLength;
while (remaining > 0) { while (remaining > 0) {
int read = lzmaIs.read(uncompressed, length - remaining, remaining); int read = lzmaIs.read(decompressed, originalLength - remaining, remaining);
if (read > 0) { if (read > 0) {
remaining -= read; remaining -= read;
} else { } else {
break; break;
} }
} }
assertEquals(0, is.available());
assertEquals(-1, is.read());
assertEquals(-1, lzmaIs.read()); assertEquals(-1, lzmaIs.read());
is.close();
lzmaIs.close(); lzmaIs.close();
out.release();
return uncompressed; return Unpooled.wrappedBuffer(decompressed);
} }
} }

View File

@ -77,9 +77,9 @@ public class SnappyFrameEncoderTest {
'n', 'e', 't', 't', 'y' 'n', 'e', 't', 't', 'y'
}); });
channel.writeOutbound(in.copy()); channel.writeOutbound(in.retain());
in.readerIndex(0); // rewind the buffer to write the same data in.resetReaderIndex(); // rewind the buffer to write the same data
channel.writeOutbound(in.copy()); channel.writeOutbound(in);
assertTrue(channel.finish()); assertTrue(channel.finish());
ByteBuf expected = Unpooled.wrappedBuffer(new byte[] { ByteBuf expected = Unpooled.wrappedBuffer(new byte[] {
@ -98,7 +98,6 @@ public class SnappyFrameEncoderTest {
actual.writerIndex(actual.writerIndex() + m.readableBytes()); actual.writerIndex(actual.writerIndex() + m.readableBytes());
} }
assertEquals(releaseLater(expected), releaseLater(actual)); assertEquals(releaseLater(expected), releaseLater(actual));
in.release();
} }
/** /**

View File

@ -20,7 +20,7 @@ import org.junit.Test;
import java.util.Random; import java.util.Random;
public class SnappyIntegrationTest extends IntegrationTest { public class SnappyIntegrationTest extends AbstractIntegrationTest {
/** /**
* The number of random regression tests run by testRandom() runs. Whenever testRandom() finds the case that * The number of random regression tests run by testRandom() runs. Whenever testRandom() finds the case that
@ -32,12 +32,12 @@ public class SnappyIntegrationTest extends IntegrationTest {
private static final int RANDOM_RUNS = 1; private static final int RANDOM_RUNS = 1;
@Override @Override
protected EmbeddedChannel createEncoderEmbeddedChannel() { protected EmbeddedChannel createEncoder() {
return new EmbeddedChannel(new SnappyFrameEncoder()); return new EmbeddedChannel(new SnappyFrameEncoder());
} }
@Override @Override
protected EmbeddedChannel createDecoderEmbeddedChannel() { protected EmbeddedChannel createDecoder() {
return new EmbeddedChannel(new SnappyFrameDecoder()); return new EmbeddedChannel(new SnappyFrameDecoder());
} }

View File

@ -98,13 +98,12 @@ public abstract class ZlibTest {
EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP)); EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP));
try { try {
chDecoderGZip.writeInbound(deflatedData.copy()); chDecoderGZip.writeInbound(deflatedData);
assertTrue(chDecoderGZip.finish()); assertTrue(chDecoderGZip.finish());
ByteBuf buf = chDecoderGZip.readInbound(); ByteBuf buf = chDecoderGZip.readInbound();
assertEquals(buf, data); assertEquals(buf, data);
assertNull(chDecoderGZip.readInbound()); assertNull(chDecoderGZip.readInbound());
data.release(); data.release();
deflatedData.release();
buf.release(); buf.release();
} finally { } finally {
dispose(chDecoderGZip); dispose(chDecoderGZip);
@ -116,8 +115,9 @@ public abstract class ZlibTest {
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper)); EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
try { try {
chEncoder.writeOutbound(data.copy()); chEncoder.writeOutbound(data.retain());
chEncoder.flush(); chEncoder.flush();
data.resetReaderIndex();
for (;;) { for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound(); ByteBuf deflatedData = chEncoder.readOutbound();

View File

@ -793,7 +793,7 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Test dependency for Bzip2Decoder --> <!-- Test dependency for Bzip2 compression codec -->
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId> <artifactId>commons-compress</artifactId>