Remove unnecessary loop and indentation in decompressors

Motivation:

Decompression handlers contain heavy use of switch-case statements. We
use compact indentation style for 'case' so that we utilize our screen
real-estate more efficiently.

Also, the following decompression handlers do not need to run a loop,
because ByteToMessageDecoder already runs a loop for them:

- FastLzFrameDecoder
- Lz4FrameDecoder
- LzfDecoder

Modifications:

- Fix indentations
- Do not wrap the decoding logic with a for loop when unnecessary
- Handle the case where a FastLz/Lzf frame contains no data properly so
  that the buffer does not leak and less garbage is produced.

Result:

- Efficiency
- Compact source code
- No buffer leak
This commit is contained in:
Trustin Lee 2015-01-12 00:13:15 +09:00
parent 79450e081b
commit ad4418cf9b
4 changed files with 531 additions and 522 deletions

View File

@ -81,232 +81,233 @@ public class Bzip2Decoder extends ByteToMessageDecoder {
if (!in.isReadable()) { if (!in.isReadable()) {
return; return;
} }
final Bzip2BitReader reader = this.reader; final Bzip2BitReader reader = this.reader;
reader.setByteBuf(in); reader.setByteBuf(in);
for (;;) { for (;;) {
switch (currentState) { switch (currentState) {
case INIT: case INIT:
if (in.readableBytes() < 4) { if (in.readableBytes() < 4) {
return; return;
} }
int magicNumber = in.readUnsignedMedium(); int magicNumber = in.readUnsignedMedium();
if (magicNumber != MAGIC_NUMBER) { if (magicNumber != MAGIC_NUMBER) {
throw new DecompressionException("Unexpected stream identifier contents. Mismatched bzip2 " + throw new DecompressionException("Unexpected stream identifier contents. Mismatched bzip2 " +
"protocol version?"); "protocol version?");
} }
int blockSize = in.readByte() - '0'; int blockSize = in.readByte() - '0';
if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) { if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
throw new DecompressionException("block size is invalid"); throw new DecompressionException("block size is invalid");
} }
this.blockSize = blockSize * BASE_BLOCK_SIZE; this.blockSize = blockSize * BASE_BLOCK_SIZE;
streamCRC = 0; streamCRC = 0;
currentState = State.INIT_BLOCK; currentState = State.INIT_BLOCK;
case INIT_BLOCK: case INIT_BLOCK:
if (!reader.hasReadableBytes(10)) { if (!reader.hasReadableBytes(10)) {
return; return;
}
// Get the block magic bytes.
final int magic1 = reader.readBits(24);
final int magic2 = reader.readBits(24);
if (magic1 == END_OF_STREAM_MAGIC_1 && magic2 == END_OF_STREAM_MAGIC_2) {
// End of stream was reached. Check the combined CRC.
final int storedCombinedCRC = reader.readInt();
if (storedCombinedCRC != streamCRC) {
throw new DecompressionException("stream CRC error");
} }
// Get the block magic bytes. currentState = State.EOF;
final int magic1 = reader.readBits(24); break;
final int magic2 = reader.readBits(24); }
if (magic1 == END_OF_STREAM_MAGIC_1 && magic2 == END_OF_STREAM_MAGIC_2) { if (magic1 != BLOCK_HEADER_MAGIC_1 || magic2 != BLOCK_HEADER_MAGIC_2) {
// End of stream was reached. Check the combined CRC. throw new DecompressionException("bad block header");
final int storedCombinedCRC = reader.readInt(); }
if (storedCombinedCRC != streamCRC) { blockCRC = reader.readInt();
throw new DecompressionException("stream CRC error"); currentState = State.INIT_BLOCK_PARAMS;
} case INIT_BLOCK_PARAMS:
currentState = State.EOF; if (!reader.hasReadableBits(25)) {
break; return;
} }
if (magic1 != BLOCK_HEADER_MAGIC_1 || magic2 != BLOCK_HEADER_MAGIC_2) { final boolean blockRandomised = reader.readBoolean();
throw new DecompressionException("bad block header"); final int bwtStartPointer = reader.readBits(24);
}
blockCRC = reader.readInt();
currentState = State.INIT_BLOCK_PARAMS;
case INIT_BLOCK_PARAMS:
if (!reader.hasReadableBits(25)) {
return;
}
final boolean blockRandomised = reader.readBoolean();
final int bwtStartPointer = reader.readBits(24);
blockDecompressor = new Bzip2BlockDecompressor(this.blockSize, blockCRC, blockDecompressor = new Bzip2BlockDecompressor(this.blockSize, blockCRC,
blockRandomised, bwtStartPointer, reader); blockRandomised, bwtStartPointer, reader);
currentState = State.RECEIVE_HUFFMAN_USED_MAP; currentState = State.RECEIVE_HUFFMAN_USED_MAP;
case RECEIVE_HUFFMAN_USED_MAP: case RECEIVE_HUFFMAN_USED_MAP:
if (!reader.hasReadableBits(16)) { if (!reader.hasReadableBits(16)) {
return; return;
} }
blockDecompressor.huffmanInUse16 = reader.readBits(16); blockDecompressor.huffmanInUse16 = reader.readBits(16);
currentState = State.RECEIVE_HUFFMAN_USED_BITMAPS; currentState = State.RECEIVE_HUFFMAN_USED_BITMAPS;
case RECEIVE_HUFFMAN_USED_BITMAPS: case RECEIVE_HUFFMAN_USED_BITMAPS:
Bzip2BlockDecompressor blockDecompressor = this.blockDecompressor; Bzip2BlockDecompressor blockDecompressor = this.blockDecompressor;
final int inUse16 = blockDecompressor.huffmanInUse16; final int inUse16 = blockDecompressor.huffmanInUse16;
final int bitNumber = Integer.bitCount(inUse16); final int bitNumber = Integer.bitCount(inUse16);
final byte[] huffmanSymbolMap = blockDecompressor.huffmanSymbolMap; final byte[] huffmanSymbolMap = blockDecompressor.huffmanSymbolMap;
if (!reader.hasReadableBits(bitNumber * HUFFMAN_SYMBOL_RANGE_SIZE + 3)) { if (!reader.hasReadableBits(bitNumber * HUFFMAN_SYMBOL_RANGE_SIZE + 3)) {
return; return;
} }
int huffmanSymbolCount = 0; int huffmanSymbolCount = 0;
if (bitNumber > 0) { if (bitNumber > 0) {
for (int i = 0; i < 16; i++) { for (int i = 0; i < 16; i++) {
if ((inUse16 & 1 << 15 >>> i) != 0) { if ((inUse16 & 1 << 15 >>> i) != 0) {
for (int j = 0, k = i << 4; j < HUFFMAN_SYMBOL_RANGE_SIZE; j++, k++) { for (int j = 0, k = i << 4; j < HUFFMAN_SYMBOL_RANGE_SIZE; j++, k++) {
if (reader.readBoolean()) { if (reader.readBoolean()) {
huffmanSymbolMap[huffmanSymbolCount++] = (byte) k; huffmanSymbolMap[huffmanSymbolCount++] = (byte) k;
}
} }
} }
} }
} }
blockDecompressor.huffmanEndOfBlockSymbol = huffmanSymbolCount + 1; }
blockDecompressor.huffmanEndOfBlockSymbol = huffmanSymbolCount + 1;
int totalTables = reader.readBits(3); int totalTables = reader.readBits(3);
if (totalTables < HUFFMAN_MINIMUM_TABLES || totalTables > HUFFMAN_MAXIMUM_TABLES) { if (totalTables < HUFFMAN_MINIMUM_TABLES || totalTables > HUFFMAN_MAXIMUM_TABLES) {
throw new DecompressionException("incorrect huffman groups number"); throw new DecompressionException("incorrect huffman groups number");
} }
int alphaSize = huffmanSymbolCount + 2; int alphaSize = huffmanSymbolCount + 2;
if (alphaSize > HUFFMAN_MAX_ALPHABET_SIZE) { if (alphaSize > HUFFMAN_MAX_ALPHABET_SIZE) {
throw new DecompressionException("incorrect alphabet size"); throw new DecompressionException("incorrect alphabet size");
} }
huffmanStageDecoder = new Bzip2HuffmanStageDecoder(reader, totalTables, alphaSize); huffmanStageDecoder = new Bzip2HuffmanStageDecoder(reader, totalTables, alphaSize);
currentState = State.RECEIVE_SELECTORS_NUMBER; currentState = State.RECEIVE_SELECTORS_NUMBER;
case RECEIVE_SELECTORS_NUMBER: case RECEIVE_SELECTORS_NUMBER:
if (!reader.hasReadableBits(15)) { if (!reader.hasReadableBits(15)) {
return;
}
int totalSelectors = reader.readBits(15);
if (totalSelectors < 1 || totalSelectors > MAX_SELECTORS) {
throw new DecompressionException("incorrect selectors number");
}
huffmanStageDecoder.selectors = new byte[totalSelectors];
currentState = State.RECEIVE_SELECTORS;
case RECEIVE_SELECTORS:
Bzip2HuffmanStageDecoder huffmanStageDecoder = this.huffmanStageDecoder;
byte[] selectors = huffmanStageDecoder.selectors;
totalSelectors = selectors.length;
final Bzip2MoveToFrontTable tableMtf = huffmanStageDecoder.tableMTF;
int currSelector;
// Get zero-terminated bit runs (0..62) of MTF'ed Huffman table. length = 1..6
for (currSelector = huffmanStageDecoder.currentSelector;
currSelector < totalSelectors; currSelector++) {
if (!reader.hasReadableBits(HUFFMAN_SELECTOR_LIST_MAX_LENGTH)) {
// Save state if end of current ByteBuf was reached
huffmanStageDecoder.currentSelector = currSelector;
return; return;
} }
int totalSelectors = reader.readBits(15); int index = 0;
if (totalSelectors < 1 || totalSelectors > MAX_SELECTORS) { while (reader.readBoolean()) {
throw new DecompressionException("incorrect selectors number"); index++;
} }
huffmanStageDecoder.selectors = new byte[totalSelectors]; selectors[currSelector] = tableMtf.indexToFront(index);
}
currentState = State.RECEIVE_SELECTORS; currentState = State.RECEIVE_HUFFMAN_LENGTH;
case RECEIVE_SELECTORS: case RECEIVE_HUFFMAN_LENGTH:
Bzip2HuffmanStageDecoder huffmanStageDecoder = this.huffmanStageDecoder; huffmanStageDecoder = this.huffmanStageDecoder;
byte[] selectors = huffmanStageDecoder.selectors; totalTables = huffmanStageDecoder.totalTables;
totalSelectors = selectors.length; final byte[][] codeLength = huffmanStageDecoder.tableCodeLengths;
final Bzip2MoveToFrontTable tableMtf = huffmanStageDecoder.tableMTF; alphaSize = huffmanStageDecoder.alphabetSize;
int currSelector; /* Now the coding tables */
// Get zero-terminated bit runs (0..62) of MTF'ed Huffman table. length = 1..6 int currGroup;
for (currSelector = huffmanStageDecoder.currentSelector; int currLength = huffmanStageDecoder.currentLength;
currSelector < totalSelectors; currSelector++) { int currAlpha = 0;
if (!reader.hasReadableBits(HUFFMAN_SELECTOR_LIST_MAX_LENGTH)) { boolean modifyLength = huffmanStageDecoder.modifyLength;
// Save state if end of current ByteBuf was reached boolean saveStateAndReturn = false;
huffmanStageDecoder.currentSelector = currSelector; loop: for (currGroup = huffmanStageDecoder.currentGroup; currGroup < totalTables; currGroup++) {
return; // start_huffman_length
} if (!reader.hasReadableBits(5)) {
int index = 0; saveStateAndReturn = true;
while (reader.readBoolean()) { break;
index++;
}
selectors[currSelector] = tableMtf.indexToFront(index);
} }
if (currLength < 0) {
currentState = State.RECEIVE_HUFFMAN_LENGTH; currLength = reader.readBits(5);
case RECEIVE_HUFFMAN_LENGTH: }
huffmanStageDecoder = this.huffmanStageDecoder; for (currAlpha = huffmanStageDecoder.currentAlpha; currAlpha < alphaSize; currAlpha++) {
totalTables = huffmanStageDecoder.totalTables; // delta_bit_length: 1..40
final byte[][] codeLength = huffmanStageDecoder.tableCodeLengths; if (!reader.isReadable()) {
alphaSize = huffmanStageDecoder.alphabetSize;
/* Now the coding tables */
int currGroup;
int currLength = huffmanStageDecoder.currentLength;
int currAlpha = 0;
boolean modifyLength = huffmanStageDecoder.modifyLength;
boolean saveStateAndReturn = false;
loop: for (currGroup = huffmanStageDecoder.currentGroup; currGroup < totalTables; currGroup++) {
// start_huffman_length
if (!reader.hasReadableBits(5)) {
saveStateAndReturn = true; saveStateAndReturn = true;
break; break loop;
} }
if (currLength < 0) { while (modifyLength || reader.readBoolean()) { // 0=>next symbol; 1=>alter length
currLength = reader.readBits(5); if (!reader.isReadable()) {
} modifyLength = true;
for (currAlpha = huffmanStageDecoder.currentAlpha; currAlpha < alphaSize; currAlpha++) { saveStateAndReturn = true;
// delta_bit_length: 1..40 break loop;
}
// 1=>decrement length; 0=>increment length
currLength += reader.readBoolean() ? -1 : 1;
modifyLength = false;
if (!reader.isReadable()) { if (!reader.isReadable()) {
saveStateAndReturn = true; saveStateAndReturn = true;
break loop; break loop;
} }
while (modifyLength || reader.readBoolean()) { // 0=>next symbol; 1=>alter length
if (!reader.isReadable()) {
modifyLength = true;
saveStateAndReturn = true;
break loop;
}
// 1=>decrement length; 0=>increment length
currLength += reader.readBoolean() ? -1 : 1;
modifyLength = false;
if (!reader.isReadable()) {
saveStateAndReturn = true;
break loop;
}
}
codeLength[currGroup][currAlpha] = (byte) currLength;
} }
currLength = -1; codeLength[currGroup][currAlpha] = (byte) currLength;
currAlpha = huffmanStageDecoder.currentAlpha = 0;
modifyLength = false;
} }
if (saveStateAndReturn) { currLength = -1;
// Save state if end of current ByteBuf was reached currAlpha = huffmanStageDecoder.currentAlpha = 0;
huffmanStageDecoder.currentGroup = currGroup; modifyLength = false;
huffmanStageDecoder.currentLength = currLength; }
huffmanStageDecoder.currentAlpha = currAlpha; if (saveStateAndReturn) {
huffmanStageDecoder.modifyLength = modifyLength; // Save state if end of current ByteBuf was reached
return; huffmanStageDecoder.currentGroup = currGroup;
} huffmanStageDecoder.currentLength = currLength;
huffmanStageDecoder.currentAlpha = currAlpha;
// Finally create the Huffman tables huffmanStageDecoder.modifyLength = modifyLength;
huffmanStageDecoder.createHuffmanDecodingTables();
currentState = State.DECODE_HUFFMAN_DATA;
case DECODE_HUFFMAN_DATA:
blockDecompressor = this.blockDecompressor;
final int oldReaderIndex = in.readerIndex();
final boolean decoded = blockDecompressor.decodeHuffmanData(this.huffmanStageDecoder);
if (!decoded) {
return;
}
// It used to avoid "Bzip2Decoder.decode() did not read anything but decoded a message" exception.
// Because previous operation may read only a few bits from Bzip2BitReader.bitBuffer and
// don't read incomming ByteBuf.
if (in.readerIndex() == oldReaderIndex && in.isReadable()) {
reader.refill();
}
final int blockLength = blockDecompressor.blockLength();
final ByteBuf uncompressed = ctx.alloc().buffer(blockLength);
boolean success = false;
try {
int uncByte;
while ((uncByte = blockDecompressor.read()) >= 0) {
uncompressed.writeByte(uncByte);
}
int currentBlockCRC = blockDecompressor.checkCRC();
streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ currentBlockCRC;
out.add(uncompressed);
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}
currentState = State.INIT_BLOCK;
break;
case EOF:
in.skipBytes(in.readableBytes());
return; return;
default: }
throw new IllegalStateException();
// Finally create the Huffman tables
huffmanStageDecoder.createHuffmanDecodingTables();
currentState = State.DECODE_HUFFMAN_DATA;
case DECODE_HUFFMAN_DATA:
blockDecompressor = this.blockDecompressor;
final int oldReaderIndex = in.readerIndex();
final boolean decoded = blockDecompressor.decodeHuffmanData(this.huffmanStageDecoder);
if (!decoded) {
return;
}
// It used to avoid "Bzip2Decoder.decode() did not read anything but decoded a message" exception.
// Because previous operation may read only a few bits from Bzip2BitReader.bitBuffer and
// don't read incomming ByteBuf.
if (in.readerIndex() == oldReaderIndex && in.isReadable()) {
reader.refill();
}
final int blockLength = blockDecompressor.blockLength();
final ByteBuf uncompressed = ctx.alloc().buffer(blockLength);
boolean success = false;
try {
int uncByte;
while ((uncByte = blockDecompressor.read()) >= 0) {
uncompressed.writeByte(uncByte);
}
int currentBlockCRC = blockDecompressor.checkCRC();
streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ currentBlockCRC;
out.add(uncompressed);
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}
currentState = State.INIT_BLOCK;
break;
case EOF:
in.skipBytes(in.readableBytes());
return;
default:
throw new IllegalStateException();
} }
} }
} }

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.internal.EmptyArrays;
import java.util.List; import java.util.List;
import java.util.zip.Adler32; import java.util.zip.Adler32;
@ -108,104 +109,115 @@ public class FastLzFrameDecoder extends ByteToMessageDecoder {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (;;) { try {
try { switch (currentState) {
switch (currentState) { case INIT_BLOCK:
case INIT_BLOCK: if (in.readableBytes() < 4) {
if (in.readableBytes() < 4) { break;
return;
}
final int magic = in.readUnsignedMedium();
if (magic != MAGIC_NUMBER) {
throw new DecompressionException("unexpected block identifier");
}
final byte options = in.readByte();
isCompressed = (options & 0x01) == BLOCK_TYPE_COMPRESSED;
hasChecksum = (options & 0x10) == BLOCK_WITH_CHECKSUM;
currentState = State.INIT_BLOCK_PARAMS;
case INIT_BLOCK_PARAMS:
if (in.readableBytes() < 2 + (isCompressed ? 2 : 0) + (hasChecksum ? 4 : 0)) {
return;
}
currentChecksum = hasChecksum ? in.readInt() : 0;
chunkLength = in.readUnsignedShort();
originalLength = isCompressed ? in.readUnsignedShort() : chunkLength;
currentState = State.DECOMPRESS_DATA;
case DECOMPRESS_DATA:
final int chunkLength = this.chunkLength;
if (in.readableBytes() < chunkLength) {
return;
}
final int idx = in.readerIndex();
final int originalLength = this.originalLength;
ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
final byte[] output = uncompressed.array();
final int outputPtr = uncompressed.arrayOffset() + uncompressed.writerIndex();
boolean success = false;
try {
if (isCompressed) {
final byte[] input;
final int inputPtr;
if (in.hasArray()) {
input = in.array();
inputPtr = in.arrayOffset() + idx;
} else {
input = new byte[chunkLength];
in.getBytes(idx, input);
inputPtr = 0;
}
final int decompressedBytes = decompress(input, inputPtr, chunkLength,
output, outputPtr, originalLength);
if (originalLength != decompressedBytes) {
throw new DecompressionException(String.format(
"stream corrupted: originalLength(%d) and actual length(%d) mismatch",
originalLength, decompressedBytes));
}
} else {
in.getBytes(idx, output, outputPtr, chunkLength);
}
final Checksum checksum = this.checksum;
if (hasChecksum && checksum != null) {
checksum.reset();
checksum.update(output, outputPtr, originalLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
out.add(uncompressed);
in.skipBytes(chunkLength);
currentState = State.INIT_BLOCK;
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}
break;
case CORRUPTED:
in.skipBytes(in.readableBytes());
return;
default:
throw new IllegalStateException();
} }
} catch (Exception e) {
currentState = State.CORRUPTED; final int magic = in.readUnsignedMedium();
throw e; if (magic != MAGIC_NUMBER) {
throw new DecompressionException("unexpected block identifier");
}
final byte options = in.readByte();
isCompressed = (options & 0x01) == BLOCK_TYPE_COMPRESSED;
hasChecksum = (options & 0x10) == BLOCK_WITH_CHECKSUM;
currentState = State.INIT_BLOCK_PARAMS;
case INIT_BLOCK_PARAMS:
if (in.readableBytes() < 2 + (isCompressed ? 2 : 0) + (hasChecksum ? 4 : 0)) {
break;
}
currentChecksum = hasChecksum ? in.readInt() : 0;
chunkLength = in.readUnsignedShort();
originalLength = isCompressed ? in.readUnsignedShort() : chunkLength;
currentState = State.DECOMPRESS_DATA;
case DECOMPRESS_DATA:
final int chunkLength = this.chunkLength;
if (in.readableBytes() < chunkLength) {
break;
}
final int idx = in.readerIndex();
final int originalLength = this.originalLength;
final ByteBuf uncompressed;
final byte[] output;
final int outputPtr;
if (originalLength != 0) {
uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
output = uncompressed.array();
outputPtr = uncompressed.arrayOffset() + uncompressed.writerIndex();
} else {
uncompressed = null;
output = EmptyArrays.EMPTY_BYTES;
outputPtr = 0;
}
boolean success = false;
try {
if (isCompressed) {
final byte[] input;
final int inputPtr;
if (in.hasArray()) {
input = in.array();
inputPtr = in.arrayOffset() + idx;
} else {
input = new byte[chunkLength];
in.getBytes(idx, input);
inputPtr = 0;
}
final int decompressedBytes = decompress(input, inputPtr, chunkLength,
output, outputPtr, originalLength);
if (originalLength != decompressedBytes) {
throw new DecompressionException(String.format(
"stream corrupted: originalLength(%d) and actual length(%d) mismatch",
originalLength, decompressedBytes));
}
} else {
in.getBytes(idx, output, outputPtr, chunkLength);
}
final Checksum checksum = this.checksum;
if (hasChecksum && checksum != null) {
checksum.reset();
checksum.update(output, outputPtr, originalLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
if (uncompressed != null) {
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
out.add(uncompressed);
}
in.skipBytes(chunkLength);
currentState = State.INIT_BLOCK;
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}
break;
case CORRUPTED:
in.skipBytes(in.readableBytes());
break;
default:
throw new IllegalStateException();
} }
} catch (Exception e) {
currentState = State.CORRUPTED;
throw e;
} }
} }
} }

View File

@ -148,149 +148,147 @@ public class Lz4FrameDecoder extends ByteToMessageDecoder {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (;;) { try {
try { switch (currentState) {
switch (currentState) { case INIT_BLOCK:
case INIT_BLOCK: if (in.readableBytes() < HEADER_LENGTH) {
if (in.readableBytes() < HEADER_LENGTH) { break;
return; }
} final long magic = in.readLong();
final long magic = in.readLong(); if (magic != MAGIC_NUMBER) {
if (magic != MAGIC_NUMBER) { throw new DecompressionException("unexpected block identifier");
throw new DecompressionException("unexpected block identifier"); }
final int token = in.readByte();
final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
int blockType = token & 0xF0;
int compressedLength = Integer.reverseBytes(in.readInt());
if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
throw new DecompressionException(String.format(
"invalid compressedLength: %d (expected: 0-%d)",
compressedLength, MAX_BLOCK_SIZE));
}
int decompressedLength = Integer.reverseBytes(in.readInt());
final int maxDecompressedLength = 1 << compressionLevel;
if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
throw new DecompressionException(String.format(
"invalid decompressedLength: %d (expected: 0-%d)",
decompressedLength, maxDecompressedLength));
}
if (decompressedLength == 0 && compressedLength != 0
|| decompressedLength != 0 && compressedLength == 0
|| blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
throw new DecompressionException(String.format(
"stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
compressedLength, decompressedLength));
}
int currentChecksum = Integer.reverseBytes(in.readInt());
if (decompressedLength == 0 && compressedLength == 0) {
if (currentChecksum != 0) {
throw new DecompressionException("stream corrupted: checksum error");
}
currentState = State.FINISHED;
decompressor = null;
checksum = null;
break;
}
this.blockType = blockType;
this.compressedLength = compressedLength;
this.decompressedLength = decompressedLength;
this.currentChecksum = currentChecksum;
currentState = State.DECOMPRESS_DATA;
case DECOMPRESS_DATA:
blockType = this.blockType;
compressedLength = this.compressedLength;
decompressedLength = this.decompressedLength;
currentChecksum = this.currentChecksum;
if (in.readableBytes() < compressedLength) {
break;
}
final int idx = in.readerIndex();
ByteBuf uncompressed = ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
final byte[] dest = uncompressed.array();
final int destOff = uncompressed.arrayOffset() + uncompressed.writerIndex();
boolean success = false;
try {
switch (blockType) {
case BLOCK_TYPE_NON_COMPRESSED: {
in.getBytes(idx, dest, destOff, decompressedLength);
break;
}
case BLOCK_TYPE_COMPRESSED: {
final byte[] src;
final int srcOff;
if (in.hasArray()) {
src = in.array();
srcOff = in.arrayOffset() + idx;
} else {
src = new byte[compressedLength];
in.getBytes(idx, src);
srcOff = 0;
} }
final int token = in.readByte();
final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
int blockType = token & 0xF0;
int compressedLength = Integer.reverseBytes(in.readInt());
if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
throw new DecompressionException(String.format(
"invalid compressedLength: %d (expected: 0-%d)",
compressedLength, MAX_BLOCK_SIZE));
}
int decompressedLength = Integer.reverseBytes(in.readInt());
final int maxDecompressedLength = 1 << compressionLevel;
if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
throw new DecompressionException(String.format(
"invalid decompressedLength: %d (expected: 0-%d)",
decompressedLength, maxDecompressedLength));
}
if (decompressedLength == 0 && compressedLength != 0
|| decompressedLength != 0 && compressedLength == 0
|| blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
throw new DecompressionException(String.format(
"stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
compressedLength, decompressedLength));
}
int currentChecksum = Integer.reverseBytes(in.readInt());
if (decompressedLength == 0 && compressedLength == 0) {
if (currentChecksum != 0) {
throw new DecompressionException("stream corrupted: checksum error");
}
currentState = State.FINISHED;
decompressor = null;
checksum = null;
break;
}
this.blockType = blockType;
this.compressedLength = compressedLength;
this.decompressedLength = decompressedLength;
this.currentChecksum = currentChecksum;
currentState = State.DECOMPRESS_DATA;
case DECOMPRESS_DATA:
blockType = this.blockType;
compressedLength = this.compressedLength;
decompressedLength = this.decompressedLength;
currentChecksum = this.currentChecksum;
if (in.readableBytes() < compressedLength) {
return;
}
final int idx = in.readerIndex();
ByteBuf uncompressed = ctx.alloc().heapBuffer(decompressedLength, decompressedLength);
final byte[] dest = uncompressed.array();
final int destOff = uncompressed.arrayOffset() + uncompressed.writerIndex();
boolean success = false;
try { try {
switch (blockType) { final int readBytes = decompressor.decompress(src, srcOff,
case BLOCK_TYPE_NON_COMPRESSED: { dest, destOff, decompressedLength);
in.getBytes(idx, dest, destOff, decompressedLength); if (compressedLength != readBytes) {
break; throw new DecompressionException(String.format(
} "stream corrupted: compressedLength(%d) and actual length(%d) mismatch",
case BLOCK_TYPE_COMPRESSED: { compressedLength, readBytes));
final byte[] src;
final int srcOff;
if (in.hasArray()) {
src = in.array();
srcOff = in.arrayOffset() + idx;
} else {
src = new byte[compressedLength];
in.getBytes(idx, src);
srcOff = 0;
}
try {
final int readBytes = decompressor.decompress(src, srcOff,
dest, destOff, decompressedLength);
if (compressedLength != readBytes) {
throw new DecompressionException(String.format(
"stream corrupted: compressedLength(%d) and actual length(%d) mismatch",
compressedLength, readBytes));
}
} catch (LZ4Exception e) {
throw new DecompressionException(e);
}
break;
}
default:
throw new DecompressionException(String.format(
"unexpected blockType: %d (expected: %d or %d)",
blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
final Checksum checksum = this.checksum;
if (checksum != null) {
checksum.reset();
checksum.update(dest, destOff, decompressedLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
out.add(uncompressed);
in.skipBytes(compressedLength);
currentState = State.INIT_BLOCK;
success = true;
} finally {
if (!success) {
uncompressed.release();
} }
} catch (LZ4Exception e) {
throw new DecompressionException(e);
} }
break; break;
case FINISHED: }
case CORRUPTED:
in.skipBytes(in.readableBytes());
return;
default: default:
throw new IllegalStateException(); throw new DecompressionException(String.format(
"unexpected blockType: %d (expected: %d or %d)",
blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
final Checksum checksum = this.checksum;
if (checksum != null) {
checksum.reset();
checksum.update(dest, destOff, decompressedLength);
final int checksumResult = (int) checksum.getValue();
if (checksumResult != currentChecksum) {
throw new DecompressionException(String.format(
"stream corrupted: mismatching checksum: %d (expected: %d)",
checksumResult, currentChecksum));
}
}
uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
out.add(uncompressed);
in.skipBytes(compressedLength);
currentState = State.INIT_BLOCK;
success = true;
} finally {
if (!success) {
uncompressed.release();
}
} }
} catch (Exception e) { break;
currentState = State.CORRUPTED; case FINISHED:
throw e; case CORRUPTED:
in.skipBytes(in.readableBytes());
break;
default:
throw new IllegalStateException();
} }
} catch (Exception e) {
currentState = State.CORRUPTED;
throw e;
} }
} }

View File

@ -109,104 +109,102 @@ public class LzfDecoder extends ByteToMessageDecoder {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (;;) { try {
try { switch (currentState) {
switch (currentState) { case INIT_BLOCK:
case INIT_BLOCK: if (in.readableBytes() < HEADER_LEN_NOT_COMPRESSED) {
if (in.readableBytes() < HEADER_LEN_NOT_COMPRESSED) { break;
return;
}
final int magic = in.readUnsignedShort();
if (magic != MAGIC_NUMBER) {
throw new DecompressionException("unexpected block identifier");
}
final int type = in.readByte();
switch (type) {
case BLOCK_TYPE_NON_COMPRESSED:
isCompressed = false;
currentState = State.DECOMPRESS_DATA;
break;
case BLOCK_TYPE_COMPRESSED:
isCompressed = true;
currentState = State.INIT_ORIGINAL_LENGTH;
break;
default:
throw new DecompressionException(String.format(
"unknown type of chunk: %d (expected: %d or %d)",
type, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
chunkLength = in.readUnsignedShort();
if (type != BLOCK_TYPE_COMPRESSED) {
break;
}
case INIT_ORIGINAL_LENGTH:
if (in.readableBytes() < 2) {
return;
}
originalLength = in.readUnsignedShort();
currentState = State.DECOMPRESS_DATA;
case DECOMPRESS_DATA:
final int chunkLength = this.chunkLength;
if (in.readableBytes() < chunkLength) {
return;
}
final int originalLength = this.originalLength;
if (isCompressed) {
final int idx = in.readerIndex();
final byte[] inputArray;
final int inPos;
if (in.hasArray()) {
inputArray = in.array();
inPos = in.arrayOffset() + idx;
} else {
inputArray = recycler.allocInputBuffer(chunkLength);
in.getBytes(idx, inputArray, 0, chunkLength);
inPos = 0;
}
ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
final byte[] outputArray = uncompressed.array();
final int outPos = uncompressed.arrayOffset() + uncompressed.writerIndex();
boolean success = false;
try {
decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength);
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
out.add(uncompressed);
in.skipBytes(chunkLength);
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}
if (!in.hasArray()) {
recycler.releaseInputBuffer(inputArray);
}
} else {
out.add(in.readSlice(chunkLength).retain());
}
currentState = State.INIT_BLOCK;
break;
case CORRUPTED:
in.skipBytes(in.readableBytes());
return;
default:
throw new IllegalStateException();
} }
} catch (Exception e) { final int magic = in.readUnsignedShort();
currentState = State.CORRUPTED; if (magic != MAGIC_NUMBER) {
decoder = null; throw new DecompressionException("unexpected block identifier");
recycler = null; }
throw e;
final int type = in.readByte();
switch (type) {
case BLOCK_TYPE_NON_COMPRESSED:
isCompressed = false;
currentState = State.DECOMPRESS_DATA;
break;
case BLOCK_TYPE_COMPRESSED:
isCompressed = true;
currentState = State.INIT_ORIGINAL_LENGTH;
break;
default:
throw new DecompressionException(String.format(
"unknown type of chunk: %d (expected: %d or %d)",
type, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
}
chunkLength = in.readUnsignedShort();
if (type != BLOCK_TYPE_COMPRESSED) {
break;
}
case INIT_ORIGINAL_LENGTH:
if (in.readableBytes() < 2) {
break;
}
originalLength = in.readUnsignedShort();
currentState = State.DECOMPRESS_DATA;
case DECOMPRESS_DATA:
final int chunkLength = this.chunkLength;
if (in.readableBytes() < chunkLength) {
break;
}
final int originalLength = this.originalLength;
if (isCompressed) {
final int idx = in.readerIndex();
final byte[] inputArray;
final int inPos;
if (in.hasArray()) {
inputArray = in.array();
inPos = in.arrayOffset() + idx;
} else {
inputArray = recycler.allocInputBuffer(chunkLength);
in.getBytes(idx, inputArray, 0, chunkLength);
inPos = 0;
}
ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
final byte[] outputArray = uncompressed.array();
final int outPos = uncompressed.arrayOffset() + uncompressed.writerIndex();
boolean success = false;
try {
decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength);
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
out.add(uncompressed);
in.skipBytes(chunkLength);
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}
if (!in.hasArray()) {
recycler.releaseInputBuffer(inputArray);
}
} else if (chunkLength > 0) {
out.add(in.readSlice(chunkLength).retain());
}
currentState = State.INIT_BLOCK;
break;
case CORRUPTED:
in.skipBytes(in.readableBytes());
break;
default:
throw new IllegalStateException();
} }
} catch (Exception e) {
currentState = State.CORRUPTED;
decoder = null;
recycler = null;
throw e;
} }
} }
} }