diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ByteBufChecksum.java b/codec/src/main/java/io/netty/handler/codec/compression/ByteBufChecksum.java
new file mode 100644
index 0000000000..c1b4bc7079
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/compression/ByteBufChecksum.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2016 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.compression;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.PlatformDependent;
+
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+/**
+ * {@link Checksum} implementation which can directly act on a {@link ByteBuf}.
+ *
+ * Implementations may optimize access patterns depending on if the {@link ByteBuf} is backed by a
+ * byte array ({@link ByteBuf#hasArray()} is {@code true}) or not.
+ */
+abstract class ByteBufChecksum implements Checksum {
+ private static final Method ADLER32_UPDATE_METHOD;
+ private static final Method CRC32_UPDATE_METHOD;
+
+ static {
+ // See if we can use fast-path when using ByteBuf that is not heap based as Adler32 and CRC32 added support
+ // for update(ByteBuffer) in JDK8.
+ ADLER32_UPDATE_METHOD = updateByteBuffer(new Adler32());
+ CRC32_UPDATE_METHOD = updateByteBuffer(new CRC32());
+ }
+
+ private static Method updateByteBuffer(Checksum checksum) {
+ if (PlatformDependent.javaVersion() >= 8) {
+ try {
+ Method method = checksum.getClass().getDeclaredMethod("update", ByteBuffer.class);
+ method.invoke(method, ByteBuffer.allocate(1));
+ return method;
+ } catch (Throwable ignore) {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ protected Checksum checksum;
+
+ private ByteBufChecksum(Checksum checksum) {
+ this.checksum = checksum;
+ }
+
+ @Override
+ public void update(int b) {
+ checksum.update(b);
+ }
+
+ /**
+ * @see {@link #update(byte[], int, int)}.
+ */
+ abstract void update(ByteBuf b, int off, int len);
+
+ /**
+ * Returns {@code true} if {@link ByteBuffer} is supported without memory copy.
+ */
+ abstract boolean isSupportingByteBuffer();
+
+ @Override
+ public void update(byte[] b, int off, int len) {
+ checksum.update(b, off, len);
+ }
+
+ @Override
+ public long getValue() {
+ return checksum.getValue();
+ }
+
+ @Override
+ public void reset() {
+ checksum.reset();
+ }
+
+ static ByteBufChecksum wrapChecksum(Checksum checksum) {
+ ObjectUtil.checkNotNull(checksum, "checksum");
+ if (checksum instanceof Adler32 && ADLER32_UPDATE_METHOD != null) {
+ return new ReflectiveByteBufChecksum(checksum, ADLER32_UPDATE_METHOD);
+ }
+ if (checksum instanceof CRC32 && CRC32_UPDATE_METHOD != null) {
+ return new ReflectiveByteBufChecksum(checksum, CRC32_UPDATE_METHOD);
+ }
+ return new SlowByteBufChecksum(checksum);
+ }
+
+ private static final class ReflectiveByteBufChecksum extends ByteBufChecksum {
+ private final Method method;
+
+ ReflectiveByteBufChecksum(Checksum checksum, Method method) {
+ super(checksum);
+ this.method = method;
+ }
+
+ @Override
+ void update(ByteBuf b, int off, int len) {
+ if (b.hasArray()) {
+ update(b.array(), b.arrayOffset() + off, len);
+ } else {
+ try {
+ method.invoke(checksum, CompressionUtil.safeNioBuffer(b));
+ } catch (Throwable cause) {
+ throw new Error();
+ }
+ }
+ }
+
+ @Override
+ boolean isSupportingByteBuffer() {
+ return true;
+ }
+ }
+
+ private static final class SlowByteBufChecksum extends ByteBufChecksum {
+
+ SlowByteBufChecksum(Checksum checksum) {
+ super(checksum);
+ }
+
+ @Override
+ void update(ByteBuf b, int off, int len) {
+ if (b.hasArray()) {
+ update(b.array(), b.arrayOffset() + off, len);
+ } else {
+ ByteBuf heapBuffer = b.alloc().heapBuffer(len);
+ try {
+ heapBuffer.writeBytes(b, off, len);
+ update(heapBuffer.array(), heapBuffer.arrayOffset(), len);
+ } finally {
+ heapBuffer.release();
+ }
+ }
+ }
+
+ @Override
+ boolean isSupportingByteBuffer() {
+ return false;
+ }
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java b/codec/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java
new file mode 100644
index 0000000000..8b43e7ff33
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/compression/CompressionUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.compression;
+
+import io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+final class CompressionUtil {
+
+ private CompressionUtil() { }
+
+ static void checkChecksum(ByteBufChecksum checksum, ByteBuf uncompressed, int currentChecksum) {
+ checksum.reset();
+ checksum.update(uncompressed,
+ uncompressed.readerIndex(), uncompressed.readableBytes());
+
+ final int checksumResult = (int) checksum.getValue();
+ if (checksumResult != currentChecksum) {
+ throw new DecompressionException(String.format(
+ "stream corrupted: mismatching checksum: %d (expected: %d)",
+ checksumResult, currentChecksum));
+ }
+ }
+
+ static ByteBuffer safeNioBuffer(ByteBuf buffer) {
+ return buffer.nioBufferCount() == 1 ? buffer.internalNioBuffer(buffer.readerIndex(), buffer.readableBytes())
+ : buffer.nioBuffer();
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java
index e15944c9d4..d616e42f41 100644
--- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java
+++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameDecoder.java
@@ -65,7 +65,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
/**
* Underlying checksum calculator in use.
*/
- private Checksum checksum;
+ private ByteBufChecksum checksum;
/**
* Type of current block.
@@ -114,7 +114,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
/**
* Creates a new LZ4 decoder with customizable implementation.
*
- * @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance
+ * @param factory user customizable {@link LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param validateChecksums if {@code true}, the checksum field will be validated against the actual
@@ -132,7 +132,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
/**
* Creates a new customizable LZ4 decoder.
*
- * @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance
+ * @param factory user customizable {@link LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param checksum the {@link Checksum} instance to use to check data for integrity.
@@ -143,7 +143,7 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
throw new NullPointerException("factory");
}
decompressor = factory.fastDecompressor();
- this.checksum = checksum;
+ this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
}
@Override
@@ -212,69 +212,47 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
break;
}
- final int idx = in.readerIndex();
+ final ByteBufChecksum checksum = this.checksum;
+ ByteBuf uncompressed = null;
- ByteBuf uncompressed = ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
- final byte[] dest = uncompressed.array();
- final int destOff = uncompressed.arrayOffset() + uncompressed.writerIndex();
-
- boolean success = false;
try {
switch (blockType) {
- case BLOCK_TYPE_NON_COMPRESSED: {
- in.getBytes(idx, dest, destOff, decompressedLength);
- break;
- }
- case BLOCK_TYPE_COMPRESSED: {
- final byte[] src;
- final int srcOff;
- if (in.hasArray()) {
- src = in.array();
- srcOff = in.arrayOffset() + idx;
- } else {
- src = new byte[compressedLength];
- in.getBytes(idx, src);
- srcOff = 0;
- }
+ case BLOCK_TYPE_NON_COMPRESSED:
+ // Just pass through, we not update the readerIndex yet as we do this outside of the
+ // switch statement.
+ uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength);
+ break;
+ case BLOCK_TYPE_COMPRESSED:
+ uncompressed = checksum == null || checksum.isSupportingByteBuffer()
+ // We can allocate whatever buffer if we either not need to do checksum processing
+ // or if our ByteBufChecksum implementation supports ByteBuffer.
+ // This is needed as Checksum implementations itself only support byte[].
+ ? ctx.alloc().buffer(decompressedLength, decompressedLength)
+ : ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
- try {
- final int readBytes = decompressor.decompress(src, srcOff,
- dest, destOff, decompressedLength);
- if (compressedLength != readBytes) {
- throw new DecompressionException(String.format(
- "stream corrupted: compressedLength(%d) and actual length(%d) mismatch",
- compressedLength, readBytes));
- }
- } catch (LZ4Exception e) {
- throw new DecompressionException(e);
- }
- break;
- }
- default:
- throw new DecompressionException(String.format(
- "unexpected blockType: %d (expected: %d or %d)",
- blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
- }
-
- final Checksum checksum = this.checksum;
- if (checksum != null) {
- checksum.reset();
- checksum.update(dest, destOff, decompressedLength);
- final int checksumResult = (int) checksum.getValue();
- if (checksumResult != currentChecksum) {
+ decompressor.decompress(CompressionUtil.safeNioBuffer(in),
+ uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength));
+ // Update the writerIndex now to reflect what we decompressed.
+ uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
+ break;
+ default:
throw new DecompressionException(String.format(
- "stream corrupted: mismatching checksum: %d (expected: %d)",
- checksumResult, currentChecksum));
- }
+ "unexpected blockType: %d (expected: %d or %d)",
+ blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
- uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
- out.add(uncompressed);
+ // Skip inbound bytes after we processed them.
in.skipBytes(compressedLength);
+ if (checksum != null) {
+ CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum);
+ }
+ out.add(uncompressed);
+ uncompressed = null;
currentState = State.INIT_BLOCK;
- success = true;
+ } catch (LZ4Exception e) {
+ throw new DecompressionException(e);
} finally {
- if (!success) {
+ if (uncompressed != null) {
uncompressed.release();
}
}
diff --git a/pom.xml b/pom.xml
index ba6be85df4..8ab1f43f51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -730,6 +730,10 @@
java.security.cert.CertificateRevokedException
java.util.concurrent.ConcurrentLinkedDeque
+
+
+ java.util.zip.CRC32
+ java.util.zip.Adler32