Add Snappy compression codec

This commit is contained in:
Luke Wood 2012-12-18 15:09:30 +00:00 committed by Norman Maurer
parent f6735f8cc9
commit 43e40d6af6
13 changed files with 1379 additions and 0 deletions

View File

@ -130,3 +130,10 @@ obtained at:
* HOMEPAGE:
* http://www.osgi.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* http://code.google.com/p/snappy/

43
codec-snappy/pom.xml Normal file
View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2012 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Beta1-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-snappy</artifactId>
<packaging>jar</packaging>
<name>Netty/Codec/Snappy</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,458 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.compression.CompressionException;
/**
* Uncompresses an input {@link ByteBuf} encoded with Snappy compression into an
* output {@link ByteBuf}.
*
* See http://code.google.com/p/snappy/source/browse/trunk/format_description.txt
*/
public class Snappy {
private static final int MAX_HT_SIZE = 1 << 14;
private static final int MIN_COMPRESSIBLE_BYTES = 15;
// constants for the tag types
private static final int LITERAL = 0;
private static final int COPY_1_BYTE_OFFSET = 1;
private static final int COPY_2_BYTE_OFFSET = 2;
private static final int COPY_4_BYTE_OFFSET = 3;
private int inputLength;
public void reset() {
inputLength = 0;
}
public void encode(ByteBuf in, ByteBuf out, int length) {
// Write the preamble length to the output buffer
int bytesToEncode = 1 + bitsToEncode(length - 1) / 7;
for (int i = 0; i < bytesToEncode; i++) {
if (i == bytesToEncode - 1) {
out.writeByte(length >> i * 7);
} else {
out.writeByte(0x80 | length >> i * 7);
}
}
int inIndex = in.readerIndex();
final int baseIndex = in.readerIndex();
final int maxIndex = length;
final short[] table = getHashTable(maxIndex);
final int shift = 32 - (int) Math.floor(Math.log(table.length) / Math.log(2));
int nextEmit = inIndex;
if (maxIndex - inIndex >= MIN_COMPRESSIBLE_BYTES) {
int nextHash = hash(in, ++inIndex, shift);
outer: while (true) {
int skip = 32;
int candidate;
int nextIndex = inIndex;
do {
inIndex = nextIndex;
int hash = nextHash;
int bytesBetweenHashLookups = skip++ >> 5;
nextIndex = inIndex + bytesBetweenHashLookups;
if (nextIndex > maxIndex) {
break outer;
}
nextHash = hash(in, nextIndex, shift);
candidate = baseIndex + table[hash];
table[hash] = (short) (inIndex - baseIndex);
}
while (in.getInt(inIndex) != in.getInt(candidate));
encodeLiteral(in, out, inIndex - nextEmit);
int insertTail;
do {
int base = inIndex;
int matched = 4 + findMatchingLength(in, candidate + 4, inIndex + 4, maxIndex);
inIndex += matched;
int offset = base - candidate;
encodeCopy(out, offset, matched);
in.readerIndex(in.readerIndex() + matched);
insertTail = inIndex - 1;
nextEmit = inIndex;
if (inIndex >= maxIndex) {
break outer;
}
int prevHash = hash(in, insertTail, shift);
table[prevHash] = (short) (inIndex - baseIndex - 1);
int currentHash = hash(in, insertTail + 1, shift);
candidate = baseIndex + table[currentHash];
table[currentHash] = (short) (inIndex - baseIndex);
}
while (in.getInt(insertTail + 1) == in.getInt(candidate));
nextHash = hash(in, insertTail + 2, shift);
++inIndex;
}
}
// If there are any remaining characters, write them out as a literal
if (nextEmit < maxIndex) {
encodeLiteral(in, out, maxIndex - nextEmit);
}
}
/**
* Hashes the 4 bytes located at index, shifting the resulting hash into
* the appropriate range for our hash table.
*
* @param in The input buffer to read 4 bytes from
* @param index The index to read at
* @param shift The shift value, for ensuring that the resulting value is
* withing the range of our hash table size
* @return A 32-bit hash of 4 bytes located at index
*/
private static int hash(ByteBuf in, int index, int shift) {
return in.getInt(index) + 0x1e35a7bd >>> shift;
}
/**
* Creates an appropriately sized hashtable for the given input size
*
* @param inputSize The size of our input, ie. the number of bytes we need to encode
* @return An appropriately sized empty hashtable
*/
private static short[] getHashTable(int inputSize) {
int htSize = 256;
while (htSize < MAX_HT_SIZE && htSize < inputSize) {
htSize <<= 1;
}
short[] table;
if (htSize <= 256) {
table = new short[256];
} else {
table = new short[MAX_HT_SIZE];
}
return table;
}
/**
* Iterates over the supplied input buffer between the supplied minIndex and
* maxIndex to find how long our matched copy overlaps with an already-written
* literal value.
*
* @param in The input buffer to scan over
* @param minIndex The index in the input buffer to start scanning from
* @param inIndex The index of the start of our copy
* @param maxIndex The length of our input buffer
* @return The number of bytes for which our candidate copy is a repeat of
*/
private static int findMatchingLength(ByteBuf in, int minIndex, int inIndex, int maxIndex) {
int matched = 0;
while (inIndex <= maxIndex - 4 &&
in.getInt(inIndex) == in.getInt(minIndex + matched)) {
inIndex += 4;
matched += 4;
}
while (inIndex < maxIndex && in.getByte(minIndex + matched) == in.getByte(inIndex)) {
++inIndex;
++matched;
}
return matched;
}
/**
* Calculates the minimum number of bits required to encode a value. This can
* then in turn be used to calculate the number of septets or octets (as
* appropriate) to use to encode a length parameter.
*
* @param value The value to calculate the minimum number of bits required to encode
* @return The minimum number of bits required to encode the supplied value
*/
private static int bitsToEncode(int value) {
int highestOneBit = Integer.highestOneBit(value);
int bitLength = 0;
while ((highestOneBit >>= 1) != 0) {
bitLength++;
}
return bitLength;
}
/**
* Writes a literal to the supplied output buffer by directly copying from
* the input buffer. The literal is taken from the current readerIndex
* up to the supplied length.
*
* @param in The input buffer to copy from
* @param out The output buffer to copy to
* @param length The length of the literal to copy
*/
private static void encodeLiteral(ByteBuf in, ByteBuf out, int length) {
if (length < 61) {
out.writeByte(length - 1 << 2);
} else {
int bitLength = bitsToEncode(length - 1);
int bytesToEncode = 1 + bitLength / 8;
out.writeByte(59 + bytesToEncode << 2);
for (int i = 0; i < bytesToEncode; i++) {
out.writeByte(length - 1 >> i * 8 & 0x0ff);
}
}
out.writeBytes(in, length);
}
private static void encodeCopyWithOffset(ByteBuf out, int offset, int length) {
if (length < 12 && offset < 2048) {
out.writeByte(COPY_1_BYTE_OFFSET | length - 4 << 2 | offset >> 8 << 5);
out.writeByte(offset & 0x0ff);
} else {
out.writeByte(COPY_2_BYTE_OFFSET | length - 1 << 2);
out.writeByte(offset & 0x0ff);
out.writeByte(offset >> 8 & 0x0ff);
}
}
/**
* Encodes a series of copies, each at most 64 bytes in length.
*
* @param out The output buffer to write the copy pointer to
* @param offset The offset at which the original instance lies
* @param length The length of the original instance
*/
private static void encodeCopy(ByteBuf out, int offset, int length) {
while (length >= 68) {
encodeCopyWithOffset(out, offset, 64);
length -= 64;
}
if (length > 64) {
encodeCopyWithOffset(out, offset, 60);
length -= 60;
}
encodeCopyWithOffset(out, offset, length);
}
public void decode(ByteBuf in, ByteBuf out, int maxLength) {
int inIndex = in.readerIndex();
if (inputLength == 0) {
inputLength = readPreamble(in);
}
if (inputLength == 0 || in.readerIndex() - inIndex + in.readableBytes() < maxLength) {
// Wait until we've got the entire chunk before continuing
return;
}
out.ensureWritableBytes(inputLength);
while (in.readable() && in.readerIndex() - inIndex < maxLength) {
byte tag = in.readByte();
switch (tag & 0x03) {
case LITERAL:
decodeLiteral(tag, in, out);
break;
case COPY_1_BYTE_OFFSET:
decodeCopyWith1ByteOffset(tag, in, out);
break;
case COPY_2_BYTE_OFFSET:
decodeCopyWith2ByteOffset(tag, in, out);
break;
case COPY_4_BYTE_OFFSET:
decodeCopyWith4ByteOffset(tag, in, out);
break;
}
}
}
/**
* Reads the length varint (a series of bytes, where the lower 7 bits
* are data and the upper bit is a flag to indicate more bytes to be
* read).
*
* @param in The input buffer to read the preamble from
* @return The calculated length based on the input buffer, or 0 if
* no preamble is able to be calculated
*/
private static int readPreamble(ByteBuf in) {
int length = 0;
int byteIndex = 0;
while (in.readableBytes() > 0) {
int current = in.readByte() & 0x0ff;
length += current << byteIndex++ * 7;
if ((current & 0x80) != 0x80) {
return length;
}
if (byteIndex >= 4) {
throw new CompressionException("Preamble is greater than 4 bytes");
}
}
return 0;
}
/**
* Reads a literal from the input buffer directly to the output buffer.
* A "literal" is an uncompressed segment of data stored directly in the
* byte stream.
*
* @param tag The tag that identified this segment as a literal is also
* used to encode part of the length of the data
* @param in The input buffer to read the literal from
* @param out The output buffer to write the literal to
*/
private static void decodeLiteral(byte tag, ByteBuf in, ByteBuf out) {
int length;
switch(tag >> 2 & 0x3F) {
case 60:
length = in.readByte() & 0x0ff;
break;
case 61:
length = (in.readByte() & 0x0ff)
+ ((in.readByte() & 0x0ff) << 8);
break;
case 62:
length = (in.readByte() & 0x0ff)
+ ((in.readByte() & 0x0ff) << 8)
+ ((in.readByte() & 0x0ff) << 16);
break;
case 64:
length = (in.readByte() & 0x0ff)
+ ((in.readByte() & 0x0ff) << 8)
+ ((in.readByte() & 0x0ff) << 16)
+ ((in.readByte() & 0x0ff) << 24);
break;
default:
length = tag >> 2 & 0x3F;
}
length += 1;
out.writeBytes(in, length);
}
/**
* Reads a compressed reference offset and length from the supplied input
* buffer, seeks back to the appropriate place in the input buffer and
* writes the found data to the supplied output stream.
*
* @param tag The tag used to identify this as a copy is also used to encode
* the length and part of the offset
* @param in The input buffer to read from
* @param out The output buffer to write to
* @throws CompressionException If the read offset is invalid
*/
private static void decodeCopyWith1ByteOffset(byte tag, ByteBuf in, ByteBuf out) {
int initialIndex = in.readerIndex();
int length = 4 + ((tag & 0x0c) >> 2);
int offset = 1 + ((tag & 0x0e0) << 8)
+ (in.readByte() & 0x0ff);
validateOffset(offset, initialIndex);
in.markReaderIndex();
in.readerIndex(initialIndex - offset);
in.readBytes(out, length);
in.resetReaderIndex();
}
/**
* Reads a compressed reference offset and length from the supplied input
* buffer, seeks back to the appropriate place in the input buffer and
* writes the found data to the supplied output stream.
*
* @param tag The tag used to identify this as a copy is also used to encode
* the length and part of the offset
* @param in The input buffer to read from
* @param out The output buffer to write to
* @throws CompressionException If the read offset is invalid
*/
private static void decodeCopyWith2ByteOffset(byte tag, ByteBuf in, ByteBuf out) {
int initialIndex = in.readerIndex();
int length = 1 + (tag >> 2 & 0x03f);
int offset = 1 + (in.readByte() & 0x0ff)
+ (in.readByte() & 0x0ff) << 8;
validateOffset(offset, initialIndex);
in.markReaderIndex();
in.readerIndex(initialIndex - offset);
in.readBytes(out, length);
in.resetReaderIndex();
}
/**
* Reads a compressed reference offset and length from the supplied input
* buffer, seeks back to the appropriate place in the input buffer and
* writes the found data to the supplied output stream.
*
* @param tag The tag used to identify this as a copy is also used to encode
* the length and part of the offset
* @param in The input buffer to read from
* @param out The output buffer to write to
* @throws CompressionException If the read offset is invalid
*/
private static void decodeCopyWith4ByteOffset(byte tag, ByteBuf in, ByteBuf out) {
int initialIndex = in.readerIndex();
int length = 1 + (tag >> 2 & 0x03F);
int offset = 1 + (in.readByte() & 0x0ff)
+ ((in.readByte() & 0x0ff) << 8)
+ ((in.readByte() & 0x0ff) << 16)
+ ((in.readByte() & 0x0ff) << 24);
validateOffset(offset, initialIndex);
in.markReaderIndex();
in.readerIndex(initialIndex - offset);
in.readBytes(out, length);
in.resetReaderIndex();
}
/**
* Validates that the offset extracted from a compressed reference is within
* the permissible bounds of an offset (4 <= offset <= 32768), and does not
* exceed the length of the chunk currently read so far.
*
* @param offset The offset extracted from the compressed reference
* @param chunkSizeSoFar The number of bytes read so far from this chunk
* @throws CompressionException if the offset is invalid
*/
private static void validateOffset(int offset, int chunkSizeSoFar) {
if (offset > Short.MAX_VALUE) {
throw new CompressionException("Offset exceeds maximum permissible value");
}
if (offset <= 4) {
throw new CompressionException("Offset is less than minimum permissible value");
}
if (offset > chunkSizeSoFar) {
throw new CompressionException("Offset exceeds size of chunk");
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.compression.CompressionException;
import java.util.zip.CRC32;
public final class SnappyChecksumUtil {
/**
* Computes the CRC32 checksum of the supplied data, performs the "mask" operation
* on the computed checksum, and then compares the resulting masked checksum to the
* supplied checksum.
*
* @param slice The input data to calculate the CRC32 checksum of
* @param checksum The checksum decoded from the stream to compare against
* @throws CompressionException If the calculated and supplied checksums do not match
*/
public static void validateChecksum(ByteBuf slice, int checksum) {
if (calculateChecksum(slice) != checksum) {
throw new CompressionException("Uncompressed data did not match checksum");
}
}
/**
* Computes the CRC32 checksum of the supplied data and performs the "mask" operation
* on the computed checksum
*
* @param slice The input data to calculate the CRC32 checksum of
*/
public static int calculateChecksum(ByteBuf slice) {
CRC32 crc32 = new CRC32();
try {
if (slice.hasArray()) {
crc32.update(slice.array());
} else {
byte[] array = new byte[slice.readableBytes()];
slice.markReaderIndex();
slice.readBytes(array);
slice.resetReaderIndex();
crc32.update(array);
}
return maskChecksum((int) crc32.getValue());
} finally {
crc32.reset();
}
}
/**
* From the spec:
*
* "Checksums are not stored directly, but masked, as checksumming data and
* then its own checksum can be problematic. The masking is the same as used
* in Apache Hadoop: Rotate the checksum by 15 bits, then add the constant
* 0xa282ead8 (using wraparound as normal for unsigned integers)."
*
* @param checksum The actual checksum of the data
* @return The masked checksum
*/
static int maskChecksum(int checksum) {
return (checksum >> 15 | checksum << 17) + 0xa282ead8;
}
// utility class
private SnappyChecksumUtil() { }
}

View File

@ -0,0 +1,189 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import java.nio.charset.Charset;
import java.util.Arrays;
import static io.netty.handler.codec.compression.snappy.SnappyChecksumUtil.validateChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToByteDecoder;
import io.netty.handler.codec.compression.CompressionException;
/**
* Uncompresses a {@link ByteBuf} encoded with the Snappy framing format.
*
* See http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
*/
public class SnappyFramedDecoder extends ByteToByteDecoder {
enum ChunkType {
STREAM_IDENTIFIER,
COMPRESSED_DATA,
UNCOMPRESSED_DATA,
RESERVED_UNSKIPPABLE,
RESERVED_SKIPPABLE
}
private static final byte[] SNAPPY = "sNaPpY".getBytes(Charset.forName("US-ASCII"));
private final Snappy snappy = new Snappy();
private final boolean validateChecksums;
private int chunkLength;
private ChunkType chunkType;
private boolean started;
/**
* Creates a new snappy-framed decoder with validation of checksums
* turned off
*/
public SnappyFramedDecoder() {
this(false);
}
/**
* Creates a new snappy-framed decoder with validation of checksums
* as specified.
*
* @param validateChecksums
* If true, the checksum field will be validated against the actual
* uncompressed data, and if the checksums do not match, a suitable
* {@link CompressionException} will be thrown
*/
public SnappyFramedDecoder(boolean validateChecksums) {
this.validateChecksums = validateChecksums;
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (!in.readable()) {
return;
}
while (in.readable()) {
if (chunkLength == 0) {
if (in.readableBytes() < 3) {
// We need to be at least able to read the chunk type identifier (one byte),
// and the length of the chunk (2 bytes) in order to proceed
return;
}
byte type = in.readByte();
chunkType = mapChunkType(type);
chunkLength = (in.readByte() & 0x0ff)
+ ((in.readByte() & 0x0ff) << 8);
// The spec mandates that reserved unskippable chunks must immediately
// return an error, as we must assume that we cannot decode the stream
// correctly
if (chunkType == ChunkType.RESERVED_UNSKIPPABLE) {
throw new CompressionException("Found reserved unskippable chunk type: " + type);
}
}
if (chunkLength == 0 || in.readableBytes() < chunkLength) {
// Wait until the entire chunk is available, as it will prevent us from
// having to buffer the data here instead
return;
}
int checksum;
switch(chunkType) {
case STREAM_IDENTIFIER:
if (chunkLength != SNAPPY.length) {
throw new CompressionException("Unexpected length of stream identifier: " + chunkLength);
}
byte[] identifier = new byte[chunkLength];
in.readBytes(identifier);
if (!Arrays.equals(identifier, SNAPPY)) {
throw new CompressionException("Unexpected stream identifier contents. Mismatched snappy " +
"protocol version?");
}
started = true;
break;
case RESERVED_SKIPPABLE:
if (!started) {
throw new CompressionException("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER");
}
in.skipBytes(chunkLength);
break;
case UNCOMPRESSED_DATA:
if (!started) {
throw new CompressionException("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER");
}
checksum = in.readByte() & 0x0ff
+ (in.readByte() << 8 & 0x0ff)
+ (in.readByte() << 16 & 0x0ff)
+ (in.readByte() << 24 & 0x0ff);
if (validateChecksums) {
ByteBuf data = in.readBytes(chunkLength);
validateChecksum(data, checksum);
out.writeBytes(data);
} else {
in.readBytes(out, chunkLength);
}
break;
case COMPRESSED_DATA:
if (!started) {
throw new CompressionException("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER");
}
checksum = in.readByte() & 0x0ff
+ (in.readByte() << 8 & 0x0ff)
+ (in.readByte() << 16 & 0x0ff)
+ (in.readByte() << 24 & 0x0ff);
if (validateChecksums) {
ByteBuf uncompressed = ctx.alloc().buffer();
snappy.decode(in, uncompressed, chunkLength);
validateChecksum(uncompressed, checksum);
} else {
snappy.decode(in, out, chunkLength);
}
snappy.reset();
break;
}
chunkLength = 0;
}
}
/**
* Decodes the chunk type from the type tag byte.
*
* @param type The tag byte extracted from the stream
* @return The appropriate {@link ChunkType}, defaulting to
* {@link ChunkType#RESERVED_UNSKIPPABLE}
*/
static ChunkType mapChunkType(byte type) {
if (type == 0) {
return ChunkType.COMPRESSED_DATA;
} else if (type == 1) {
return ChunkType.UNCOMPRESSED_DATA;
} else if (type == -0x80) {
return ChunkType.STREAM_IDENTIFIER;
} else if ((type & 0x80) == 0x80) {
return ChunkType.RESERVED_SKIPPABLE;
} else {
return ChunkType.RESERVED_UNSKIPPABLE;
}
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToByteEncoder;
import static io.netty.handler.codec.compression.snappy.SnappyChecksumUtil.*;
/**
* Compresses a {@link ByteBuf} using the Snappy framing format.
*
* See http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
*/
public class SnappyFramedEncoder extends ByteToByteEncoder {
/**
* The minimum amount that we'll consider actually attempting to compress.
* This value is preamble + the minimum length our Snappy service will
* compress (instead of just emitting a literal).
*/
private static final int MIN_COMPRESSIBLE_LENGTH = 18;
/**
* All streams should start with the "Stream identifier", containing chunk
* type 0xff, a length field of 0x6, and 'sNaPpY' in ASCII.
*/
private static final byte[] STREAM_START = {
-0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
};
private final Snappy snappy = new Snappy();
private boolean started;
@Override
public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (!in.readable()) {
return;
}
if (!started) {
started = true;
out.writeBytes(STREAM_START);
}
final int chunkLength = in.readableBytes();
if (chunkLength > MIN_COMPRESSIBLE_LENGTH) {
// If we have lots of available data, break it up into smaller chunks
int numberOfChunks = 1 + chunkLength / Short.MAX_VALUE;
for (int i = 0; i < numberOfChunks; i++) {
int subChunkLength = Math.min(Short.MAX_VALUE, chunkLength);
out.writeByte(0);
writeChunkLength(out, subChunkLength);
ByteBuf slice = in.slice();
calculateAndWriteChecksum(slice, out);
snappy.encode(slice, out, subChunkLength);
}
} else {
out.writeByte(1);
writeChunkLength(out, chunkLength);
ByteBuf slice = in.slice();
calculateAndWriteChecksum(slice, out);
out.writeBytes(slice);
}
in.readerIndex(in.readerIndex() + chunkLength);
}
/**
* Writes the 2-byte chunk length to the output buffer.
*
* @param out The buffer to write to
* @param chunkLength The length to write
*/
private static void writeChunkLength(ByteBuf out, int chunkLength) {
out.writeByte(chunkLength & 0x0ff);
out.writeByte(chunkLength >> 8 & 0x0ff);
}
/**
* Calculates and writes the 4-byte checksum to the output buffer
*
* @param slice The data to calculate the checksum for
* @param out The output buffer to write the checksum to
*/
private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
int checksum = calculateChecksum(slice);
out.writeByte(checksum & 0x0ff);
out.writeByte(checksum >> 8 & 0x0ff);
out.writeByte(checksum >> 16 & 0x0ff);
out.writeByte(checksum >> 24 & 0x0ff);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Encoder and decoder for the Snappy compression protocol.
*/
package io.netty.handler.codec.compression.snappy;

View File

@ -0,0 +1,54 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import static org.junit.Assert.assertEquals;
import static io.netty.handler.codec.compression.snappy.SnappyChecksumUtil.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.compression.CompressionException;
import org.junit.Test;
public class SnappyChecksumUtilTest {
@Test
public void testCalculateChecksum() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'n', 'e', 't', 't', 'y'
});
assertEquals(maskChecksum(0xddaa8ce6), calculateChecksum(input));
}
@Test
public void testValidateChecksumMatches() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'y', 't', 't', 'e', 'n'
});
validateChecksum(input, maskChecksum(0x37c55159));
}
@Test(expected = CompressionException.class)
public void testValidateChecksumFails() {
ByteBuf input = Unpooled.wrappedBuffer(new byte[] {
'y', 't', 't', 'e', 'n'
});
validateChecksum(input, maskChecksum(0xddaa8ce6));
}
}

View File

@ -0,0 +1,133 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import static org.junit.Assert.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.compression.CompressionException;
import org.junit.Test;
public class SnappyFramedDecoderTest {
private final SnappyFramedDecoder decoder = new SnappyFramedDecoder();
@Test(expected = CompressionException.class)
public void testReservedUnskippableChunkTypeCausesError() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x03, 0x01, 0x00, 0x00
});
decoder.decode(null, in, null);
}
@Test(expected = CompressionException.class)
public void testInvalidStreamIdentifierLength() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
-0x80, 0x05, 0x00, 'n', 'e', 't', 't', 'y'
});
decoder.decode(null, in, null);
}
@Test(expected = CompressionException.class)
public void testInvalidStreamIdentifierValue() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
-0x80, 0x06, 0x00, 's', 'n', 'e', 't', 't', 'y'
});
decoder.decode(null, in, null);
}
@Test(expected = CompressionException.class)
public void testReservedSkippableBeforeStreamIdentifier() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
-0x7f, 0x06, 0x00, 's', 'n', 'e', 't', 't', 'y'
});
decoder.decode(null, in, null);
}
@Test(expected = CompressionException.class)
public void testUncompressedDataBeforeStreamIdentifier() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x01, 0x05, 0x00, 'n', 'e', 't', 't', 'y'
});
decoder.decode(null, in, null);
}
@Test(expected = CompressionException.class)
public void testCompressedDataBeforeStreamIdentifier() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x00, 0x05, 0x00, 'n', 'e', 't', 't', 'y'
});
decoder.decode(null, in, null);
}
@Test
public void testReservedSkippableSkipsInput() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
-0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59,
-0x7f, 0x05, 0x00, 'n', 'e', 't', 't', 'y'
});
ByteBuf out = Unpooled.unmodifiableBuffer(Unpooled.EMPTY_BUFFER);
decoder.decode(null, in, out);
assertEquals(17, in.readerIndex());
}
@Test
public void testUncompressedDataAppendsToOut() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
-0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59,
0x01, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 'n', 'e', 't', 't', 'y'
});
ByteBuf out = Unpooled.buffer(5);
decoder.decode(null, in, out);
byte[] expected = {
'n', 'e', 't', 't', 'y'
};
assertArrayEquals(expected, out.array());
}
@Test
public void testCompressedDataDecodesAndAppendsToOut() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
-0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59,
0x00, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00,
0x05, // preamble length
0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79 // "netty"
});
ByteBuf out = Unpooled.buffer(5);
decoder.decode(null, in, out);
byte[] expected = {
'n', 'e', 't', 't', 'y'
};
assertArrayEquals(expected, out.array());
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import static org.junit.Assert.assertArrayEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
public class SnappyFramedEncoderTest {
private final SnappyFramedEncoder encoder = new SnappyFramedEncoder();
@Test
public void testSmallAmountOfDataIsUncompressed() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
'n', 'e', 't', 't', 'y'
});
ByteBuf out = Unpooled.buffer(21);
encoder.encode(null, in, out);
byte[] expected = {
-0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59,
0x01, 0x05, 0x00, 0x2d, -0x5a, -0x7e, -0x5e, 'n', 'e', 't', 't', 'y'
};
assertArrayEquals(expected, out.array());
}
@Test
public void testLargeAmountOfDataIsCompressed() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
'n', 'e', 't', 't', 'y', 'n', 'e', 't', 't', 'y',
'n', 'e', 't', 't', 'y', 'n', 'e', 't', 't', 'y'
});
ByteBuf out = Unpooled.buffer(26);
encoder.encode(null, in, out);
byte[] expected = {
-0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59,
0x00, 0x14, 0x00, 0x7b, 0x1f, 0x65, 0x64,
0x14, 0x10,
'n', 'e', 't', 't', 'y',
0x3a, 0x05, 0x00
};
assertArrayEquals(expected, out.array());
}
@Test
public void testStreamStartIsOnlyWrittenOnce() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
'n', 'e', 't', 't', 'y'
});
ByteBuf out = Unpooled.buffer(33);
encoder.encode(null, in, out);
in.readerIndex(0); // rewind the buffer to write the same data
encoder.encode(null, in, out);
byte[] expected = {
-0x80, 0x06, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59,
0x01, 0x05, 0x00, 0x2d, -0x5a, -0x7e, -0x5e, 'n', 'e', 't', 't', 'y',
0x01, 0x05, 0x00, 0x2d, -0x5a, -0x7e, -0x5e, 'n', 'e', 't', 't', 'y',
};
assertArrayEquals(expected, out.array());
}
}

View File

@ -0,0 +1,173 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression.snappy;
import static org.junit.Assert.assertArrayEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.compression.CompressionException;
import org.junit.After;
import org.junit.Test;
public class SnappyTest {
private final Snappy snappy = new Snappy();
@After
public void resetSnappy() {
snappy.reset();
}
@Test
public void testDecodeLiteral() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x05, // preamble length
0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79 // "netty"
});
ByteBuf out = Unpooled.buffer(5);
snappy.decode(in, out, 7);
// "netty"
byte[] expected = {
0x6e, 0x65, 0x74, 0x74, 0x79
};
assertArrayEquals("Literal was not decoded correctly", expected, out.array());
}
@Test
public void testDecodeCopyWith1ByteOffset() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x01, // preamble length
0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79, // "netty"
0x05 << 2 | 0x01, // copy with 1-byte offset + length
0x05 // offset
});
ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 9);
// "nettynetty" - we saved a whole byte :)
byte[] expected = {
0x6e, 0x65, 0x74, 0x74, 0x79, 0x6e, 0x65, 0x74, 0x74, 0x79
};
assertArrayEquals("Copy was not decoded correctly", expected, out.array());
}
@Test(expected = CompressionException.class)
public void testDecodeCopyWithTinyOffset() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x01, // preamble length
0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79, // "netty"
0x05 << 2 | 0x01, // copy with 1-byte offset + length
0x03 // INVALID offset (< 4)
});
ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 9);
}
@Test(expected = CompressionException.class)
public void testDecodeCopyWithOffsetBeforeChunk() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x01, // preamble length
0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79, // "netty"
0x05 << 2 | 0x01, // copy with 1-byte offset + length
0x0b // INVALID offset (greater than chunk size)
});
ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 9);
}
@Test(expected = CompressionException.class)
public void testDecodeWithOverlyLongPreamble() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
-0x80, -0x80, -0x80, -0x80, 0x7f, // preamble length
0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79, // "netty"
});
ByteBuf out = Unpooled.buffer(10);
snappy.decode(in, out, 9);
}
@Test
public void encodeShortTextIsLiteral() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(new byte[] {
0x6e, 0x65, 0x74, 0x74, 0x79
});
ByteBuf out = Unpooled.buffer(7);
snappy.encode(in, out, 5);
byte[] expected = {
0x05, // preamble length
0x04 << 2, // literal tag + length
0x6e, 0x65, 0x74, 0x74, 0x79 // "netty"
};
assertArrayEquals("Encoded literal was invalid", expected, out.array());
}
@Test
public void encodeLongTextUsesCopy() throws Exception {
ByteBuf in = Unpooled.wrappedBuffer(
("Netty has been designed carefully with the experiences " +
"earned from the implementation of a lot of protocols " +
"such as FTP, SMTP, HTTP, and various binary and " +
"text-based legacy protocols").getBytes("US-ASCII")
);
ByteBuf out = Unpooled.buffer(180);
snappy.encode(in, out, in.readableBytes());
// The only compressibility in the above are the words "the ",
// and "protocols", so this is a literal, followed by a copy
// followed by another literal, followed by another copy
byte[] expected = {
-0x49, 0x01, // preamble length
-0x10, 0x42, // literal tag + length
// Literal
0x4e, 0x65, 0x74, 0x74, 0x79, 0x20, 0x68, 0x61, 0x73, 0x20,
0x62, 0x65, 0x65, 0x6e, 0x20, 0x64, 0x65, 0x73, 0x69, 0x67,
0x6e, 0x65, 0x64, 0x20, 0x63, 0x61, 0x72, 0x65, 0x66, 0x75,
0x6c, 0x6c, 0x79, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x74,
0x68, 0x65, 0x20, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x65,
0x6e, 0x63, 0x65, 0x73, 0x20, 0x65, 0x61, 0x72, 0x6e, 0x65,
0x64, 0x20, 0x66, 0x72, 0x6f, 0x6d, 0x20,
// First copy (the)
0x01, 0x1C, -0x10,
// Next literal
0x66, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, 0x61,
0x20, 0x6c, 0x6f, 0x74, 0x20, 0x6f, 0x66, 0x20, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x20, 0x73, 0x75,
0x63, 0x68, 0x20, 0x61, 0x73, 0x20, 0x46, 0x54, 0x50, 0x2c,
0x20, 0x53, 0x4d, 0x54, 0x50, 0x2c, 0x20, 0x48, 0x54, 0x54,
0x50, 0x2c, 0x20, 0x61, 0x6e, 0x64, 0x20, 0x76, 0x61, 0x72,
0x69, 0x6f, 0x75, 0x73, 0x20, 0x62, 0x69, 0x6e, 0x61, 0x72,
0x79, 0x20, 0x61, 0x6e, 0x64, 0x20, 0x74, 0x65, 0x78, 0x74,
0x2d, 0x62, 0x61, 0x73, 0x65, 0x64, 0x20, 0x6c, 0x65, 0x67,
0x61, 0x63, 0x79, 0x20,
// Second copy (protocols)
0x15, 0x4c
};
assertArrayEquals("Encoded result was incorrect", expected, out.array());
}
}

View File

@ -0,0 +1,28 @@
Copyright 2011, Google Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -77,6 +77,7 @@
<module>buffer</module>
<module>codec</module>
<module>codec-http</module>
<module>codec-snappy</module>
<module>codec-socks</module>
<module>transport</module>
<module>handler</module>