From 0d5774a82b470833c5f8a84fa4708142a4801f52 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 15 Jun 2021 08:04:44 +0200 Subject: [PATCH] Make all compression codecs support buffers that don't have arrays (#11383) (#11387) Motivation: Various compression codecs are currently hard-coded to only support buffers that are backed by byte-arrays that they are willing to expose. This is efficient for most of the codecs, but compatibility suffers, as we are not able to freely choose our buffer implementations when compression codecs are involved. Modification: Add code to the compression codecs, that allow them to handle buffers that don't have arrays. For many of the codecs, this unfortunately involves allocating temporary byte-arrays, and copying back-and-forth. We have to do it that way since some codecs can _only_ work with byte-arrays. Also add tests to verify that this works. Result: It is now possible to use all of our compression codecs with both on-heap and off-heap buffers. The default buffer choice has not changed, however, so performance should be unaffected. Co-authored-by: Chris Vest --- .../codec/compression/FastLzFrameDecoder.java | 93 ++++++++----------- .../codec/compression/FastLzFrameEncoder.java | 36 ++++++- .../handler/codec/compression/LzfDecoder.java | 17 +++- .../handler/codec/compression/LzfEncoder.java | 21 ++++- .../compression/AbstractIntegrationTest.java | 83 ++++++++++------- .../compression/Bzip2IntegrationTest.java | 9 +- .../compression/FastLzIntegrationTest.java | 87 +++++++++-------- .../compression/SnappyIntegrationTest.java | 4 +- 8 files changed, 202 insertions(+), 148 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java index 89cef256fb..c905dbcd3b 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.internal.EmptyArrays; @@ -146,70 +147,50 @@ public class FastLzFrameDecoder extends ByteToMessageDecoder { final int idx = in.readerIndex(); final int originalLength = this.originalLength; - final ByteBuf uncompressed; - final byte[] output; - final int outputPtr; + final byte[] output = originalLength == 0? EmptyArrays.EMPTY_BYTES : new byte[originalLength]; + final int outputPtr = 0; - if (originalLength != 0) { - uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); - output = uncompressed.array(); - outputPtr = uncompressed.arrayOffset() + uncompressed.writerIndex(); - } else { - uncompressed = null; - output = EmptyArrays.EMPTY_BYTES; - outputPtr = 0; - } - - boolean success = false; - try { - if (isCompressed) { - final byte[] input; - final int inputPtr; - if (in.hasArray()) { - input = in.array(); - inputPtr = in.arrayOffset() + idx; - } else { - input = new byte[chunkLength]; - in.getBytes(idx, input); - inputPtr = 0; - } - - final int decompressedBytes = decompress(input, inputPtr, chunkLength, - output, outputPtr, originalLength); - if (originalLength != decompressedBytes) { - throw new DecompressionException(String.format( - "stream corrupted: originalLength(%d) and actual length(%d) mismatch", - originalLength, decompressedBytes)); - } + if (isCompressed) { + final byte[] input; + final int inputPtr; + if (in.hasArray()) { + input = in.array(); + inputPtr = in.arrayOffset() + idx; } else { - in.getBytes(idx, output, outputPtr, chunkLength); + input = new byte[chunkLength]; + in.getBytes(idx, input); + inputPtr = 0; } - final Checksum checksum = this.checksum; - if (hasChecksum && checksum != null) { - checksum.reset(); - checksum.update(output, outputPtr, originalLength); - final int checksumResult = (int) checksum.getValue(); - if (checksumResult != currentChecksum) { - throw new DecompressionException(String.format( - "stream corrupted: mismatching checksum: %d (expected: %d)", - checksumResult, currentChecksum)); - } + final int decompressedBytes = decompress(input, inputPtr, chunkLength, + output, outputPtr, originalLength); + if (originalLength != decompressedBytes) { + throw new DecompressionException(String.format( + "stream corrupted: originalLength(%d) and actual length(%d) mismatch", + originalLength, decompressedBytes)); } + } else { + in.getBytes(idx, output, outputPtr, chunkLength); + } - if (uncompressed != null) { - uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); - out.add(uncompressed); - } - in.skipBytes(chunkLength); - - currentState = State.INIT_BLOCK; - success = true; - } finally { - if (!success && uncompressed != null) { - uncompressed.release(); + final Checksum checksum = this.checksum; + if (hasChecksum && checksum != null) { + checksum.reset(); + checksum.update(output, outputPtr, originalLength); + final int checksumResult = (int) checksum.getValue(); + if (checksumResult != currentChecksum) { + throw new DecompressionException(String.format( + "stream corrupted: mismatching checksum: %d (expected: %d)", + checksumResult, currentChecksum)); } } + + if (output.length > 0) { + out.add(Unpooled.wrappedBuffer(output).writerIndex(originalLength)); + } + in.skipBytes(chunkLength); + + currentState = State.INIT_BLOCK; break; case CORRUPTED: in.skipBytes(in.readableBytes()); diff --git a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java index d6432a513c..35a52f1897 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFrameEncoder.java @@ -115,8 +115,15 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { blockType = BLOCK_TYPE_NON_COMPRESSED; out.ensureWritable(outputOffset + 2 + length); - final byte[] output = out.array(); - final int outputPtr = out.arrayOffset() + outputOffset + 2; + final byte[] output; + final int outputPtr; + if (out.hasArray()) { + output = out.array(); + outputPtr = out.arrayOffset() + outputOffset + 2; + } else { + output = new byte[length]; + outputPtr = 0; + } if (checksum != null) { final byte[] input; @@ -138,6 +145,9 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { } else { in.getBytes(idx, output, outputPtr, length); } + if (!out.hasArray()) { + out.setBytes(outputOffset + 2, output); + } chunkLength = length; } else { // try to compress @@ -160,9 +170,19 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { final int maxOutputLength = calculateOutputBufferLength(length); out.ensureWritable(outputOffset + 4 + maxOutputLength); - final byte[] output = out.array(); - final int outputPtr = out.arrayOffset() + outputOffset + 4; + final byte[] output; + final int outputPtr; + if (out.hasArray()) { + output = out.array(); + outputPtr = out.arrayOffset() + outputOffset + 4; + } else { + output = new byte[maxOutputLength]; + outputPtr = 0; + } final int compressedLength = compress(input, inputPtr, length, output, outputPtr, level); + if (!out.hasArray()) { + out.setBytes(outputOffset + 4, output, 0, compressedLength); + } if (compressedLength < length) { blockType = BLOCK_TYPE_COMPRESSED; chunkLength = compressedLength; @@ -171,7 +191,13 @@ public class FastLzFrameEncoder extends MessageToByteEncoder { outputOffset += 2; } else { blockType = BLOCK_TYPE_NON_COMPRESSED; - System.arraycopy(input, inputPtr, output, outputPtr - 2, length); + if (out.hasArray()) { + System.arraycopy(input, inputPtr, output, outputPtr - 2, length); + } else { + for (int i = 0; i < length; i++) { + out.setByte(outputOffset + 2 + i, input[inputPtr + i]); + } + } chunkLength = length; } } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java index c09a7ddc01..b867b67b96 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java @@ -171,13 +171,24 @@ public class LzfDecoder extends ByteToMessageDecoder { } ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); - final byte[] outputArray = uncompressed.array(); - final int outPos = uncompressed.arrayOffset() + uncompressed.writerIndex(); + final byte[] outputArray; + final int outPos; + if (uncompressed.hasArray()) { + outputArray = uncompressed.array(); + outPos = uncompressed.arrayOffset() + uncompressed.writerIndex(); + } else { + outputArray = new byte[originalLength]; + outPos = 0; + } boolean success = false; try { decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength); - uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); + if (uncompressed.hasArray()) { + uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); + } else { + uncompressed.writeBytes(outputArray); + } out.add(uncompressed); in.skipBytes(chunkLength); success = true; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java index b53cbd5aff..bba0fefe7c 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java @@ -159,10 +159,18 @@ public class LzfEncoder extends MessageToByteEncoder { inputPtr = 0; } - final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length); + // Estimate may apparently under-count by one in some cases. + final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length) + 1; out.ensureWritable(maxOutputLength); - final byte[] output = out.array(); - final int outputPtr = out.arrayOffset() + out.writerIndex(); + final byte[] output; + final int outputPtr; + if (out.hasArray()) { + output = out.array(); + outputPtr = out.arrayOffset() + out.writerIndex(); + } else { + output = new byte[maxOutputLength]; + outputPtr = 0; + } final int outputLength; if (length >= compressThreshold) { @@ -173,7 +181,12 @@ public class LzfEncoder extends MessageToByteEncoder { outputLength = encodeNonCompress(input, inputPtr, length, output, outputPtr); } - out.writerIndex(out.writerIndex() + outputLength); + if (out.hasArray()) { + out.writerIndex(out.writerIndex() + outputLength); + } else { + out.writeBytes(output, 0, outputLength); + } + in.skipBytes(length); if (!in.hasArray()) { diff --git a/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java index 5f35ccba54..d1dc6ea7e1 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/AbstractIntegrationTest.java @@ -45,14 +45,12 @@ public abstract class AbstractIntegrationTest { protected abstract EmbeddedChannel createEncoder(); protected abstract EmbeddedChannel createDecoder(); - @BeforeEach - public void initChannels() throws Exception { + public void initChannels() { encoder = createEncoder(); decoder = createDecoder(); } - @AfterEach - public void closeChannels() throws Exception { + public void closeChannels() { encoder.close(); for (;;) { Object msg = encoder.readOutbound(); @@ -74,19 +72,22 @@ public abstract class AbstractIntegrationTest { @Test public void testEmpty() throws Exception { - testIdentity(EmptyArrays.EMPTY_BYTES); + testIdentity(EmptyArrays.EMPTY_BYTES, true); + testIdentity(EmptyArrays.EMPTY_BYTES, false); } @Test public void testOneByte() throws Exception { final byte[] data = { 'A' }; - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testTwoBytes() throws Exception { final byte[] data = { 'B', 'A' }; - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -94,14 +95,16 @@ public abstract class AbstractIntegrationTest { final byte[] data = ("Netty is a NIO client server framework which enables " + "quick and easy development of network applications such as protocol " + "servers and clients.").getBytes(CharsetUtil.UTF_8); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testLargeRandom() throws Exception { final byte[] data = new byte[1024 * 1024]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -111,7 +114,8 @@ public abstract class AbstractIntegrationTest { for (int i = 0; i < 1024; i++) { data[i] = 2; } - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -120,20 +124,23 @@ public abstract class AbstractIntegrationTest { for (int i = 0; i < data.length; i++) { data[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); } - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testLongBlank() throws Exception { final byte[] data = new byte[102400]; - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void testLongSame() throws Exception { final byte[] data = new byte[102400]; Arrays.fill(data, (byte) 123); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test @@ -142,31 +149,39 @@ public abstract class AbstractIntegrationTest { for (int i = 0; i < data.length; i++) { data[i] = (byte) i; } - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } - protected void testIdentity(final byte[] data) { - final ByteBuf in = Unpooled.wrappedBuffer(data); - assertTrue(encoder.writeOutbound(in.retain())); - assertTrue(encoder.finish()); - + protected void testIdentity(final byte[] data, boolean heapBuffer) { + initChannels(); + final ByteBuf in = heapBuffer? Unpooled.wrappedBuffer(data) : + Unpooled.directBuffer(data.length).setBytes(0, data); final CompositeByteBuf compressed = Unpooled.compositeBuffer(); - ByteBuf msg; - while ((msg = encoder.readOutbound()) != null) { - compressed.addComponent(true, msg); - } - assertThat(compressed, is(notNullValue())); - - decoder.writeInbound(compressed.retain()); - assertFalse(compressed.isReadable()); final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); - while ((msg = decoder.readInbound()) != null) { - decompressed.addComponent(true, msg); - } - assertEquals(in.resetReaderIndex(), decompressed); - compressed.release(); - decompressed.release(); - in.release(); + try { + assertTrue(encoder.writeOutbound(in.retain())); + assertTrue(encoder.finish()); + + ByteBuf msg; + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(true, msg); + } + assertThat(compressed, is(notNullValue())); + + decoder.writeInbound(compressed.retain()); + assertFalse(compressed.isReadable()); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(true, msg); + } + in.readerIndex(0); + assertEquals(in, decompressed); + } finally { + compressed.release(); + decompressed.release(); + in.release(); + closeChannels(); + } } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java index d8f4c0ad0a..2b358eb9e4 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2IntegrationTest.java @@ -34,20 +34,23 @@ public class Bzip2IntegrationTest extends AbstractIntegrationTest { public void test3Tables() throws Exception { byte[] data = new byte[500]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void test4Tables() throws Exception { byte[] data = new byte[1100]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } @Test public void test5Tables() throws Exception { byte[] data = new byte[2300]; rand.nextBytes(data); - testIdentity(data); + testIdentity(data, true); + testIdentity(data, false); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java index 3708685882..5987042953 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java @@ -64,49 +64,54 @@ public class FastLzIntegrationTest extends AbstractIntegrationTest { } @Override // test batched flow of data - protected void testIdentity(final byte[] data) { - final ByteBuf original = Unpooled.wrappedBuffer(data); - - int written = 0, length = rand.nextInt(100); - while (written + length < data.length) { - ByteBuf in = Unpooled.wrappedBuffer(data, written, length); - encoder.writeOutbound(in); - written += length; - length = rand.nextInt(100); - } - ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); - encoder.writeOutbound(in); - encoder.finish(); - - ByteBuf msg; + protected void testIdentity(final byte[] data, boolean heapBuffer) { + initChannels(); + final ByteBuf original = heapBuffer? Unpooled.wrappedBuffer(data) : + Unpooled.directBuffer(data.length).writeBytes(data); final CompositeByteBuf compressed = Unpooled.compositeBuffer(); - while ((msg = encoder.readOutbound()) != null) { - compressed.addComponent(true, msg); - } - assertThat(compressed, is(notNullValue())); - - final byte[] compressedArray = new byte[compressed.readableBytes()]; - compressed.readBytes(compressedArray); - written = 0; - length = rand.nextInt(100); - while (written + length < compressedArray.length) { - in = Unpooled.wrappedBuffer(compressedArray, written, length); - decoder.writeInbound(in); - written += length; - length = rand.nextInt(100); - } - in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); - decoder.writeInbound(in); - - assertFalse(compressed.isReadable()); final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); - while ((msg = decoder.readInbound()) != null) { - decompressed.addComponent(true, msg); - } - assertEquals(original, decompressed); - compressed.release(); - decompressed.release(); - original.release(); + try { + int written = 0, length = rand.nextInt(100); + while (written + length < data.length) { + ByteBuf in = Unpooled.wrappedBuffer(data, written, length); + encoder.writeOutbound(in); + written += length; + length = rand.nextInt(100); + } + ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); + encoder.writeOutbound(in); + encoder.finish(); + + ByteBuf msg; + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(true, msg); + } + assertThat(compressed, is(notNullValue())); + + final byte[] compressedArray = new byte[compressed.readableBytes()]; + compressed.readBytes(compressedArray); + written = 0; + length = rand.nextInt(100); + while (written + length < compressedArray.length) { + in = Unpooled.wrappedBuffer(compressedArray, written, length); + decoder.writeInbound(in); + written += length; + length = rand.nextInt(100); + } + in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); + decoder.writeInbound(in); + + assertFalse(compressed.isReadable()); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(true, msg); + } + assertEquals(original, decompressed); + } finally { + compressed.release(); + decompressed.release(); + original.release(); + closeChannels(); + } } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java index 2f4671074d..5e16178699 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java @@ -64,7 +64,7 @@ public class SnappyIntegrationTest extends AbstractIntegrationTest { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }; - testIdentity(data); + testIdentity(data, true); } // These tests were found using testRandom() with large RANDOM_RUNS. @@ -104,7 +104,7 @@ public class SnappyIntegrationTest extends AbstractIntegrationTest { private void testWithSeed(long seed) { byte[] data = new byte[16 * 1048576]; new Random(seed).nextBytes(data); - testIdentity(data); + testIdentity(data, true); } private static void printSeedAsTest(long l) {