Implemented LZ4 compression codec

Motivation:

LZ4 compression codec provides sending and receiving data encoded by very fast LZ4 algorithm.

Modifications:

- Added `lz4` library which implements LZ4 algorithm.
- Implemented Lz4FramedEncoder which extends MessageToByteEncoder and provides compression of outgoing messages.
- Added tests to verify the Lz4FramedEncoder and how it can compress data for the next uncompression using the original library.
- Implemented Lz4FramedDecoder which extends ByteToMessageDecoder and provides uncompression of incoming messages.
- Added tests to verify the Lz4FramedDecoder and how it can uncompress data after compression using the original library.
- Added integration tests for Lz4FramedEncoder/Decoder.

Result:

Full LZ4 compression codec which can compress/uncompress data using LZ4 algorithm.
This commit is contained in:
Idel Pivnitskiy 2014-08-05 23:48:23 +04:00 committed by Trustin Lee
parent d0b5fb9548
commit c8841bc9de
10 changed files with 1322 additions and 0 deletions

View File

@ -114,6 +114,14 @@ decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* HOMEPAGE: * HOMEPAGE:
* https://github.com/ning/compress * https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at: and decompression library written by William Kinney. It can be obtained at:

View File

@ -54,6 +54,11 @@
<artifactId>compress-lzf</artifactId> <artifactId>compress-lzf</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<optional>true</optional>
</dependency>
<!-- Test dependencies for jboss marshalling encoder/decoder --> <!-- Test dependencies for jboss marshalling encoder/decoder -->
<dependency> <dependency>

View File

@ -0,0 +1,72 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
final class Lz4Constants {
/**
* Magic number of LZ4 block.
*/
static final long MAGIC_NUMBER = (long) 'L' << 56 |
(long) 'Z' << 48 |
(long) '4' << 40 |
(long) 'B' << 32 |
'l' << 24 |
'o' << 16 |
'c' << 8 |
'k';
/**
* Full length of LZ4 block header.
*/
static final int HEADER_LENGTH = 8 + // magic number
1 + // token
4 + // compressed length
4 + // decompressed length
4; // checksum
/**
* Offsets of header's parts.
*/
static final int TOKEN_OFFSET = 8;
static final int COMPRESSED_LENGTH_OFFSET = TOKEN_OFFSET + 1;
static final int DECOMPRESSED_LENGTH_OFFSET = COMPRESSED_LENGTH_OFFSET + 4;
static final int CHECKSUM_OFFSET = DECOMPRESSED_LENGTH_OFFSET + 4;
/**
* Base value for compression level.
*/
static final int COMPRESSION_LEVEL_BASE = 10;
/**
* LZ4 block sizes.
*/
static final int MIN_BLOCK_SIZE = 64;
static final int MAX_BLOCK_SIZE = 1 << COMPRESSION_LEVEL_BASE + 0x0F; // 32 M
static final int DEFAULT_BLOCK_SIZE = 1 << 16; // 64 KB
/**
* LZ4 block types.
*/
static final int BLOCK_TYPE_NON_COMPRESSED = 0x10;
static final int BLOCK_TYPE_COMPRESSED = 0x20;
/**
* Default seed value for xxhash.
*/
static final int DEFAULT_SEED = 0x9747b28c;
private Lz4Constants() { }
}

View File

@ -0,0 +1,304 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.xxhash.XXHashFactory;
import java.util.List;
import java.util.zip.Checksum;
import static io.netty.handler.codec.compression.Lz4Constants.*;
/**
* Uncompresses a {@link ByteBuf} encoded with the LZ4 format.
*
* See original <a href="http://code.google.com/p/lz4/">LZ4 website</a>
* and <a href="http://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
* for full description.
*
* Since the original LZ4 block format does not contains size of compressed block and size of original data
* this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
* written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* * Magic * Token * Compressed * Decompressed * Checksum * + * LZ4 compressed *
* * * * length * length * * * block *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*/
public class Lz4FrameDecoder extends ByteToMessageDecoder {
/**
* Current state of stream.
*/
private enum State {
INIT_BLOCK,
DECOMPRESS_DATA,
FINISHED,
CORRUPTED
}
private State currentState = State.INIT_BLOCK;
/**
* Underlying decompressor in use.
*/
private LZ4FastDecompressor decompressor;
/**
* Underlying checksum calculator in use.
*/
private Checksum checksum;
/**
* Type of current block.
*/
private int blockType;
/**
* Compressed length of current incoming block.
*/
private int compressedLength;
/**
* Decompressed length of current incoming block.
*/
private int decompressedLength;
/**
* Checksum value of current incoming block.
*/
private int currentChecksum;
/**
* Creates the fastest LZ4 decoder.
*
* Note that by default, validation of the checksum header in each chunk is
* DISABLED for performance improvements. If performance is less of an issue,
* or if you would prefer the safety that checksum validation brings, please
* use the {@link #Lz4FrameDecoder(boolean)} constructor with the argument
* set to {@code true}.
*/
public Lz4FrameDecoder() {
this(false);
}
/**
* Creates a LZ4 decoder with fastest decoder instance available on your machine.
*
* @param validateChecksums if {@code true}, the checksum field will be validated against the actual
* uncompressed data, and if the checksums do not match, a suitable
* {@link DecompressionException} will be thrown
*/
public Lz4FrameDecoder(boolean validateChecksums) {
this(LZ4Factory.fastestInstance(), validateChecksums);
}
/**
* Creates a new LZ4 decoder with customizable implementation.
*
* @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param validateChecksums if {@code true}, the checksum field will be validated against the actual
* uncompressed data, and if the checksums do not match, a suitable
* {@link DecompressionException} will be thrown. In this case encoder will use
* xxhash hashing for Java, based on Yann Collet's work available at
* <a href="http://code.google.com/p/xxhash/">Google Code</a>.
*/
public Lz4FrameDecoder(LZ4Factory factory, boolean validateChecksums) {
this(factory, validateChecksums ?
XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum()
: null);
}
/**
* Creates a new customizable LZ4 decoder.
*
* @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param checksum the {@link Checksum} instance to use to check data for integrity.
* You may set {@code null} if you do not want to validate checksum of each block
*/
public Lz4FrameDecoder(LZ4Factory factory, Checksum checksum) {
if (factory == null) {
throw new NullPointerException("factory");
}
decompressor = factory.fastDecompressor();
this.checksum = checksum;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (;;) {
try {
switch (currentState) {
case INIT_BLOCK:
if (in.readableBytes() < HEADER_LENGTH) {
return;
}
final long magic = in.readLong();
if (magic != MAGIC_NUMBER) {
throw new DecompressionException("unexpected block identifier");
}
final int token = in.readByte();
final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
int blockType = token & 0xF0;
int compressedLength = Integer.reverseBytes(in.readInt());
if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
throw new DecompressionException(String.format(
"invalid compressedLength: %d (expected: 0-%d)",
compressedLength, MAX_BLOCK_SIZE));
}
int decompressedLength = Integer.reverseBytes(in.readInt());
final int maxDecompressedLength = 1 << compressionLevel;
if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
throw new DecompressionException(String.format(
"invalid decompressedLength: %d (expected: 0-%d)",
decompressedLength, maxDecompressedLength));
}
if (decompressedLength == 0 && compressedLength != 0
|| decompressedLength != 0 && compressedLength == 0
|| blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
throw new DecompressionException(String.format(
"stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
compressedLength, decompressedLength));
}
int currentChecksum = Integer.reverseBytes(in.readInt());
if (decompressedLength == 0 && compressedLength == 0) {
if (currentChecksum != 0) {
throw new DecompressionException("stream corrupted: checksum error");
}
currentState = State.FINISHED;
decompressor = null;
checksum = null;
break;
}
this.blockType = blockType;
this.compressedLength = compressedLength;
this.decompressedLength = decompressedLength;
this.currentChecksum = currentChecksum;
currentState = State.DECOMPRESS_DATA;
case DECOMPRESS_DATA:
blockType = this.blockType;
compressedLength = this.compressedLength;
decompressedLength = this.decompressedLength;
currentChecksum = this.currentChecksum;
if (in.readableBytes() < compressedLength) {
return;
}
final int idx = in.readerIndex();
ByteBuf uncompressed = ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
final byte[] dest = uncompressed.array();
final int destOff = uncompressed.arrayOffset() + uncompressed.writerIndex();
boolean success = false;
try {
switch (blockType) {
case BLOCK_TYPE_NON_COMPRESSED: {
in.getBytes(idx, dest, destOff, decompressedLength);
break;
}
case BLOCK_TYPE_COMPRESSED: {
final byte[] src;
final int srcOff;
if (in.hasArray()) {
src = in.array();
srcOff = in.arrayOffset() + idx;
} else {
src = new byte[compressedLength];
in.getBytes(idx, src);
srcOff = 0;
}
try {
final int readBytes = decompressor.decompress(src, srcOff,
dest, destOff, decompressedLength);
if (compressedLength != readBytes) {
throw new DecompressionException(String.format(
"stream corrupted: compressedLength(%d) and actual length(%d) mismatch",
compressedLength, readBytes));
}
} catch (LZ4Exception e) {
throw new DecompressionException(e);
}
break;
}
default:
throw new DecompressionException(String.format(
"unexpected blockType: %d (expected: %d or %d)",
blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
final Checksum checksum = this.checksum;
if (checksum != null) {
checksum.reset();
checksum.update(dest, destOff, decompressedLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
out.add(uncompressed);
in.skipBytes(compressedLength);
currentState = State.INIT_BLOCK;
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}
break;
case FINISHED:
case CORRUPTED:
in.skipBytes(in.readableBytes());
return;
default:
throw new IllegalStateException();
}
} catch (Exception e) {
currentState = State.CORRUPTED;
throw e;
}
}
}
/**
* Returns {@code true} if and only if the end of the compressed stream
* has been reached.
*/
public boolean isClosed() {
return currentState == State.FINISHED;
}
}

View File

@ -0,0 +1,333 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
import java.util.concurrent.TimeUnit;
import java.util.zip.Checksum;
import static io.netty.handler.codec.compression.Lz4Constants.*;
/**
* Compresses a {@link ByteBuf} using the LZ4 format.
*
* See original <a href="http://code.google.com/p/lz4/">LZ4 website</a>
* and <a href="http://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
* for full description.
*
* Since the original LZ4 block format does not contains size of compressed block and size of original data
* this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
* written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* * Magic * Token * Compressed * Decompressed * Checksum * + * LZ4 compressed *
* * * * length * length * * * block *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*/
public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
/**
* Underlying compressor in use.
*/
private LZ4Compressor compressor;
/**
* Underlying checksum calculator in use.
*/
private Checksum checksum;
/**
* Compression level of current LZ4 encoder (depends on {@link #compressedBlockSize}).
*/
private final int compressionLevel;
/**
* Inner byte buffer for outgoing data.
*/
private byte[] buffer;
/**
* Current length of buffered bytes in {@link #buffer}.
*/
private int currentBlockLength;
/**
* Maximum size of compressed block with header.
*/
private final int compressedBlockSize;
/**
* Indicates if the compressed stream has been finished.
*/
private volatile boolean finished;
/**
* Used to interact with its {@link ChannelPipeline} and other handlers.
*/
private volatile ChannelHandlerContext ctx;
/**
* Creates the fastest LZ4 encoder with default block size (64 KB)
* and xxhash hashing for Java, based on Yann Collet's work available at
* <a href="http://code.google.com/p/xxhash/">Google Code</a>.
*/
public Lz4FrameEncoder() {
this(false);
}
/**
* Creates a new LZ4 encoder with hight or fast compression, default block size (64 KB)
* and xxhash hashing for Java, based on Yann Collet's work available at
* <a href="http://code.google.com/p/xxhash/">Google Code</a>.
*
* @param highCompressor if {@code true} codec will use compressor which requires more memory
* and is slower but compresses more efficiently
*/
public Lz4FrameEncoder(boolean highCompressor) {
this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE,
XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
}
/**
* Creates a new customizable LZ4 encoder.
*
* @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param highCompressor if {@code true} codec will use compressor which requires more memory
* and is slower but compresses more efficiently
* @param blockSize the maximum number of bytes to try to compress at once,
* must be >= 64 and <= 32 M
* @param checksum the {@link Checksum} instance to use to check data for integrity
*/
public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
super(false);
if (factory == null) {
throw new NullPointerException("factory");
}
if (checksum == null) {
throw new NullPointerException("checksum");
}
compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
this.checksum = checksum;
compressionLevel = compressionLevel(blockSize);
buffer = new byte[blockSize];
currentBlockLength = 0;
compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize);
finished = false;
}
/**
* Calculates compression level on the basis of block size.
*/
private static int compressionLevel(int blockSize) {
if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
throw new IllegalArgumentException(String.format(
"blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
}
int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
return compressionLevel;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (finished) {
out.writeBytes(in);
return;
}
int length = in.readableBytes();
final byte[] buffer = this.buffer;
final int blockSize = buffer.length;
while (currentBlockLength + length >= blockSize) {
final int tail = blockSize - currentBlockLength;
in.getBytes(in.readerIndex(), buffer, currentBlockLength, tail);
currentBlockLength = blockSize;
flushBufferedData(out);
in.skipBytes(tail);
length -= tail;
}
in.readBytes(buffer, currentBlockLength, length);
currentBlockLength += length;
}
private void flushBufferedData(ByteBuf out) {
int currentBlockLength = this.currentBlockLength;
if (currentBlockLength == 0) {
return;
}
checksum.reset();
checksum.update(buffer, 0, currentBlockLength);
final int check = (int) checksum.getValue();
out.ensureWritable(compressedBlockSize);
final int idx = out.writerIndex();
final byte[] dest = out.array();
final int destOff = out.arrayOffset() + idx;
int compressedLength;
try {
compressedLength = compressor.compress(buffer, 0, currentBlockLength, dest, destOff + HEADER_LENGTH);
} catch (LZ4Exception e) {
throw new CompressionException(e);
}
final int blockType;
if (compressedLength >= currentBlockLength) {
blockType = BLOCK_TYPE_NON_COMPRESSED;
compressedLength = currentBlockLength;
System.arraycopy(buffer, 0, dest, destOff + HEADER_LENGTH, currentBlockLength);
} else {
blockType = BLOCK_TYPE_COMPRESSED;
}
out.setLong(idx, MAGIC_NUMBER);
dest[destOff + TOKEN_OFFSET] = (byte) (blockType | compressionLevel);
writeIntLE(compressedLength, dest, destOff + COMPRESSED_LENGTH_OFFSET);
writeIntLE(currentBlockLength, dest, destOff + DECOMPRESSED_LENGTH_OFFSET);
writeIntLE(check, dest, destOff + CHECKSUM_OFFSET);
out.writerIndex(idx + HEADER_LENGTH + compressedLength);
currentBlockLength = 0;
this.currentBlockLength = currentBlockLength;
}
private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
if (finished) {
promise.setSuccess();
return promise;
}
finished = true;
final ByteBuf footer = ctx.alloc().heapBuffer(
compressor.maxCompressedLength(currentBlockLength) + HEADER_LENGTH);
flushBufferedData(footer);
final int idx = footer.writerIndex();
final byte[] dest = footer.array();
final int destOff = footer.arrayOffset() + idx;
footer.setLong(idx, MAGIC_NUMBER);
dest[destOff + TOKEN_OFFSET] = (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel);
writeIntLE(0, dest, destOff + COMPRESSED_LENGTH_OFFSET);
writeIntLE(0, dest, destOff + DECOMPRESSED_LENGTH_OFFSET);
writeIntLE(0, dest, destOff + CHECKSUM_OFFSET);
footer.writerIndex(idx + HEADER_LENGTH);
compressor = null;
checksum = null;
buffer = null;
return ctx.writeAndFlush(footer, promise);
}
/**
* Writes {@code int} value into the byte buffer with little-endian format.
*/
private static void writeIntLE(int i, byte[] buf, int off) {
buf[off++] = (byte) i;
buf[off++] = (byte) (i >>> 8);
buf[off++] = (byte) (i >>> 16);
buf[off] = (byte) (i >>> 24);
}
/**
* Returns {@code true} if and only if the compressed stream has been finished.
*/
public boolean isClosed() {
return finished;
}
/**
* Close this {@link Lz4FrameEncoder} and so finish the encoding.
*
* The returned {@link ChannelFuture} will be notified once the operation completes.
*/
public ChannelFuture close() {
return close(ctx().newPromise());
}
/**
* Close this {@link Lz4FrameEncoder} and so finish the encoding.
* The given {@link ChannelFuture} will be notified once the operation
* completes and will also be returned.
*/
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return promise;
}
}
@Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.close(promise);
}
});
if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
}
}, 10, TimeUnit.SECONDS); // FIXME: Magic number
}
}
private ChannelHandlerContext ctx() {
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
throw new IllegalStateException("not added to a pipeline");
}
return ctx;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
}

View File

@ -0,0 +1,239 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import static io.netty.handler.codec.compression.Lz4Constants.*;
import static org.junit.Assert.*;
public class Lz4FrameDecoderTest {
private static final byte[] DATA = { 0x4C, 0x5A, 0x34, 0x42, 0x6C, 0x6F, 0x63, 0x6B, // magic bytes
0x16, // token
0x05, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, // compr. and decompr. length
(byte) 0x86, (byte) 0xE4, 0x79, 0x0F, // checksum
0x4E, 0x65, 0x74, 0x74, 0x79, // data
0x4C, 0x5A, 0x34, 0x42, 0x6C, 0x6F, 0x63, 0x6B, // magic bytes
0x16, // token
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last empty block
0x00, 0x00, 0x00, 0x00 };
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
@Rule
public ExpectedException expected = ExpectedException.none();
private EmbeddedChannel channel;
@Before
public void initChannel() {
channel = new EmbeddedChannel(new Lz4FrameDecoder(true));
}
@Test
public void testUnexpectedBlockIdentifier() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("unexpected block identifier");
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[1] = 0x00;
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeInbound(in);
}
@Test
public void testInvalidCompressedLength() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("invalid compressedLength");
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[12] = (byte) 0xFF;
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeInbound(in);
}
@Test
public void testInvalidDecompressedLength() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("invalid decompressedLength");
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[16] = (byte) 0xFF;
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeInbound(in);
}
@Test
public void testDecompressedAndCompressedLengthMismatch() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("mismatch");
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[13] = 0x01;
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeInbound(in);
}
@Test
public void testUnexpectedBlockType() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("unexpected blockType");
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[8] = 0x36;
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeInbound(in);
}
@Test
public void testMismatchingChecksum() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("mismatching checksum");
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[17] = 0x01;
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeInbound(in);
}
@Test
public void testChecksumErrorOfLastBlock() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("checksum error");
final byte[] data = Arrays.copyOf(DATA, DATA.length);
data[44] = 0x01;
ByteBuf in = Unpooled.wrappedBuffer(data);
try {
channel.writeInbound(in);
} finally {
for (;;) {
ByteBuf inflated = channel.readInbound();
if (inflated == null) {
break;
}
inflated.release();
}
channel.finish();
}
}
private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream();
LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize());
lz4Os.write(data);
lz4Os.close();
ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray());
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
@Test
public void testDecompressionOfSmallChunkOfData() throws Exception {
testDecompression(channel, BYTES_SMALL);
}
@Test
public void testDecompressionOfLargeChunkOfData() throws Exception {
testDecompression(channel, BYTES_LARGE);
}
@Test
public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
ByteArrayOutputStream os = new ByteArrayOutputStream();
LZ4BlockOutputStream lz4Os = new LZ4BlockOutputStream(os, randomBlockSize());
lz4Os.write(data);
lz4Os.close();
final byte[] compressedArray = os.toByteArray();
int written = 0, length = rand.nextInt(100);
while (written + length < compressedArray.length) {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length);
channel.writeInbound(compressed);
written += length;
length = rand.nextInt(100);
}
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed);
ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readInbound()) != null) {
uncompressed.addComponent(msg);
uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
}
return uncompressed;
}
private static int randomBlockSize() {
return rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1);
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import net.jpountz.lz4.LZ4BlockInputStream;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import static org.junit.Assert.*;
public class Lz4FrameEncoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[256000];
static {
rand = ThreadLocalRandom.current();
//fill arrays with compressible data
for (int i = 0; i < BYTES_SMALL.length; i++) {
BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
for (int i = 0; i < BYTES_LARGE.length; i++) {
BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
}
private EmbeddedChannel channel;
@Before
public void initChannel() {
channel = new EmbeddedChannel(new Lz4FrameEncoder());
}
private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeOutbound(in);
channel.finish();
final byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(channel, BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(channel, BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
int written = 0, length = rand.nextInt(1, 100);
while (written + length < data.length) {
ByteBuf in = Unpooled.wrappedBuffer(data, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(1, 100);
}
ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written);
channel.writeOutbound(in);
channel.finish();
final byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel, int originalLength) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
ByteArrayInputStream is = new ByteArrayInputStream(compressed);
LZ4BlockInputStream lz4Is = new LZ4BlockInputStream(is);
byte[] uncompressed = new byte[originalLength];
int remaining = originalLength;
while (remaining > 0) {
int read = lz4Is.read(uncompressed, originalLength - remaining, remaining);
if (read > 0) {
remaining -= read;
} else {
break;
}
}
return uncompressed;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.channel.embedded.EmbeddedChannel;
public class Lz4FrameIntegrationTest extends IntegrationTest {
@Override
protected EmbeddedChannel createEncoderEmbeddedChannel() {
return new EmbeddedChannel(new Lz4FrameEncoder());
}
@Override
protected EmbeddedChannel createDecoderEmbeddedChannel() {
return new EmbeddedChannel(new Lz4FrameDecoder());
}
}

202
license/LICENSE.lz4.txt Normal file
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -425,6 +425,11 @@
<artifactId>compress-lzf</artifactId> <artifactId>compress-lzf</artifactId>
<version>1.0.1</version> <version>1.0.1</version>
</dependency> </dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.2.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.rxtx</groupId> <groupId>org.rxtx</groupId>