From a0c466a27673a02c86f364cc78a500050bd91383 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Tue, 12 Aug 2014 03:00:56 +0400 Subject: [PATCH] Implemented FastLZ compression codec Motivation: FastLZ compression codec provides sending and receiving data encoded by fast FastLZ algorithm using block mode. Modifications: - Added part of `jfastlz` library which implements FastLZ algorithm. See FastLz class. - Implemented FastLzFramedEncoder which extends MessageToByteEncoder and provides compression of outgoing messages. - Implemented FastLzFramedDecoder which extends ByteToMessageDecoder and provides uncompression of incoming messages. - Added integration tests for `FastLzFramedEncoder/Decoder`. Result: Full FastLZ compression codec which can compress/uncompress data using FastLZ algorithm. --- NOTICE.txt | 8 + .../handler/codec/compression/FastLz.java | 575 ++++++++++++++++++ .../compression/FastLzFramedDecoder.java | 211 +++++++ .../compression/FastLzFramedEncoder.java | 186 ++++++ .../compression/FastLzIntegrationTest.java | 136 +++++ license/LICENSE.jfastlz.txt | 24 + 6 files changed, 1140 insertions(+) create mode 100644 codec/src/main/java/io/netty/handler/codec/compression/FastLz.java create mode 100644 codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedDecoder.java create mode 100644 codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedEncoder.java create mode 100644 codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java create mode 100644 license/LICENSE.jfastlz.txt diff --git a/NOTICE.txt b/NOTICE.txt index 74637c84a7..356556498d 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -114,6 +114,14 @@ decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: * HOMEPAGE: * https://github.com/ning/compress +This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression +and decompression library written by William Kinney. It can be obtained at: + + * LICENSE: + * license/LICENSE.jfastlz.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jfastlz/ + This product optionally depends on 'Protocol Buffers', Google's data interchange format, which can be obtained at: diff --git a/codec/src/main/java/io/netty/handler/codec/compression/FastLz.java b/codec/src/main/java/io/netty/handler/codec/compression/FastLz.java new file mode 100644 index 0000000000..754ddfb0d9 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/FastLz.java @@ -0,0 +1,575 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +/** + * Core of FastLZ compression algorithm. + * + * This class provides methods for compression and decompression of buffers and saves + * constants which use by {@link FastLzFramedEncoder} and {@link FastLzFramedDecoder}. + * + * This is refactored code of jfastlz + * library written by William Kinney. + */ +final class FastLz { + + private static final int MAX_DISTANCE = 8191; + private static final int MAX_FARDISTANCE = 65535 + MAX_DISTANCE - 1; + + private static final int HASH_LOG = 13; + private static final int HASH_SIZE = 1 << HASH_LOG; // 8192 + private static final int HASH_MASK = HASH_SIZE - 1; + + private static final int MAX_COPY = 32; + private static final int MAX_LEN = 256 + 8; + + private static final int MIN_RECOMENDED_LENGTH_FOR_LEVEL_2 = 1024 * 64; + + static final int MAGIC_NUMBER = 'F' << 16 | 'L' << 8 | 'Z'; + + static final byte BLOCK_TYPE_NON_COMPRESSED = 0x00; + static final byte BLOCK_TYPE_COMPRESSED = 0x01; + static final byte BLOCK_WITHOUT_CHECKSUM = 0x00; + static final byte BLOCK_WITH_CHECKSUM = 0x10; + + static final int OPTIONS_OFFSET = 3; + static final int CHECKSUM_OFFSET = 4; + + static final int MAX_CHUNK_LENGTH = 0xFFFF; + + /** + * Do not call {@link #compress(byte[], int, int, byte[], int, int)} for input buffers + * which length less than this value. + */ + static final int MIN_LENGTH_TO_COMPRESSION = 32; + + /** + * In this case {@link #compress(byte[], int, int, byte[], int, int)} will choose level + * automatically depending on the length of the input buffer. If length less than + * {@link #MIN_RECOMENDED_LENGTH_FOR_LEVEL_2} {@link #LEVEL_1} will be choosen, + * otherwise {@link #LEVEL_2}. + */ + static final int LEVEL_AUTO = 0; + + /** + * Level 1 is the fastest compression and generally useful for short data. + */ + static final int LEVEL_1 = 1; + + /** + * Level 2 is slightly slower but it gives better compression ratio. + */ + static final int LEVEL_2 = 2; + + /** + * The output buffer must be at least 6% larger than the input buffer and can not be smaller than 66 bytes. + * @param inputLength length of input buffer + * @return Maximum output buffer length + */ + static int calculateOutputBufferLength(int inputLength) { + final int outputLength = (int) (inputLength * 1.06); + return Math.max(outputLength, 66); + } + + /** + * Compress a block of data in the input buffer and returns the size of compressed block. + * The size of input buffer is specified by length. The minimum input buffer size is 32. + * + * If the input is not compressible, the return value might be larger than length (input buffer size). + */ + static int compress(final byte[] input, final int inOffset, final int inLength, + final byte[] output, final int outOffset, final int proposedLevel) { + final int level; + if (proposedLevel == LEVEL_AUTO) { + level = inLength < MIN_RECOMENDED_LENGTH_FOR_LEVEL_2 ? LEVEL_1 : LEVEL_2; + } else { + level = proposedLevel; + } + + int ip = 0; + int ipBound = ip + inLength - 2; + int ipLimit = ip + inLength - 12; + + int op = 0; + + // const flzuint8* htab[HASH_SIZE]; + int[] htab = new int[HASH_SIZE]; + // const flzuint8** hslot; + int hslot; + // flzuint32 hval; + // int OK b/c address starting from 0 + int hval; + // flzuint32 copy; + // int OK b/c address starting from 0 + int copy; + + /* sanity check */ + if (inLength < 4) { + if (inLength != 0) { + // *op++ = length-1; + output[outOffset + op++] = (byte) (inLength - 1); + ipBound++; + while (ip <= ipBound) { + output[outOffset + op++] = input[inOffset + ip++]; + } + return inLength + 1; + } + // else + return 0; + } + + /* initializes hash table */ + // for (hslot = htab; hslot < htab + HASH_SIZE; hslot++) + for (hslot = 0; hslot < HASH_SIZE; hslot++) { + //*hslot = ip; + htab[hslot] = ip; + } + + /* we start with literal copy */ + copy = 2; + output[outOffset + op++] = MAX_COPY - 1; + output[outOffset + op++] = input[inOffset + ip++]; + output[outOffset + op++] = input[inOffset + ip++]; + + /* main loop */ + while (ip < ipLimit) { + int ref = 0; + + long distance = 0; + + /* minimum match length */ + // flzuint32 len = 3; + // int OK b/c len is 0 and octal based + int len = 3; + + /* comparison starting-point */ + int anchor = ip; + + boolean matchLabel = false; + + /* check for a run */ + if (level == LEVEL_2) { + //if(ip[0] == ip[-1] && FASTLZ_READU16(ip-1)==FASTLZ_READU16(ip+1)) + if (input[inOffset + ip] == input[inOffset + ip - 1] && + readU16(input, inOffset + ip - 1) == readU16(input, inOffset + ip + 1)) { + distance = 1; + ip += 3; + ref = anchor - 1 + 3; + + /* + * goto match; + */ + matchLabel = true; + } + } + if (!matchLabel) { + /* find potential match */ + // HASH_FUNCTION(hval,ip); + hval = hashFunction(input, inOffset + ip); + // hslot = htab + hval; + hslot = hval; + // ref = htab[hval]; + ref = htab[hval]; + + /* calculate distance to the match */ + distance = anchor - ref; + + /* update hash table */ + //*hslot = anchor; + htab[hslot] = anchor; + + /* is this a match? check the first 3 bytes */ + if (distance == 0 + || (level == LEVEL_1 ? distance >= MAX_DISTANCE : distance >= MAX_FARDISTANCE) + || input[inOffset + ref++] != input[inOffset + ip++] + || input[inOffset + ref++] != input[inOffset + ip++] + || input[inOffset + ref++] != input[inOffset + ip++]) { + /* + * goto literal; + */ + output[outOffset + op++] = input[inOffset + anchor++]; + ip = anchor; + copy++; + if (copy == MAX_COPY) { + copy = 0; + output[outOffset + op++] = MAX_COPY - 1; + } + continue; + } + + if (level == LEVEL_2) { + /* far, needs at least 5-byte match */ + if (distance >= MAX_DISTANCE) { + if (input[inOffset + ip++] != input[inOffset + ref++] + || input[inOffset + ip++] != input[inOffset + ref++]) { + /* + * goto literal; + */ + output[outOffset + op++] = input[inOffset + anchor++]; + ip = anchor; + copy++; + if (copy == MAX_COPY) { + copy = 0; + output[outOffset + op++] = MAX_COPY - 1; + } + continue; + } + len += 2; + } + } + } // end if(!matchLabel) + /* + * match: + */ + /* last matched byte */ + ip = anchor + len; + + /* distance is biased */ + distance--; + + if (distance == 0) { + /* zero distance means a run */ + //flzuint8 x = ip[-1]; + byte x = input[inOffset + ip - 1]; + while (ip < ipBound) { + if (input[inOffset + ref++] != x) { + break; + } else { + ip++; + } + } + } else { + for (;;) { + /* safe because the outer check against ip limit */ + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + while (ip < ipBound) { + if (input[inOffset + ref++] != input[inOffset + ip++]) { + break; + } + } + break; + } + } + + /* if we have copied something, adjust the copy count */ + if (copy != 0) { + /* copy is biased, '0' means 1 byte copy */ + // *(op-copy-1) = copy-1; + output[outOffset + op - copy - 1] = (byte) (copy - 1); + } else { + /* back, to overwrite the copy count */ + op--; + } + + /* reset literal counter */ + copy = 0; + + /* length is biased, '1' means a match of 3 bytes */ + ip -= 3; + len = ip - anchor; + + /* encode the match */ + if (level == LEVEL_2) { + if (distance < MAX_DISTANCE) { + if (len < 7) { + output[outOffset + op++] = (byte) ((len << 5) + (distance >>> 8)); + output[outOffset + op++] = (byte) (distance & 255); + } else { + output[outOffset + op++] = (byte) ((7 << 5) + (distance >>> 8)); + for (len -= 7; len >= 255; len -= 255) { + output[outOffset + op++] = (byte) 255; + } + output[outOffset + op++] = (byte) len; + output[outOffset + op++] = (byte) (distance & 255); + } + } else { + /* far away, but not yet in the another galaxy... */ + if (len < 7) { + distance -= MAX_DISTANCE; + output[outOffset + op++] = (byte) ((len << 5) + 31); + output[outOffset + op++] = (byte) 255; + output[outOffset + op++] = (byte) (distance >>> 8); + output[outOffset + op++] = (byte) (distance & 255); + } else { + distance -= MAX_DISTANCE; + output[outOffset + op++] = (byte) ((7 << 5) + 31); + for (len -= 7; len >= 255; len -= 255) { + output[outOffset + op++] = (byte) 255; + } + output[outOffset + op++] = (byte) len; + output[outOffset + op++] = (byte) 255; + output[outOffset + op++] = (byte) (distance >>> 8); + output[outOffset + op++] = (byte) (distance & 255); + } + } + } else { + if (len > MAX_LEN - 2) { + while (len > MAX_LEN - 2) { + output[outOffset + op++] = (byte) ((7 << 5) + (distance >>> 8)); + output[outOffset + op++] = (byte) (MAX_LEN - 2 - 7 - 2); + output[outOffset + op++] = (byte) (distance & 255); + len -= MAX_LEN - 2; + } + } + + if (len < 7) { + output[outOffset + op++] = (byte) ((len << 5) + (distance >>> 8)); + output[outOffset + op++] = (byte) (distance & 255); + } else { + output[outOffset + op++] = (byte) ((7 << 5) + (distance >>> 8)); + output[outOffset + op++] = (byte) (len - 7); + output[outOffset + op++] = (byte) (distance & 255); + } + } + + /* update the hash at match boundary */ + //HASH_FUNCTION(hval,ip); + hval = hashFunction(input, inOffset + ip); + htab[hval] = ip++; + + //HASH_FUNCTION(hval,ip); + hval = hashFunction(input, inOffset + ip); + htab[hval] = ip++; + + /* assuming literal copy */ + output[outOffset + op++] = MAX_COPY - 1; + + continue; + + // Moved to be inline, with a 'continue' + /* + * literal: + * + output[outOffset + op++] = input[inOffset + anchor++]; + ip = anchor; + copy++; + if(copy == MAX_COPY){ + copy = 0; + output[outOffset + op++] = MAX_COPY-1; + } + */ + } + + /* left-over as literal copy */ + ipBound++; + while (ip <= ipBound) { + output[outOffset + op++] = input[inOffset + ip++]; + copy++; + if (copy == MAX_COPY) { + copy = 0; + output[outOffset + op++] = MAX_COPY - 1; + } + } + + /* if we have copied something, adjust the copy length */ + if (copy != 0) { + //*(op-copy-1) = copy-1; + output[outOffset + op - copy - 1] = (byte) (copy - 1); + } else { + op--; + } + + if (level == LEVEL_2) { + /* marker for fastlz2 */ + output[outOffset] |= 1 << 5; + } + + return op; + } + + /** + * Decompress a block of compressed data and returns the size of the decompressed block. + * If error occurs, e.g. the compressed data is corrupted or the output buffer is not large + * enough, then 0 (zero) will be returned instead. + * + * Decompression is memory safe and guaranteed not to write the output buffer + * more than what is specified in outLength. + */ + static int decompress(final byte[] input, final int inOffset, final int inLength, + final byte[] output, final int outOffset, final int outLength) { + //int level = ((*(const flzuint8*)input) >> 5) + 1; + final int level = (input[inOffset] >> 5) + 1; + if (level != LEVEL_1 && level != LEVEL_2) { + throw new DecompressionException(String.format( + "invalid level: %d (expected: %d or %d)", level, LEVEL_1, LEVEL_2 + )); + } + + // const flzuint8* ip = (const flzuint8*) input; + int ip = 0; + // flzuint8* op = (flzuint8*) output; + int op = 0; + // flzuint32 ctrl = (*ip++) & 31; + long ctrl = input[inOffset + ip++] & 31; + + int loop = 1; + do { + // const flzuint8* ref = op; + int ref = op; + // flzuint32 len = ctrl >> 5; + long len = ctrl >> 5; + // flzuint32 ofs = (ctrl & 31) << 8; + long ofs = (ctrl & 31) << 8; + + if (ctrl >= 32) { + len--; + // ref -= ofs; + ref -= ofs; + + int code; + if (len == 6) { + if (level == LEVEL_1) { + // len += *ip++; + len += input[inOffset + ip++] & 0xFF; + } else { + do { + code = input[inOffset + ip++] & 0xFF; + len += code; + } while (code == 255); + } + } + if (level == LEVEL_1) { + // ref -= *ip++; + ref -= input[inOffset + ip++] & 0xFF; + } else { + code = input[inOffset + ip++] & 0xFF; + ref -= code; + + /* match from 16-bit distance */ + // if(FASTLZ_UNEXPECT_CONDITIONAL(code==255)) + // if(FASTLZ_EXPECT_CONDITIONAL(ofs==(31 << 8))) + if (code == 255 && ofs == 31 << 8) { + ofs = (input[inOffset + ip++] & 0xFF) << 8; + ofs += input[inOffset + ip++] & 0xFF; + + ref = (int) (op - ofs - MAX_DISTANCE); + } + } + + // if the output index + length of block(?) + 3(?) is over the output limit? + if (op + len + 3 > outLength) { + return 0; + } + + // if (FASTLZ_UNEXPECT_CONDITIONAL(ref-1 < (flzuint8 *)output)) + // if the address space of ref-1 is < the address of output? + // if we are still at the beginning of the output address? + if (ref - 1 < 0) { + return 0; + } + + if (ip < inLength) { + ctrl = input[inOffset + ip++] & 0xFF; + } else { + loop = 0; + } + + if (ref == op) { + /* optimize copy for a run */ + // flzuint8 b = ref[-1]; + byte b = output[outOffset + ref - 1]; + output[outOffset + op++] = b; + output[outOffset + op++] = b; + output[outOffset + op++] = b; + while (len != 0) { + output[outOffset + op++] = b; + --len; + } + } else { + /* copy from reference */ + ref--; + + // *op++ = *ref++; + output[outOffset + op++] = output[outOffset + ref++]; + output[outOffset + op++] = output[outOffset + ref++]; + output[outOffset + op++] = output[outOffset + ref++]; + + while (len != 0) { + output[outOffset + op++] = output[outOffset + ref++]; + --len; + } + } + } else { + ctrl++; + + if (op + ctrl > outLength) { + return 0; + } + if (ip + ctrl > inLength) { + return 0; + } + + //*op++ = *ip++; + output[outOffset + op++] = input[inOffset + ip++]; + + for (--ctrl; ctrl != 0; ctrl--) { + // *op++ = *ip++; + output[outOffset + op++] = input[inOffset + ip++]; + } + + loop = ip < inLength ? 1 : 0; + if (loop != 0) { + // ctrl = *ip++; + ctrl = input[inOffset + ip++] & 0xFF; + } + } + + // while(FASTLZ_EXPECT_CONDITIONAL(loop)); + } while (loop != 0); + + // return op - (flzuint8*)output; + return op; + } + + private static int hashFunction(byte[] p, int offset) { + int v = readU16(p, offset); + v ^= readU16(p, offset + 1) ^ v >> 16 - HASH_LOG; + v &= HASH_MASK; + return v; + } + + private static int readU16(byte[] data, int offset) { + if (offset + 1 >= data.length) { + return data[offset] & 0xff; + } + return (data[offset + 1] & 0xff) << 8 | data[offset] & 0xff; + } + + private FastLz() { } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedDecoder.java new file mode 100644 index 0000000000..1b7379ce26 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedDecoder.java @@ -0,0 +1,211 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; +import java.util.zip.Adler32; +import java.util.zip.Checksum; + +import static io.netty.handler.codec.compression.FastLz.*; + +/** + * Uncompresses a {@link ByteBuf} encoded with the Bzip2 format. + * + * See FastLZ format. + */ +public class FastLzFramedDecoder extends ByteToMessageDecoder { + /** + * Current state of decompression. + */ + private enum State { + INIT_BLOCK, + INIT_BLOCK_PARAMS, + DECOMPRESS_DATA, + CORRUPTED + } + + private State currentState = State.INIT_BLOCK; + + /** + * Underlying checksum calculator in use. + */ + private final Checksum checksum; + + /** + * Length of current received chunk of data. + */ + private int chunkLength; + + /** + * Original of current received chunk of data. + * It is equal to {@link #chunkLength} for non compressed chunks. + */ + private int originalLength; + + /** + * Indicates is this chunk compressed or not. + */ + private boolean isCompressed; + + /** + * Indicates is this chunk has checksum or not. + */ + private boolean hasChecksum; + + /** + * Chechsum value of current received chunk of data which has checksum. + */ + private int currentChecksum; + + /** + * Creates the fastest FastLZ decoder without checksum calculation. + */ + public FastLzFramedDecoder() { + this(false); + } + + /** + * Creates a FastLZ decoder with calculation of checksums as specified. + * + * @param validateChecksums + * If true, the checksum field will be validated against the actual + * uncompressed data, and if the checksums do not match, a suitable + * {@link DecompressionException} will be thrown. + * Note, that in this case decoder will use {@link java.util.zip.Adler32} + * as a default checksum calculator. + */ + public FastLzFramedDecoder(boolean validateChecksums) { + this(validateChecksums ? new Adler32() : null); + } + + /** + * Creates a FastLZ decoder with specified checksum calculator. + * + * @param checksum + * the {@link Checksum} instance to use to check data for integrity. + * You may set {@code null} if you do not want to validate checksum of each block. + */ + public FastLzFramedDecoder(Checksum checksum) { + this.checksum = checksum; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + for (;;) { + try { + switch (currentState) { + case INIT_BLOCK: + if (in.readableBytes() < 4) { + return; + } + + final int magic = in.readUnsignedMedium(); + if (magic != MAGIC_NUMBER) { + throw new DecompressionException("unexpected block identifier"); + } + + final byte options = in.readByte(); + isCompressed = (options & 0x01) == BLOCK_TYPE_COMPRESSED; + hasChecksum = (options & 0x10) == BLOCK_WITH_CHECKSUM; + + currentState = State.INIT_BLOCK_PARAMS; + case INIT_BLOCK_PARAMS: + if (in.readableBytes() < 2 + (isCompressed ? 2 : 0) + (hasChecksum ? 4 : 0)) { + return; + } + currentChecksum = hasChecksum ? in.readInt() : 0; + chunkLength = in.readUnsignedShort(); + originalLength = isCompressed ? in.readUnsignedShort() : chunkLength; + + currentState = State.DECOMPRESS_DATA; + case DECOMPRESS_DATA: + final int chunkLength = this.chunkLength; + if (in.readableBytes() < chunkLength) { + return; + } + + final int idx = in.readerIndex(); + final int originalLength = this.originalLength; + + ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); + final byte[] output = uncompressed.array(); + final int outputPtr = uncompressed.arrayOffset() + uncompressed.writerIndex(); + + boolean success = false; + try { + if (isCompressed) { + final byte[] input; + final int inputPtr; + if (in.hasArray()) { + input = in.array(); + inputPtr = in.arrayOffset() + idx; + } else { + input = new byte[chunkLength]; + in.getBytes(idx, input); + inputPtr = 0; + } + + final int decompressedBytes = decompress(input, inputPtr, chunkLength, + output, outputPtr, originalLength); + if (originalLength != decompressedBytes) { + throw new DecompressionException(String.format( + "stream corrupted: originalLength(%d) and actual length(%d) mismatch", + originalLength, decompressedBytes)); + } + } else { + in.getBytes(idx, output, outputPtr, chunkLength); + } + + final Checksum checksum = this.checksum; + if (hasChecksum && checksum != null) { + checksum.reset(); + checksum.update(output, outputPtr, originalLength); + final int checksumResult = (int) checksum.getValue(); + if (checksumResult != currentChecksum) { + throw new DecompressionException(String.format( + "stream corrupted: mismatching checksum: %d (expected: %d)", + checksumResult, currentChecksum)); + } + } + uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); + out.add(uncompressed); + in.skipBytes(chunkLength); + + currentState = State.INIT_BLOCK; + success = true; + } finally { + if (!success) { + uncompressed.release(); + } + } + break; + case CORRUPTED: + in.skipBytes(in.readableBytes()); + return; + default: + throw new IllegalStateException(); + } + } catch (Exception e) { + currentState = State.CORRUPTED; + throw e; + } + } + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedEncoder.java new file mode 100644 index 0000000000..a750d28bf2 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/FastLzFramedEncoder.java @@ -0,0 +1,186 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import java.util.zip.Adler32; +import java.util.zip.Checksum; + +import static io.netty.handler.codec.compression.FastLz.*; + +/** + * Compresses a {@link ByteBuf} using the FastLZ algorithm. + * + * See FastLZ format. + */ +public class FastLzFramedEncoder extends MessageToByteEncoder { + /** + * Compression level. + */ + private final int level; + + /** + * Underlying checksum calculator in use. + */ + private final Checksum checksum; + + /** + * Creates a FastLZ encoder without checksum calculator and with auto detection of compression level. + */ + public FastLzFramedEncoder() { + this(LEVEL_AUTO, null); + } + + /** + * Creates a FastLZ encoder with specified compression level and without checksum calculator. + * + * @param level supports only these values: + * 0 - Encoder will choose level automatically depending on the length of the input buffer. + * 1 - Level 1 is the fastest compression and generally useful for short data. + * 2 - Level 2 is slightly slower but it gives better compression ratio. + */ + public FastLzFramedEncoder(int level) { + this(level, null); + } + + /** + * Creates a FastLZ encoder with auto detection of compression + * level and calculation of checksums as specified. + * + * @param validateChecksums + * If true, the checksum of each block will be calculated and this value + * will be added to the header of block. + * By default {@link FastLzFramedEncoder} uses {@link java.util.zip.Adler32} + * for checksum calculation. + */ + public FastLzFramedEncoder(boolean validateChecksums) { + this(LEVEL_AUTO, validateChecksums ? new Adler32() : null); + } + + /** + * Creates a FastLZ encoder with specified compression level and checksum calculator. + * + * @param level supports only these values: + * 0 - Encoder will choose level automatically depending on the length of the input buffer. + * 1 - Level 1 is the fastest compression and generally useful for short data. + * 2 - Level 2 is slightly slower but it gives better compression ratio. + * @param checksum + * the {@link Checksum} instance to use to check data for integrity. + * You may set {@code null} if you don't want to validate checksum of each block. + */ + public FastLzFramedEncoder(int level, Checksum checksum) { + super(false); + if (level != LEVEL_AUTO && level != LEVEL_1 && level != LEVEL_2) { + throw new IllegalArgumentException(String.format( + "level: %d (expected: %d or %d or %d)", level, LEVEL_AUTO, LEVEL_1, LEVEL_2)); + } + this.level = level; + this.checksum = checksum; + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + final Checksum checksum = this.checksum; + + for (;;) { + if (!in.isReadable()) { + return; + } + final int idx = in.readerIndex(); + final int length = Math.min(in.readableBytes(), MAX_CHUNK_LENGTH); + + final int outputIdx = out.writerIndex(); + out.setMedium(outputIdx, MAGIC_NUMBER); + int outputOffset = outputIdx + CHECKSUM_OFFSET + (checksum != null ? 4 : 0); + + final byte blockType; + final int chunkLength; + if (length < MIN_LENGTH_TO_COMPRESSION) { + blockType = BLOCK_TYPE_NON_COMPRESSED; + + out.ensureWritable(outputOffset + 2 + length); + final byte[] output = out.array(); + final int outputPtr = out.arrayOffset() + outputOffset + 2; + + if (checksum != null) { + final byte[] input; + final int inputPtr; + if (in.hasArray()) { + input = in.array(); + inputPtr = in.arrayOffset() + idx; + } else { + input = new byte[length]; + in.getBytes(idx, input); + inputPtr = 0; + } + + checksum.reset(); + checksum.update(input, inputPtr, length); + out.setInt(outputIdx + CHECKSUM_OFFSET, (int) checksum.getValue()); + + System.arraycopy(input, inputPtr, output, outputPtr, length); + } else { + in.getBytes(idx, output, outputPtr, length); + } + chunkLength = length; + } else { + // try to compress + final byte[] input; + final int inputPtr; + if (in.hasArray()) { + input = in.array(); + inputPtr = in.arrayOffset() + idx; + } else { + input = new byte[length]; + in.getBytes(idx, input); + inputPtr = 0; + } + + if (checksum != null) { + checksum.reset(); + checksum.update(input, inputPtr, length); + out.setInt(outputIdx + CHECKSUM_OFFSET, (int) checksum.getValue()); + } + + final int maxOutputLength = calculateOutputBufferLength(length); + out.ensureWritable(outputOffset + 4 + maxOutputLength); + final byte[] output = out.array(); + final int outputPtr = out.arrayOffset() + outputOffset + 4; + final int compressedLength = compress(input, inputPtr, length, output, outputPtr, level); + if (compressedLength < length) { + blockType = BLOCK_TYPE_COMPRESSED; + chunkLength = compressedLength; + + out.setShort(outputOffset, chunkLength); + outputOffset += 2; + } else { + blockType = BLOCK_TYPE_NON_COMPRESSED; + System.arraycopy(input, inputPtr, output, outputPtr - 2, length); + chunkLength = length; + } + } + out.setShort(outputOffset, length); + + out.setByte(outputIdx + OPTIONS_OFFSET, + blockType | (checksum != null ? BLOCK_WITH_CHECKSUM : BLOCK_WITHOUT_CHECKSUM)); + out.writerIndex(outputOffset + 2 + chunkLength); + in.skipBytes(length); + } + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java new file mode 100644 index 0000000000..0df309372a --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/FastLzIntegrationTest.java @@ -0,0 +1,136 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.ReferenceCountUtil; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +public class FastLzIntegrationTest extends IntegrationTest { + + public static class TestWithChecksum extends IntegrationTest { + + @Override + protected EmbeddedChannel createEncoderEmbeddedChannel() { + return new EmbeddedChannel(new FastLzFramedEncoder(true)); + } + + @Override + protected EmbeddedChannel createDecoderEmbeddedChannel() { + return new EmbeddedChannel(new FastLzFramedDecoder(true)); + } + } + + public static class TestRandomChecksum extends IntegrationTest { + + @Override + protected EmbeddedChannel createEncoderEmbeddedChannel() { + return new EmbeddedChannel(new FastLzFramedEncoder(rand.nextBoolean())); + } + + @Override + protected EmbeddedChannel createDecoderEmbeddedChannel() { + return new EmbeddedChannel(new FastLzFramedDecoder(rand.nextBoolean())); + } + } + + @Override + protected EmbeddedChannel createEncoderEmbeddedChannel() { + return new EmbeddedChannel(new FastLzFramedEncoder(rand.nextBoolean())); + } + + @Override + protected EmbeddedChannel createDecoderEmbeddedChannel() { + return new EmbeddedChannel(new FastLzFramedDecoder(rand.nextBoolean())); + } + + @Override // test batched flow of data + protected void testIdentity(final byte[] data) { + final ByteBuf original = Unpooled.wrappedBuffer(data); + final EmbeddedChannel encoder = createEncoderEmbeddedChannel(); + final EmbeddedChannel decoder = createDecoderEmbeddedChannel(); + + try { + int written = 0, length = rand.nextInt(100); + while (written + length < data.length) { + ByteBuf in = Unpooled.wrappedBuffer(data, written, length); + encoder.writeOutbound(in); + written += length; + length = rand.nextInt(100); + } + ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); + encoder.writeOutbound(in); + encoder.finish(); + + ByteBuf msg; + final CompositeByteBuf compressed = Unpooled.compositeBuffer(); + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(msg); + compressed.writerIndex(compressed.writerIndex() + msg.readableBytes()); + } + assertThat(compressed, is(notNullValue())); + + final byte[] compressedArray = new byte[compressed.readableBytes()]; + compressed.readBytes(compressedArray); + written = 0; + length = rand.nextInt(100); + while (written + length < compressedArray.length) { + in = Unpooled.wrappedBuffer(compressedArray, written, length); + decoder.writeInbound(in); + written += length; + length = rand.nextInt(100); + } + in = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); + decoder.writeInbound(in); + + assertFalse(compressed.isReadable()); + final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(msg); + decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes()); + } + assertEquals(original, decompressed); + + compressed.release(); + decompressed.release(); + original.release(); + } finally { + encoder.close(); + decoder.close(); + + for (;;) { + Object msg = encoder.readOutbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + + for (;;) { + Object msg = decoder.readInbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + } + } +} diff --git a/license/LICENSE.jfastlz.txt b/license/LICENSE.jfastlz.txt new file mode 100644 index 0000000000..6f27e141f6 --- /dev/null +++ b/license/LICENSE.jfastlz.txt @@ -0,0 +1,24 @@ +The MIT License + +Copyright (c) 2009 William Kinney + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, +copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE.