Lz4FrameEncoder should prefer direct buffers for its output.

Motivation:

We should prefer direct buffers for the output of Lz4FrameEncoder as this is what is needed for writing to the socket.

Modification:

Use direct buffers for the output

Result:

Less memory copies needed.
This commit is contained in:
Norman Maurer 2016-07-12 14:29:46 +02:00
parent bb3c4a43d8
commit fa84e86f78

View File

@ -17,6 +17,7 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -30,6 +31,7 @@ import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory; import net.jpountz.xxhash.XXHashFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.zip.Checksum; import java.util.zip.Checksum;
@ -52,6 +54,8 @@ import static io.netty.handler.codec.compression.Lz4Constants.*;
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*/ */
public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> { public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
private final int blockSize;
/** /**
* Underlying compressor in use. * Underlying compressor in use.
*/ */
@ -70,7 +74,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
/** /**
* Inner byte buffer for outgoing data. * Inner byte buffer for outgoing data.
*/ */
private byte[] buffer; private ByteBuf buffer;
/** /**
* Current length of buffered bytes in {@link #buffer}. * Current length of buffered bytes in {@link #buffer}.
@ -117,7 +121,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
/** /**
* Creates a new customizable LZ4 encoder. * Creates a new customizable LZ4 encoder.
* *
* @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance * @param factory user customizable {@link LZ4Factory} instance
* which may be JNI bindings to the original C implementation, a pure Java implementation * which may be JNI bindings to the original C implementation, a pure Java implementation
* or a Java implementation that uses the {@link sun.misc.Unsafe} * or a Java implementation that uses the {@link sun.misc.Unsafe}
* @param highCompressor if {@code true} codec will use compressor which requires more memory * @param highCompressor if {@code true} codec will use compressor which requires more memory
@ -127,7 +131,6 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
* @param checksum the {@link Checksum} instance to use to check data for integrity * @param checksum the {@link Checksum} instance to use to check data for integrity
*/ */
public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) { public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
super(false);
if (factory == null) { if (factory == null) {
throw new NullPointerException("factory"); throw new NullPointerException("factory");
} }
@ -139,7 +142,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
this.checksum = checksum; this.checksum = checksum;
compressionLevel = compressionLevel(blockSize); compressionLevel = compressionLevel(blockSize);
buffer = new byte[blockSize]; this.blockSize = blockSize;
currentBlockLength = 0; currentBlockLength = 0;
compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize); compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize);
@ -168,8 +171,8 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
int length = in.readableBytes(); int length = in.readableBytes();
final byte[] buffer = this.buffer; final ByteBuf buffer = this.buffer;
final int blockSize = buffer.length; final int blockSize = buffer.capacity();
while (currentBlockLength + length >= blockSize) { while (currentBlockLength + length >= blockSize) {
final int tail = blockSize - currentBlockLength; final int tail = blockSize - currentBlockLength;
in.getBytes(in.readerIndex(), buffer, currentBlockLength, tail); in.getBytes(in.readerIndex(), buffer, currentBlockLength, tail);
@ -188,16 +191,18 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
return; return;
} }
checksum.reset(); checksum.reset();
checksum.update(buffer, 0, currentBlockLength); checksum.update(buffer.array(), buffer.arrayOffset(), currentBlockLength);
final int check = (int) checksum.getValue(); final int check = (int) checksum.getValue();
out.ensureWritable(compressedBlockSize); out.ensureWritable(compressedBlockSize);
final int idx = out.writerIndex(); final int idx = out.writerIndex();
final byte[] dest = out.array();
final int destOff = out.arrayOffset() + idx;
int compressedLength; int compressedLength;
try { try {
compressedLength = compressor.compress(buffer, 0, currentBlockLength, dest, destOff + HEADER_LENGTH); ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
int pos = outNioBuffer.position();
// We always want to start at position 0 as we take care of reusing the buffer in the encode(...) loop.
compressor.compress(buffer.internalNioBuffer(0, currentBlockLength), outNioBuffer);
compressedLength = outNioBuffer.position() - pos;
} catch (LZ4Exception e) { } catch (LZ4Exception e) {
throw new CompressionException(e); throw new CompressionException(e);
} }
@ -205,16 +210,16 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
if (compressedLength >= currentBlockLength) { if (compressedLength >= currentBlockLength) {
blockType = BLOCK_TYPE_NON_COMPRESSED; blockType = BLOCK_TYPE_NON_COMPRESSED;
compressedLength = currentBlockLength; compressedLength = currentBlockLength;
System.arraycopy(buffer, 0, dest, destOff + HEADER_LENGTH, currentBlockLength); out.setBytes(idx + HEADER_LENGTH, buffer, 0, currentBlockLength);
} else { } else {
blockType = BLOCK_TYPE_COMPRESSED; blockType = BLOCK_TYPE_COMPRESSED;
} }
out.setLong(idx, MAGIC_NUMBER); out.setLong(idx, MAGIC_NUMBER);
dest[destOff + TOKEN_OFFSET] = (byte) (blockType | compressionLevel); out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
writeIntLE(compressedLength, dest, destOff + COMPRESSED_LENGTH_OFFSET); out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
writeIntLE(currentBlockLength, dest, destOff + DECOMPRESSED_LENGTH_OFFSET); out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, currentBlockLength);
writeIntLE(check, dest, destOff + CHECKSUM_OFFSET); out.setIntLE(idx + CHECKSUM_OFFSET, check);
out.writerIndex(idx + HEADER_LENGTH + compressedLength); out.writerIndex(idx + HEADER_LENGTH + compressedLength);
currentBlockLength = 0; currentBlockLength = 0;
@ -228,35 +233,33 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
} }
finished = true; finished = true;
final ByteBuf footer = ctx.alloc().heapBuffer( try {
compressor.maxCompressedLength(currentBlockLength) + HEADER_LENGTH); final ByteBuf footer = ctx.alloc().heapBuffer(
flushBufferedData(footer); compressor.maxCompressedLength(currentBlockLength) + HEADER_LENGTH);
flushBufferedData(footer);
final int idx = footer.writerIndex(); final int idx = footer.writerIndex();
final byte[] dest = footer.array(); footer.setLong(idx, MAGIC_NUMBER);
final int destOff = footer.arrayOffset() + idx; footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
footer.setLong(idx, MAGIC_NUMBER); footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
dest[destOff + TOKEN_OFFSET] = (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel); footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
writeIntLE(0, dest, destOff + COMPRESSED_LENGTH_OFFSET); footer.setInt(idx + CHECKSUM_OFFSET, 0);
writeIntLE(0, dest, destOff + DECOMPRESSED_LENGTH_OFFSET);
writeIntLE(0, dest, destOff + CHECKSUM_OFFSET);
footer.writerIndex(idx + HEADER_LENGTH);
compressor = null; footer.writerIndex(idx + HEADER_LENGTH);
checksum = null;
buffer = null;
return ctx.writeAndFlush(footer, promise); return ctx.writeAndFlush(footer, promise);
} finally {
cleanup();
}
} }
/** private void cleanup() {
* Writes {@code int} value into the byte buffer with little-endian format. compressor = null;
*/ checksum = null;
private static void writeIntLE(int i, byte[] buf, int off) { if (buffer != null) {
buf[off++] = (byte) i; buffer.release();
buf[off++] = (byte) (i >>> 8); buffer = null;
buf[off++] = (byte) (i >>> 16); }
buf[off] = (byte) (i >>> 24);
} }
/** /**
@ -329,5 +332,13 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.ctx = ctx;
// Ensure we use a heap based ByteBuf.
buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
cleanup();
} }
} }