Flush LZ4FrameEncoder buffer when channel flush() is received.

Motivation:

LZ4FrameEncoder maintains an internal buffer of incoming data compress, and only writes out compressed data when a size threshold is reached. LZ4FrameEncoder does not override the flush() method, and thus the only way to flush data down the pipeline is via more data or close the channel.

Modifications:

Override the flush() function to flush on demand. Also overrode the allocateBuffer() function so we can more accurately size the output buffer (instead of needing to potatntially realloc via buffer.ensureWritable()).

Result:

Implementation works as described.
This commit is contained in:
Jason Brown 2016-10-14 15:44:32 -07:00 committed by Scott Mitchell
parent c590e3bd63
commit 3ea807e375
3 changed files with 275 additions and 42 deletions

View File

@ -153,4 +153,8 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
* @throws Exception is thrown if an error accour
*/
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
protected boolean isPreferDirect() {
return preferDirect;
}
}

View File

@ -24,8 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.ObjectUtil;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
@ -54,6 +56,8 @@ import static io.netty.handler.codec.compression.Lz4Constants.*;
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*/
public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
private final int blockSize;
/**
@ -64,27 +68,22 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
/**
* Underlying checksum calculator in use.
*/
private Checksum checksum;
private ByteBufChecksum checksum;
/**
* Compression level of current LZ4 encoder (depends on {@link #compressedBlockSize}).
* Compression level of current LZ4 encoder (depends on {@link #blockSize}).
*/
private final int compressionLevel;
/**
* Inner byte buffer for outgoing data.
* Inner byte buffer for outgoing data. It's capacity will be {@link #blockSize}.
*/
private ByteBuf buffer;
/**
* Current length of buffered bytes in {@link #buffer}.
* Maximum size for any buffer to write encoded (compressed) data into.
*/
private int currentBlockLength;
/**
* Maximum size of compressed block with header.
*/
private final int compressedBlockSize;
private final int maxEncodeSize;
/**
* Indicates if the compressed stream has been finished.
@ -131,6 +130,24 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
* @param checksum the {@link Checksum} instance to use to check data for integrity
*/
public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
this(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
}
/**
* Creates a new customizable LZ4 encoder.
*
* @param factory user customizable {@link LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param highCompressor if {@code true} codec will use compressor which requires more memory
* and is slower but compresses more efficiently
* @param blockSize the maximum number of bytes to try to compress at once,
* must be >= 64 and <= 32 M
* @param checksum the {@link Checksum} instance to use to check data for integrity
* @param maxEncodeSize the maximum size for an encode (compressed) buffer
*/
public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize,
Checksum checksum, int maxEncodeSize) {
if (factory == null) {
throw new NullPointerException("factory");
}
@ -139,13 +156,11 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
}
compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
this.checksum = checksum;
this.checksum = ByteBufChecksum.wrapChecksum(checksum);
compressionLevel = compressionLevel(blockSize);
this.blockSize = blockSize;
currentBlockLength = 0;
compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize);
this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
finished = false;
}
@ -162,6 +177,54 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
return compressionLevel;
}
@Override
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) {
return allocateBuffer(ctx, msg, preferDirect, true);
}
private ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect,
boolean allowEmptyReturn) {
int targetBufSize = 0;
int remaining = msg.readableBytes() + buffer.readableBytes();
// quick overflow check
if (remaining < 0) {
throw new EncoderException("too much data to allocate a buffer for compression");
}
while (remaining > 0) {
int curSize = Math.min(blockSize, remaining);
remaining -= curSize;
// calculate the total compressed size of the current block (including header) and add to the total
targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
}
// in addition to just the raw byte count, the headers (HEADER_LENGTH) per block (configured via
// #blockSize) will also add to the targetBufSize, and the combination of those would never wrap around
// again to be >= 0, this is a good check for the overflow case.
if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
"allowable size (%d bytes)", targetBufSize, maxEncodeSize));
}
if (allowEmptyReturn && targetBufSize < blockSize) {
return Unpooled.EMPTY_BUFFER;
}
if (preferDirect) {
return ctx.alloc().ioBuffer(targetBufSize, targetBufSize);
} else {
return ctx.alloc().heapBuffer(targetBufSize, targetBufSize);
}
}
/**
* {@inheritDoc}
*
* Encodes the input buffer into {@link #blockSize} chunks in the output buffer. Data is only compressed and
* written once we hit the {@link #blockSize}; else, it is copied into the backing {@link #buffer} to await
* more data.
*/
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (finished) {
@ -169,48 +232,45 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
return;
}
int length = in.readableBytes();
final ByteBuf buffer = this.buffer;
final int blockSize = buffer.capacity();
while (currentBlockLength + length >= blockSize) {
final int tail = blockSize - currentBlockLength;
in.getBytes(in.readerIndex(), buffer, currentBlockLength, tail);
currentBlockLength = blockSize;
int length;
while ((length = in.readableBytes()) > 0) {
final int nextChunkSize = Math.min(length, buffer.writableBytes());
in.readBytes(buffer, nextChunkSize);
if (!buffer.isWritable()) {
flushBufferedData(out);
in.skipBytes(tail);
length -= tail;
}
in.readBytes(buffer, currentBlockLength, length);
currentBlockLength += length;
}
}
private void flushBufferedData(ByteBuf out) {
int currentBlockLength = this.currentBlockLength;
if (currentBlockLength == 0) {
int flushableBytes = buffer.readableBytes();
if (flushableBytes == 0) {
return;
}
checksum.reset();
checksum.update(buffer.array(), buffer.arrayOffset(), currentBlockLength);
checksum.update(buffer, buffer.readerIndex(), flushableBytes);
final int check = (int) checksum.getValue();
out.ensureWritable(compressedBlockSize);
final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
out.ensureWritable(bufSize);
final int idx = out.writerIndex();
int compressedLength;
try {
ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
int pos = outNioBuffer.position();
// We always want to start at position 0 as we take care of reusing the buffer in the encode(...) loop.
compressor.compress(buffer.internalNioBuffer(0, currentBlockLength), outNioBuffer);
compressor.compress(buffer.internalNioBuffer(0, flushableBytes), outNioBuffer);
compressedLength = outNioBuffer.position() - pos;
} catch (LZ4Exception e) {
throw new CompressionException(e);
}
final int blockType;
if (compressedLength >= currentBlockLength) {
if (compressedLength >= flushableBytes) {
blockType = BLOCK_TYPE_NON_COMPRESSED;
compressedLength = currentBlockLength;
out.setBytes(idx + HEADER_LENGTH, buffer, 0, currentBlockLength);
compressedLength = flushableBytes;
out.setBytes(idx + HEADER_LENGTH, buffer, 0, flushableBytes);
} else {
blockType = BLOCK_TYPE_COMPRESSED;
}
@ -218,12 +278,20 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
out.setLong(idx, MAGIC_NUMBER);
out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, currentBlockLength);
out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, flushableBytes);
out.setIntLE(idx + CHECKSUM_OFFSET, check);
out.writerIndex(idx + HEADER_LENGTH + compressedLength);
currentBlockLength = 0;
buffer.clear();
}
this.currentBlockLength = currentBlockLength;
@Override
public void flush(final ChannelHandlerContext ctx) throws Exception {
if (buffer != null && buffer.isReadable()) {
final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
flushBufferedData(buf);
ctx.write(buf);
}
ctx.flush();
}
private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
@ -235,7 +303,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
try {
final ByteBuf footer = ctx.alloc().heapBuffer(
compressor.maxCompressedLength(currentBlockLength) + HEADER_LENGTH);
compressor.maxCompressedLength(buffer.readableBytes()) + HEADER_LENGTH);
flushBufferedData(footer);
final int idx = footer.writerIndex();
@ -330,10 +398,11 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
// Ensure we use a heap based ByteBuf.
buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
buffer.clear();
}
@Override
@ -341,4 +410,8 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
super.handlerRemoved(ctx);
cleanup();
}
final ByteBuf getBackingBuffer() {
return buffer;
}
}

View File

@ -15,17 +15,52 @@
*/
package io.netty.handler.codec.compression;
import java.io.InputStream;
import java.util.zip.Checksum;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.EncoderException;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.InputStream;
import static org.junit.Assert.*;
import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
public class Lz4FrameEncoderTest extends AbstractEncoderTest {
/**
* For the purposes of this test, if we pass this (very small) size of buffer into
* {@link Lz4FrameEncoder#allocateBuffer(ChannelHandlerContext, ByteBuf, boolean)}, we should get back
* an empty buffer.
*/
private static final int NONALLOCATABLE_SIZE = 1;
@Mock
private ChannelHandlerContext ctx;
/**
* A {@link ByteBuf} for mocking purposes, largely because it's difficult to allocate to huge buffers.
*/
@Mock
private ByteBuf buffer;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(ctx.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
}
@Override
public void initChannel() {
@ -59,4 +94,125 @@ public class Lz4FrameEncoderTest extends AbstractEncoderTest {
return Unpooled.wrappedBuffer(decompressed);
}
@Test
public void testAllocateDirectBuffer() {
final int blockSize = 100;
testAllocateBuffer(blockSize, blockSize - 13, true);
testAllocateBuffer(blockSize, blockSize * 5, true);
testAllocateBuffer(blockSize, NONALLOCATABLE_SIZE, true);
}
@Test
public void testAllocateHeapBuffer() {
final int blockSize = 100;
testAllocateBuffer(blockSize, blockSize - 13, false);
testAllocateBuffer(blockSize, blockSize * 5, false);
testAllocateBuffer(blockSize, NONALLOCATABLE_SIZE, false);
}
private void testAllocateBuffer(int blockSize, int bufSize, boolean isDirect) {
// allocate the input buffer to an arbitrary size less than the blockSize
ByteBuf in = ByteBufAllocator.DEFAULT.buffer(bufSize, bufSize);
in.writerIndex(in.capacity());
ByteBuf out = null;
try {
Lz4FrameEncoder encoder = newEncoder(blockSize, Lz4FrameEncoder.DEFAULT_MAX_ENCODE_SIZE);
out = encoder.allocateBuffer(ctx, in, isDirect);
Assert.assertNotNull(out);
if (NONALLOCATABLE_SIZE == bufSize) {
Assert.assertFalse(out.isWritable());
} else {
Assert.assertTrue(out.writableBytes() > 0);
Assert.assertEquals(isDirect, out.isDirect());
}
} finally {
in.release();
if (out != null) {
out.release();
}
}
}
@Test (expected = EncoderException.class)
public void testAllocateDirectBufferExceedMaxEncodeSize() {
final int maxEncodeSize = 1024;
Lz4FrameEncoder encoder = newEncoder(Lz4Constants.DEFAULT_BLOCK_SIZE, maxEncodeSize);
int inputBufferSize = maxEncodeSize * 10;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(inputBufferSize, inputBufferSize);
try {
buf.writerIndex(inputBufferSize);
encoder.allocateBuffer(ctx, buf, false);
} finally {
buf.release();
}
}
private Lz4FrameEncoder newEncoder(int blockSize, int maxEncodeSize) {
Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum();
Lz4FrameEncoder encoder = new Lz4FrameEncoder(LZ4Factory.fastestInstance(), true,
blockSize,
checksum,
maxEncodeSize);
encoder.handlerAdded(ctx);
return encoder;
}
/**
* This test might be a invasive in terms of knowing what happens inside
* {@link Lz4FrameEncoder#allocateBuffer(ChannelHandlerContext, ByteBuf, boolean)}, but this is safest way
* of testing the overflow conditions as allocating the huge buffers fails in many CI environments.
*/
@Test (expected = EncoderException.class)
public void testAllocateOnHeapBufferOverflowsOutputSize() {
final int maxEncodeSize = Integer.MAX_VALUE;
Lz4FrameEncoder encoder = newEncoder(Lz4Constants.DEFAULT_BLOCK_SIZE, maxEncodeSize);
when(buffer.readableBytes()).thenReturn(maxEncodeSize);
buffer.writerIndex(maxEncodeSize);
encoder.allocateBuffer(ctx, buffer, false);
}
@Test
public void testFlush() {
Lz4FrameEncoder encoder = new Lz4FrameEncoder();
EmbeddedChannel channel = new EmbeddedChannel(encoder);
int size = 27;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(size, size);
buf.writerIndex(size);
Assert.assertEquals(0, encoder.getBackingBuffer().readableBytes());
channel.write(buf);
Assert.assertTrue(channel.outboundMessages().isEmpty());
Assert.assertEquals(size, encoder.getBackingBuffer().readableBytes());
channel.flush();
Assert.assertTrue(channel.finish());
Assert.assertTrue(channel.releaseOutbound());
Assert.assertFalse(channel.releaseInbound());
}
@Test
public void testAllocatingAroundBlockSize() {
int blockSize = 100;
Lz4FrameEncoder encoder = newEncoder(blockSize, Lz4FrameEncoder.DEFAULT_MAX_ENCODE_SIZE);
EmbeddedChannel channel = new EmbeddedChannel(encoder);
int size = blockSize - 1;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(size, size);
buf.writerIndex(size);
Assert.assertEquals(0, encoder.getBackingBuffer().readableBytes());
channel.write(buf);
Assert.assertEquals(size, encoder.getBackingBuffer().readableBytes());
int nextSize = size - 1;
buf = ByteBufAllocator.DEFAULT.buffer(nextSize, nextSize);
buf.writerIndex(nextSize);
channel.write(buf);
Assert.assertEquals(size + nextSize - blockSize, encoder.getBackingBuffer().readableBytes());
channel.flush();
Assert.assertEquals(0, encoder.getBackingBuffer().readableBytes());
Assert.assertTrue(channel.finish());
Assert.assertTrue(channel.releaseOutbound());
Assert.assertFalse(channel.releaseInbound());
}
}