From bb3c4a43d8c904236b681414b30a9089575de65b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 12 Jul 2016 13:25:13 +0200 Subject: [PATCH] Lz4FrameDecoder should reduce memory copies whenever possible Motivation: When the user constructs Lz4FrameDecoder with a Checksum implementation like CRC32 or Adler32 and uses Java8 we can directly use a ByteBuffer to do the checksum work. This way we can eliminate memory copies. Modifications: Detect if ByteBuffer can be used for checksum work and if so reduce memory copies. Result: Less memory copies when using JDK8 --- .../codec/compression/ByteBufChecksum.java | 158 ++++++++++++++++++ .../codec/compression/CompressionUtil.java | 43 +++++ .../codec/compression/Lz4FrameDecoder.java | 92 ++++------ pom.xml | 4 + 4 files changed, 240 insertions(+), 57 deletions(-) create mode 100644 codec/src/main/java/io/netty/handler/codec/compression/ByteBufChecksum.java create mode 100644 codec/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ByteBufChecksum.java b/codec/src/main/java/io/netty/handler/codec/compression/ByteBufChecksum.java new file mode 100644 index 0000000000..c1b4bc7079 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/ByteBufChecksum.java @@ -0,0 +1,158 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.PlatformDependent; + +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.zip.Adler32; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * {@link Checksum} implementation which can directly act on a {@link ByteBuf}. + * + * Implementations may optimize access patterns depending on if the {@link ByteBuf} is backed by a + * byte array ({@link ByteBuf#hasArray()} is {@code true}) or not. + */ +abstract class ByteBufChecksum implements Checksum { + private static final Method ADLER32_UPDATE_METHOD; + private static final Method CRC32_UPDATE_METHOD; + + static { + // See if we can use fast-path when using ByteBuf that is not heap based as Adler32 and CRC32 added support + // for update(ByteBuffer) in JDK8. + ADLER32_UPDATE_METHOD = updateByteBuffer(new Adler32()); + CRC32_UPDATE_METHOD = updateByteBuffer(new CRC32()); + } + + private static Method updateByteBuffer(Checksum checksum) { + if (PlatformDependent.javaVersion() >= 8) { + try { + Method method = checksum.getClass().getDeclaredMethod("update", ByteBuffer.class); + method.invoke(method, ByteBuffer.allocate(1)); + return method; + } catch (Throwable ignore) { + return null; + } + } + return null; + } + + protected Checksum checksum; + + private ByteBufChecksum(Checksum checksum) { + this.checksum = checksum; + } + + @Override + public void update(int b) { + checksum.update(b); + } + + /** + * @see {@link #update(byte[], int, int)}. + */ + abstract void update(ByteBuf b, int off, int len); + + /** + * Returns {@code true} if {@link ByteBuffer} is supported without memory copy. + */ + abstract boolean isSupportingByteBuffer(); + + @Override + public void update(byte[] b, int off, int len) { + checksum.update(b, off, len); + } + + @Override + public long getValue() { + return checksum.getValue(); + } + + @Override + public void reset() { + checksum.reset(); + } + + static ByteBufChecksum wrapChecksum(Checksum checksum) { + ObjectUtil.checkNotNull(checksum, "checksum"); + if (checksum instanceof Adler32 && ADLER32_UPDATE_METHOD != null) { + return new ReflectiveByteBufChecksum(checksum, ADLER32_UPDATE_METHOD); + } + if (checksum instanceof CRC32 && CRC32_UPDATE_METHOD != null) { + return new ReflectiveByteBufChecksum(checksum, CRC32_UPDATE_METHOD); + } + return new SlowByteBufChecksum(checksum); + } + + private static final class ReflectiveByteBufChecksum extends ByteBufChecksum { + private final Method method; + + ReflectiveByteBufChecksum(Checksum checksum, Method method) { + super(checksum); + this.method = method; + } + + @Override + void update(ByteBuf b, int off, int len) { + if (b.hasArray()) { + update(b.array(), b.arrayOffset() + off, len); + } else { + try { + method.invoke(checksum, CompressionUtil.safeNioBuffer(b)); + } catch (Throwable cause) { + throw new Error(); + } + } + } + + @Override + boolean isSupportingByteBuffer() { + return true; + } + } + + private static final class SlowByteBufChecksum extends ByteBufChecksum { + + SlowByteBufChecksum(Checksum checksum) { + super(checksum); + } + + @Override + void update(ByteBuf b, int off, int len) { + if (b.hasArray()) { + update(b.array(), b.arrayOffset() + off, len); + } else { + ByteBuf heapBuffer = b.alloc().heapBuffer(len); + try { + heapBuffer.writeBytes(b, off, len); + update(heapBuffer.array(), heapBuffer.arrayOffset(), len); + } finally { + heapBuffer.release(); + } + } + } + + @Override + boolean isSupportingByteBuffer() { + return false; + } + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java b/codec/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java new file mode 100644 index 0000000000..8b43e7ff33 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java @@ -0,0 +1,43 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; + +import java.nio.ByteBuffer; + +final class CompressionUtil { + + private CompressionUtil() { } + + static void checkChecksum(ByteBufChecksum checksum, ByteBuf uncompressed, int currentChecksum) { + checksum.reset(); + checksum.update(uncompressed, + uncompressed.readerIndex(), uncompressed.readableBytes()); + + final int checksumResult = (int) checksum.getValue(); + if (checksumResult != currentChecksum) { + throw new DecompressionException(String.format( + "stream corrupted: mismatching checksum: %d (expected: %d)", + checksumResult, currentChecksum)); + } + } + + static ByteBuffer safeNioBuffer(ByteBuf buffer) { + return buffer.nioBufferCount() == 1 ? buffer.internalNioBuffer(buffer.readerIndex(), buffer.readableBytes()) + : buffer.nioBuffer(); + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java index e15944c9d4..d616e42f41 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java @@ -65,7 +65,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder { /** * Underlying checksum calculator in use. */ - private Checksum checksum; + private ByteBufChecksum checksum; /** * Type of current block. @@ -114,7 +114,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder { /** * Creates a new LZ4 decoder with customizable implementation. * - * @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance + * @param factory user customizable {@link LZ4Factory} instance * which may be JNI bindings to the original C implementation, a pure Java implementation * or a Java implementation that uses the {@link sun.misc.Unsafe} * @param validateChecksums if {@code true}, the checksum field will be validated against the actual @@ -132,7 +132,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder { /** * Creates a new customizable LZ4 decoder. * - * @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance + * @param factory user customizable {@link LZ4Factory} instance * which may be JNI bindings to the original C implementation, a pure Java implementation * or a Java implementation that uses the {@link sun.misc.Unsafe} * @param checksum the {@link Checksum} instance to use to check data for integrity. @@ -143,7 +143,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder { throw new NullPointerException("factory"); } decompressor = factory.fastDecompressor(); - this.checksum = checksum; + this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum); } @Override @@ -212,69 +212,47 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder { break; } - final int idx = in.readerIndex(); + final ByteBufChecksum checksum = this.checksum; + ByteBuf uncompressed = null; - ByteBuf uncompressed = ctx.alloc().heapBuffer(decompressedLength, decompressedLength); - final byte[] dest = uncompressed.array(); - final int destOff = uncompressed.arrayOffset() + uncompressed.writerIndex(); - - boolean success = false; try { switch (blockType) { - case BLOCK_TYPE_NON_COMPRESSED: { - in.getBytes(idx, dest, destOff, decompressedLength); - break; - } - case BLOCK_TYPE_COMPRESSED: { - final byte[] src; - final int srcOff; - if (in.hasArray()) { - src = in.array(); - srcOff = in.arrayOffset() + idx; - } else { - src = new byte[compressedLength]; - in.getBytes(idx, src); - srcOff = 0; - } + case BLOCK_TYPE_NON_COMPRESSED: + // Just pass through, we not update the readerIndex yet as we do this outside of the + // switch statement. + uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength); + break; + case BLOCK_TYPE_COMPRESSED: + uncompressed = checksum == null || checksum.isSupportingByteBuffer() + // We can allocate whatever buffer if we either not need to do checksum processing + // or if our ByteBufChecksum implementation supports ByteBuffer. + // This is needed as Checksum implementations itself only support byte[]. + ? ctx.alloc().buffer(decompressedLength, decompressedLength) + : ctx.alloc().heapBuffer(decompressedLength, decompressedLength); - try { - final int readBytes = decompressor.decompress(src, srcOff, - dest, destOff, decompressedLength); - if (compressedLength != readBytes) { - throw new DecompressionException(String.format( - "stream corrupted: compressedLength(%d) and actual length(%d) mismatch", - compressedLength, readBytes)); - } - } catch (LZ4Exception e) { - throw new DecompressionException(e); - } - break; - } - default: - throw new DecompressionException(String.format( - "unexpected blockType: %d (expected: %d or %d)", - blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED)); - } - - final Checksum checksum = this.checksum; - if (checksum != null) { - checksum.reset(); - checksum.update(dest, destOff, decompressedLength); - final int checksumResult = (int) checksum.getValue(); - if (checksumResult != currentChecksum) { + decompressor.decompress(CompressionUtil.safeNioBuffer(in), + uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength)); + // Update the writerIndex now to reflect what we decompressed. + uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength); + break; + default: throw new DecompressionException(String.format( - "stream corrupted: mismatching checksum: %d (expected: %d)", - checksumResult, currentChecksum)); - } + "unexpected blockType: %d (expected: %d or %d)", + blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED)); } - uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength); - out.add(uncompressed); + // Skip inbound bytes after we processed them. in.skipBytes(compressedLength); + if (checksum != null) { + CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum); + } + out.add(uncompressed); + uncompressed = null; currentState = State.INIT_BLOCK; - success = true; + } catch (LZ4Exception e) { + throw new DecompressionException(e); } finally { - if (!success) { + if (uncompressed != null) { uncompressed.release(); } } diff --git a/pom.xml b/pom.xml index ba6be85df4..8ab1f43f51 100644 --- a/pom.xml +++ b/pom.xml @@ -730,6 +730,10 @@ java.security.cert.CertificateRevokedException java.util.concurrent.ConcurrentLinkedDeque + + + java.util.zip.CRC32 + java.util.zip.Adler32