diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java index c03021cea4..32ad151b6d 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java @@ -17,27 +17,38 @@ package io.netty.handler.codec.compression; import com.ning.compress.BufferRecycler; import com.ning.compress.lzf.ChunkEncoder; +import com.ning.compress.lzf.LZFChunk; import com.ning.compress.lzf.LZFEncoder; import com.ning.compress.lzf.util.ChunkEncoderFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import static com.ning.compress.lzf.LZFChunk.*; +import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN; /** * Compresses a {@link ByteBuf} using the LZF format. - * + *

* See original LZF package * and LZF format for full description. */ public class LzfEncoder extends MessageToByteEncoder { + /** * Minimum block size ready for compression. Blocks with length * less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed. */ private static final int MIN_BLOCK_TO_COMPRESS = 16; + /** + * Compress threshold for LZF format. When the amount of input data is less than compressThreshold, + * we will construct an uncompressed output according to the LZF format. + *

+ * When the value is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, since LZF will not compress data + * that is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, compressThreshold will not work. + */ + private final int compressThreshold; + /** * Underlying decoder in use. */ @@ -55,29 +66,44 @@ public class LzfEncoder extends MessageToByteEncoder { * non-standard platforms it may be necessary to use {@link #LzfEncoder(boolean)} with {@code true} param. */ public LzfEncoder() { - this(false, MAX_CHUNK_LEN); + this(false); } /** * Creates a new LZF encoder with specified encoding instance. * - * @param safeInstance - * If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods, - * and should work on all Java platforms and JVMs. - * Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses - * Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well). + * @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses + * standard JDK access methods, and should work on all Java platforms and JVMs. + * Otherwise encoder will try to use highly optimized {@link ChunkEncoder} + * implementation that uses Sun JDK's {@link sun.misc.Unsafe} + * class (which may be included by other JDK's as well). */ public LzfEncoder(boolean safeInstance) { this(safeInstance, MAX_CHUNK_LEN); } + /** + * Creates a new LZF encoder with specified encoding instance and compressThreshold. + * + * @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses standard + * JDK access methods, and should work on all Java platforms and JVMs. + * Otherwise encoder will try to use highly optimized {@link ChunkEncoder} + * implementation that uses Sun JDK's {@link sun.misc.Unsafe} + * class (which may be included by other JDK's as well). + * @param totalLength Expected total length of content to compress; only matters for outgoing messages + * that is smaller than maximum chunk size (64k), to optimize encoding hash tables. + */ + public LzfEncoder(boolean safeInstance, int totalLength) { + this(safeInstance, totalLength, MIN_BLOCK_TO_COMPRESS); + } + /** * Creates a new LZF encoder with specified total length of encoded chunk. You can configure it to encode * your data flow more efficient if you know the average size of messages that you send. * - * @param totalLength - * Expected total length of content to compress; only matters for outgoing messages that is smaller - * than maximum chunk size (64k), to optimize encoding hash tables. + * @param totalLength Expected total length of content to compress; + * only matters for outgoing messages that is smaller than maximum chunk size (64k), + * to optimize encoding hash tables. */ public LzfEncoder(int totalLength) { this(false, totalLength); @@ -86,27 +112,36 @@ public class LzfEncoder extends MessageToByteEncoder { /** * Creates a new LZF encoder with specified settings. * - * @param safeInstance - * If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods, - * and should work on all Java platforms and JVMs. - * Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses - * Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well). - * @param totalLength - * Expected total length of content to compress; only matters for outgoing messages that is smaller - * than maximum chunk size (64k), to optimize encoding hash tables. + * @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK + * access methods, and should work on all Java platforms and JVMs. + * Otherwise encoder will try to use highly optimized {@link ChunkEncoder} + * implementation that uses Sun JDK's {@link sun.misc.Unsafe} + * class (which may be included by other JDK's as well). + * @param totalLength Expected total length of content to compress; only matters for outgoing messages + * that is smaller than maximum chunk size (64k), to optimize encoding hash tables. + * @param compressThreshold Compress threshold for LZF format. When the amount of input data is less than + * compressThreshold, we will construct an uncompressed output according + * to the LZF format. */ - public LzfEncoder(boolean safeInstance, int totalLength) { + public LzfEncoder(boolean safeInstance, int totalLength, int compressThreshold) { super(false); if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) { throw new IllegalArgumentException("totalLength: " + totalLength + " (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')'); } - encoder = safeInstance ? - ChunkEncoderFactory.safeNonAllocatingInstance(totalLength) - : ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength); + if (compressThreshold < MIN_BLOCK_TO_COMPRESS) { + // not a suitable value. + throw new IllegalArgumentException("compressThreshold:" + compressThreshold + + " expected >=" + MIN_BLOCK_TO_COMPRESS); + } + this.compressThreshold = compressThreshold; - recycler = BufferRecycler.instance(); + this.encoder = safeInstance ? + ChunkEncoderFactory.safeNonAllocatingInstance(totalLength) + : ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength); + + this.recycler = BufferRecycler.instance(); } @Override @@ -128,8 +163,16 @@ public class LzfEncoder extends MessageToByteEncoder { out.ensureWritable(maxOutputLength); final byte[] output = out.array(); final int outputPtr = out.arrayOffset() + out.writerIndex(); - final int outputLength = LZFEncoder.appendEncoded(encoder, - input, inputPtr, length, output, outputPtr) - outputPtr; + + final int outputLength; + if (length >= compressThreshold) { + // compress. + outputLength = encodeCompress(input, inputPtr, length, output, outputPtr); + } else { + // not compress. + outputLength = encodeNonCompress(input, inputPtr, length, output, outputPtr); + } + out.writerIndex(out.writerIndex() + outputLength); in.skipBytes(length); @@ -138,6 +181,36 @@ public class LzfEncoder extends MessageToByteEncoder { } } + private int encodeCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) { + return LZFEncoder.appendEncoded(encoder, + input, inputPtr, length, output, outputPtr) - outputPtr; + } + + private static int lzfEncodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) { + int left = length; + int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left); + outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr); + left -= chunkLen; + if (left < 1) { + return outputPtr; + } + inputPtr += chunkLen; + do { + chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN); + outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr); + inputPtr += chunkLen; + left -= chunkLen; + } while (left > 0); + return outputPtr; + } + + /** + * Use lzf uncompressed format to encode a piece of input. + */ + private static int encodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) { + return lzfEncodeNonCompress(input, inputPtr, length, output, outputPtr) - outputPtr; + } + @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { encoder.close(); diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LengthAwareLzfIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LengthAwareLzfIntegrationTest.java new file mode 100644 index 0000000000..42304474c0 --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/LengthAwareLzfIntegrationTest.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.channel.embedded.EmbeddedChannel; + +import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN; + +public class LengthAwareLzfIntegrationTest extends LzfIntegrationTest { + + @Override + protected EmbeddedChannel createEncoder() { + return new EmbeddedChannel(new LzfEncoder(false, MAX_CHUNK_LEN, 2 * 1024 * 1024)); + } +}