diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java index a9b09be8c8..1b94279d97 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java @@ -26,8 +26,6 @@ import java.util.List; import static com.ning.compress.lzf.LZFChunk.BYTE_Z; import static com.ning.compress.lzf.LZFChunk.BYTE_V; -import static com.ning.compress.lzf.LZFChunk.MAX_HEADER_LEN; -import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_COMPRESSED; import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_NOT_COMPRESSED; import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED; import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_COMPRESSED; @@ -40,39 +38,47 @@ import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_COMPRESSED; */ public class LzfDecoder extends ByteToMessageDecoder { /** - * A brief signature for content auto-detection. + * Current state of decompression. */ - private static final short SIGNATURE_OF_CHUNK = BYTE_Z << 8 | BYTE_V; + private enum State { + INIT_BLOCK, + INIT_ORIGINAL_LENGTH, + DECOMPRESS_DATA, + CORRUPTED + } + + private State currentState = State.INIT_BLOCK; /** - * Offset to the "Type" in chunk header. + * Magic number of LZF chunk. */ - private static final int TYPE_OFFSET = 2; - - /** - * Offset to the "ChunkLength" in chunk header. - */ - private static final int CHUNK_LENGTH_OFFSET = 3; - - /** - * Offset to the "OriginalLength" in chunk header. - */ - private static final int ORIGINAL_LENGTH_OFFSET = 5; + private static final short MAGIC_NUMBER = BYTE_Z << 8 | BYTE_V; /** * Underlying decoder in use. */ - private final ChunkDecoder decoder; + private ChunkDecoder decoder; /** * Object that handles details of buffer recycling. */ - private final BufferRecycler recycler; + private BufferRecycler recycler; /** - * Determines the state of flow. + * Length of current received chunk of data. */ - private boolean corrupted; + private int chunkLength; + + /** + * Original length of current received chunk of data. + * It is equal to {@link #chunkLength} for non compressed chunks. + */ + private int originalLength; + + /** + * Indicates is this chunk compressed or not. + */ + private boolean isCompressed; /** * Creates a new LZF decoder with the most optimal available methods for underlying data access. @@ -104,74 +110,101 @@ public class LzfDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { for (;;) { - if (corrupted) { - in.skipBytes(in.readableBytes()); - return; - } - - if (in.readableBytes() < HEADER_LEN_NOT_COMPRESSED) { - return; - } - final int idx = in.readerIndex(); - final int type = in.getByte(idx + TYPE_OFFSET); - final int chunkLength = in.getUnsignedShort(idx + CHUNK_LENGTH_OFFSET); - final int totalLength = (type == BLOCK_TYPE_NON_COMPRESSED ? - HEADER_LEN_NOT_COMPRESSED : MAX_HEADER_LEN) + chunkLength; - if (in.readableBytes() < totalLength) { - return; - } - try { - if (in.getUnsignedShort(idx) != SIGNATURE_OF_CHUNK) { - throw new DecompressionException("Unexpected signature of chunk"); - } - switch (type) { - case BLOCK_TYPE_NON_COMPRESSED: { - in.skipBytes(HEADER_LEN_NOT_COMPRESSED); - out.add(in.readBytes(chunkLength)); - break; - } - case BLOCK_TYPE_COMPRESSED: { - final int originalLength = in.getUnsignedShort(idx + ORIGINAL_LENGTH_OFFSET); - - final byte[] inputArray; - final int inPos; - if (in.hasArray()) { - inputArray = in.array(); - inPos = in.arrayOffset() + idx + HEADER_LEN_COMPRESSED; - } else { - inputArray = recycler.allocInputBuffer(chunkLength); - in.getBytes(idx + HEADER_LEN_COMPRESSED, inputArray, 0, chunkLength); - inPos = 0; + switch (currentState) { + case INIT_BLOCK: + if (in.readableBytes() < HEADER_LEN_NOT_COMPRESSED) { + return; + } + final int magic = in.readUnsignedShort(); + if (magic != MAGIC_NUMBER) { + throw new DecompressionException("unexpected block identifier"); } - ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); - final byte[] outputArray = uncompressed.array(); - final int outPos = uncompressed.arrayOffset() + uncompressed.writerIndex(); + final int type = in.readByte(); + switch (type) { + case BLOCK_TYPE_NON_COMPRESSED: + isCompressed = false; + currentState = State.DECOMPRESS_DATA; + break; + case BLOCK_TYPE_COMPRESSED: + isCompressed = true; + currentState = State.INIT_ORIGINAL_LENGTH; + break; + default: + throw new DecompressionException(String.format( + "unknown type of chunk: %d (expected: %d or %d)", + type, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED)); + } + chunkLength = in.readUnsignedShort(); - boolean success = false; - try { - decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength); - uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); - out.add(uncompressed); - in.skipBytes(totalLength); - success = true; - } finally { - if (!success) { - uncompressed.release(); + if (type != BLOCK_TYPE_COMPRESSED) { + break; + } + case INIT_ORIGINAL_LENGTH: + if (in.readableBytes() < 2) { + return; + } + originalLength = in.readUnsignedShort(); + + currentState = State.DECOMPRESS_DATA; + case DECOMPRESS_DATA: + final int chunkLength = this.chunkLength; + if (in.readableBytes() < chunkLength) { + return; + } + final int originalLength = this.originalLength; + + if (isCompressed) { + final int idx = in.readerIndex(); + + final byte[] inputArray; + final int inPos; + if (in.hasArray()) { + inputArray = in.array(); + inPos = in.arrayOffset() + idx; + } else { + inputArray = recycler.allocInputBuffer(chunkLength); + in.getBytes(idx, inputArray, 0, chunkLength); + inPos = 0; } + + ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); + final byte[] outputArray = uncompressed.array(); + final int outPos = uncompressed.arrayOffset() + uncompressed.writerIndex(); + + boolean success = false; + try { + decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength); + uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); + out.add(uncompressed); + in.skipBytes(chunkLength); + success = true; + } finally { + if (!success) { + uncompressed.release(); + } + } + + if (!in.hasArray()) { + recycler.releaseInputBuffer(inputArray); + } + } else { + out.add(in.readSlice(chunkLength).retain()); } - if (!in.hasArray()) { - recycler.releaseInputBuffer(inputArray); - } + currentState = State.INIT_BLOCK; break; - } + case CORRUPTED: + in.skipBytes(in.readableBytes()); + return; default: - throw new DecompressionException("Unknown type of chunk: " + type + " (expected: 0 or 1)"); + throw new IllegalStateException(); } } catch (Exception e) { - corrupted = true; + currentState = State.CORRUPTED; + decoder = null; + recycler = null; throw e; } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java index e959cf02da..ee122639c1 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java @@ -60,9 +60,9 @@ public class LzfDecoderTest { } @Test - public void testUnexpectedSignatureOfChunk() throws Exception { + public void testUnexpectedBlockIdentifier() throws Exception { expected.expect(DecompressionException.class); - expected.expectMessage("Unexpected signature of chunk"); + expected.expectMessage("unexpected block identifier"); ByteBuf in = Unpooled.buffer(); in.writeShort(0x1234); //random value @@ -75,7 +75,7 @@ public class LzfDecoderTest { @Test public void testUnknownTypeOfChunk() throws Exception { expected.expect(DecompressionException.class); - expected.expectMessage("Unknown type of chunk"); + expected.expectMessage("unknown type of chunk"); ByteBuf in = Unpooled.buffer(); in.writeByte(BYTE_Z);