Implemented a Bzip2Encoder

Motivation:

Bzip2Encoder provides sending data compressed in bzip2 format.

Modifications:

Added classes:
- Bzip2Encoder
- Bzip2BitWriter
- Bzip2BlockCompressor
- Bzip2DivSufSort
- Bzip2HuffmanAllocator
- Bzip2HuffmanStageEncoder
- Bzip2MTFAndRLE2StageEncoder
- Bzip2EncoderTest

Modified classes:
- Bzip2Constants (splited BLOCK_HEADER_MAGIC and END_OF_STREAM_MAGIC)
- Bzip2Decoder (use splited magic numbers)

Added integration tests for Bzip2Encoder/Decoder

Result:

Implemented new encoder which can compress outgoing data in bzip2 format.
This commit is contained in:
Idel Pivnitskiy 2014-07-17 14:36:00 +04:00 committed by Norman Maurer
parent 3c6017a9b1
commit ed7240b597
16 changed files with 3920 additions and 31 deletions

View File

@ -89,6 +89,15 @@ and decompression library written by Matthew J. Francis. It can be obtained at:
* HOMEPAGE: * HOMEPAGE:
* https://code.google.com/p/jbzip2/ * https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/libdivsufsort/
This product optionally depends on 'JZlib', a re-implementation of zlib in This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at: pure Java, which can be obtained at:

View File

@ -0,0 +1,109 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
/**
* A bit writer that allows the writing of single bit booleans, unary numbers, bit strings
* of arbitrary length (up to 24 bits), and bit aligned 32-bit integers. A single byte at a
* time is written to the {@link ByteBuf} when sufficient bits have been accumulated.
*/
final class Bzip2BitWriter {
/**
* A buffer of bits waiting to be written to the output stream.
*/
private int bitBuffer;
/**
* The number of bits currently buffered in {@link #bitBuffer}.
*/
private int bitCount;
/**
* Writes up to 24 bits to the output {@link ByteBuf}.
* @param count The number of bits to write (maximum {@code 24}, because the {@link #bitBuffer}
* is {@code int} and it can store up to {@code 8} bits before calling)
* @param value The bits to write
*/
void writeBits(ByteBuf out, final int count, final int value) {
if (count < 0 || count > 24) {
throw new IllegalArgumentException("count: " + count + " (expected: 0-24)");
}
int bitCount = this.bitCount;
int bitBuffer = this.bitBuffer | (value << (32 - count)) >>> bitCount;
bitCount += count;
while (bitCount >= 8) {
out.writeByte(bitBuffer >>> 24);
bitBuffer <<= 8;
bitCount -= 8;
}
this.bitBuffer = bitBuffer;
this.bitCount = bitCount;
}
/**
* Writes a single bit to the output {@link ByteBuf}.
* @param value The bit to write
*/
void writeBoolean(ByteBuf out, final boolean value) {
int bitCount = this.bitCount + 1;
int bitBuffer = this.bitBuffer | (value ? 1 : 0) << (32 - bitCount);
if (bitCount == 8) {
out.writeByte(bitBuffer >>> 24);
bitBuffer = 0;
bitCount = 0;
}
this.bitBuffer = bitBuffer;
this.bitCount = bitCount;
}
/**
* Writes a zero-terminated unary number to the output {@link ByteBuf}.
* Example of the output for value = 6: {@code 1111110}
* @param value The number of {@code 1} to write
*/
void writeUnary(ByteBuf out, int value) {
if (value < 0) {
throw new IllegalArgumentException("value: " + value + " (expected 0 or more)");
}
while (value-- > 0) {
writeBoolean(out, true);
}
writeBoolean(out, false);
}
/**
* Writes an integer as 32 bits to the output {@link ByteBuf}.
* @param value The integer to write
*/
void writeInt(ByteBuf out, final int value) {
writeBits(out, 16, (value >>> 16) & 0xffff);
writeBits(out, 16, value & 0xffff);
}
/**
* Writes any remaining bits to the output {@link ByteBuf},
* zero padding to a whole byte as required.
*/
void flush(ByteBuf out) {
if (bitCount > 0) {
writeBits(out, 8 - bitCount, 0);
}
}
}

View File

@ -0,0 +1,294 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import static io.netty.handler.codec.compression.Bzip2Constants.*;
/**
* Compresses and writes a single Bzip2 block.<br><br>
*
* Block encoding consists of the following stages:<br>
* 1. Run-Length Encoding[1] - {@link #write(int)}<br>
* 2. Burrows Wheeler Transform - {@link #close(ByteBuf)} (through {@link Bzip2DivSufSort})<br>
* 3. Write block header - {@link #close(ByteBuf)}<br>
* 4. Move To Front Transform - {@link #close(ByteBuf)} (through {@link Bzip2HuffmanStageEncoder})<br>
* 5. Run-Length Encoding[2] - {@link #close(ByteBuf)} (through {@link Bzip2HuffmanStageEncoder})<br>
* 6. Create and write Huffman tables - {@link #close(ByteBuf)} (through {@link Bzip2HuffmanStageEncoder})<br>
* 7. Huffman encode and write data - {@link #close(ByteBuf)} (through {@link Bzip2HuffmanStageEncoder})
*/
final class Bzip2BlockCompressor {
/**
* A writer that provides bit-level writes.
*/
private final Bzip2BitWriter writer;
/**
* CRC builder for the block.
*/
private final Crc32 crc = new Crc32();
/**
* The RLE'd block data.
*/
private final byte[] block;
/**
* Current length of the data within the {@link #block} array.
*/
private int blockLength;
/**
* A limit beyond which new data will not be accepted into the block.
*/
private final int blockLengthLimit;
/**
* The values that are present within the RLE'd block data. For each index, {@code true} if that
* value is present within the data, otherwise {@code false}.
*/
private final boolean[] blockValuesPresent = new boolean[256];
/**
* The Burrows Wheeler Transformed block data.
*/
private final int[] bwtBlock;
/**
* The current RLE value being accumulated (undefined when {@link #rleLength} is 0).
*/
private int rleCurrentValue = -1;
/**
* The repeat count of the current RLE value.
*/
private int rleLength;
/**
* @param writer The {@link Bzip2BitWriter} which provides bit-level writes
* @param blockSize The declared block size in bytes. Up to this many bytes will be accepted
* into the block after Run-Length Encoding is applied
*/
Bzip2BlockCompressor(final Bzip2BitWriter writer, final int blockSize) {
this.writer = writer;
// One extra byte is added to allow for the block wrap applied in close()
block = new byte[blockSize + 1];
bwtBlock = new int[blockSize + 1];
blockLengthLimit = blockSize - 6; // 5 bytes for one RLE run plus one byte - see {@link #write(int)}
}
/**
* Write the Huffman symbol to output byte map.
*/
private void writeSymbolMap(ByteBuf out) {
Bzip2BitWriter writer = this.writer;
final boolean[] blockValuesPresent = this.blockValuesPresent;
final boolean[] condensedInUse = new boolean[16];
for (int i = 0; i < condensedInUse.length; i++) {
for (int j = 0, k = i << 4; j < 16; j++, k++) {
if (blockValuesPresent[k]) {
condensedInUse[i] = true;
}
}
}
for (int i = 0; i < condensedInUse.length; i++) {
writer.writeBoolean(out, condensedInUse[i]);
}
for (int i = 0; i < condensedInUse.length; i++) {
if (condensedInUse[i]) {
for (int j = 0, k = i << 4; j < 16; j++, k++) {
writer.writeBoolean(out, blockValuesPresent[k]);
}
}
}
}
/**
* Writes an RLE run to the block array, updating the block CRC and present values array as required.
* @param value The value to write
* @param runLength The run length of the value to write
*/
private void writeRun(final int value, int runLength) {
final int blockLength = this.blockLength;
final byte[] block = this.block;
blockValuesPresent[value] = true;
crc.updateCRC(value, runLength);
final byte byteValue = (byte) value;
switch (runLength) {
case 1:
block[blockLength] = byteValue;
this.blockLength = blockLength + 1;
break;
case 2:
block[blockLength] = byteValue;
block[blockLength + 1] = byteValue;
this.blockLength = blockLength + 2;
break;
case 3:
block[blockLength] = byteValue;
block[blockLength + 1] = byteValue;
block[blockLength + 2] = byteValue;
this.blockLength = blockLength + 3;
break;
default:
runLength -= 4;
blockValuesPresent[runLength] = true;
block[blockLength] = byteValue;
block[blockLength + 1] = byteValue;
block[blockLength + 2] = byteValue;
block[blockLength + 3] = byteValue;
block[blockLength + 4] = (byte) runLength;
this.blockLength = blockLength + 5;
break;
}
}
/**
* Writes a byte to the block, accumulating to an RLE run where possible.
* @param value The byte to write
* @return {@code true} if the byte was written, or {@code false} if the block is already full
*/
boolean write(final int value) {
if (blockLength > blockLengthLimit) {
return false;
}
final int rleCurrentValue = this.rleCurrentValue;
final int rleLength = this.rleLength;
if (rleLength == 0) {
this.rleCurrentValue = value;
this.rleLength = 1;
} else if (rleCurrentValue != value) {
// This path commits us to write 6 bytes - one RLE run (5 bytes) plus one extra
writeRun(rleCurrentValue & 0xff, rleLength);
this.rleCurrentValue = value;
this.rleLength = 1;
} else {
if (rleLength == 254) {
writeRun(rleCurrentValue & 0xff, 255);
this.rleLength = 0;
} else {
this.rleLength = rleLength + 1;
}
}
return true;
}
/**
* Writes an array to the block.
* @param data The array to write
* @param offset The offset within the input data to write from
* @param length The number of bytes of input data to write
* @return The actual number of input bytes written. May be less than the number requested, or
* zero if the block is already full
*/
int write(final byte[] data, int offset, int length) {
int written = 0;
while (length-- > 0) {
if (!write(data[offset++])) {
break;
}
written++;
}
return written;
}
/**
* Compresses and writes out the block.
*/
void close(ByteBuf out) {
// If an RLE run is in progress, write it out
if (rleLength > 0) {
writeRun(rleCurrentValue & 0xff, rleLength);
}
// Apply a one byte block wrap required by the BWT implementation
block[blockLength] = block[0];
// Perform the Burrows Wheeler Transform
Bzip2DivSufSort divSufSort = new Bzip2DivSufSort(block, bwtBlock, blockLength);
int bwtStartPointer = divSufSort.bwt();
Bzip2BitWriter writer = this.writer;
// Write out the block header
writer.writeBits(out, 24, BLOCK_HEADER_MAGIC_1);
writer.writeBits(out, 24, BLOCK_HEADER_MAGIC_2);
writer.writeInt(out, crc.getCRC());
writer.writeBoolean(out, false); // Randomised block flag. We never create randomised blocks
writer.writeBits(out, 24, bwtStartPointer);
// Write out the symbol map
writeSymbolMap(out);
// Perform the Move To Front Transform and Run-Length Encoding[2] stages
Bzip2MTFAndRLE2StageEncoder mtfEncoder = new Bzip2MTFAndRLE2StageEncoder(bwtBlock, blockLength,
blockValuesPresent);
mtfEncoder.encode();
// Perform the Huffman Encoding stage and write out the encoded data
Bzip2HuffmanStageEncoder huffmanEncoder = new Bzip2HuffmanStageEncoder(writer,
mtfEncoder.mtfBlock(),
mtfEncoder.mtfLength(),
mtfEncoder.mtfAlphabetSize(),
mtfEncoder.mtfSymbolFrequencies());
huffmanEncoder.encode(out);
}
/**
* Gets available size of the current block.
* @return Number of available bytes which can be written
*/
int availableSize() {
if (blockLength == 0) {
return blockLengthLimit + 2;
}
return blockLengthLimit - blockLength + 1;
}
/**
* Determines if the block is full and ready for compression.
* @return {@code true} if the block is full, otherwise {@code false}
*/
boolean isFull() {
return blockLength > blockLengthLimit;
}
/**
* Determines if any bytes have been written to the block.
* @return {@code true} if one or more bytes has been written to the block, otherwise {@code false}
*/
boolean isEmpty() {
return blockLength == 0 && rleLength == 0;
}
/**
* Gets the CRC of the completed block. Only valid after calling {@link #close(ByteBuf)}.
* @return The block's CRC
*/
int crc() {
return crc.getCRC();
}
}

View File

@ -16,7 +16,7 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
/** /**
* Constants for {@link Bzip2Decoder}. * Constants for both the {@link Bzip2Encoder} and the {@link Bzip2Decoder}.
*/ */
final class Bzip2Constants { final class Bzip2Constants {
@ -28,12 +28,14 @@ final class Bzip2Constants {
/** /**
* Block header magic number. Equals to BCD (pi). * Block header magic number. Equals to BCD (pi).
*/ */
static final long COMPRESSED_MAGIC = 0x314159265359L; static final int BLOCK_HEADER_MAGIC_1 = 0x314159;
static final int BLOCK_HEADER_MAGIC_2 = 0x265359;
/** /**
* End of stream magic number. Equals to BCD sqrt(pi). * End of stream magic number. Equals to BCD sqrt(pi).
*/ */
static final long END_OF_STREAM_MAGIC = 0x177245385090L; static final int END_OF_STREAM_MAGIC_1 = 0x177245;
static final int END_OF_STREAM_MAGIC_2 = 0x385090;
/** /**
* Base block size. * Base block size.

View File

@ -108,8 +108,9 @@ public class Bzip2Decoder extends ByteToMessageDecoder {
} }
Bzip2BitReader reader = this.reader; Bzip2BitReader reader = this.reader;
// Get the block magic bytes. // Get the block magic bytes.
final long magic = (long) reader.readBits(in, 24) << 24 | reader.readBits(in, 24); final int magic1 = reader.readBits(in, 24);
if (magic == END_OF_STREAM_MAGIC) { final int magic2 = reader.readBits(in, 24);
if (magic1 == END_OF_STREAM_MAGIC_1 && magic2 == END_OF_STREAM_MAGIC_2) {
// End of stream was reached. Check the combined CRC. // End of stream was reached. Check the combined CRC.
final int storedCombinedCRC = reader.readInt(in); final int storedCombinedCRC = reader.readInt(in);
if (storedCombinedCRC != streamCRC) { if (storedCombinedCRC != streamCRC) {
@ -118,7 +119,7 @@ public class Bzip2Decoder extends ByteToMessageDecoder {
currentState = State.EOF; currentState = State.EOF;
break; break;
} }
if (magic != COMPRESSED_MAGIC) { if (magic1 != BLOCK_HEADER_MAGIC_1 || magic2 != BLOCK_HEADER_MAGIC_2) {
throw new DecompressionException("bad block header"); throw new DecompressionException("bad block header");
} }
blockCRC = reader.readInt(in); blockCRC = reader.readInt(in);

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,263 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.compression.Bzip2Constants.*;
/**
* Compresses a {@link ByteBuf} using the Bzip2 algorithm.
*
* See <a href="http://en.wikipedia.org/wiki/Bzip2">Bzip2</a>.
*/
public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
/**
* Current state of stream.
*/
private enum State {
INIT,
INIT_BLOCK,
WRITE_DATA,
CLOSE_BLOCK,
EOF
}
private State currentState = State.INIT;
/**
* A writer that provides bit-level writes.
*/
private final Bzip2BitWriter writer = new Bzip2BitWriter();
/**
* The declared maximum block size of the stream (before final run-length decoding).
*/
private final int streamBlockSize;
/**
* The merged CRC of all blocks compressed so far.
*/
private int streamCRC;
/**
* The compressor for the current block.
*/
private Bzip2BlockCompressor blockCompressor;
/**
* (@code true} if the compressed stream has been finished, otherwise {@code false}.
*/
private volatile boolean finished;
/**
* Used to interact with its {@link ChannelPipeline} and other handlers.
*/
private volatile ChannelHandlerContext ctx;
/**
* Creates a new bzip2 encoder with the maximum (900,000 byte) block size.
*/
public Bzip2Encoder() {
this(MAX_BLOCK_SIZE);
}
/**
* Creates a new bzip2 encoder with the specified {@code blockSizeMultiplier}.
* @param blockSizeMultiplier
* The Bzip2 block size as a multiple of 100,000 bytes (minimum {@code 1}, maximum {@code 9}).
* Larger block sizes require more memory for both compression and decompression,
* but give better compression ratios. {@code 9} will usually be the best value to use.
*/
public Bzip2Encoder(final int blockSizeMultiplier) {
if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
throw new IllegalArgumentException(
"blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
}
streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (finished) {
out.writeBytes(in);
return;
}
for (;;) {
switch (currentState) {
case INIT:
out.ensureWritable(4);
out.writeMedium(MAGIC_NUMBER);
out.writeByte('0' + streamBlockSize / BASE_BLOCK_SIZE);
currentState = State.INIT_BLOCK;
case INIT_BLOCK:
blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
currentState = State.WRITE_DATA;
case WRITE_DATA:
if (!in.isReadable()) {
return;
}
Bzip2BlockCompressor blockCompressor = this.blockCompressor;
final int length = in.readableBytes() < blockCompressor.availableSize() ?
in.readableBytes() : blockCompressor.availableSize();
final int offset;
final byte[] array;
if (in.hasArray()) {
array = in.array();
offset = in.arrayOffset() + in.readerIndex();
} else {
array = new byte[length];
in.getBytes(in.readerIndex(), array);
offset = 0;
}
final int bytesWritten = blockCompressor.write(array, offset, length);
in.skipBytes(bytesWritten);
if (!blockCompressor.isFull()) {
if (in.isReadable()) {
break;
} else {
return;
}
}
currentState = State.CLOSE_BLOCK;
case CLOSE_BLOCK:
closeBlock(out);
currentState = State.INIT_BLOCK;
break;
default:
throw new IllegalStateException();
}
}
}
/**
* Close current block and update {@link #streamCRC}.
*/
private void closeBlock(ByteBuf out) {
final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
if (!blockCompressor.isEmpty()) {
blockCompressor.close(out);
final int blockCRC = blockCompressor.crc();
streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
}
}
/**
* Returns {@code true} if and only if the end of the compressed stream has been reached.
*/
public boolean isClosed() {
return finished;
}
/**
* Close this {@link Bzip2Encoder} and so finish the encoding.
*
* The returned {@link ChannelFuture} will be notified once the operation completes.
*/
public ChannelFuture close() {
return close(ctx().newPromise());
}
/**
* Close this {@link Bzip2Encoder} and so finish the encoding.
* The given {@link ChannelFuture} will be notified once the operation
* completes and will also be returned.
*/
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return promise;
}
}
@Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.close(promise);
}
});
if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
}
}, 10, TimeUnit.SECONDS); // FIXME: Magic number
}
}
private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
if (finished) {
promise.setSuccess();
return promise;
}
finished = true;
final ByteBuf footer = ctx.alloc().buffer();
closeBlock(footer);
final int streamCRC = this.streamCRC;
final Bzip2BitWriter writer = this.writer;
try {
writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
writer.writeInt(footer, streamCRC);
writer.flush(footer);
} finally {
blockCompressor = null;
}
return ctx.writeAndFlush(footer, promise);
}
private ChannelHandlerContext ctx() {
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
throw new IllegalStateException("not added to a pipeline");
}
return ctx;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
}

View File

@ -0,0 +1,183 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
/**
* An in-place, length restricted Canonical Huffman code length allocator.<br>
* Based on the algorithm proposed by R. L. Milidi'u, A. A. Pessoa and E. S. Laber in
* <a href="http://www-di.inf.puc-rio.br/~laber/public/spire98.ps">In-place Length-Restricted Prefix Coding</a>
* and incorporating additional ideas from the implementation of
* <a href="http://entropyware.info/shcodec/index.html">shcodec</a> by Simakov Alexander.
*/
final class Bzip2HuffmanAllocator {
/**
* @param array The code length array
* @param i The input position
* @param nodesToMove The number of internal nodes to be relocated
* @return The smallest {@code k} such that {@code nodesToMove <= k <= i} and
* {@code i <= (array[k] % array.length)}
*/
private static int first(final int[] array, int i, final int nodesToMove) {
final int length = array.length;
final int limit = i;
int k = array.length - 2;
while (i >= nodesToMove && array[i] % length > limit) {
k = i;
i -= limit - i + 1;
}
i = Math.max(nodesToMove - 1, i);
while (k > i + 1) {
int temp = i + k >> 1;
if (array[temp] % length > limit) {
k = temp;
} else {
i = temp;
}
}
return k;
}
/**
* Fills the code array with extended parent pointers.
* @param array The code length array
*/
private static void setExtendedParentPointers(final int[] array) {
final int length = array.length;
array[0] += array[1];
for (int headNode = 0, tailNode = 1, topNode = 2; tailNode < length - 1; tailNode++) {
int temp;
if (topNode >= length || array[headNode] < array[topNode]) {
temp = array[headNode];
array[headNode++] = tailNode;
} else {
temp = array[topNode++];
}
if (topNode >= length || (headNode < tailNode && array[headNode] < array[topNode])) {
temp += array[headNode];
array[headNode++] = tailNode + length;
} else {
temp += array[topNode++];
}
array[tailNode] = temp;
}
}
/**
* Finds the number of nodes to relocate in order to achieve a given code length limit.
* @param array The code length array
* @param maximumLength The maximum bit length for the generated codes
* @return The number of nodes to relocate
*/
private static int findNodesToRelocate(final int[] array, final int maximumLength) {
int currentNode = array.length - 2;
for (int currentDepth = 1; currentDepth < maximumLength - 1 && currentNode > 1; currentDepth++) {
currentNode = first(array, currentNode - 1, 0);
}
return currentNode;
}
/**
* A final allocation pass with no code length limit.
* @param array The code length array
*/
private static void allocateNodeLengths(final int[] array) {
int firstNode = array.length - 2;
int nextNode = array.length - 1;
for (int currentDepth = 1, availableNodes = 2; availableNodes > 0; currentDepth++) {
final int lastNode = firstNode;
firstNode = first(array, lastNode - 1, 0);
for (int i = availableNodes - (lastNode - firstNode); i > 0; i--) {
array[nextNode--] = currentDepth;
}
availableNodes = (lastNode - firstNode) << 1;
}
}
/**
* A final allocation pass that relocates nodes in order to achieve a maximum code length limit.
* @param array The code length array
* @param nodesToMove The number of internal nodes to be relocated
* @param insertDepth The depth at which to insert relocated nodes
*/
private static void allocateNodeLengthsWithRelocation(final int[] array,
final int nodesToMove, final int insertDepth) {
int firstNode = array.length - 2;
int nextNode = array.length - 1;
int currentDepth = insertDepth == 1 ? 2 : 1;
int nodesLeftToMove = insertDepth == 1 ? nodesToMove - 2 : nodesToMove;
for (int availableNodes = currentDepth << 1; availableNodes > 0; currentDepth++) {
final int lastNode = firstNode;
firstNode = firstNode <= nodesToMove ? firstNode : first(array, lastNode - 1, nodesToMove);
int offset = 0;
if (currentDepth >= insertDepth) {
offset = Math.min(nodesLeftToMove, 1 << (currentDepth - insertDepth));
} else if (currentDepth == insertDepth - 1) {
offset = 1;
if (array[firstNode] == lastNode) {
firstNode++;
}
}
for (int i = availableNodes - (lastNode - firstNode + offset); i > 0; i--) {
array[nextNode--] = currentDepth;
}
nodesLeftToMove -= offset;
availableNodes = (lastNode - firstNode + offset) << 1;
}
}
/**
* Allocates Canonical Huffman code lengths in place based on a sorted frequency array.
* @param array On input, a sorted array of symbol frequencies; On output, an array of Canonical
* Huffman code lengths
* @param maximumLength The maximum code length. Must be at least {@code ceil(log2(array.length))}
*/
static void allocateHuffmanCodeLengths(final int[] array, final int maximumLength) {
switch (array.length) {
case 2:
array[1] = 1;
case 1:
array[0] = 1;
return;
}
/* Pass 1 : Set extended parent pointers */
setExtendedParentPointers(array);
/* Pass 2 : Find number of nodes to relocate in order to achieve maximum code length */
int nodesToRelocate = findNodesToRelocate(array, maximumLength);
/* Pass 3 : Generate code lengths */
if (array[0] % array.length >= nodesToRelocate) {
allocateNodeLengths(array);
} else {
int insertDepth = maximumLength - (32 - Integer.numberOfLeadingZeros(nodesToRelocate - 1));
allocateNodeLengthsWithRelocation(array, nodesToRelocate, insertDepth);
}
}
private Bzip2HuffmanAllocator() { }
}

View File

@ -0,0 +1,374 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import java.util.Arrays;
import static io.netty.handler.codec.compression.Bzip2Constants.*;
/**
* An encoder for the Bzip2 Huffman encoding stage.
*/
final class Bzip2HuffmanStageEncoder {
/**
* Used in initial Huffman table generation.
*/
private static final int HUFFMAN_HIGH_SYMBOL_COST = 15;
/**
* The {@link Bzip2BitWriter} to which the Huffman tables and data is written.
*/
private final Bzip2BitWriter writer;
/**
* The output of the Move To Front Transform and Run Length Encoding[2] stages.
*/
private final char[] mtfBlock;
/**
* The actual number of values contained in the {@link #mtfBlock} array.
*/
private final int mtfLength;
/**
* The number of unique values in the {@link #mtfBlock} array.
*/
private final int mtfAlphabetSize;
/**
* The global frequencies of values within the {@link #mtfBlock} array.
*/
private final int[] mtfSymbolFrequencies;
/**
* The Canonical Huffman code lengths for each table.
*/
private final int[][] huffmanCodeLengths;
/**
* Merged code symbols for each table. The value at each position is ((code length << 24) | code).
*/
private final int[][] huffmanMergedCodeSymbols;
/**
* The selectors for each segment.
*/
private final byte[] selectors;
/**
* @param writer The {@link Bzip2BitWriter} which provides bit-level writes
* @param mtfBlock The MTF block data
* @param mtfLength The actual length of the MTF block
* @param mtfAlphabetSize The size of the MTF block's alphabet
* @param mtfSymbolFrequencies The frequencies the MTF block's symbols
*/
Bzip2HuffmanStageEncoder(final Bzip2BitWriter writer, final char[] mtfBlock,
final int mtfLength, final int mtfAlphabetSize, final int[] mtfSymbolFrequencies) {
this.writer = writer;
this.mtfBlock = mtfBlock;
this.mtfLength = mtfLength;
this.mtfAlphabetSize = mtfAlphabetSize;
this.mtfSymbolFrequencies = mtfSymbolFrequencies;
final int totalTables = selectTableCount(mtfLength);
huffmanCodeLengths = new int[totalTables][mtfAlphabetSize];
huffmanMergedCodeSymbols = new int[totalTables][mtfAlphabetSize];
selectors = new byte[(mtfLength + HUFFMAN_GROUP_RUN_LENGTH - 1) / HUFFMAN_GROUP_RUN_LENGTH];
}
/**
* Selects an appropriate table count for a given MTF length.
* @param mtfLength The length to select a table count for
* @return The selected table count
*/
private static int selectTableCount(final int mtfLength) {
if (mtfLength >= 2400) {
return 6;
}
if (mtfLength >= 1200) {
return 5;
}
if (mtfLength >= 600) {
return 4;
}
if (mtfLength >= 200) {
return 3;
}
return 2;
}
/**
* Generate a Huffman code length table for a given list of symbol frequencies.
* @param alphabetSize The total number of symbols
* @param symbolFrequencies The frequencies of the symbols
* @param codeLengths The array to which the generated code lengths should be written
*/
private static void generateHuffmanCodeLengths(final int alphabetSize,
final int[] symbolFrequencies, final int[] codeLengths) {
final int[] mergedFrequenciesAndIndices = new int[alphabetSize];
final int[] sortedFrequencies = new int[alphabetSize];
// The Huffman allocator needs its input symbol frequencies to be sorted, but we need to
// return code lengths in the same order as the corresponding frequencies are passed in.
// The symbol frequency and index are merged into a single array of
// integers - frequency in the high 23 bits, index in the low 9 bits.
// 2^23 = 8,388,608 which is higher than the maximum possible frequency for one symbol in a block
// 2^9 = 512 which is higher than the maximum possible alphabet size (== 258)
// Sorting this array simultaneously sorts the frequencies and
// leaves a lookup that can be used to cheaply invert the sort.
for (int i = 0; i < alphabetSize; i++) {
mergedFrequenciesAndIndices[i] = (symbolFrequencies[i] << 9) | i;
}
Arrays.sort(mergedFrequenciesAndIndices);
for (int i = 0; i < alphabetSize; i++) {
sortedFrequencies[i] = mergedFrequenciesAndIndices[i] >>> 9;
}
// Allocate code lengths - the allocation is in place,
// so the code lengths will be in the sortedFrequencies array afterwards
Bzip2HuffmanAllocator.allocateHuffmanCodeLengths(sortedFrequencies, HUFFMAN_ENCODE_MAX_CODE_LENGTH);
// Reverse the sort to place the code lengths in the same order as the symbols whose frequencies were passed in
for (int i = 0; i < alphabetSize; i++) {
codeLengths[mergedFrequenciesAndIndices[i] & 0x1ff] = sortedFrequencies[i];
}
}
/**
* Generate initial Huffman code length tables, giving each table a different low cost section
* of the alphabet that is roughly equal in overall cumulative frequency. Note that the initial
* tables are invalid for actual Huffman code generation, and only serve as the seed for later
* iterative optimisation in {@link #optimiseSelectorsAndHuffmanTables(boolean)}.
*/
private void generateHuffmanOptimisationSeeds() {
final int[][] huffmanCodeLengths = this.huffmanCodeLengths;
final int[] mtfSymbolFrequencies = this.mtfSymbolFrequencies;
final int mtfAlphabetSize = this.mtfAlphabetSize;
final int totalTables = huffmanCodeLengths.length;
int remainingLength = mtfLength;
int lowCostEnd = -1;
for (int i = 0; i < totalTables; i++) {
final int targetCumulativeFrequency = remainingLength / (totalTables - i);
final int lowCostStart = lowCostEnd + 1;
int actualCumulativeFrequency = 0;
while (actualCumulativeFrequency < targetCumulativeFrequency && lowCostEnd < mtfAlphabetSize - 1) {
actualCumulativeFrequency += mtfSymbolFrequencies[++lowCostEnd];
}
if (lowCostEnd > lowCostStart && i != 0 && i != totalTables - 1 && (totalTables - i & 1) == 0) {
actualCumulativeFrequency -= mtfSymbolFrequencies[lowCostEnd--];
}
final int[] tableCodeLengths = huffmanCodeLengths[i];
for (int j = 0; j < mtfAlphabetSize; j++) {
if (j < lowCostStart || j > lowCostEnd) {
tableCodeLengths[j] = HUFFMAN_HIGH_SYMBOL_COST;
}
}
remainingLength -= actualCumulativeFrequency;
}
}
/**
* Co-optimise the selector list and the alternative Huffman table code lengths. This method is
* called repeatedly in the hope that the total encoded size of the selectors, the Huffman code
* lengths and the block data encoded with them will converge towards a minimum.<br>
* If the data is highly incompressible, it is possible that the total encoded size will
* instead diverge (increase) slightly.<br>
* @param storeSelectors If {@code true}, write out the (final) chosen selectors
*/
private void optimiseSelectorsAndHuffmanTables(final boolean storeSelectors) {
final char[] mtfBlock = this.mtfBlock;
final byte[] selectors = this.selectors;
final int[][] huffmanCodeLengths = this.huffmanCodeLengths;
final int mtfLength = this.mtfLength;
final int mtfAlphabetSize = this.mtfAlphabetSize;
final int totalTables = huffmanCodeLengths.length;
final int[][] tableFrequencies = new int[totalTables][mtfAlphabetSize];
int selectorIndex = 0;
// Find the best table for each group of 50 block bytes based on the current Huffman code lengths
for (int groupStart = 0; groupStart < mtfLength;) {
final int groupEnd = Math.min(groupStart + HUFFMAN_GROUP_RUN_LENGTH, mtfLength) - 1;
// Calculate the cost of this group when encoded by each table
short[] cost = new short[totalTables];
for (int i = groupStart; i <= groupEnd; i++) {
final int value = mtfBlock[i];
for (int j = 0; j < totalTables; j++) {
cost[j] += huffmanCodeLengths[j][value];
}
}
// Find the table with the least cost for this group
byte bestTable = 0;
int bestCost = cost[0];
for (byte i = 1 ; i < totalTables; i++) {
final int tableCost = cost[i];
if (tableCost < bestCost) {
bestCost = tableCost;
bestTable = i;
}
}
// Accumulate symbol frequencies for the table chosen for this block
final int[] bestGroupFrequencies = tableFrequencies[bestTable];
for (int i = groupStart; i <= groupEnd; i++) {
bestGroupFrequencies[mtfBlock[i]]++;
}
// Store a selector indicating the table chosen for this block
if (storeSelectors) {
selectors[selectorIndex++] = bestTable;
}
groupStart = groupEnd + 1;
}
// Generate new Huffman code lengths based on the frequencies for each table accumulated in this iteration
for (int i = 0; i < totalTables; i++) {
generateHuffmanCodeLengths(mtfAlphabetSize, tableFrequencies[i], huffmanCodeLengths[i]);
}
}
/**
* Assigns Canonical Huffman codes based on the calculated lengths.
*/
private void assignHuffmanCodeSymbols() {
final int[][] huffmanMergedCodeSymbols = this.huffmanMergedCodeSymbols;
final int[][] huffmanCodeLengths = this.huffmanCodeLengths;
final int mtfAlphabetSize = this.mtfAlphabetSize;
final int totalTables = huffmanCodeLengths.length;
for (int i = 0; i < totalTables; i++) {
final int[] tableLengths = huffmanCodeLengths[i];
int minimumLength = 32;
int maximumLength = 0;
for (int j = 0; j < mtfAlphabetSize; j++) {
final int length = tableLengths[j];
if (length > maximumLength) {
maximumLength = length;
}
if (length < minimumLength) {
minimumLength = length;
}
}
int code = 0;
for (int j = minimumLength; j <= maximumLength; j++) {
for (int k = 0; k < mtfAlphabetSize; k++) {
if ((huffmanCodeLengths[i][k] & 0xff) == j) {
huffmanMergedCodeSymbols[i][k] = (j << 24) | code;
code++;
}
}
code <<= 1;
}
}
}
/**
* Write out the selector list and Huffman tables.
*/
private void writeSelectorsAndHuffmanTables(ByteBuf out) {
final Bzip2BitWriter writer = this.writer;
final byte[] selectors = this.selectors;
final int totalSelectors = selectors.length;
final int[][] huffmanCodeLengths = this.huffmanCodeLengths;
final int totalTables = huffmanCodeLengths.length;
final int mtfAlphabetSize = this.mtfAlphabetSize;
writer.writeBits(out, 3, totalTables);
writer.writeBits(out, 15, totalSelectors);
// Write the selectors
Bzip2MoveToFrontTable selectorMTF = new Bzip2MoveToFrontTable();
for (byte selector : selectors) {
writer.writeUnary(out, selectorMTF.valueToFront(selector));
}
// Write the Huffman tables
for (final int[] tableLengths : huffmanCodeLengths) {
int currentLength = tableLengths[0];
writer.writeBits(out, 5, currentLength);
for (int j = 0; j < mtfAlphabetSize; j++) {
final int codeLength = tableLengths[j];
final int value = currentLength < codeLength ? 2 : 3;
int delta = Math.abs(codeLength - currentLength);
while (delta-- > 0) {
writer.writeBits(out, 2, value);
}
writer.writeBoolean(out, false);
currentLength = codeLength;
}
}
}
/**
* Writes out the encoded block data.
*/
private void writeBlockData(ByteBuf out) {
final Bzip2BitWriter writer = this.writer;
final int[][] huffmanMergedCodeSymbols = this.huffmanMergedCodeSymbols;
final byte[] selectors = this.selectors;
final char[] mtf = mtfBlock;
final int mtfLength = this.mtfLength;
int selectorIndex = 0;
for (int mtfIndex = 0; mtfIndex < mtfLength;) {
final int groupEnd = Math.min(mtfIndex + HUFFMAN_GROUP_RUN_LENGTH, mtfLength) - 1;
final int[] tableMergedCodeSymbols = huffmanMergedCodeSymbols[selectors[selectorIndex++]];
while (mtfIndex <= groupEnd) {
final int mergedCodeSymbol = tableMergedCodeSymbols[mtf[mtfIndex++]];
writer.writeBits(out, mergedCodeSymbol >>> 24, mergedCodeSymbol);
}
}
}
/**
* Encodes and writes the block data.
*/
void encode(ByteBuf out) {
// Create optimised selector list and Huffman tables
generateHuffmanOptimisationSeeds();
for (int i = 3; i >= 0; i--) {
optimiseSelectorsAndHuffmanTables(i == 0);
}
assignHuffmanCodeSymbols();
// Write out the tables and the block data encoded with them
writeSelectorsAndHuffmanTables(out);
writeBlockData(out);
}
}

View File

@ -0,0 +1,183 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import static io.netty.handler.codec.compression.Bzip2Constants.*;
/**
* An encoder for the Bzip2 Move To Front Transform and Run-Length Encoding[2] stages.<br>
* Although conceptually these two stages are separate, it is computationally efficient to perform
* them in one pass.
*/
final class Bzip2MTFAndRLE2StageEncoder {
/**
* The Burrows-Wheeler transformed block.
*/
private final int[] bwtBlock;
/**
* Actual length of the data in the {@link #bwtBlock} array.
*/
private final int bwtLength;
/**
* At each position, {@code true} if the byte value with that index is present within the block,
* otherwise {@code false}.
*/
private final boolean[] bwtValuesPresent;
/**
* The output of the Move To Front Transform and Run-Length Encoding[2] stages.
*/
private final char[] mtfBlock;
/**
* The actual number of values contained in the {@link #mtfBlock} array.
*/
private int mtfLength;
/**
* The global frequencies of values within the {@link #mtfBlock} array.
*/
private final int[] mtfSymbolFrequencies = new int[HUFFMAN_MAX_ALPHABET_SIZE];
/**
* The encoded alphabet size.
*/
private int alphabetSize;
/**
* @param bwtBlock The Burrows Wheeler Transformed block data
* @param bwtLength The actual length of the BWT data
* @param bwtValuesPresent The values that are present within the BWT data. For each index,
* {@code true} if that value is present within the data, otherwise {@code false}
*/
Bzip2MTFAndRLE2StageEncoder(final int[] bwtBlock, final int bwtLength, final boolean[] bwtValuesPresent) {
this.bwtBlock = bwtBlock;
this.bwtLength = bwtLength;
this.bwtValuesPresent = bwtValuesPresent;
mtfBlock = new char[bwtLength + 1];
}
/**
* Performs the Move To Front transform and Run Length Encoding[1] stages.
*/
void encode() {
final int bwtLength = this.bwtLength;
final boolean[] bwtValuesPresent = this.bwtValuesPresent;
final int[] bwtBlock = this.bwtBlock;
final char[] mtfBlock = this.mtfBlock;
final int[] mtfSymbolFrequencies = this.mtfSymbolFrequencies;
final byte[] huffmanSymbolMap = new byte[256];
final Bzip2MoveToFrontTable symbolMTF = new Bzip2MoveToFrontTable();
int totalUniqueValues = 0;
for (int i = 0; i < huffmanSymbolMap.length; i++) {
if (bwtValuesPresent[i]) {
huffmanSymbolMap[i] = (byte) totalUniqueValues++;
}
}
final int endOfBlockSymbol = totalUniqueValues + 1;
int mtfIndex = 0;
int repeatCount = 0;
int totalRunAs = 0;
int totalRunBs = 0;
for (int i = 0; i < bwtLength; i++) {
// Move To Front
final int mtfPosition = symbolMTF.valueToFront(huffmanSymbolMap[bwtBlock[i] & 0xff]);
// Run Length Encode
if (mtfPosition == 0) {
repeatCount++;
} else {
if (repeatCount > 0) {
repeatCount--;
while (true) {
if ((repeatCount & 1) == 0) {
mtfBlock[mtfIndex++] = HUFFMAN_SYMBOL_RUNA;
totalRunAs++;
} else {
mtfBlock[mtfIndex++] = HUFFMAN_SYMBOL_RUNB;
totalRunBs++;
}
if (repeatCount <= 1) {
break;
}
repeatCount = (repeatCount - 2) >>> 1;
}
repeatCount = 0;
}
mtfBlock[mtfIndex++] = (char) (mtfPosition + 1);
mtfSymbolFrequencies[mtfPosition + 1]++;
}
}
if (repeatCount > 0) {
repeatCount--;
while (true) {
if ((repeatCount & 1) == 0) {
mtfBlock[mtfIndex++] = HUFFMAN_SYMBOL_RUNA;
totalRunAs++;
} else {
mtfBlock[mtfIndex++] = HUFFMAN_SYMBOL_RUNB;
totalRunBs++;
}
if (repeatCount <= 1) {
break;
}
repeatCount = (repeatCount - 2) >>> 1;
}
}
mtfBlock[mtfIndex] = (char) endOfBlockSymbol;
mtfSymbolFrequencies[endOfBlockSymbol]++;
mtfSymbolFrequencies[HUFFMAN_SYMBOL_RUNA] += totalRunAs;
mtfSymbolFrequencies[HUFFMAN_SYMBOL_RUNB] += totalRunBs;
mtfLength = mtfIndex + 1;
alphabetSize = endOfBlockSymbol + 1;
}
/**
* @return The encoded MTF block
*/
char[] mtfBlock() {
return mtfBlock;
}
/**
* @return The actual length of the MTF block
*/
int mtfLength() {
return mtfLength;
}
/**
* @return The size of the MTF block's alphabet
*/
int mtfAlphabetSize() {
return alphabetSize;
}
/**
* @return The frequencies of the MTF block's symbols
*/
int[] mtfSymbolFrequencies() {
return mtfSymbolFrequencies;
}
}

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.compression;
/** /**
* A 256 entry Move To Front transform. * A 256 entry Move To Front transform.
*/ */
class Bzip2MoveToFrontTable { final class Bzip2MoveToFrontTable {
/** /**
* The Move To Front list. * The Move To Front list.
*/ */

View File

@ -21,4 +21,4 @@
* <a href="http://code.google.com/p/snappy/">Snappy</a>. * <a href="http://code.google.com/p/snappy/">Snappy</a>.
*/ */
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
// TODO Implement bzip2 and lzma handlers // TODO Implement lzma handler

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.ThreadLocalRandom;
@ -84,9 +85,9 @@ public class Bzip2DecoderTest {
ByteBuf in = Unpooled.buffer(); ByteBuf in = Unpooled.buffer();
in.writeMedium(MAGIC_NUMBER); in.writeMedium(MAGIC_NUMBER);
in.writeByte('1'); //block size in.writeByte('1'); //block size
in.writeInt(11111); //random value in.writeMedium(11); //incorrect block header
in.writeShort(111); //random value in.writeMedium(11); //incorrect block header
in.writeInt(111); //block CRC in.writeInt(11111); //block CRC
channel.writeInbound(in); channel.writeInbound(in);
} }
@ -99,8 +100,8 @@ public class Bzip2DecoderTest {
ByteBuf in = Unpooled.buffer(); ByteBuf in = Unpooled.buffer();
in.writeMedium(MAGIC_NUMBER); in.writeMedium(MAGIC_NUMBER);
in.writeByte('1'); //block size in.writeByte('1'); //block size
in.writeInt((int) (END_OF_STREAM_MAGIC >> 16)); in.writeMedium(END_OF_STREAM_MAGIC_1);
in.writeShort((int) END_OF_STREAM_MAGIC); in.writeMedium(END_OF_STREAM_MAGIC_2);
in.writeInt(1); //wrong storedCombinedCRC in.writeInt(1); //wrong storedCombinedCRC
channel.writeInbound(in); channel.writeInbound(in);
@ -181,6 +182,22 @@ public class Bzip2DecoderTest {
channel.writeInbound(in); channel.writeInbound(in);
} }
@Test
public void testStartPointerInvalid() throws Exception {
expected.expect(DecompressionException.class);
expected.expectMessage("start pointer invalid");
final byte[] data = { 0x42, 0x5A, 0x68, 0x37, 0x31, 0x41, 0x59, 0x26, 0x53,
0x59, 0x77, 0x7B, (byte) 0xCA, (byte) 0xC0, (byte) 0xFF, 0x00,
0x00, 0x05, (byte) 0x80, 0x00, 0x01, 0x02, 0x00, 0x04,
0x20, 0x20, 0x00, 0x30, (byte) 0xCD, 0x34, 0x19, (byte) 0xA6,
(byte) 0x89, (byte) 0x99, (byte) 0xC5, (byte) 0xDC, (byte) 0x91,
0x4E, 0x14, 0x24, 0x1D, (byte) 0xDE, (byte) 0xF2, (byte) 0xB0, 0x00 };
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeInbound(in);
}
private static void testDecompression(final byte[] data) throws Exception { private static void testDecompression(final byte[] data) throws Exception {
for (int blockSize = MIN_BLOCK_SIZE; blockSize <= MAX_BLOCK_SIZE; blockSize++) { for (int blockSize = MIN_BLOCK_SIZE; blockSize <= MAX_BLOCK_SIZE; blockSize++) {
final EmbeddedChannel channel = new EmbeddedChannel(new Bzip2Decoder()); final EmbeddedChannel channel = new EmbeddedChannel(new Bzip2Decoder());
@ -193,17 +210,13 @@ public class Bzip2DecoderTest {
ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray()); ByteBuf compressed = Unpooled.wrappedBuffer(os.toByteArray());
channel.writeInbound(compressed); channel.writeInbound(compressed);
ByteBuf uncompressed = Unpooled.buffer(); ByteBuf uncompressed = readUncompressed(channel);
ByteBuf msg; ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
while ((msg = channel.readInbound()) != null) {
uncompressed.writeBytes(msg);
msg.release();
}
final byte[] result = new byte[uncompressed.readableBytes()];
uncompressed.readBytes(result);
uncompressed.release();
assertArrayEquals(data, result); assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
} }
} }
@ -219,10 +232,12 @@ public class Bzip2DecoderTest {
@Test @Test
public void testDecompressionOfBatchedFlowOfData() throws Exception { public void testDecompressionOfBatchedFlowOfData() throws Exception {
final byte[] data = BYTES_LARGE;
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os, BZip2CompressorOutputStream bZip2Os = new BZip2CompressorOutputStream(os,
rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1)); rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1));
bZip2Os.write(BYTES_LARGE); bZip2Os.write(data);
bZip2Os.close(); bZip2Os.close();
final byte[] compressedArray = os.toByteArray(); final byte[] compressedArray = os.toByteArray();
@ -236,16 +251,23 @@ public class Bzip2DecoderTest {
ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written);
channel.writeInbound(compressed); channel.writeInbound(compressed);
ByteBuf uncompressed = Unpooled.buffer(); ByteBuf uncompressed = readUncompressed(channel);
ByteBuf dataBuf = Unpooled.wrappedBuffer(data);
assertEquals(dataBuf, uncompressed);
uncompressed.release();
dataBuf.release();
}
private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception {
CompositeByteBuf uncompressed = Unpooled.compositeBuffer();
ByteBuf msg; ByteBuf msg;
while ((msg = channel.readInbound()) != null) { while ((msg = channel.readInbound()) != null) {
uncompressed.writeBytes(msg); uncompressed.addComponent(msg);
msg.release(); uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes());
} }
final byte[] result = new byte[uncompressed.readableBytes()];
uncompressed.readBytes(result);
uncompressed.release();
assertArrayEquals(BYTES_LARGE, result); return uncompressed;
} }
} }

View File

@ -0,0 +1,134 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.ThreadLocalRandom;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import static io.netty.handler.codec.compression.Bzip2Constants.*;
import static org.junit.Assert.*;
public class Bzip2EncoderTest {
private static final ThreadLocalRandom rand;
private static final byte[] BYTES_SMALL = new byte[256];
private static final byte[] BYTES_LARGE = new byte[MAX_BLOCK_SIZE * BASE_BLOCK_SIZE * 2];
static {
rand = ThreadLocalRandom.current();
rand.nextBytes(BYTES_SMALL);
rand.nextBytes(BYTES_LARGE);
}
@Test
public void testStreamInitialization() throws Exception {
final EmbeddedChannel channel = new EmbeddedChannel(new Bzip2Encoder());
ByteBuf in = Unpooled.wrappedBuffer("test".getBytes());
channel.writeOutbound(in);
ByteBuf out = channel.readOutbound();
assertEquals(MAGIC_NUMBER, out.readMedium());
assertEquals(9 + '0', out.readByte());
out.release();
channel.finish();
}
private static void testCompression(final byte[] data) throws Exception {
for (int blockSize = MIN_BLOCK_SIZE; blockSize <= MAX_BLOCK_SIZE; blockSize++) {
final EmbeddedChannel channel = new EmbeddedChannel(new Bzip2Encoder(blockSize));
ByteBuf in = Unpooled.wrappedBuffer(data);
channel.writeOutbound(in);
channel.finish();
byte[] uncompressed = uncompress(channel, data.length);
assertArrayEquals(data, uncompressed);
}
}
@Test
public void testCompressionOfSmallChunkOfData() throws Exception {
testCompression(BYTES_SMALL);
}
@Test
public void testCompressionOfLargeChunkOfData() throws Exception {
testCompression(BYTES_LARGE);
}
@Test
public void testCompressionOfBatchedFlowOfData() throws Exception {
final EmbeddedChannel channel = new EmbeddedChannel(new Bzip2Encoder(
rand.nextInt(MIN_BLOCK_SIZE, MAX_BLOCK_SIZE + 1)));
int written = 0, length = rand.nextInt(100);
while (written + length < BYTES_LARGE.length) {
ByteBuf in = Unpooled.wrappedBuffer(BYTES_LARGE, written, length);
channel.writeOutbound(in);
written += length;
length = rand.nextInt(100);
}
ByteBuf in = Unpooled.wrappedBuffer(BYTES_LARGE, written, BYTES_LARGE.length - written);
channel.writeOutbound(in);
channel.finish();
byte[] uncompressed = uncompress(channel, BYTES_LARGE.length);
assertArrayEquals(BYTES_LARGE, uncompressed);
}
private static byte[] uncompress(EmbeddedChannel channel, int length) throws Exception {
CompositeByteBuf out = Unpooled.compositeBuffer();
ByteBuf msg;
while ((msg = channel.readOutbound()) != null) {
out.addComponent(msg);
out.writerIndex(out.writerIndex() + msg.readableBytes());
}
byte[] compressed = new byte[out.readableBytes()];
out.readBytes(compressed);
out.release();
ByteArrayInputStream is = new ByteArrayInputStream(compressed);
BZip2CompressorInputStream bZip2Is = new BZip2CompressorInputStream(is);
byte[] uncompressed = new byte[length];
int remaining = length;
while (remaining > 0) {
int read = bZip2Is.read(uncompressed, length - remaining, remaining);
if (read > 0) {
remaining -= read;
} else {
break;
}
}
assertEquals(-1, bZip2Is.read());
return uncompressed;
}
}

View File

@ -0,0 +1,178 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Test;
import java.util.Arrays;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class Bzip2IntegrationTest {
private static final ThreadLocalRandom rand = ThreadLocalRandom.current();
public static final byte[] EMPTY = new byte[0];
@Test
public void testEmpty() throws Exception {
testIdentity(EMPTY);
}
@Test
public void testOneByte() throws Exception {
testIdentity(new byte[] { 'A' });
}
@Test
public void testTwoBytes() throws Exception {
testIdentity(new byte[] { 'B', 'A' });
}
@Test
public void testRegular() throws Exception {
byte[] data = ("Netty is a NIO client server framework which enables quick and easy development " +
"of network applications such as protocol servers and clients.").getBytes();
testIdentity(data);
}
@Test
public void test3Tables() throws Exception {
byte[] data = new byte[500];
rand.nextBytes(data);
testIdentity(data);
}
@Test
public void test4Tables() throws Exception {
byte[] data = new byte[1100];
rand.nextBytes(data);
testIdentity(data);
}
@Test
public void test5Tables() throws Exception {
byte[] data = new byte[2300];
rand.nextBytes(data);
testIdentity(data);
}
@Test
public void testLargeRandom() throws Exception {
byte[] data = new byte[1048576];
rand.nextBytes(data);
testIdentity(data);
}
@Test
public void testPartRandom() throws Exception {
byte[] data = new byte[12345];
rand.nextBytes(data);
for (int i = 0; i < 1024; i++) {
data[i] = 123;
}
testIdentity(data);
}
@Test
public void testCompressible() throws Exception {
byte[] data = new byte[10000];
for (int i = 0; i < data.length; i++) {
data[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt();
}
testIdentity(data);
}
@Test
public void testLongBlank() throws Exception {
byte[] data = new byte[100000];
testIdentity(data);
}
@Test
public void testLongSame() throws Exception {
byte[] data = new byte[100000];
Arrays.fill(data, (byte) 123);
testIdentity(data);
}
@Test
public void testSequential() throws Exception {
byte[] data = new byte[49];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
testIdentity(data);
}
private static void testIdentity(byte[] data) {
ByteBuf in = Unpooled.wrappedBuffer(data);
EmbeddedChannel encoder = new EmbeddedChannel(new Bzip2Encoder());
EmbeddedChannel decoder = new EmbeddedChannel(new Bzip2Decoder());
try {
ByteBuf msg;
encoder.writeOutbound(in.copy());
encoder.finish();
CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
}
assertThat(compressed, is(notNullValue()));
assertThat(compressed, is(not(in)));
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
}
assertEquals(in, decompressed);
compressed.release();
decompressed.release();
in.release();
} finally {
encoder.close();
decoder.close();
for (;;) {
Object msg = encoder.readOutbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
for (;;) {
Object msg = decoder.readInbound();
if (msg == null) {
break;
}
ReferenceCountUtil.release(msg);
}
}
}
}

View File

@ -0,0 +1,22 @@
Copyright (c) 2003-2008 Yuta Mori All Rights Reserved.
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.