From 319b7fa69ac5f4ae53216097184ac2eac42caaab Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sat, 9 Feb 2013 23:39:33 +0900 Subject: [PATCH] Fix SnappyFramedEncoder/Decoder / Fix Snappy preamble encoding / Add test for #1002 - The new test still fails due to a bug in Snappy.encode/decode() --- .../handler/codec/compression/Snappy.java | 31 ++--- .../compression/SnappyFramedDecoder.java | 106 ++++++++++-------- .../compression/SnappyFramedEncoder.java | 56 +++++---- .../compression/SnappyIntegrationTest.java | 52 +++++++-- 4 files changed, 148 insertions(+), 97 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java b/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java index f52840d6f6..c26c3fffff 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java @@ -41,12 +41,13 @@ public class Snappy { public void encode(ByteBuf in, ByteBuf out, int length) { // Write the preamble length to the output buffer - int bytesToEncode = 1 + bitsToEncode(length - 1) / 7; - for (int i = 0; i < bytesToEncode; i++) { - if (i == bytesToEncode - 1) { - out.writeByte(length >> i * 7); + for (int i = 0;; i ++) { + int b = length >>> i * 7; + if ((b & 0xFFFFFF80) != 0) { + out.writeByte(b & 0x7f | 0x80); } else { - out.writeByte(0x80 | length >> i * 7); + out.writeByte(b); + break; } } @@ -301,10 +302,10 @@ public class Snappy { private static int readPreamble(ByteBuf in) { int length = 0; int byteIndex = 0; - while (in.readableBytes() > 0) { - int current = in.readByte() & 0x0ff; - length += current << byteIndex++ * 7; - if ((current & 0x80) != 0x80) { + while (in.isReadable()) { + int current = in.readUnsignedByte(); + length |= (current & 0x7f) << byteIndex++ * 7; + if ((current & 0x80) == 0) { return length; } @@ -334,18 +335,18 @@ public class Snappy { break; case 61: length = in.readUnsignedByte() - | (in.readUnsignedByte() << 8); + | in.readUnsignedByte() << 8; break; case 62: length = in.readUnsignedByte() - | (in.readUnsignedByte() << 8) - | (in.readUnsignedByte() << 16); + | in.readUnsignedByte() << 8 + | in.readUnsignedByte() << 16; break; case 64: length = in.readUnsignedByte() - | (in.readUnsignedByte() << 8) - | (in.readUnsignedByte() << 16) - | (in.readUnsignedByte() << 24); + | in.readUnsignedByte() << 8 + | in.readUnsignedByte() << 16 + | in.readUnsignedByte() << 24; break; default: length = tag >> 2 & 0x3F; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedDecoder.java index 444df5d5da..c027018cde 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedDecoder.java @@ -43,8 +43,6 @@ public class SnappyFramedDecoder extends ByteToByteDecoder { private final Snappy snappy = new Snappy(); private final boolean validateChecksums; - private int chunkLength; - private ChunkType chunkType; private boolean started; private boolean corrupted; @@ -71,52 +69,38 @@ public class SnappyFramedDecoder extends ByteToByteDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { - if (!in.isReadable()) { - return; - } - if (corrupted) { in.skipBytes(in.readableBytes()); return; } try { - while (in.isReadable()) { - if (chunkLength == 0) { - if (in.readableBytes() < 3) { - // We need to be at least able to read the chunk type identifier (one byte), - // and the length of the chunk (2 bytes) in order to proceed - return; - } + int idx = in.readerIndex(); + final int inSize = in.writerIndex() - idx; + if (inSize < 4) { + // We need to be at least able to read the chunk type identifier (one byte), + // and the length of the chunk (3 bytes) in order to proceed + return; + } - byte type = in.readByte(); - chunkType = mapChunkType(type); - chunkLength = in.readUnsignedByte() | in.readUnsignedByte() << 8; + final int chunkTypeVal = in.getUnsignedByte(idx); + final ChunkType chunkType = mapChunkType((byte) chunkTypeVal); + final int chunkLength = in.getUnsignedByte(idx + 1) + | in.getUnsignedByte(idx + 2) << 8 + | in.getUnsignedByte(idx + 3) << 16; - // The spec mandates that reserved unskippable chunks must immediately - // return an error, as we must assume that we cannot decode the stream - // correctly - if (chunkType == ChunkType.RESERVED_UNSKIPPABLE) { - throw new CompressionException("Found reserved unskippable chunk type: " + type); - } - } - - if (chunkLength == 0 || in.readableBytes() < chunkLength) { - // Wait until the entire chunk is available, as it will prevent us from - // having to buffer the data here instead - return; - } - - int checksum; - - switch(chunkType) { + switch (chunkType) { case STREAM_IDENTIFIER: if (chunkLength != SNAPPY.length) { throw new CompressionException("Unexpected length of stream identifier: " + chunkLength); } + if (inSize < 4 + SNAPPY.length) { + break; + } + byte[] identifier = new byte[chunkLength]; - in.readBytes(identifier); + in.skipBytes(4).readBytes(identifier); if (!Arrays.equals(identifier, SNAPPY)) { throw new CompressionException("Unexpected stream identifier contents. Mismatched snappy " + @@ -124,38 +108,65 @@ public class SnappyFramedDecoder extends ByteToByteDecoder { } started = true; - break; case RESERVED_SKIPPABLE: if (!started) { throw new CompressionException("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER"); } - in.skipBytes(chunkLength); + + if (inSize < 4 + chunkLength) { + // TODO: Don't keep skippable bytes + return; + } + + in.skipBytes(4 + chunkLength); break; + case RESERVED_UNSKIPPABLE: + // The spec mandates that reserved unskippable chunks must immediately + // return an error, as we must assume that we cannot decode the stream + // correctly + throw new CompressionException( + "Found reserved unskippable chunk type: 0x" + Integer.toHexString(chunkTypeVal)); case UNCOMPRESSED_DATA: if (!started) { throw new CompressionException("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER"); } - checksum = in.readUnsignedByte() - | in.readUnsignedByte() << 8 - | in.readUnsignedByte() << 16 - | in.readUnsignedByte() << 24; + if (chunkLength > 65536 + 4) { + throw new CompressionException("Received UNCOMPRESSED_DATA larger than 65540 bytes"); + } + + if (inSize < 4 + chunkLength) { + return; + } + + in.skipBytes(4); if (validateChecksums) { - ByteBuf data = in.readBytes(chunkLength); + int checksum = in.readUnsignedByte() + | in.readUnsignedByte() << 8 + | in.readUnsignedByte() << 16 + | in.readUnsignedByte() << 24; + ByteBuf data = in.readSlice(chunkLength - 4); validateChecksum(data, checksum); out.writeBytes(data); } else { - in.readBytes(out, chunkLength); + in.skipBytes(4); + in.readBytes(out, chunkLength - 4); } break; case COMPRESSED_DATA: if (!started) { throw new CompressionException("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER"); } - checksum = in.readUnsignedByte() - | in.readUnsignedByte() << 8 - | in.readUnsignedByte() << 16 - | in.readUnsignedByte() << 24; + + if (inSize < 4 + chunkLength) { + return; + } + + in.skipBytes(4); + int checksum = in.readUnsignedByte() + | in.readUnsignedByte() << 8 + | in.readUnsignedByte() << 16 + | in.readUnsignedByte() << 24; if (validateChecksums) { ByteBuf uncompressed = ctx.alloc().buffer(); snappy.decode(in, uncompressed, chunkLength); @@ -165,9 +176,6 @@ public class SnappyFramedDecoder extends ByteToByteDecoder { } snappy.reset(); break; - } - - chunkLength = 0; } } catch (Exception e) { corrupted = true; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedEncoder.java index 796936a49a..f6f23acd10 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/SnappyFramedEncoder.java @@ -15,12 +15,12 @@ */ package io.netty.handler.codec.compression; -import static io.netty.handler.codec.compression.SnappyChecksumUtil.calculateChecksum; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToByteEncoder; +import static io.netty.handler.codec.compression.SnappyChecksumUtil.*; + /** * Compresses a {@link ByteBuf} using the Snappy framing format. * @@ -39,7 +39,7 @@ public class SnappyFramedEncoder extends ByteToByteEncoder { * type 0xff, a length field of 0x6, and 'sNaPpY' in ASCII. */ private static final byte[] STREAM_START = { - -0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59 + -0x80, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59 }; private final Snappy snappy = new Snappy(); @@ -56,29 +56,43 @@ public class SnappyFramedEncoder extends ByteToByteEncoder { out.writeBytes(STREAM_START); } - final int chunkLength = in.readableBytes(); - if (chunkLength > MIN_COMPRESSIBLE_LENGTH) { - // If we have lots of available data, break it up into smaller chunks - int numberOfChunks = 1 + chunkLength / Short.MAX_VALUE; - for (int i = 0; i < numberOfChunks; i++) { - int subChunkLength = Math.min(Short.MAX_VALUE, chunkLength); - out.writeByte(0); - writeChunkLength(out, subChunkLength); - ByteBuf slice = in.slice(in.readerIndex(), subChunkLength); - calculateAndWriteChecksum(slice, out); - - snappy.encode(slice, out, subChunkLength); - - in.readerIndex(slice.readerIndex()); + int dataLength = in.readableBytes(); + if (dataLength > MIN_COMPRESSIBLE_LENGTH) { + for (;;) { + final int lengthIdx = out.writerIndex() + 1; + out.writeInt(0); + if (dataLength > 65536) { + ByteBuf slice = in.readSlice(65536); + calculateAndWriteChecksum(slice, out); + snappy.encode(slice, out, 65536); + setChunkLength(out, lengthIdx); + dataLength -= 65536; + } else { + ByteBuf slice = in.readSlice(dataLength); + calculateAndWriteChecksum(slice, out); + snappy.encode(slice, out, dataLength); + setChunkLength(out, lengthIdx); + break; + } } } else { out.writeByte(1); - writeChunkLength(out, chunkLength); + writeChunkLength(out, dataLength + 4); calculateAndWriteChecksum(in, out); - out.writeBytes(in, chunkLength); + out.writeBytes(in, dataLength); } } + private static void setChunkLength(ByteBuf out, int lengthIdx) { + int chunkLength = out.writerIndex() - lengthIdx - 3; + if (chunkLength >>> 24 != 0) { + throw new CompressionException("compressed data too large: " + chunkLength); + } + out.setByte(lengthIdx, chunkLength & 0xff); + out.setByte(lengthIdx + 1, chunkLength >>> 8 & 0xff); + out.setByte(lengthIdx + 2, chunkLength >>> 16 & 0xff); + } + /** * Writes the 2-byte chunk length to the output buffer. * @@ -86,8 +100,8 @@ public class SnappyFramedEncoder extends ByteToByteEncoder { * @param chunkLength The length to write */ private static void writeChunkLength(ByteBuf out, int chunkLength) { - out.writeByte(chunkLength & 0x0ff); - out.writeByte(chunkLength >> 8 & 0x0ff); + out.writeByte(chunkLength & 0xff); + out.writeByte(chunkLength >>> 8 & 0xff); } /** diff --git a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java index 5017bf6216..2c30432230 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java @@ -19,26 +19,54 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedByteChannel; import io.netty.util.CharsetUtil; - -import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; +import static org.junit.Assert.*; + public class SnappyIntegrationTest { private final EmbeddedByteChannel channel = new EmbeddedByteChannel(new SnappyFramedEncoder(), new SnappyFramedDecoder()); - + @Test public void testEncoderDecoderIdentity() throws Exception { ByteBuf in = Unpooled.copiedBuffer( - "Netty has been designed carefully with the experiences " + - "earned from the implementation of a lot of protocols " + - "such as FTP, SMTP, HTTP, and various binary and " + - "text-based legacy protocols", CharsetUtil.US_ASCII - ); - channel.writeOutbound(in); - + "Netty has been designed carefully with the experiences earned from the implementation of a lot of " + + "protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols", + CharsetUtil.US_ASCII); + + channel.writeOutbound(in.copy()); channel.writeInbound(channel.readOutbound()); - - Assert.assertEquals(in, channel.readInbound()); + assertEquals(in, channel.readInbound()); + } + + @Test + @Ignore // FIXME: Make it pass. + public void testEncoderDecoderIdentity2() throws Exception { + // Data from https://github.com/netty/netty/issues/1002 + ByteBuf in = Unpooled.wrappedBuffer(new byte[] { + 11, 0, 0, 0, 0, 0, 16, 65, 96, 119, -22, 79, -43, 76, -75, -93, + 11, 104, 96, -99, 126, -98, 27, -36, 40, 117, -65, -3, -57, -83, -58, 7, + 114, -14, 68, -122, 124, 88, 118, 54, 45, -26, 117, 13, -45, -9, 60, -73, + -53, -44, 53, 68, -77, -71, 109, 43, -38, 59, 100, -12, -87, 44, -106, 123, + -107, 38, 13, -117, -23, -49, 29, 21, 26, 66, 1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 66, 0, -104, -49, + 16, -120, 22, 8, -52, -54, -102, -52, -119, -124, -92, -71, 101, -120, -52, -48, + 45, -26, -24, 26, 41, -13, 36, 64, -47, 15, -124, -7, -16, 91, 96, 0, + -93, -42, 101, 20, -74, 39, -124, 35, 43, -49, -21, -92, -20, -41, 79, 41, + 110, -105, 42, -96, 90, -9, -100, -22, -62, 91, 2, 35, 113, 117, -71, 66, + 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1 + }); + + channel.writeOutbound(in.copy()); + channel.writeInbound(channel.readOutbound()); + assertEquals(in, channel.readInbound()); } }