More fixes to snappy: * Make Snappy.decode stateful instead of relying on the uncompressed length being equal to the compressed length * Correctly handle copies where offset < length * Take copies from the output buffer in decoding * Make the maximum encoded chunk size 32kB for compressed data

This commit is contained in:
Luke Wood 2013-02-11 13:20:51 +00:00 committed by Norman Maurer
parent 30bcc72b44
commit c1783cc8d3
5 changed files with 192 additions and 68 deletions

View File

@ -27,16 +27,29 @@ public class Snappy {
private static final int MAX_HT_SIZE = 1 << 14; private static final int MAX_HT_SIZE = 1 << 14;
private static final int MIN_COMPRESSIBLE_BYTES = 15; private static final int MIN_COMPRESSIBLE_BYTES = 15;
// used as a return value to indicate that we haven't yet read our full preamble
private static final int PREAMBLE_NOT_FULL = -1;
// constants for the tag types // constants for the tag types
private static final int LITERAL = 0; private static final int LITERAL = 0;
private static final int COPY_1_BYTE_OFFSET = 1; private static final int COPY_1_BYTE_OFFSET = 1;
private static final int COPY_2_BYTE_OFFSET = 2; private static final int COPY_2_BYTE_OFFSET = 2;
private static final int COPY_4_BYTE_OFFSET = 3; private static final int COPY_4_BYTE_OFFSET = 3;
private int inputLength; private State state = State.READY;
private byte tag;
private static enum State {
READY,
READING_PREAMBLE,
READING_TAG,
READING_LITERAL,
READING_COPY
}
public void reset() { public void reset() {
inputLength = 0; state = State.READY;
tag = 0;
} }
public void encode(ByteBuf in, ByteBuf out, int length) { public void encode(ByteBuf in, ByteBuf out, int length) {
@ -258,37 +271,78 @@ public class Snappy {
encodeCopyWithOffset(out, offset, length); encodeCopyWithOffset(out, offset, length);
} }
public void decode(ByteBuf in, ByteBuf out, int maxLength) { public void decode(ByteBuf in, ByteBuf out) {
int inIndex = in.readerIndex(); while (in.isReadable()) {
if (inputLength == 0) { switch (state) {
inputLength = readPreamble(in); case READY:
} state = State.READING_PREAMBLE;
case READING_PREAMBLE:
if (inputLength == 0 || in.readerIndex() - inIndex + in.readableBytes() < maxLength - 5) { int uncompressedLength = readPreamble(in);
// Wait until we've got the entire chunk before continuing if (uncompressedLength == PREAMBLE_NOT_FULL) {
// We've not yet read all of the preamble, so wait until we can
return; return;
} }
if (uncompressedLength == 0) {
out.ensureWritable(inputLength); // Should never happen, but it does mean we have nothing further to do
state = State.READY;
while (in.isReadable() && in.readerIndex() - inIndex < maxLength) { return;
byte tag = in.readByte(); }
out.ensureWritable(uncompressedLength);
state = State.READING_TAG;
case READING_TAG:
if (!in.isReadable()) {
return;
}
tag = in.readByte();
switch (tag & 0x03) { switch (tag & 0x03) {
case LITERAL: case LITERAL:
decodeLiteral(tag, in, out); state = State.READING_LITERAL;
break; break;
case COPY_1_BYTE_OFFSET: case COPY_1_BYTE_OFFSET:
decodeCopyWith1ByteOffset(tag, in, out); case COPY_2_BYTE_OFFSET:
case COPY_4_BYTE_OFFSET:
state = State.READING_COPY;
break;
}
break;
case READING_LITERAL:
if (decodeLiteral(tag, in, out)) {
state = State.READING_TAG;
} else {
// Need to wait for more data
return;
}
break;
case READING_COPY:
switch (tag & 0x03) {
case COPY_1_BYTE_OFFSET:
if (decodeCopyWith1ByteOffset(tag, in, out)) {
state = State.READING_TAG;
} else {
// Need to wait for more data
return;
}
break; break;
case COPY_2_BYTE_OFFSET: case COPY_2_BYTE_OFFSET:
decodeCopyWith2ByteOffset(tag, in, out); if (decodeCopyWith2ByteOffset(tag, in, out)) {
state = State.READING_TAG;
} else {
// Need to wait for more data
return;
}
break; break;
case COPY_4_BYTE_OFFSET: case COPY_4_BYTE_OFFSET:
decodeCopyWith4ByteOffset(tag, in, out); if (decodeCopyWith4ByteOffset(tag, in, out)) {
state = State.READING_TAG;
} else {
// Need to wait for more data
return;
}
break; break;
} }
} }
} }
}
/** /**
* Reads the length varint (a series of bytes, where the lower 7 bits * Reads the length varint (a series of bytes, where the lower 7 bits
@ -327,22 +381,35 @@ public class Snappy {
* @param in The input buffer to read the literal from * @param in The input buffer to read the literal from
* @param out The output buffer to write the literal to * @param out The output buffer to write the literal to
*/ */
private static void decodeLiteral(byte tag, ByteBuf in, ByteBuf out) { private static boolean decodeLiteral(byte tag, ByteBuf in, ByteBuf out) {
in.markReaderIndex();
int length; int length;
switch(tag >> 2 & 0x3F) { switch(tag >> 2 & 0x3F) {
case 60: case 60:
if (!in.isReadable()) {
return false;
}
length = in.readUnsignedByte(); length = in.readUnsignedByte();
break; break;
case 61: case 61:
if (in.readableBytes() < 2) {
return false;
}
length = in.readUnsignedByte() length = in.readUnsignedByte()
| in.readUnsignedByte() << 8; | in.readUnsignedByte() << 8;
break; break;
case 62: case 62:
if (in.readableBytes() < 3) {
return false;
}
length = in.readUnsignedByte() length = in.readUnsignedByte()
| in.readUnsignedByte() << 8 | in.readUnsignedByte() << 8
| in.readUnsignedByte() << 16; | in.readUnsignedByte() << 16;
break; break;
case 64: case 64:
if (in.readableBytes() < 4) {
return false;
}
length = in.readUnsignedByte() length = in.readUnsignedByte()
| in.readUnsignedByte() << 8 | in.readUnsignedByte() << 8
| in.readUnsignedByte() << 16 | in.readUnsignedByte() << 16
@ -353,7 +420,13 @@ public class Snappy {
} }
length += 1; length += 1;
if (in.readableBytes() < length) {
in.resetReaderIndex();
return false;
}
out.writeBytes(in, length); out.writeBytes(in, length);
return true;
} }
/** /**
@ -367,17 +440,35 @@ public class Snappy {
* @param out The output buffer to write to * @param out The output buffer to write to
* @throws CompressionException If the read offset is invalid * @throws CompressionException If the read offset is invalid
*/ */
private static void decodeCopyWith1ByteOffset(byte tag, ByteBuf in, ByteBuf out) { private static boolean decodeCopyWith1ByteOffset(byte tag, ByteBuf in, ByteBuf out) {
int initialIndex = in.readerIndex(); if (!in.isReadable()) {
return false;
}
int initialIndex = out.readableBytes();
int length = 4 + ((tag & 0x01c) >> 2); int length = 4 + ((tag & 0x01c) >> 2);
int offset = 1 + ((tag & 0x0e0) << 8 | in.readUnsignedByte()); int offset = (tag & 0x0e0) << 8 | in.readUnsignedByte();
validateOffset(offset, initialIndex); validateOffset(offset, initialIndex);
in.markReaderIndex(); out.markReaderIndex();
in.readerIndex(initialIndex - offset); if (offset < length) {
in.readBytes(out, length); int copies = length / offset;
in.resetReaderIndex(); for (; copies > 0; copies--) {
out.readerIndex(initialIndex - offset);
out.readBytes(out, offset);
}
if (length % offset != 0) {
out.readerIndex(initialIndex - offset);
out.readBytes(out, length % offset);
}
} else {
out.readerIndex(initialIndex - offset);
out.readBytes(out, length);
}
out.resetReaderIndex();
return true;
} }
/** /**
@ -391,17 +482,35 @@ public class Snappy {
* @param out The output buffer to write to * @param out The output buffer to write to
* @throws CompressionException If the read offset is invalid * @throws CompressionException If the read offset is invalid
*/ */
private static void decodeCopyWith2ByteOffset(byte tag, ByteBuf in, ByteBuf out) { private static boolean decodeCopyWith2ByteOffset(byte tag, ByteBuf in, ByteBuf out) {
int initialIndex = in.readerIndex(); if (in.readableBytes() < 2) {
return false;
}
int initialIndex = out.readableBytes();
int length = 1 + (tag >> 2 & 0x03f); int length = 1 + (tag >> 2 & 0x03f);
int offset = 1 + (in.readUnsignedByte() | in.readUnsignedByte() << 8); int offset = in.readUnsignedByte() | in.readUnsignedByte() << 8;
validateOffset(offset, initialIndex); validateOffset(offset, initialIndex);
in.markReaderIndex(); out.markReaderIndex();
in.readerIndex(initialIndex - offset); if (offset < length) {
in.readBytes(out, length); int copies = length / offset;
in.resetReaderIndex(); for (; copies > 0; copies--) {
out.readerIndex(initialIndex - offset);
out.readBytes(out, offset);
}
if (length % offset != 0) {
out.readerIndex(initialIndex - offset);
out.readBytes(out, length % offset);
}
} else {
out.readerIndex(initialIndex - offset);
out.readBytes(out, length);
}
out.resetReaderIndex();
return true;
} }
/** /**
@ -415,20 +524,38 @@ public class Snappy {
* @param out The output buffer to write to * @param out The output buffer to write to
* @throws CompressionException If the read offset is invalid * @throws CompressionException If the read offset is invalid
*/ */
private static void decodeCopyWith4ByteOffset(byte tag, ByteBuf in, ByteBuf out) { private static boolean decodeCopyWith4ByteOffset(byte tag, ByteBuf in, ByteBuf out) {
int initialIndex = in.readerIndex(); if (in.readableBytes() < 4) {
return false;
}
int initialIndex = out.readableBytes();
int length = 1 + (tag >> 2 & 0x03F); int length = 1 + (tag >> 2 & 0x03F);
int offset = 1 + (in.readUnsignedByte() | int offset = in.readUnsignedByte() |
in.readUnsignedByte() << 8 | in.readUnsignedByte() << 8 |
in.readUnsignedByte() << 16 | in.readUnsignedByte() << 16 |
in.readUnsignedByte() << 24); in.readUnsignedByte() << 24;
validateOffset(offset, initialIndex); validateOffset(offset, initialIndex);
in.markReaderIndex(); out.markReaderIndex();
in.readerIndex(initialIndex - offset); if (offset < length) {
in.readBytes(out, length); int copies = length / offset;
in.resetReaderIndex(); for (; copies > 0; copies--) {
out.readerIndex(initialIndex - offset);
out.readBytes(out, offset);
}
if (length % offset != 0) {
out.readerIndex(initialIndex - offset);
out.readBytes(out, length % offset);
}
} else {
out.readerIndex(initialIndex - offset);
out.readBytes(out, length);
}
out.resetReaderIndex();
return true;
} }
/** /**
@ -445,7 +572,7 @@ public class Snappy {
throw new CompressionException("Offset exceeds maximum permissible value"); throw new CompressionException("Offset exceeds maximum permissible value");
} }
if (offset <= 4) { if (offset <= 0) {
throw new CompressionException("Offset is less than minimum permissible value"); throw new CompressionException("Offset is less than minimum permissible value");
} }

View File

@ -170,11 +170,11 @@ public class SnappyFramedDecoder extends ByteToByteDecoder {
if (validateChecksums) { if (validateChecksums) {
// TODO: Optimize me. // TODO: Optimize me.
ByteBuf uncompressed = ctx.alloc().buffer(); ByteBuf uncompressed = ctx.alloc().buffer();
snappy.decode(in.readSlice(chunkLength - 4), uncompressed, chunkLength); snappy.decode(in.readSlice(chunkLength - 4), uncompressed);
validateChecksum(uncompressed, checksum); validateChecksum(uncompressed, checksum);
out.writeBytes(uncompressed); out.writeBytes(uncompressed);
} else { } else {
snappy.decode(in.readSlice(chunkLength - 4), out, chunkLength); snappy.decode(in.readSlice(chunkLength - 4), out);
} }
snappy.reset(); snappy.reset();
break; break;

View File

@ -61,12 +61,12 @@ public class SnappyFramedEncoder extends ByteToByteEncoder {
for (;;) { for (;;) {
final int lengthIdx = out.writerIndex() + 1; final int lengthIdx = out.writerIndex() + 1;
out.writeInt(0); out.writeInt(0);
if (dataLength > 65536) { if (dataLength > 32768) {
ByteBuf slice = in.readSlice(65536); ByteBuf slice = in.readSlice(32768);
calculateAndWriteChecksum(slice, out); calculateAndWriteChecksum(slice, out);
snappy.encode(slice, out, 65536); snappy.encode(slice, out, 32768);
setChunkLength(out, lengthIdx); setChunkLength(out, lengthIdx);
dataLength -= 65536; dataLength -= 32768;
} else { } else {
ByteBuf slice = in.readSlice(dataLength); ByteBuf slice = in.readSlice(dataLength);
calculateAndWriteChecksum(slice, out); calculateAndWriteChecksum(slice, out);

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedByteChannel; import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.util.Random; import java.util.Random;
@ -37,7 +36,6 @@ public class SnappyIntegrationTest {
} }
@Test @Test
@Ignore // FIXME: Make it pass.
public void test1002() throws Exception { public void test1002() throws Exception {
// Data from https://github.com/netty/netty/issues/1002 // Data from https://github.com/netty/netty/issues/1002
testIdentity(wrappedBuffer(new byte[] { testIdentity(wrappedBuffer(new byte[] {
@ -63,7 +61,6 @@ public class SnappyIntegrationTest {
} }
@Test @Test
@Ignore // FIXME: Make it pass.
public void testRandom() throws Exception { public void testRandom() throws Exception {
byte[] data = new byte[16 * 1048576]; byte[] data = new byte[16 * 1048576];
new Random().nextBytes(data); new Random().nextBytes(data);

View File

@ -38,7 +38,7 @@ public class SnappyTest {
0x6e, 0x65, 0x74, 0x74, 0x79 // "netty" 0x6e, 0x65, 0x74, 0x74, 0x79 // "netty"
}); });
ByteBuf out = Unpooled.buffer(5); ByteBuf out = Unpooled.buffer(5);
snappy.decode(in, out, 7); snappy.decode(in, out);
// "netty" // "netty"
ByteBuf expected = Unpooled.wrappedBuffer(new byte[] { ByteBuf expected = Unpooled.wrappedBuffer(new byte[] {
@ -57,7 +57,7 @@ public class SnappyTest {
0x05 // offset 0x05 // offset
}); });
ByteBuf out = Unpooled.buffer(10); ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 10); snappy.decode(in, out);
// "nettynetty" - we saved a whole byte :) // "nettynetty" - we saved a whole byte :)
ByteBuf expected = Unpooled.wrappedBuffer(new byte[] { ByteBuf expected = Unpooled.wrappedBuffer(new byte[] {
@ -69,14 +69,14 @@ public class SnappyTest {
@Test(expected = CompressionException.class) @Test(expected = CompressionException.class)
public void testDecodeCopyWithTinyOffset() throws Exception { public void testDecodeCopyWithTinyOffset() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] { ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x0a, // preamble length 0x0b, // preamble length
0x04 << 2, // literal tag + length 0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79, // "netty" 0x6e, 0x65, 0x74, 0x74, 0x79, // "netty"
0x05 << 2 | 0x01, // copy with 1-byte offset + length 0x05 << 2 | 0x01, // copy with 1-byte offset + length
0x03 // INVALID offset (< 4) 0x00 // INVALID offset (< 1)
}); });
ByteBuf out = Unpooled.buffer(10); ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 9); snappy.decode(in, out);
} }
@Test(expected = CompressionException.class) @Test(expected = CompressionException.class)
@ -89,7 +89,7 @@ public class SnappyTest {
0x0b // INVALID offset (greater than chunk size) 0x0b // INVALID offset (greater than chunk size)
}); });
ByteBuf out = Unpooled.buffer(10); ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 9); snappy.decode(in, out);
} }
@Test(expected = CompressionException.class) @Test(expected = CompressionException.class)
@ -100,7 +100,7 @@ public class SnappyTest {
0x6e, 0x65, 0x74, 0x74, 0x79, // "netty" 0x6e, 0x65, 0x74, 0x74, 0x79, // "netty"
}); });
ByteBuf out = Unpooled.buffer(10); ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 9); snappy.decode(in, out);
} }
@Test @Test