Merge SnappyChecksumUtil into Snappy and make calculateChecksum() public / Overall clean up

This commit is contained in:
Trustin Lee 2013-05-09 15:23:56 +09:00
parent c406647bb2
commit 80f4c0b334
6 changed files with 142 additions and 182 deletions

View File

@ -15,8 +15,11 @@
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf;
import java.util.zip.CRC32;
/**
* Uncompresses an input {@link ByteBuf} encoded with Snappy compression into an
* output {@link ByteBuf}.
@ -24,6 +27,7 @@ import io.netty.buffer.ByteBuf;
* See http://code.google.com/p/snappy/source/browse/trunk/format_description.txt
*/
public class Snappy {
private static final int MAX_HT_SIZE = 1 << 14;
private static final int MIN_COMPRESSIBLE_BYTES = 15;
@ -41,7 +45,7 @@ public class Snappy {
private byte tag;
private int written;
private static enum State {
private enum State {
READY,
READING_PREAMBLE,
READING_TAG,
@ -390,11 +394,10 @@ public class Snappy {
* byte stream.
*
* @param tag The tag that identified this segment as a literal is also
* used to encode part of the length of the data
* used to encode part of the length of the data
* @param in The input buffer to read the literal from
* @param out The output buffer to write the literal to
* @return The number of bytes appended to the output buffer, or -1 to indicate
* "try again later"
* @return The number of bytes appended to the output buffer, or -1 to indicate "try again later"
*/
private static int decodeLiteral(byte tag, ByteBuf in, ByteBuf out) {
in.markReaderIndex();
@ -410,25 +413,19 @@ public class Snappy {
if (in.readableBytes() < 2) {
return NOT_ENOUGH_INPUT;
}
length = in.readUnsignedByte()
| in.readUnsignedByte() << 8;
length = BufUtil.swapShort(in.readShort());
break;
case 62:
if (in.readableBytes() < 3) {
return NOT_ENOUGH_INPUT;
}
length = in.readUnsignedByte()
| in.readUnsignedByte() << 8
| in.readUnsignedByte() << 16;
length = BufUtil.swapMedium(in.readUnsignedMedium());
break;
case 64:
if (in.readableBytes() < 4) {
return NOT_ENOUGH_INPUT;
}
length = in.readUnsignedByte()
| in.readUnsignedByte() << 8
| in.readUnsignedByte() << 16
| in.readUnsignedByte() << 24;
length = BufUtil.swapInt(in.readInt());
break;
default:
length = tag >> 2 & 0x3F;
@ -508,7 +505,7 @@ public class Snappy {
int initialIndex = out.writerIndex();
int length = 1 + (tag >> 2 & 0x03f);
int offset = in.readUnsignedByte() | in.readUnsignedByte() << 8;
int offset = BufUtil.swapShort(in.readShort());
validateOffset(offset, writtenSoFar);
@ -552,10 +549,7 @@ public class Snappy {
int initialIndex = out.writerIndex();
int length = 1 + (tag >> 2 & 0x03F);
int offset = in.readUnsignedByte() |
in.readUnsignedByte() << 8 |
in.readUnsignedByte() << 16 |
in.readUnsignedByte() << 24;
int offset = BufUtil.swapInt(in.readInt());
validateOffset(offset, writtenSoFar);
@ -601,4 +595,83 @@ public class Snappy {
throw new CompressionException("Offset exceeds size of chunk");
}
}
/**
* Computes the CRC32 checksum of the supplied data and performs the "mask" operation
* on the computed checksum
*
* @param data The input data to calculate the CRC32 checksum of
*/
public static int calculateChecksum(ByteBuf data) {
return calculateChecksum(data, data.readerIndex(), data.readableBytes());
}
/**
* Computes the CRC32 checksum of the supplied data and performs the "mask" operation
* on the computed checksum
*
* @param data The input data to calculate the CRC32 checksum of
*/
public static int calculateChecksum(ByteBuf data, int offset, int length) {
CRC32 crc32 = new CRC32();
try {
if (data.hasArray()) {
crc32.update(data.array(), data.arrayOffset() + offset, length);
} else {
byte[] array = new byte[length];
data.getBytes(offset, array);
crc32.update(array);
}
return maskChecksum((int) crc32.getValue());
} finally {
crc32.reset();
}
}
/**
* Computes the CRC32 checksum of the supplied data, performs the "mask" operation
* on the computed checksum, and then compares the resulting masked checksum to the
* supplied checksum.
*
* @param expectedChecksum The checksum decoded from the stream to compare against
* @param data The input data to calculate the CRC32 checksum of
* @throws CompressionException If the calculated and supplied checksums do not match
*/
static void validateChecksum(int expectedChecksum, ByteBuf data) {
validateChecksum(expectedChecksum, data, data.readerIndex(), data.readableBytes());
}
/**
* Computes the CRC32 checksum of the supplied data, performs the "mask" operation
* on the computed checksum, and then compares the resulting masked checksum to the
* supplied checksum.
*
* @param expectedChecksum The checksum decoded from the stream to compare against
* @param data The input data to calculate the CRC32 checksum of
* @throws CompressionException If the calculated and supplied checksums do not match
*/
static void validateChecksum(int expectedChecksum, ByteBuf data, int offset, int length) {
final int actualChecksum = calculateChecksum(data, offset, length);
if (actualChecksum != expectedChecksum) {
throw new CompressionException(
"mismatching checksum: " + Integer.toHexString(actualChecksum) +
" (expected: " + Integer.toHexString(expectedChecksum) + ')');
}
}
/**
* From the spec:
*
* "Checksums are not stored directly, but masked, as checksumming data and
* then its own checksum can be problematic. The masking is the same as used
* in Apache Hadoop: Rotate the checksum by 15 bits, then add the constant
* 0xa282ead8 (using wraparound as normal for unsigned integers)."
*
* @param checksum The actual checksum of the data
* @return The masked checksum
*/
static int maskChecksum(int checksum) {
return (checksum >> 15 | checksum << 17) + 0xa282ead8;
}
}

View File

@ -1,76 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import java.util.zip.CRC32;
final class SnappyChecksumUtil {
/**
* Computes the CRC32 checksum of the supplied data, performs the "mask" operation
* on the computed checksum, and then compares the resulting masked checksum to the
* supplied checksum.
*
* @param slice The input data to calculate the CRC32 checksum of
* @param checksum The checksum decoded from the stream to compare against
* @throws CompressionException If the calculated and supplied checksums do not match
*/
static void validateChecksum(ByteBuf slice, int checksum) {
if (calculateChecksum(slice) != checksum) {
throw new CompressionException("Uncompressed data did not match checksum");
}
}
/**
* Computes the CRC32 checksum of the supplied data and performs the "mask" operation
* on the computed checksum
*
* @param slice The input data to calculate the CRC32 checksum of
*/
static int calculateChecksum(ByteBuf slice) {
CRC32 crc32 = new CRC32();
try {
byte[] array = new byte[slice.readableBytes()];
slice.markReaderIndex();
slice.readBytes(array);
slice.resetReaderIndex();
crc32.update(array);
return maskChecksum((int) crc32.getValue());
} finally {
crc32.reset();
}
}
/**
* From the spec:
*
* "Checksums are not stored directly, but masked, as checksumming data and
* then its own checksum can be problematic. The masking is the same as used
* in Apache Hadoop: Rotate the checksum by 15 bits, then add the constant
* 0xa282ead8 (using wraparound as normal for unsigned integers)."
*
* @param checksum The actual checksum of the data
* @return The masked checksum
*/
static int maskChecksum(int checksum) {
return (checksum >> 15 | checksum << 17) + 0xa282ead8;
}
// utility class
private SnappyChecksumUtil() { }
}

View File

@ -15,14 +15,14 @@
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToByteDecoder;
import java.nio.charset.Charset;
import java.util.Arrays;
import static io.netty.handler.codec.compression.SnappyChecksumUtil.*;
import static io.netty.handler.codec.compression.Snappy.*;
/**
* Uncompresses a {@link ByteBuf} encoded with the Snappy framing format.
@ -38,7 +38,7 @@ public class SnappyFramedDecoder extends ByteToByteDecoder {
RESERVED_SKIPPABLE
}
private static final byte[] SNAPPY = "sNaPpY".getBytes(Charset.forName("US-ASCII"));
private static final byte[] SNAPPY = { 's', 'N', 'a', 'P', 'p', 'Y' };
private final Snappy snappy = new Snappy();
private final boolean validateChecksums;
@ -85,9 +85,7 @@ public class SnappyFramedDecoder extends ByteToByteDecoder {
final int chunkTypeVal = in.getUnsignedByte(idx);
final ChunkType chunkType = mapChunkType((byte) chunkTypeVal);
final int chunkLength = in.getUnsignedByte(idx + 1)
| in.getUnsignedByte(idx + 2) << 8
| in.getUnsignedByte(idx + 3) << 16;
final int chunkLength = BufUtil.swapMedium(in.getUnsignedMedium(idx + 1));
switch (chunkType) {
case STREAM_IDENTIFIER:
@ -141,17 +139,12 @@ public class SnappyFramedDecoder extends ByteToByteDecoder {
in.skipBytes(4);
if (validateChecksums) {
int checksum = in.readUnsignedByte()
| in.readUnsignedByte() << 8
| in.readUnsignedByte() << 16
| in.readUnsignedByte() << 24;
ByteBuf data = in.readSlice(chunkLength - 4);
validateChecksum(data, checksum);
out.writeBytes(data);
int checksum = BufUtil.swapInt(in.readInt());
validateChecksum(checksum, in, in.readerIndex(), chunkLength - 4);
} else {
in.skipBytes(4);
in.readBytes(out, chunkLength - 4);
}
out.writeBytes(in, chunkLength - 4);
break;
case COMPRESSED_DATA:
if (!started) {
@ -163,16 +156,18 @@ public class SnappyFramedDecoder extends ByteToByteDecoder {
}
in.skipBytes(4);
int checksum = in.readUnsignedByte()
| in.readUnsignedByte() << 8
| in.readUnsignedByte() << 16
| in.readUnsignedByte() << 24;
int checksum = BufUtil.swapInt(in.readInt());
if (validateChecksums) {
// TODO: Optimize me.
ByteBuf uncompressed = ctx.alloc().buffer();
snappy.decode(in.readSlice(chunkLength - 4), uncompressed);
validateChecksum(uncompressed, checksum);
out.writeBytes(uncompressed);
int oldWriterIndex = in.writerIndex();
int uncompressedStart = out.writerIndex();
try {
in.writerIndex(in.readerIndex() + chunkLength - 4);
snappy.decode(in, out);
} finally {
in.writerIndex(oldWriterIndex);
}
int uncompressedLength = in.writerIndex() - uncompressedStart;
validateChecksum(checksum, out, uncompressedStart, uncompressedLength);
} else {
snappy.decode(in.readSlice(chunkLength - 4), out);
}
@ -189,8 +184,7 @@ public class SnappyFramedDecoder extends ByteToByteDecoder {
* Decodes the chunk type from the type tag byte.
*
* @param type The tag byte extracted from the stream
* @return The appropriate {@link ChunkType}, defaulting to
* {@link ChunkType#RESERVED_UNSKIPPABLE}
* @return The appropriate {@link ChunkType}, defaulting to {@link ChunkType#RESERVED_UNSKIPPABLE}
*/
static ChunkType mapChunkType(byte type) {
if (type == 0) {

View File

@ -15,11 +15,12 @@
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToByteEncoder;
import static io.netty.handler.codec.compression.SnappyChecksumUtil.*;
import static io.netty.handler.codec.compression.Snappy.*;
/**
* Compresses a {@link ByteBuf} using the Snappy framing format.
@ -98,9 +99,7 @@ public class SnappyFramedEncoder extends ByteToByteEncoder {
if (chunkLength >>> 24 != 0) {
throw new CompressionException("compressed data too large: " + chunkLength);
}
out.setByte(lengthIdx, chunkLength & 0xff);
out.setByte(lengthIdx + 1, chunkLength >>> 8 & 0xff);
out.setByte(lengthIdx + 2, chunkLength >>> 16 & 0xff);
out.setMedium(lengthIdx, BufUtil.swapMedium(chunkLength));
}
/**
@ -110,9 +109,7 @@ public class SnappyFramedEncoder extends ByteToByteEncoder {
* @param chunkLength The length to write
*/
private static void writeChunkLength(ByteBuf out, int chunkLength) {
out.writeByte(chunkLength & 0xff);
out.writeByte(chunkLength >>> 8 & 0xff);
out.writeByte(chunkLength >>> 16 & 0xff);
out.writeMedium(BufUtil.swapMedium(chunkLength));
}
/**
@ -122,10 +119,6 @@ public class SnappyFramedEncoder extends ByteToByteEncoder {
* @param out The output buffer to write the checksum to
*/
private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
int checksum = calculateChecksum(slice);
out.writeByte(checksum & 0x0ff);
out.writeByte(checksum >>> 8 & 0x0ff);
out.writeByte(checksum >>> 16 & 0x0ff);
out.writeByte(checksum >>> 24);
out.writeInt(BufUtil.swapInt(calculateChecksum(slice)));
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import static io.netty.handler.codec.compression.SnappyChecksumUtil.*;
import static org.junit.Assert.*;
public class SnappyChecksumUtilTest {
@Test
public void testCalculateChecksum() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'n', 'e', 't', 't', 'y'
});
assertEquals(maskChecksum(0xddaa8ce6), calculateChecksum(input));
}
@Test
public void testValidateChecksumMatches() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'y', 't', 't', 'e', 'n'
});
validateChecksum(input, maskChecksum(0x37c55159));
}
@Test(expected = CompressionException.class)
public void testValidateChecksumFails() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'y', 't', 't', 'e', 'n'
});
validateChecksum(input, maskChecksum(0xddaa8ce6));
}
}

View File

@ -20,6 +20,7 @@ import io.netty.buffer.Unpooled;
import org.junit.After;
import org.junit.Test;
import static io.netty.handler.codec.compression.Snappy.*;
import static org.junit.Assert.*;
public class SnappyTest {
@ -168,4 +169,30 @@ public class SnappyTest {
assertEquals("Encoded result was incorrect", expected, out);
}
@Test
public void testCalculateChecksum() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'n', 'e', 't', 't', 'y'
});
assertEquals(maskChecksum(0xddaa8ce6), calculateChecksum(input));
}
@Test
public void testValidateChecksumMatches() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'y', 't', 't', 'e', 'n'
});
validateChecksum(maskChecksum(0x37c55159), input);
}
@Test(expected = CompressionException.class)
public void testValidateChecksumFails() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'y', 't', 't', 'e', 'n'
});
validateChecksum(maskChecksum(0xddaa8ce6), input);
}
}