Lz4FrameDecoder should reduce memory copies whenever possible

Motivation:

When the user constructs Lz4FrameDecoder with a Checksum implementation like CRC32 or Adler32 and uses Java8 we can directly use a ByteBuffer to do the checksum work. This way we can eliminate memory copies.

Modifications:

Detect if ByteBuffer can be used for checksum work and if so reduce memory copies.

Result:

Less memory copies when using JDK8
This commit is contained in:
Norman Maurer 2016-07-12 13:25:13 +02:00
parent b8400f9628
commit bb3c4a43d8
4 changed files with 240 additions and 57 deletions

View File

@ -0,0 +1,158 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.zip.Adler32;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* {@link Checksum} implementation which can directly act on a {@link ByteBuf}.
*
* Implementations may optimize access patterns depending on if the {@link ByteBuf} is backed by a
* byte array ({@link ByteBuf#hasArray()} is {@code true}) or not.
*/
abstract class ByteBufChecksum implements Checksum {
private static final Method ADLER32_UPDATE_METHOD;
private static final Method CRC32_UPDATE_METHOD;
static {
// See if we can use fast-path when using ByteBuf that is not heap based as Adler32 and CRC32 added support
// for update(ByteBuffer) in JDK8.
ADLER32_UPDATE_METHOD = updateByteBuffer(new Adler32());
CRC32_UPDATE_METHOD = updateByteBuffer(new CRC32());
}
private static Method updateByteBuffer(Checksum checksum) {
if (PlatformDependent.javaVersion() >= 8) {
try {
Method method = checksum.getClass().getDeclaredMethod("update", ByteBuffer.class);
method.invoke(method, ByteBuffer.allocate(1));
return method;
} catch (Throwable ignore) {
return null;
}
}
return null;
}
protected Checksum checksum;
private ByteBufChecksum(Checksum checksum) {
this.checksum = checksum;
}
@Override
public void update(int b) {
checksum.update(b);
}
/**
* @see {@link #update(byte[], int, int)}.
*/
abstract void update(ByteBuf b, int off, int len);
/**
* Returns {@code true} if {@link ByteBuffer} is supported without memory copy.
*/
abstract boolean isSupportingByteBuffer();
@Override
public void update(byte[] b, int off, int len) {
checksum.update(b, off, len);
}
@Override
public long getValue() {
return checksum.getValue();
}
@Override
public void reset() {
checksum.reset();
}
static ByteBufChecksum wrapChecksum(Checksum checksum) {
ObjectUtil.checkNotNull(checksum, "checksum");
if (checksum instanceof Adler32 && ADLER32_UPDATE_METHOD != null) {
return new ReflectiveByteBufChecksum(checksum, ADLER32_UPDATE_METHOD);
}
if (checksum instanceof CRC32 && CRC32_UPDATE_METHOD != null) {
return new ReflectiveByteBufChecksum(checksum, CRC32_UPDATE_METHOD);
}
return new SlowByteBufChecksum(checksum);
}
private static final class ReflectiveByteBufChecksum extends ByteBufChecksum {
private final Method method;
ReflectiveByteBufChecksum(Checksum checksum, Method method) {
super(checksum);
this.method = method;
}
@Override
void update(ByteBuf b, int off, int len) {
if (b.hasArray()) {
update(b.array(), b.arrayOffset() + off, len);
} else {
try {
method.invoke(checksum, CompressionUtil.safeNioBuffer(b));
} catch (Throwable cause) {
throw new Error();
}
}
}
@Override
boolean isSupportingByteBuffer() {
return true;
}
}
private static final class SlowByteBufChecksum extends ByteBufChecksum {
SlowByteBufChecksum(Checksum checksum) {
super(checksum);
}
@Override
void update(ByteBuf b, int off, int len) {
if (b.hasArray()) {
update(b.array(), b.arrayOffset() + off, len);
} else {
ByteBuf heapBuffer = b.alloc().heapBuffer(len);
try {
heapBuffer.writeBytes(b, off, len);
update(heapBuffer.array(), heapBuffer.arrayOffset(), len);
} finally {
heapBuffer.release();
}
}
}
@Override
boolean isSupportingByteBuffer() {
return false;
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
final class CompressionUtil {
private CompressionUtil() { }
static void checkChecksum(ByteBufChecksum checksum, ByteBuf uncompressed, int currentChecksum) {
checksum.reset();
checksum.update(uncompressed,
uncompressed.readerIndex(), uncompressed.readableBytes());
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
static ByteBuffer safeNioBuffer(ByteBuf buffer) {
return buffer.nioBufferCount() == 1 ? buffer.internalNioBuffer(buffer.readerIndex(), buffer.readableBytes())
: buffer.nioBuffer();
}
}

View File

@ -65,7 +65,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
/**
* Underlying checksum calculator in use.
*/
private Checksum checksum;
private ByteBufChecksum checksum;
/**
* Type of current block.
@ -114,7 +114,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
/**
* Creates a new LZ4 decoder with customizable implementation.
*
* @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance
* @param factory user customizable {@link LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param validateChecksums if {@code true}, the checksum field will be validated against the actual
@ -132,7 +132,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
/**
* Creates a new customizable LZ4 decoder.
*
* @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance
* @param factory user customizable {@link LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param checksum the {@link Checksum} instance to use to check data for integrity.
@ -143,7 +143,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
throw new NullPointerException("factory");
}
decompressor = factory.fastDecompressor();
this.checksum = checksum;
this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
}
@Override
@ -212,69 +212,47 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
break;
}
final int idx = in.readerIndex();
final ByteBufChecksum checksum = this.checksum;
ByteBuf uncompressed = null;
ByteBuf uncompressed = ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
final byte[] dest = uncompressed.array();
final int destOff = uncompressed.arrayOffset() + uncompressed.writerIndex();
boolean success = false;
try {
switch (blockType) {
case BLOCK_TYPE_NON_COMPRESSED: {
in.getBytes(idx, dest, destOff, decompressedLength);
break;
}
case BLOCK_TYPE_COMPRESSED: {
final byte[] src;
final int srcOff;
if (in.hasArray()) {
src = in.array();
srcOff = in.arrayOffset() + idx;
} else {
src = new byte[compressedLength];
in.getBytes(idx, src);
srcOff = 0;
}
case BLOCK_TYPE_NON_COMPRESSED:
// Just pass through, we not update the readerIndex yet as we do this outside of the
// switch statement.
uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength);
break;
case BLOCK_TYPE_COMPRESSED:
uncompressed = checksum == null || checksum.isSupportingByteBuffer()
// We can allocate whatever buffer if we either not need to do checksum processing
// or if our ByteBufChecksum implementation supports ByteBuffer.
// This is needed as Checksum implementations itself only support byte[].
? ctx.alloc().buffer(decompressedLength, decompressedLength)
: ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
try {
final int readBytes = decompressor.decompress(src, srcOff,
dest, destOff, decompressedLength);
if (compressedLength != readBytes) {
throw new DecompressionException(String.format(
"stream corrupted: compressedLength(%d) and actual length(%d) mismatch",
compressedLength, readBytes));
}
} catch (LZ4Exception e) {
throw new DecompressionException(e);
}
break;
}
default:
throw new DecompressionException(String.format(
"unexpected blockType: %d (expected: %d or %d)",
blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
final Checksum checksum = this.checksum;
if (checksum != null) {
checksum.reset();
checksum.update(dest, destOff, decompressedLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
decompressor.decompress(CompressionUtil.safeNioBuffer(in),
uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength));
// Update the writerIndex now to reflect what we decompressed.
uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
break;
default:
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
"unexpected blockType: %d (expected: %d or %d)",
blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
out.add(uncompressed);
// Skip inbound bytes after we processed them.
in.skipBytes(compressedLength);
if (checksum != null) {
CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum);
}
out.add(uncompressed);
uncompressed = null;
currentState = State.INIT_BLOCK;
success = true;
} catch (LZ4Exception e) {
throw new DecompressionException(e);
} finally {
if (!success) {
if (uncompressed != null) {
uncompressed.release();
}
}

View File

@ -730,6 +730,10 @@
<ignore>java.security.cert.CertificateRevokedException</ignore>
<ignore>java.util.concurrent.ConcurrentLinkedDeque</ignore>
<!-- Compression -->
<ignore>java.util.zip.CRC32</ignore>
<ignore>java.util.zip.Adler32</ignore>
</ignores>
</configuration>
<executions>