diff --git a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java index 64aa134ac8..ee92266b76 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.internal.EmptyArrays; @@ -145,70 +146,50 @@ public class FastLzFrameDecoder extends ByteToMessageDecoder { final int idx = in.readerIndex(); final int originalLength = this.originalLength; - final ByteBuf uncompressed; - final byte[] output; - final int outputPtr; + final byte[] output = originalLength == 0? EmptyArrays.EMPTY_BYTES : new byte[originalLength]; + final int outputPtr = 0; - if (originalLength != 0) { - uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); - output = uncompressed.array(); - outputPtr = uncompressed.arrayOffset() + uncompressed.writerIndex(); - } else { - uncompressed = null; - output = EmptyArrays.EMPTY_BYTES; - outputPtr = 0; - } - - boolean success = false; - try { - if (isCompressed) { - final byte[] input; - final int inputPtr; - if (in.hasArray()) { - input = in.array(); - inputPtr = in.arrayOffset() + idx; - } else { - input = new byte[chunkLength]; - in.getBytes(idx, input); - inputPtr = 0; - } - - final int decompressedBytes = decompress(input, inputPtr, chunkLength, - output, outputPtr, originalLength); - if (originalLength != decompressedBytes) { - throw new DecompressionException(String.format( - "stream corrupted: originalLength(%d) and actual length(%d) mismatch", - originalLength, decompressedBytes)); - } + if (isCompressed) { + final byte[] input; + final int inputPtr; + if (in.hasArray()) { + input = in.array(); + inputPtr = in.arrayOffset() + idx; } else { - in.getBytes(idx, output, outputPtr, chunkLength); + input = new byte[chunkLength]; + in.getBytes(idx, input); + inputPtr = 0; } - final Checksum checksum = this.checksum; - if (hasChecksum && checksum != null) { - checksum.reset(); - checksum.update(output, outputPtr, originalLength); - final int checksumResult = (int) checksum.getValue(); - if (checksumResult != currentChecksum) { - throw new DecompressionException(String.format( - "stream corrupted: mismatching checksum: %d (expected: %d)", - checksumResult, currentChecksum)); - } + final int decompressedBytes = decompress(input, inputPtr, chunkLength, + output, outputPtr, originalLength); + if (originalLength != decompressedBytes) { + throw new DecompressionException(String.format( + "stream corrupted: originalLength(%d) and actual length(%d) mismatch", + originalLength, decompressedBytes)); } + } else { + in.getBytes(idx, output, outputPtr, chunkLength); + } - if (uncompressed != null) { - uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); - ctx.fireChannelRead(uncompressed); - } - in.skipBytes(chunkLength); - - currentState = State.INIT_BLOCK; - success = true; - } finally { - if (!success && uncompressed != null) { - uncompressed.release(); + final Checksum checksum = this.checksum; + if (hasChecksum && checksum != null) { + checksum.reset(); + checksum.update(output, outputPtr, originalLength); + final int checksumResult = (int) checksum.getValue(); + if (checksumResult != currentChecksum) { + throw new DecompressionException(String.format( + "stream corrupted: mismatching checksum: %d (expected: %d)", + checksumResult, currentChecksum)); } } + + if (output.length > 0) { + ctx.fireChannelRead(Unpooled.wrappedBuffer(output).writerIndex(originalLength)); + } + in.skipBytes(chunkLength); + + currentState = State.INIT_BLOCK; break; case CORRUPTED: in.skipBytes(in.readableBytes()); diff --git a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java index d6432a513c..35a52f1897 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java @@ -115,8 +115,15 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { blockType = BLOCK_TYPE_NON_COMPRESSED; out.ensureWritable(outputOffset + 2 + length); - final byte[] output = out.array(); - final int outputPtr = out.arrayOffset() + outputOffset + 2; + final byte[] output; + final int outputPtr; + if (out.hasArray()) { + output = out.array(); + outputPtr = out.arrayOffset() + outputOffset + 2; + } else { + output = new byte[length]; + outputPtr = 0; + } if (checksum != null) { final byte[] input; @@ -138,6 +145,9 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { } else { in.getBytes(idx, output, outputPtr, length); } + if (!out.hasArray()) { + out.setBytes(outputOffset + 2, output); + } chunkLength = length; } else { // try to compress @@ -160,9 +170,19 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { final int maxOutputLength = calculateOutputBufferLength(length); out.ensureWritable(outputOffset + 4 + maxOutputLength); - final byte[] output = out.array(); - final int outputPtr = out.arrayOffset() + outputOffset + 4; + final byte[] output; + final int outputPtr; + if (out.hasArray()) { + output = out.array(); + outputPtr = out.arrayOffset() + outputOffset + 4; + } else { + output = new byte[maxOutputLength]; + outputPtr = 0; + } final int compressedLength = compress(input, inputPtr, length, output, outputPtr, level); + if (!out.hasArray()) { + out.setBytes(outputOffset + 4, output, 0, compressedLength); + } if (compressedLength < length) { blockType = BLOCK_TYPE_COMPRESSED; chunkLength = compressedLength; @@ -171,7 +191,13 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { outputOffset += 2; } else { blockType = BLOCK_TYPE_NON_COMPRESSED; - System.arraycopy(input, inputPtr, output, outputPtr - 2, length); + if (out.hasArray()) { + System.arraycopy(input, inputPtr, output, outputPtr - 2, length); + } else { + for (int i = 0; i < length; i++) { + out.setByte(outputOffset + 2 + i, input[inputPtr + i]); + } + } chunkLength = length; } } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java index fe4f65f204..960df2e58e 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java @@ -169,13 +169,24 @@ public class LzfDecoder extends ByteToMessageDecoder { } ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); - final byte[] outputArray = uncompressed.array(); - final int outPos = uncompressed.arrayOffset() + uncompressed.writerIndex(); + final byte[] outputArray; + final int outPos; + if (uncompressed.hasArray()) { + outputArray = uncompressed.array(); + outPos = uncompressed.arrayOffset() + uncompressed.writerIndex(); + } else { + outputArray = new byte[originalLength]; + outPos = 0; + } boolean success = false; try { decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength); - uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); + if (uncompressed.hasArray()) { + uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); + } else { + uncompressed.writeBytes(outputArray); + } ctx.fireChannelRead(uncompressed); in.skipBytes(chunkLength); success = true; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java index b53cbd5aff..bba0fefe7c 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java @@ -159,10 +159,18 @@ public class LzfEncoder extends MessageToByteEncoder { inputPtr = 0; } - final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length); + // Estimate may apparently under-count by one in some cases. + final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length) + 1; out.ensureWritable(maxOutputLength); - final byte[] output = out.array(); - final int outputPtr = out.arrayOffset() + out.writerIndex(); + final byte[] output; + final int outputPtr; + if (out.hasArray()) { + output = out.array(); + outputPtr = out.arrayOffset() + out.writerIndex(); + } else { + output = new byte[maxOutputLength]; + outputPtr = 0; + } final int outputLength; if (length >= compressThreshold) { @@ -173,7 +181,12 @@ public class LzfEncoder extends MessageToByteEncoder { outputLength = encodeNonCompress(input, inputPtr, length, output, outputPtr); } - out.writerIndex(out.writerIndex() + outputLength); + if (out.hasArray()) { + out.writerIndex(out.writerIndex() + outputLength); + } else { + out.writeBytes(output, 0, outputLength); + } + in.skipBytes(length); if (!in.hasArray()) { diff --git a/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java index c1edb483bd..d1dc6ea7e1 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java @@ -45,14 +45,12 @@ public abstract class AbstractIntegrationTest { protected abstract EmbeddedChannel createEncoder(); protected abstract EmbeddedChannel createDecoder(); - @BeforeEach - public void initChannels() throws Exception { + public void initChannels() { encoder = createEncoder(); decoder = createDecoder(); } - @AfterEach - public void closeChannels() throws Exception { + public void closeChannels() { encoder.close(); for (;;) { Object msg = encoder.readOutbound(); @@ -74,19 +72,22 @@ public abstract class AbstractIntegrationTest { @Test public void testEmpty() throws Exception { - testIdentity(EmptyArrays.EMPTY_BYTES); + testIdentity(EmptyArrays.EMPTY_BYTES, true); + testIdentity(EmptyArrays.EMPTY_BYTES, false); } @Test public void testOneByte() throws Exception { final byte[] data = { 'A' }; - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testTwoBytes() throws Exception { final byte[] data = { 'B', 'A' }; - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -94,14 +95,16 @@ public abstract class AbstractIntegrationTest { final byte[] data = ("Netty is a NIO client server framework which enables " + "quick and easy development of network applications such as protocol " + "servers and clients.").getBytes(CharsetUtil.UTF_8); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testLargeRandom() throws Exception { final byte[] data = new byte[1024 * 1024]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -111,7 +114,8 @@ public abstract class AbstractIntegrationTest { for (int i = 0; i < 1024; i++) { data[i] = 2; } - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -120,20 +124,23 @@ public abstract class AbstractIntegrationTest { for (int i = 0; i < data.length; i++) { data[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); } - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testLongBlank() throws Exception { final byte[] data = new byte[102400]; - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testLongSame() throws Exception { final byte[] data = new byte[102400]; Arrays.fill(data, (byte) 123); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -142,32 +149,39 @@ public abstract class AbstractIntegrationTest { for (int i = 0; i < data.length; i++) { data[i] = (byte) i; } - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } - protected void testIdentity(final byte[] data) { - final ByteBuf in = Unpooled.wrappedBuffer(data); - assertTrue(encoder.writeOutbound(in.retain())); - assertTrue(encoder.finish()); - + protected void testIdentity(final byte[] data, boolean heapBuffer) { + initChannels(); + final ByteBuf in = heapBuffer? Unpooled.wrappedBuffer(data) : + Unpooled.directBuffer(data.length).setBytes(0, data); final CompositeByteBuf compressed = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = encoder.readOutbound()) != null) { - compressed.addComponent(true, msg); - } - assertThat(compressed, is(notNullValue())); - - decoder.writeInbound(compressed.retain()); - assertFalse(compressed.isReadable()); final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); - while ((msg = decoder.readInbound()) != null) { - decompressed.addComponent(true, msg); - } - in.readerIndex(0); - assertEquals(in, decompressed); - compressed.release(); - decompressed.release(); - in.release(); + try { + assertTrue(encoder.writeOutbound(in.retain())); + assertTrue(encoder.finish()); + + ByteBuf msg; + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(true, msg); + } + assertThat(compressed, is(notNullValue())); + + decoder.writeInbound(compressed.retain()); + assertFalse(compressed.isReadable()); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(true, msg); + } + in.readerIndex(0); + assertEquals(in, decompressed); + } finally { + compressed.release(); + decompressed.release(); + in.release(); + closeChannels(); + } } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java index d8f4c0ad0a..2b358eb9e4 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java @@ -34,20 +34,23 @@ public class Bzip2IntegrationTest extends AbstractIntegrationTest { public void test3Tables() throws Exception { byte[] data = new byte[500]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void test4Tables() throws Exception { byte[] data = new byte[1100]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void test5Tables() throws Exception { byte[] data = new byte[2300]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java index 3708685882..5987042953 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java @@ -64,49 +64,54 @@ public class FastLzIntegrationTest extends AbstractIntegrationTest { } @Override // test batched flow of data - protected void testIdentity(final byte[] data) { - final ByteBuf original = Unpooled.wrappedBuffer(data); - - int written = 0, length = rand.nextInt(100); - while (written + length < data.length) { - ByteBuf in = Unpooled.wrappedBuffer(data, written, length); - encoder.writeOutbound(in); - written += length; - length = rand.nextInt(100); - } - ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); - encoder.writeOutbound(in); - encoder.finish(); - - ByteBuf msg; + protected void testIdentity(final byte[] data, boolean heapBuffer) { + initChannels(); + final ByteBuf original = heapBuffer? Unpooled.wrappedBuffer(data) : + Unpooled.directBuffer(data.length).writeBytes(data); final CompositeByteBuf compressed = Unpooled.compositeBuffer(); - while ((msg = encoder.readOutbound()) != null) { - compressed.addComponent(true, msg); - } - assertThat(compressed, is(notNullValue())); - - final byte[] compressedArray = new byte[compressed.readableBytes()]; - compressed.readBytes(compressedArray); - written = 0; - length = rand.nextInt(100); - while (written + length < compressedArray.length) { - in = Unpooled.wrappedBuffer(compressedArray, written, length); - decoder.writeInbound(in); - written += length; - length = rand.nextInt(100); - } - in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); - decoder.writeInbound(in); - - assertFalse(compressed.isReadable()); final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); - while ((msg = decoder.readInbound()) != null) { - decompressed.addComponent(true, msg); - } - assertEquals(original, decompressed); - compressed.release(); - decompressed.release(); - original.release(); + try { + int written = 0, length = rand.nextInt(100); + while (written + length < data.length) { + ByteBuf in = Unpooled.wrappedBuffer(data, written, length); + encoder.writeOutbound(in); + written += length; + length = rand.nextInt(100); + } + ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); + encoder.writeOutbound(in); + encoder.finish(); + + ByteBuf msg; + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(true, msg); + } + assertThat(compressed, is(notNullValue())); + + final byte[] compressedArray = new byte[compressed.readableBytes()]; + compressed.readBytes(compressedArray); + written = 0; + length = rand.nextInt(100); + while (written + length < compressedArray.length) { + in = Unpooled.wrappedBuffer(compressedArray, written, length); + decoder.writeInbound(in); + written += length; + length = rand.nextInt(100); + } + in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); + decoder.writeInbound(in); + + assertFalse(compressed.isReadable()); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(true, msg); + } + assertEquals(original, decompressed); + } finally { + compressed.release(); + decompressed.release(); + original.release(); + closeChannels(); + } } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java index 2f4671074d..5e16178699 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java @@ -64,7 +64,7 @@ public class SnappyIntegrationTest extends AbstractIntegrationTest { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }; - testIdentity(data); + testIdentity(data, true); } // These tests were found using testRandom() with large RANDOM_RUNS. @@ -104,7 +104,7 @@ public class SnappyIntegrationTest extends AbstractIntegrationTest { private void testWithSeed(long seed) { byte[] data = new byte[16 * 1048576]; new Random(seed).nextBytes(data); - testIdentity(data); + testIdentity(data, true); } private static void printSeedAsTest(long l) {