FastLzFrameDecoder should use allocator to allocate output buffer (#11499)

Motivation:

FastLzFrameDecoder currently not use the allocator to alocate the output buffer. This means that if you use the PooledByteBufAllocator you still can't make use of the pooling. Beside this the decoder also does an uncessary memory copy when no compression is used.

Modifications:

- Allocate the output buffer via the allocator
- Don't allocate and copy if we handle an uncompressed chunk
- Make use of ByteBufChecksum for a few optimizations when running on a recent JDK

Result:

Less allocations when using FastLzFrameDecoder
This commit is contained in:
Norman Maurer 2021-07-21 22:09:12 +02:00
parent a1f13e17db
commit ae41a5c28b

View File

@ -16,10 +16,8 @@
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.internal.EmptyArrays;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
@ -47,7 +45,7 @@ public class FastLzFrameDecoder extends ByteToMessageDecoder {
/**
* Underlying checksum calculator in use.
*/
private final Checksum checksum;
private final ByteBufChecksum checksum;
/**
* Length of current received chunk of data.
@ -104,7 +102,7 @@ public class FastLzFrameDecoder extends ByteToMessageDecoder {
* You may set {@code null} if you do not want to validate checksum of each block.
*/
public FastLzFrameDecoder(Checksum checksum) {
this.checksum = checksum;
this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
}
@Override
@ -146,50 +144,59 @@ public class FastLzFrameDecoder extends ByteToMessageDecoder {
final int idx = in.readerIndex();
final int originalLength = this.originalLength;
final byte[] output = originalLength == 0? EmptyArrays.EMPTY_BYTES : new byte[originalLength];
final int outputPtr = 0;
ByteBuf output = null;
if (isCompressed) {
final byte[] input;
final int inputPtr;
if (in.hasArray()) {
input = in.array();
inputPtr = in.arrayOffset() + idx;
try {
if (isCompressed) {
final byte[] input;
final int inputOffset;
if (in.hasArray()) {
input = in.array();
inputOffset = in.arrayOffset() + idx;
} else {
input = new byte[chunkLength];
in.getBytes(idx, input);
inputOffset = 0;
}
output = ctx.alloc().heapBuffer(originalLength);
int outputOffset = output.arrayOffset() + output.writerIndex();
final int decompressedBytes = decompress(input, inputOffset, chunkLength,
output.array(), outputOffset, originalLength);
if (originalLength != decompressedBytes) {
throw new DecompressionException(String.format(
"stream corrupted: originalLength(%d) and actual length(%d) mismatch",
originalLength, decompressedBytes));
}
output.writerIndex(output.writerIndex() + decompressedBytes);
} else {
input = new byte[chunkLength];
in.getBytes(idx, input);
inputPtr = 0;
output = in.retainedSlice(idx, chunkLength);
}
final int decompressedBytes = decompress(input, inputPtr, chunkLength,
output, outputPtr, originalLength);
if (originalLength != decompressedBytes) {
throw new DecompressionException(String.format(
"stream corrupted: originalLength(%d) and actual length(%d) mismatch",
originalLength, decompressedBytes));
final ByteBufChecksum checksum = this.checksum;
if (hasChecksum && checksum != null) {
checksum.reset();
checksum.update(output, output.readerIndex(), output.readableBytes());
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
} else {
in.getBytes(idx, output, outputPtr, chunkLength);
}
final Checksum checksum = this.checksum;
if (hasChecksum && checksum != null) {
checksum.reset();
checksum.update(output, outputPtr, originalLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
if (output.readableBytes() > 0) {
ctx.fireChannelRead(output);
} else {
output.release();
}
output = null;
in.skipBytes(chunkLength);
} finally {
if (output != null) {
output.release();
}
}
if (output.length > 0) {
ctx.fireChannelRead(Unpooled.wrappedBuffer(output).writerIndex(originalLength));
}
in.skipBytes(chunkLength);
currentState = State.INIT_BLOCK;
break;
case CORRUPTED:
in.skipBytes(in.readableBytes());