Remove memory copy when checksum non heap backed ByteBuf implementations using Snappy
Motivation: We should try to minimize memory copies whenever possible. Modifications: - Refactor ByteBufChecksum to work with heap and direct ByteBuf always - Remove memory copy in Snappy by let Crc32c extend ByteBufChecksum Result: Less memory copies when using Snappy
This commit is contained in:
parent
9151739577
commit
87551fc751
@ -16,6 +16,7 @@
|
||||
package io.netty.handler.codec.compression;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.util.ByteProcessor;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
@ -42,6 +43,14 @@ abstract class ByteBufChecksum implements Checksum {
|
||||
CRC32_UPDATE_METHOD = updateByteBuffer(new CRC32());
|
||||
}
|
||||
|
||||
private final ByteProcessor updateProcessor = new ByteProcessor() {
|
||||
@Override
|
||||
public boolean process(byte value) throws Exception {
|
||||
update(value);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
private static Method updateByteBuffer(Checksum checksum) {
|
||||
if (PlatformDependent.javaVersion() >= 8) {
|
||||
try {
|
||||
@ -55,9 +64,55 @@ abstract class ByteBufChecksum implements Checksum {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected Checksum checksum;
|
||||
static ByteBufChecksum wrapChecksum(Checksum checksum) {
|
||||
ObjectUtil.checkNotNull(checksum, "checksum");
|
||||
if (checksum instanceof Adler32 && ADLER32_UPDATE_METHOD != null) {
|
||||
return new ReflectiveByteBufChecksum(checksum, ADLER32_UPDATE_METHOD);
|
||||
}
|
||||
if (checksum instanceof CRC32 && CRC32_UPDATE_METHOD != null) {
|
||||
return new ReflectiveByteBufChecksum(checksum, CRC32_UPDATE_METHOD);
|
||||
}
|
||||
return new SlowByteBufChecksum(checksum);
|
||||
}
|
||||
|
||||
private ByteBufChecksum(Checksum checksum) {
|
||||
/**
|
||||
* @see {@link #update(byte[], int, int)}.
|
||||
*/
|
||||
public void update(ByteBuf b, int off, int len) {
|
||||
if (b.hasArray()) {
|
||||
update(b.array(), b.arrayOffset() + off, len);
|
||||
} else {
|
||||
b.forEachByte(off, len, updateProcessor);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ReflectiveByteBufChecksum extends SlowByteBufChecksum {
|
||||
private final Method method;
|
||||
|
||||
ReflectiveByteBufChecksum(Checksum checksum, Method method) {
|
||||
super(checksum);
|
||||
this.method = method;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(ByteBuf b, int off, int len) {
|
||||
if (b.hasArray()) {
|
||||
update(b.array(), b.arrayOffset() + off, len);
|
||||
} else {
|
||||
try {
|
||||
method.invoke(checksum, CompressionUtil.safeNioBuffer(b));
|
||||
} catch (Throwable cause) {
|
||||
throw new Error();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class SlowByteBufChecksum extends ByteBufChecksum {
|
||||
|
||||
protected final Checksum checksum;
|
||||
|
||||
SlowByteBufChecksum(Checksum checksum) {
|
||||
this.checksum = checksum;
|
||||
}
|
||||
|
||||
@ -66,16 +121,6 @@ abstract class ByteBufChecksum implements Checksum {
|
||||
checksum.update(b);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see {@link #update(byte[], int, int)}.
|
||||
*/
|
||||
abstract void update(ByteBuf b, int off, int len);
|
||||
|
||||
/**
|
||||
* Returns {@code true} if {@link ByteBuffer} is supported without memory copy.
|
||||
*/
|
||||
abstract boolean isSupportingByteBuffer();
|
||||
|
||||
@Override
|
||||
public void update(byte[] b, int off, int len) {
|
||||
checksum.update(b, off, len);
|
||||
@ -90,69 +135,5 @@ abstract class ByteBufChecksum implements Checksum {
|
||||
public void reset() {
|
||||
checksum.reset();
|
||||
}
|
||||
|
||||
static ByteBufChecksum wrapChecksum(Checksum checksum) {
|
||||
ObjectUtil.checkNotNull(checksum, "checksum");
|
||||
if (checksum instanceof Adler32 && ADLER32_UPDATE_METHOD != null) {
|
||||
return new ReflectiveByteBufChecksum(checksum, ADLER32_UPDATE_METHOD);
|
||||
}
|
||||
if (checksum instanceof CRC32 && CRC32_UPDATE_METHOD != null) {
|
||||
return new ReflectiveByteBufChecksum(checksum, CRC32_UPDATE_METHOD);
|
||||
}
|
||||
return new SlowByteBufChecksum(checksum);
|
||||
}
|
||||
|
||||
private static final class ReflectiveByteBufChecksum extends ByteBufChecksum {
|
||||
private final Method method;
|
||||
|
||||
ReflectiveByteBufChecksum(Checksum checksum, Method method) {
|
||||
super(checksum);
|
||||
this.method = method;
|
||||
}
|
||||
|
||||
@Override
|
||||
void update(ByteBuf b, int off, int len) {
|
||||
if (b.hasArray()) {
|
||||
update(b.array(), b.arrayOffset() + off, len);
|
||||
} else {
|
||||
try {
|
||||
method.invoke(checksum, CompressionUtil.safeNioBuffer(b));
|
||||
} catch (Throwable cause) {
|
||||
throw new Error();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSupportingByteBuffer() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class SlowByteBufChecksum extends ByteBufChecksum {
|
||||
|
||||
SlowByteBufChecksum(Checksum checksum) {
|
||||
super(checksum);
|
||||
}
|
||||
|
||||
@Override
|
||||
void update(ByteBuf b, int off, int len) {
|
||||
if (b.hasArray()) {
|
||||
update(b.array(), b.arrayOffset() + off, len);
|
||||
} else {
|
||||
ByteBuf heapBuffer = b.alloc().heapBuffer(len);
|
||||
try {
|
||||
heapBuffer.writeBytes(b, off, len);
|
||||
update(heapBuffer.array(), heapBuffer.arrayOffset(), len);
|
||||
} finally {
|
||||
heapBuffer.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSupportingByteBuffer() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,8 +15,6 @@
|
||||
*/
|
||||
package io.netty.handler.codec.compression;
|
||||
|
||||
import java.util.zip.Checksum;
|
||||
|
||||
/**
|
||||
* Implements CRC32-C as defined in:
|
||||
* "Optimization of Cyclic Redundancy-CHeck Codes with 24 and 32 Parity Bits",
|
||||
@ -25,7 +23,7 @@ import java.util.zip.Checksum;
|
||||
* The implementation of this class has been sourced from the Appendix of RFC 3309,
|
||||
* but with masking due to Java not being able to support unsigned types.
|
||||
*/
|
||||
class Crc32c implements Checksum {
|
||||
class Crc32c extends ByteBufChecksum {
|
||||
private static final int[] CRC_TABLE = {
|
||||
0x00000000, 0xF26B8303, 0xE13B70F7, 0x1350F3F4,
|
||||
0xC79A971F, 0x35F1141C, 0x26A1E7E8, 0xD4CA64EB,
|
||||
@ -105,8 +103,9 @@ class Crc32c implements Checksum {
|
||||
|
||||
@Override
|
||||
public void update(byte[] buffer, int offset, int length) {
|
||||
for (int i = offset; i < offset + length; i++) {
|
||||
crc = crc32c(crc, buffer[i]);
|
||||
int end = offset + length;
|
||||
for (int i = offset; i < end; i++) {
|
||||
update(buffer[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,12 +223,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
|
||||
uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength);
|
||||
break;
|
||||
case BLOCK_TYPE_COMPRESSED:
|
||||
uncompressed = checksum == null || checksum.isSupportingByteBuffer()
|
||||
// We can allocate whatever buffer if we either not need to do checksum processing
|
||||
// or if our ByteBufChecksum implementation supports ByteBuffer.
|
||||
// This is needed as Checksum implementations itself only support byte[].
|
||||
? ctx.alloc().buffer(decompressedLength, decompressedLength)
|
||||
: ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
|
||||
uncompressed = ctx.alloc().buffer(decompressedLength, decompressedLength);
|
||||
|
||||
decompressor.decompress(CompressionUtil.safeNioBuffer(in),
|
||||
uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength));
|
||||
|
@ -611,14 +611,7 @@ public final class Snappy {
|
||||
static int calculateChecksum(ByteBuf data, int offset, int length) {
|
||||
Crc32c crc32 = new Crc32c();
|
||||
try {
|
||||
if (data.hasArray()) {
|
||||
crc32.update(data.array(), data.arrayOffset() + offset, length);
|
||||
} else {
|
||||
byte[] array = new byte[length];
|
||||
data.getBytes(offset, array);
|
||||
crc32.update(array, 0, length);
|
||||
}
|
||||
|
||||
crc32.update(data, offset, length);
|
||||
return maskChecksum((int) crc32.getValue());
|
||||
} finally {
|
||||
crc32.reset();
|
||||
|
Loading…
Reference in New Issue
Block a user