Make all compression codecs support buffers that don't have arrays (#11383) (#11387)

Motivation:
Various compression codecs are currently hard-coded to only support buffers that are backed by byte-arrays that they are willing to expose.
This is efficient for most of the codecs, but compatibility suffers, as we are not able to freely choose our buffer implementations when compression codecs are involved.

Modification:
Add code to the compression codecs, that allow them to handle buffers that don't have arrays.
For many of the codecs, this unfortunately involves allocating temporary byte-arrays, and copying back-and-forth.
We have to do it that way since some codecs can _only_ work with byte-arrays.
Also add tests to verify that this works.

Result:
It is now possible to use all of our compression codecs with both on-heap and off-heap buffers.
The default buffer choice has not changed, however, so performance should be unaffected.

Co-authored-by: Chris Vest <christianvest_hansen@apple.com>
This commit is contained in:
Norman Maurer 2021-06-15 08:04:44 +02:00 committed by GitHub
parent 50e0e8c5ca
commit 0d5774a82b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 202 additions and 148 deletions

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.internal.EmptyArrays;
@ -146,70 +147,50 @@ public class FastLzFrameDecoder extends ByteToMessageDecoder {
final int idx = in.readerIndex();
final int originalLength = this.originalLength;
final ByteBuf uncompressed;
final byte[] output;
final int outputPtr;
final byte[] output = originalLength == 0? EmptyArrays.EMPTY_BYTES : new byte[originalLength];
final int outputPtr = 0;
if (originalLength != 0) {
uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
output = uncompressed.array();
outputPtr = uncompressed.arrayOffset() + uncompressed.writerIndex();
} else {
uncompressed = null;
output = EmptyArrays.EMPTY_BYTES;
outputPtr = 0;
}
boolean success = false;
try {
if (isCompressed) {
final byte[] input;
final int inputPtr;
if (in.hasArray()) {
input = in.array();
inputPtr = in.arrayOffset() + idx;
} else {
input = new byte[chunkLength];
in.getBytes(idx, input);
inputPtr = 0;
}
final int decompressedBytes = decompress(input, inputPtr, chunkLength,
output, outputPtr, originalLength);
if (originalLength != decompressedBytes) {
throw new DecompressionException(String.format(
"stream corrupted: originalLength(%d) and actual length(%d) mismatch",
originalLength, decompressedBytes));
}
if (isCompressed) {
final byte[] input;
final int inputPtr;
if (in.hasArray()) {
input = in.array();
inputPtr = in.arrayOffset() + idx;
} else {
in.getBytes(idx, output, outputPtr, chunkLength);
input = new byte[chunkLength];
in.getBytes(idx, input);
inputPtr = 0;
}
final Checksum checksum = this.checksum;
if (hasChecksum && checksum != null) {
checksum.reset();
checksum.update(output, outputPtr, originalLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
final int decompressedBytes = decompress(input, inputPtr, chunkLength,
output, outputPtr, originalLength);
if (originalLength != decompressedBytes) {
throw new DecompressionException(String.format(
"stream corrupted: originalLength(%d) and actual length(%d) mismatch",
originalLength, decompressedBytes));
}
} else {
in.getBytes(idx, output, outputPtr, chunkLength);
}
if (uncompressed != null) {
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
out.add(uncompressed);
}
in.skipBytes(chunkLength);
currentState = State.INIT_BLOCK;
success = true;
} finally {
if (!success && uncompressed != null) {
uncompressed.release();
final Checksum checksum = this.checksum;
if (hasChecksum && checksum != null) {
checksum.reset();
checksum.update(output, outputPtr, originalLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
if (output.length > 0) {
out.add(Unpooled.wrappedBuffer(output).writerIndex(originalLength));
}
in.skipBytes(chunkLength);
currentState = State.INIT_BLOCK;
break;
case CORRUPTED:
in.skipBytes(in.readableBytes());

View File

@ -115,8 +115,15 @@ public class FastLzFrameEncoder extends MessageToByteEncoder<ByteBuf> {
blockType = BLOCK_TYPE_NON_COMPRESSED;
out.ensureWritable(outputOffset + 2 + length);
final byte[] output = out.array();
final int outputPtr = out.arrayOffset() + outputOffset + 2;
final byte[] output;
final int outputPtr;
if (out.hasArray()) {
output = out.array();
outputPtr = out.arrayOffset() + outputOffset + 2;
} else {
output = new byte[length];
outputPtr = 0;
}
if (checksum != null) {
final byte[] input;
@ -138,6 +145,9 @@ public class FastLzFrameEncoder extends MessageToByteEncoder<ByteBuf> {
} else {
in.getBytes(idx, output, outputPtr, length);
}
if (!out.hasArray()) {
out.setBytes(outputOffset + 2, output);
}
chunkLength = length;
} else {
// try to compress
@ -160,9 +170,19 @@ public class FastLzFrameEncoder extends MessageToByteEncoder<ByteBuf> {
final int maxOutputLength = calculateOutputBufferLength(length);
out.ensureWritable(outputOffset + 4 + maxOutputLength);
final byte[] output = out.array();
final int outputPtr = out.arrayOffset() + outputOffset + 4;
final byte[] output;
final int outputPtr;
if (out.hasArray()) {
output = out.array();
outputPtr = out.arrayOffset() + outputOffset + 4;
} else {
output = new byte[maxOutputLength];
outputPtr = 0;
}
final int compressedLength = compress(input, inputPtr, length, output, outputPtr, level);
if (!out.hasArray()) {
out.setBytes(outputOffset + 4, output, 0, compressedLength);
}
if (compressedLength < length) {
blockType = BLOCK_TYPE_COMPRESSED;
chunkLength = compressedLength;
@ -171,7 +191,13 @@ public class FastLzFrameEncoder extends MessageToByteEncoder<ByteBuf> {
outputOffset += 2;
} else {
blockType = BLOCK_TYPE_NON_COMPRESSED;
System.arraycopy(input, inputPtr, output, outputPtr - 2, length);
if (out.hasArray()) {
System.arraycopy(input, inputPtr, output, outputPtr - 2, length);
} else {
for (int i = 0; i < length; i++) {
out.setByte(outputOffset + 2 + i, input[inputPtr + i]);
}
}
chunkLength = length;
}
}

View File

@ -171,13 +171,24 @@ public class LzfDecoder extends ByteToMessageDecoder {
}
ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
final byte[] outputArray = uncompressed.array();
final int outPos = uncompressed.arrayOffset() + uncompressed.writerIndex();
final byte[] outputArray;
final int outPos;
if (uncompressed.hasArray()) {
outputArray = uncompressed.array();
outPos = uncompressed.arrayOffset() + uncompressed.writerIndex();
} else {
outputArray = new byte[originalLength];
outPos = 0;
}
boolean success = false;
try {
decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength);
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
if (uncompressed.hasArray()) {
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
} else {
uncompressed.writeBytes(outputArray);
}
out.add(uncompressed);
in.skipBytes(chunkLength);
success = true;

View File

@ -159,10 +159,18 @@ public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
inputPtr = 0;
}
final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length);
// Estimate may apparently under-count by one in some cases.
final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length) + 1;
out.ensureWritable(maxOutputLength);
final byte[] output = out.array();
final int outputPtr = out.arrayOffset() + out.writerIndex();
final byte[] output;
final int outputPtr;
if (out.hasArray()) {
output = out.array();
outputPtr = out.arrayOffset() + out.writerIndex();
} else {
output = new byte[maxOutputLength];
outputPtr = 0;
}
final int outputLength;
if (length >= compressThreshold) {
@ -173,7 +181,12 @@ public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
outputLength = encodeNonCompress(input, inputPtr, length, output, outputPtr);
}
out.writerIndex(out.writerIndex() + outputLength);
if (out.hasArray()) {
out.writerIndex(out.writerIndex() + outputLength);
} else {
out.writeBytes(output, 0, outputLength);
}
in.skipBytes(length);
if (!in.hasArray()) {

View File

@ -45,14 +45,12 @@ public abstract class AbstractIntegrationTest {
protected abstract EmbeddedChannel createEncoder();
protected abstract EmbeddedChannel createDecoder();
@BeforeEach
public void initChannels() throws Exception {
public void initChannels() {
encoder = createEncoder();
decoder = createDecoder();
}
@AfterEach
public void closeChannels() throws Exception {
public void closeChannels() {
encoder.close();
for (;;) {
Object msg = encoder.readOutbound();
@ -74,19 +72,22 @@ public abstract class AbstractIntegrationTest {
@Test
public void testEmpty() throws Exception {
testIdentity(EmptyArrays.EMPTY_BYTES);
testIdentity(EmptyArrays.EMPTY_BYTES, true);
testIdentity(EmptyArrays.EMPTY_BYTES, false);
}
@Test
public void testOneByte() throws Exception {
final byte[] data = { 'A' };
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
public void testTwoBytes() throws Exception {
final byte[] data = { 'B', 'A' };
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
@ -94,14 +95,16 @@ public abstract class AbstractIntegrationTest {
final byte[] data = ("Netty is a NIO client server framework which enables " +
"quick and easy development of network applications such as protocol " +
"servers and clients.").getBytes(CharsetUtil.UTF_8);
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
public void testLargeRandom() throws Exception {
final byte[] data = new byte[1024 * 1024];
rand.nextBytes(data);
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
@ -111,7 +114,8 @@ public abstract class AbstractIntegrationTest {
for (int i = 0; i < 1024; i++) {
data[i] = 2;
}
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
@ -120,20 +124,23 @@ public abstract class AbstractIntegrationTest {
for (int i = 0; i < data.length; i++) {
data[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
public void testLongBlank() throws Exception {
final byte[] data = new byte[102400];
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
public void testLongSame() throws Exception {
final byte[] data = new byte[102400];
Arrays.fill(data, (byte) 123);
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
@ -142,31 +149,39 @@ public abstract class AbstractIntegrationTest {
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
protected void testIdentity(final byte[] data) {
final ByteBuf in = Unpooled.wrappedBuffer(data);
assertTrue(encoder.writeOutbound(in.retain()));
assertTrue(encoder.finish());
protected void testIdentity(final byte[] data, boolean heapBuffer) {
initChannels();
final ByteBuf in = heapBuffer? Unpooled.wrappedBuffer(data) :
Unpooled.directBuffer(data.length).setBytes(0, data);
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(true, msg);
}
assertThat(compressed, is(notNullValue()));
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(true, msg);
}
assertEquals(in.resetReaderIndex(), decompressed);
compressed.release();
decompressed.release();
in.release();
try {
assertTrue(encoder.writeOutbound(in.retain()));
assertTrue(encoder.finish());
ByteBuf msg;
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(true, msg);
}
assertThat(compressed, is(notNullValue()));
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(true, msg);
}
in.readerIndex(0);
assertEquals(in, decompressed);
} finally {
compressed.release();
decompressed.release();
in.release();
closeChannels();
}
}
}

View File

@ -34,20 +34,23 @@ public class Bzip2IntegrationTest extends AbstractIntegrationTest {
public void test3Tables() throws Exception {
byte[] data = new byte[500];
rand.nextBytes(data);
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
public void test4Tables() throws Exception {
byte[] data = new byte[1100];
rand.nextBytes(data);
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
@Test
public void test5Tables() throws Exception {
byte[] data = new byte[2300];
rand.nextBytes(data);
testIdentity(data);
testIdentity(data, true);
testIdentity(data, false);
}
}

View File

@ -64,49 +64,54 @@ public class FastLzIntegrationTest extends AbstractIntegrationTest {
}
@Override // test batched flow of data
protected void testIdentity(final byte[] data) {
final ByteBuf original = Unpooled.wrappedBuffer(data);
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
encoder.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
encoder.writeOutbound(in);
encoder.finish();
ByteBuf msg;
protected void testIdentity(final byte[] data, boolean heapBuffer) {
initChannels();
final ByteBuf original = heapBuffer? Unpooled.wrappedBuffer(data) :
Unpooled.directBuffer(data.length).writeBytes(data);
final CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(true, msg);
}
assertThat(compressed, is(notNullValue()));
final byte[] compressedArray = new byte[compressed.readableBytes()];
compressed.readBytes(compressedArray);
written = 0;
length = rand.nextInt(100);
while (written + length < compressedArray.length) {
in = Unpooled.wrappedBuffer(compressedArray, written, length);
decoder.writeInbound(in);
written += length;
length = rand.nextInt(100);
}
in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
decoder.writeInbound(in);
assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(true, msg);
}
assertEquals(original, decompressed);
compressed.release();
decompressed.release();
original.release();
try {
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
encoder.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
encoder.writeOutbound(in);
encoder.finish();
ByteBuf msg;
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(true, msg);
}
assertThat(compressed, is(notNullValue()));
final byte[] compressedArray = new byte[compressed.readableBytes()];
compressed.readBytes(compressedArray);
written = 0;
length = rand.nextInt(100);
while (written + length < compressedArray.length) {
in = Unpooled.wrappedBuffer(compressedArray, written, length);
decoder.writeInbound(in);
written += length;
length = rand.nextInt(100);
}
in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
decoder.writeInbound(in);
assertFalse(compressed.isReadable());
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(true, msg);
}
assertEquals(original, decompressed);
} finally {
compressed.release();
decompressed.release();
original.release();
closeChannels();
}
}
}

View File

@ -64,7 +64,7 @@ public class SnappyIntegrationTest extends AbstractIntegrationTest {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1
};
testIdentity(data);
testIdentity(data, true);
}
// These tests were found using testRandom() with large RANDOM_RUNS.
@ -104,7 +104,7 @@ public class SnappyIntegrationTest extends AbstractIntegrationTest {
private void testWithSeed(long seed) {
byte[] data = new byte[16 * 1048576];
new Random(seed).nextBytes(data);
testIdentity(data);
testIdentity(data, true);
}
private static void printSeedAsTest(long l) {