Let LzfEncoder support length aware ability. (#10082)
Motivation: Since the LZF support non-compress and compress format, we can let LzfEncoder support length aware ability. It can let the user control compress. Modification: When the data length over compressThreshold, LzfEncoder use compress format to compress data. Otherwise, only use non-compress format. Whatever compress format the encoder use, the LzfDecoder can decompress data well. Result: Gives users control over compression capabilities
This commit is contained in:
parent
2576a2dd74
commit
60cbe8b7b2
@ -17,27 +17,38 @@ package io.netty.handler.codec.compression;
|
|||||||
|
|
||||||
import com.ning.compress.BufferRecycler;
|
import com.ning.compress.BufferRecycler;
|
||||||
import com.ning.compress.lzf.ChunkEncoder;
|
import com.ning.compress.lzf.ChunkEncoder;
|
||||||
|
import com.ning.compress.lzf.LZFChunk;
|
||||||
import com.ning.compress.lzf.LZFEncoder;
|
import com.ning.compress.lzf.LZFEncoder;
|
||||||
import com.ning.compress.lzf.util.ChunkEncoderFactory;
|
import com.ning.compress.lzf.util.ChunkEncoderFactory;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.MessageToByteEncoder;
|
import io.netty.handler.codec.MessageToByteEncoder;
|
||||||
|
|
||||||
import static com.ning.compress.lzf.LZFChunk.*;
|
import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compresses a {@link ByteBuf} using the LZF format.
|
* Compresses a {@link ByteBuf} using the LZF format.
|
||||||
*
|
* <p>
|
||||||
* See original <a href="http://oldhome.schmorp.de/marc/liblzf.html">LZF package</a>
|
* See original <a href="http://oldhome.schmorp.de/marc/liblzf.html">LZF package</a>
|
||||||
* and <a href="https://github.com/ning/compress/wiki/LZFFormat">LZF format</a> for full description.
|
* and <a href="https://github.com/ning/compress/wiki/LZFFormat">LZF format</a> for full description.
|
||||||
*/
|
*/
|
||||||
public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
|
public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Minimum block size ready for compression. Blocks with length
|
* Minimum block size ready for compression. Blocks with length
|
||||||
* less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed.
|
* less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed.
|
||||||
*/
|
*/
|
||||||
private static final int MIN_BLOCK_TO_COMPRESS = 16;
|
private static final int MIN_BLOCK_TO_COMPRESS = 16;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compress threshold for LZF format. When the amount of input data is less than compressThreshold,
|
||||||
|
* we will construct an uncompressed output according to the LZF format.
|
||||||
|
* <p>
|
||||||
|
* When the value is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, since LZF will not compress data
|
||||||
|
* that is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, compressThreshold will not work.
|
||||||
|
*/
|
||||||
|
private final int compressThreshold;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Underlying decoder in use.
|
* Underlying decoder in use.
|
||||||
*/
|
*/
|
||||||
@ -55,29 +66,44 @@ public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
|
|||||||
* non-standard platforms it may be necessary to use {@link #LzfEncoder(boolean)} with {@code true} param.
|
* non-standard platforms it may be necessary to use {@link #LzfEncoder(boolean)} with {@code true} param.
|
||||||
*/
|
*/
|
||||||
public LzfEncoder() {
|
public LzfEncoder() {
|
||||||
this(false, MAX_CHUNK_LEN);
|
this(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new LZF encoder with specified encoding instance.
|
* Creates a new LZF encoder with specified encoding instance.
|
||||||
*
|
*
|
||||||
* @param safeInstance
|
* @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses
|
||||||
* If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods,
|
* standard JDK access methods, and should work on all Java platforms and JVMs.
|
||||||
* and should work on all Java platforms and JVMs.
|
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
|
||||||
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses
|
* implementation that uses Sun JDK's {@link sun.misc.Unsafe}
|
||||||
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
|
* class (which may be included by other JDK's as well).
|
||||||
*/
|
*/
|
||||||
public LzfEncoder(boolean safeInstance) {
|
public LzfEncoder(boolean safeInstance) {
|
||||||
this(safeInstance, MAX_CHUNK_LEN);
|
this(safeInstance, MAX_CHUNK_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new LZF encoder with specified encoding instance and compressThreshold.
|
||||||
|
*
|
||||||
|
* @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses standard
|
||||||
|
* JDK access methods, and should work on all Java platforms and JVMs.
|
||||||
|
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
|
||||||
|
* implementation that uses Sun JDK's {@link sun.misc.Unsafe}
|
||||||
|
* class (which may be included by other JDK's as well).
|
||||||
|
* @param totalLength Expected total length of content to compress; only matters for outgoing messages
|
||||||
|
* that is smaller than maximum chunk size (64k), to optimize encoding hash tables.
|
||||||
|
*/
|
||||||
|
public LzfEncoder(boolean safeInstance, int totalLength) {
|
||||||
|
this(safeInstance, totalLength, MIN_BLOCK_TO_COMPRESS);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new LZF encoder with specified total length of encoded chunk. You can configure it to encode
|
* Creates a new LZF encoder with specified total length of encoded chunk. You can configure it to encode
|
||||||
* your data flow more efficient if you know the average size of messages that you send.
|
* your data flow more efficient if you know the average size of messages that you send.
|
||||||
*
|
*
|
||||||
* @param totalLength
|
* @param totalLength Expected total length of content to compress;
|
||||||
* Expected total length of content to compress; only matters for outgoing messages that is smaller
|
* only matters for outgoing messages that is smaller than maximum chunk size (64k),
|
||||||
* than maximum chunk size (64k), to optimize encoding hash tables.
|
* to optimize encoding hash tables.
|
||||||
*/
|
*/
|
||||||
public LzfEncoder(int totalLength) {
|
public LzfEncoder(int totalLength) {
|
||||||
this(false, totalLength);
|
this(false, totalLength);
|
||||||
@ -86,27 +112,36 @@ public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
|
|||||||
/**
|
/**
|
||||||
* Creates a new LZF encoder with specified settings.
|
* Creates a new LZF encoder with specified settings.
|
||||||
*
|
*
|
||||||
* @param safeInstance
|
* @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK
|
||||||
* If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods,
|
* access methods, and should work on all Java platforms and JVMs.
|
||||||
* and should work on all Java platforms and JVMs.
|
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
|
||||||
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses
|
* implementation that uses Sun JDK's {@link sun.misc.Unsafe}
|
||||||
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
|
* class (which may be included by other JDK's as well).
|
||||||
* @param totalLength
|
* @param totalLength Expected total length of content to compress; only matters for outgoing messages
|
||||||
* Expected total length of content to compress; only matters for outgoing messages that is smaller
|
* that is smaller than maximum chunk size (64k), to optimize encoding hash tables.
|
||||||
* than maximum chunk size (64k), to optimize encoding hash tables.
|
* @param compressThreshold Compress threshold for LZF format. When the amount of input data is less than
|
||||||
|
* compressThreshold, we will construct an uncompressed output according
|
||||||
|
* to the LZF format.
|
||||||
*/
|
*/
|
||||||
public LzfEncoder(boolean safeInstance, int totalLength) {
|
public LzfEncoder(boolean safeInstance, int totalLength, int compressThreshold) {
|
||||||
super(false);
|
super(false);
|
||||||
if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
|
if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
|
||||||
throw new IllegalArgumentException("totalLength: " + totalLength +
|
throw new IllegalArgumentException("totalLength: " + totalLength +
|
||||||
" (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
|
" (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
|
||||||
}
|
}
|
||||||
|
|
||||||
encoder = safeInstance ?
|
if (compressThreshold < MIN_BLOCK_TO_COMPRESS) {
|
||||||
ChunkEncoderFactory.safeNonAllocatingInstance(totalLength)
|
// not a suitable value.
|
||||||
: ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);
|
throw new IllegalArgumentException("compressThreshold:" + compressThreshold +
|
||||||
|
" expected >=" + MIN_BLOCK_TO_COMPRESS);
|
||||||
|
}
|
||||||
|
this.compressThreshold = compressThreshold;
|
||||||
|
|
||||||
recycler = BufferRecycler.instance();
|
this.encoder = safeInstance ?
|
||||||
|
ChunkEncoderFactory.safeNonAllocatingInstance(totalLength)
|
||||||
|
: ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);
|
||||||
|
|
||||||
|
this.recycler = BufferRecycler.instance();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -128,8 +163,16 @@ public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
|
|||||||
out.ensureWritable(maxOutputLength);
|
out.ensureWritable(maxOutputLength);
|
||||||
final byte[] output = out.array();
|
final byte[] output = out.array();
|
||||||
final int outputPtr = out.arrayOffset() + out.writerIndex();
|
final int outputPtr = out.arrayOffset() + out.writerIndex();
|
||||||
final int outputLength = LZFEncoder.appendEncoded(encoder,
|
|
||||||
input, inputPtr, length, output, outputPtr) - outputPtr;
|
final int outputLength;
|
||||||
|
if (length >= compressThreshold) {
|
||||||
|
// compress.
|
||||||
|
outputLength = encodeCompress(input, inputPtr, length, output, outputPtr);
|
||||||
|
} else {
|
||||||
|
// not compress.
|
||||||
|
outputLength = encodeNonCompress(input, inputPtr, length, output, outputPtr);
|
||||||
|
}
|
||||||
|
|
||||||
out.writerIndex(out.writerIndex() + outputLength);
|
out.writerIndex(out.writerIndex() + outputLength);
|
||||||
in.skipBytes(length);
|
in.skipBytes(length);
|
||||||
|
|
||||||
@ -138,6 +181,36 @@ public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int encodeCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
|
||||||
|
return LZFEncoder.appendEncoded(encoder,
|
||||||
|
input, inputPtr, length, output, outputPtr) - outputPtr;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int lzfEncodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
|
||||||
|
int left = length;
|
||||||
|
int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
|
||||||
|
outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
|
||||||
|
left -= chunkLen;
|
||||||
|
if (left < 1) {
|
||||||
|
return outputPtr;
|
||||||
|
}
|
||||||
|
inputPtr += chunkLen;
|
||||||
|
do {
|
||||||
|
chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
|
||||||
|
outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
|
||||||
|
inputPtr += chunkLen;
|
||||||
|
left -= chunkLen;
|
||||||
|
} while (left > 0);
|
||||||
|
return outputPtr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use lzf uncompressed format to encode a piece of input.
|
||||||
|
*/
|
||||||
|
private static int encodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
|
||||||
|
return lzfEncodeNonCompress(input, inputPtr, length, output, outputPtr) - outputPtr;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||||
encoder.close();
|
encoder.close();
|
||||||
|
@ -0,0 +1,28 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2020 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.compression;
|
||||||
|
|
||||||
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
|
|
||||||
|
import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN;
|
||||||
|
|
||||||
|
public class LengthAwareLzfIntegrationTest extends LzfIntegrationTest {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected EmbeddedChannel createEncoder() {
|
||||||
|
return new EmbeddedChannel(new LzfEncoder(false, MAX_CHUNK_LEN, 2 * 1024 * 1024));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user