Implemented LZF compression codec
Motivation: LZF compression codec provides sending and receiving data encoded by very fast LZF algorithm. Modifications: - Added Compress-LZF library which implements LZF algorithm - Implemented LzfEncoder which extends MessageToByteEncoder and provides compression of outgoing messages - Added tests to verify the LzfEncoder and how it can compress data for the next uncompression using the original library - Implemented LzfDecoder which extends ByteToMessageDecoder and provides uncompression of incoming messages - Added tests to verify the LzfDecoder and how it can uncompress data after compression using the original library - Added integration tests for LzfEncoder/Decoder Result: Full LZF compression codec which can compress/uncompress data using LZF algorithm.
This commit is contained in:
@ -97,6 +97,14 @@ pure Java, which can be obtained at:
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
This product optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
@ -49,6 +49,11 @@
<!-- Test dependencies for jboss marshalling encoder/decoder -->
@ -0,0 +1,179 @@
* Copyright 2014 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
package io.netty.handler.codec.compression;
import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkDecoder;
import com.ning.compress.lzf.util.ChunkDecoderFactory;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import static com.ning.compress.lzf.LZFChunk.BYTE_Z;
import static com.ning.compress.lzf.LZFChunk.BYTE_V;
import static com.ning.compress.lzf.LZFChunk.MAX_HEADER_LEN;
import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_COMPRESSED;
import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_NOT_COMPRESSED;
import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED;
import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_COMPRESSED;
* Uncompresses a {@link ByteBuf} encoded with the LZF format.
* See original <a href="">LZF package</a>
* and <a href="">LZF format</a> for full description.
public class LzfDecoder extends ByteToMessageDecoder {
* A brief signature for content auto-detection.
private static final short SIGNATURE_OF_CHUNK = BYTE_Z << 8 | BYTE_V;
* Offset to the "Type" in chunk header.
private static final int TYPE_OFFSET = 2;
* Offset to the "ChunkLength" in chunk header.
private static final int CHUNK_LENGTH_OFFSET = 3;
* Offset to the "OriginalLength" in chunk header.
private static final int ORIGINAL_LENGTH_OFFSET = 5;
* Underlying decoder in use.
private final ChunkDecoder decoder;
* Object that handles details of buffer recycling.
private final BufferRecycler recycler;
* Determines the state of flow.
private boolean corrupted;
* Creates a new LZF decoder with the most optimal available methods for underlying data access.
* It will "unsafe" instance if one can be used on current JVM.
* It should be safe to call this constructor as implementations are dynamically loaded; however, on some
* non-standard platforms it may be necessary to use {@link #LzfDecoder(boolean)} with {@code true} param.
public LzfDecoder() {
* Creates a new LZF decoder with specified decoding instance.
* @param safeInstance
* If {@code true} decoder will use {@link ChunkDecoder} that only uses standard JDK access methods,
* and should work on all Java platforms and JVMs.
* Otherwise decoder will try to use highly optimized {@link ChunkDecoder} implementation that uses
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
public LzfDecoder(boolean safeInstance) {
decoder = safeInstance ?
: ChunkDecoderFactory.optimalInstance();
recycler = BufferRecycler.instance();
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (;;) {
if (corrupted) {
if (in.readableBytes() < HEADER_LEN_NOT_COMPRESSED) {
final int idx = in.readerIndex();
final int type = in.getByte(idx + TYPE_OFFSET);
final int chunkLength = in.getUnsignedShort(idx + CHUNK_LENGTH_OFFSET);
final int totalLength = (type == BLOCK_TYPE_NON_COMPRESSED ?
if (in.readableBytes() < totalLength) {
try {
if (in.getUnsignedShort(idx) != SIGNATURE_OF_CHUNK) {
throw new DecompressionException("Unexpected signature of chunk");
switch (type) {
final int originalLength = in.getUnsignedShort(idx + ORIGINAL_LENGTH_OFFSET);
final byte[] inputArray;
final int inPos;
if (in.hasArray()) {
inputArray = in.array();
inPos = in.arrayOffset() + idx + HEADER_LEN_COMPRESSED;
} else {
inputArray = recycler.allocInputBuffer(chunkLength);
in.getBytes(idx + HEADER_LEN_COMPRESSED, inputArray, 0, chunkLength);
inPos = 0;
ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
final byte[] outputArray = uncompressed.array();
final int outPos = uncompressed.arrayOffset();
boolean success = false;
try {
decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength);
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
success = true;
} finally {
if (!success) {
if (!in.hasArray()) {
throw new DecompressionException("Unknown type of chunk: " + type + " (expected: 0 or 1)");
} catch (Exception e) {
corrupted = true;
throw e;
@ -0,0 +1,140 @@
* Copyright 2014 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
package io.netty.handler.codec.compression;
import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFEncoder;
import com.ning.compress.lzf.util.ChunkEncoderFactory;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.MessageToByteEncoder;
import static com.ning.compress.lzf.LZFChunk.*;
* Compresses a {@link ByteBuf} using the LZF format.
* See original <a href="">LZF package</a>
* and <a href="">LZF format</a> for full description.
public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
* Minimum block size ready for compression. Blocks with length
* less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed.
private static final int MIN_BLOCK_TO_COMPRESS = 16;
* Underlying decoder in use.
private final ChunkEncoder encoder;
* Object that handles details of buffer recycling.
private final BufferRecycler recycler;
* Creates a new LZF encoder with the most optimal available methods for underlying data access.
* It will "unsafe" instance if one can be used on current JVM.
* It should be safe to call this constructor as implementations are dynamically loaded; however, on some
* non-standard platforms it may be necessary to use {@link #LzfEncoder(boolean)} with {@code true} param.
public LzfEncoder() {
this(false, MAX_CHUNK_LEN);
* Creates a new LZF encoder with specified encoding instance.
* @param safeInstance
* If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods,
* and should work on all Java platforms and JVMs.
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
public LzfEncoder(boolean safeInstance) {
this(safeInstance, MAX_CHUNK_LEN);
* Creates a new LZF encoder with specified total length of encoded chunk. You can configure it to encode
* your data flow more efficient if you know the avarage size of messages that you send.
* @param totalLength
* Expected total length of content to compress; only matters for outgoing messages that is smaller
* than maximum chunk size (64k), to optimize encoding hash tables.
public LzfEncoder(int totalLength) {
this(false, totalLength);
* Creates a new LZF encoder with specified settings.
* @param safeInstance
* If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods,
* and should work on all Java platforms and JVMs.
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
* @param totalLength
* Expected total length of content to compress; only matters for outgoing messages that is smaller
* than maximum chunk size (64k), to optimize encoding hash tables.
public LzfEncoder(boolean safeInstance, int totalLength) {
if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
throw new IllegalArgumentException("totalLength: " + totalLength +
" (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
encoder = safeInstance ?
: ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);
recycler = BufferRecycler.instance();
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
final int length = in.readableBytes();
final int idx = in.readerIndex();
final byte[] input;
final int inputPtr;
if (in.hasArray()) {
input = in.array();
inputPtr = in.arrayOffset() + idx;
} else {
input = recycler.allocInputBuffer(length);
in.getBytes(idx, input, 0, length);
inputPtr = 0;
final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length);
final byte[] output = out.array();
final int outputPtr = out.arrayOffset() + out.writerIndex();
final int outputLength = LZFEncoder.appendEncoded(encoder,
input, inputPtr, length, output, outputPtr) - outputPtr;
out.writerIndex(out.writerIndex() + outputLength);
if (!in.hasArray()) {
@ -0,0 +1,148 @@
* Copyright 2014 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
package io.netty.handler.codec.compression;
import com.ning.compress.lzf.LZFEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static com.ning.compress.lzf.LZFChunk.BYTE_Z;
import static com.ning.compress.lzf.LZFChunk.BYTE_V;
import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED;
import static org.junit.Assert.*;
public class LzfDecoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
public void initChannel() {
channel = new EmbeddedChannel(new LzfDecoder());
public void testUnexpectedSignatureOfChunk() throws Exception {
expected.expectMessage("Unexpected signature of chunk");
ByteBuf in = Unpooled.buffer();
in.writeShort(0x1234); //random value
public void testUnknownTypeOfChunk() throws Exception {
expected.expectMessage("Unknown type of chunk");
ByteBuf in = Unpooled.buffer();
in.writeByte(0xFF); //random value
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
byte[] compressedArray = LZFEncoder.encode(data);
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
byte[] compressedArray = LZFEncoder.encode(data);
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
written += length;
length = rand.nextInt(100);
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
return uncompressed;
@ -0,0 +1,108 @@
* Copyright 2014 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
package io.netty.handler.codec.compression;
import com.ning.compress.lzf.LZFDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class LzfEncoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
private EmbeddedChannel channel;
public void initChannel() {
channel = new EmbeddedChannel(new LzfEncoder());
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(data);
final byte[] uncompressed = uncompress(channel);
assertArrayEquals(data, uncompressed);
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
written += length;
length = rand.nextInt(100);
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
final byte[] uncompressed = uncompress(channel);
assertArrayEquals(data, uncompressed);
private static byte[] uncompress(EmbeddedChannel channel) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.writerIndex(out.writerIndex() + msg.readableBytes());
byte[] compressed = new byte[out.readableBytes()];
return LZFDecoder.decode(compressed);
@ -0,0 +1,156 @@
* Copyright 2014 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Test;
import java.util.Arrays;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class LzfIntegrationTest {
private static final ThreadLocalRandom rand = ThreadLocalRandom.current();
public static final byte[] EMPTY = new byte[0];
public void testEmpty() throws Exception {
public void testOneByte() throws Exception {
testIdentity(new byte[] { 'A' });
public void testTwoBytes() throws Exception {
testIdentity(new byte[] { 'B', 'A' });
public void testRegular() throws Exception {
byte[] data = ("Netty is a NIO client server framework which enables quick and easy development " +
"of network applications such as protocol servers and clients.").getBytes();
public void testLargeRandom() throws Exception {
byte[] data = new byte[1048576];
public void testPartRandom() throws Exception {
byte[] data = new byte[12345];
for (int i = 0; i < 1024; i++) {
data[i] = 123;
public void testCompressible() throws Exception {
byte[] data = new byte[10000];
for (int i = 0; i < data.length; i++) {
data[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
public void testLongBlank() throws Exception {
byte[] data = new byte[100000];
public void testLongSame() throws Exception {
byte[] data = new byte[100000];
Arrays.fill(data, (byte) 123);
public void testSequential() throws Exception {
byte[] data = new byte[49];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
private static void testIdentity(byte[] data) {
ByteBuf in = Unpooled.wrappedBuffer(data);
EmbeddedChannel encoder = new EmbeddedChannel(new LzfEncoder());
EmbeddedChannel decoder = new EmbeddedChannel(new LzfDecoder());
try {
ByteBuf msg;
CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
assertThat(compressed, is(notNullValue()));
CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
assertEquals(in, decompressed);
} finally {
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
Normal file
Normal file
@ -0,0 +1,11 @@
Copyright 2009-2010 Ning, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
Reference in New Issue
Block a user