From fa84e86f78206a4ef035c1e58c9ea2d13d3a1b05 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 12 Jul 2016 14:29:46 +0200 Subject: [PATCH] Lz4FrameEncoder should prefer direct buffers for its output. Motivation: We should prefer direct buffers for the output of Lz4FrameEncoder as this is what is needed for writing to the socket. Modification: Use direct buffers for the output Result: Less memory copies needed. --- .../codec/compression/Lz4FrameEncoder.java | 89 +++++++++++-------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java index 72e5f20bbb..85a9ec8bb1 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -30,6 +31,7 @@ import net.jpountz.lz4.LZ4Exception; import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHashFactory; +import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import java.util.zip.Checksum; @@ -52,6 +54,8 @@ import static io.netty.handler.codec.compression.Lz4Constants.*; * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ public class Lz4FrameEncoder extends MessageToByteEncoder { + private final int blockSize; + /** * Underlying compressor in use. */ @@ -70,7 +74,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { /** * Inner byte buffer for outgoing data. */ - private byte[] buffer; + private ByteBuf buffer; /** * Current length of buffered bytes in {@link #buffer}. @@ -117,7 +121,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { /** * Creates a new customizable LZ4 encoder. * - * @param factory user customizable {@link net.jpountz.lz4.LZ4Factory} instance + * @param factory user customizable {@link LZ4Factory} instance * which may be JNI bindings to the original C implementation, a pure Java implementation * or a Java implementation that uses the {@link sun.misc.Unsafe} * @param highCompressor if {@code true} codec will use compressor which requires more memory @@ -127,7 +131,6 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { * @param checksum the {@link Checksum} instance to use to check data for integrity */ public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) { - super(false); if (factory == null) { throw new NullPointerException("factory"); } @@ -139,7 +142,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { this.checksum = checksum; compressionLevel = compressionLevel(blockSize); - buffer = new byte[blockSize]; + this.blockSize = blockSize; currentBlockLength = 0; compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize); @@ -168,8 +171,8 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { int length = in.readableBytes(); - final byte[] buffer = this.buffer; - final int blockSize = buffer.length; + final ByteBuf buffer = this.buffer; + final int blockSize = buffer.capacity(); while (currentBlockLength + length >= blockSize) { final int tail = blockSize - currentBlockLength; in.getBytes(in.readerIndex(), buffer, currentBlockLength, tail); @@ -188,16 +191,18 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { return; } checksum.reset(); - checksum.update(buffer, 0, currentBlockLength); + checksum.update(buffer.array(), buffer.arrayOffset(), currentBlockLength); final int check = (int) checksum.getValue(); out.ensureWritable(compressedBlockSize); final int idx = out.writerIndex(); - final byte[] dest = out.array(); - final int destOff = out.arrayOffset() + idx; int compressedLength; try { - compressedLength = compressor.compress(buffer, 0, currentBlockLength, dest, destOff + HEADER_LENGTH); + ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH); + int pos = outNioBuffer.position(); + // We always want to start at position 0 as we take care of reusing the buffer in the encode(...) loop. + compressor.compress(buffer.internalNioBuffer(0, currentBlockLength), outNioBuffer); + compressedLength = outNioBuffer.position() - pos; } catch (LZ4Exception e) { throw new CompressionException(e); } @@ -205,16 +210,16 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { if (compressedLength >= currentBlockLength) { blockType = BLOCK_TYPE_NON_COMPRESSED; compressedLength = currentBlockLength; - System.arraycopy(buffer, 0, dest, destOff + HEADER_LENGTH, currentBlockLength); + out.setBytes(idx + HEADER_LENGTH, buffer, 0, currentBlockLength); } else { blockType = BLOCK_TYPE_COMPRESSED; } out.setLong(idx, MAGIC_NUMBER); - dest[destOff + TOKEN_OFFSET] = (byte) (blockType | compressionLevel); - writeIntLE(compressedLength, dest, destOff + COMPRESSED_LENGTH_OFFSET); - writeIntLE(currentBlockLength, dest, destOff + DECOMPRESSED_LENGTH_OFFSET); - writeIntLE(check, dest, destOff + CHECKSUM_OFFSET); + out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel)); + out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength); + out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, currentBlockLength); + out.setIntLE(idx + CHECKSUM_OFFSET, check); out.writerIndex(idx + HEADER_LENGTH + compressedLength); currentBlockLength = 0; @@ -228,35 +233,33 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { } finished = true; - final ByteBuf footer = ctx.alloc().heapBuffer( - compressor.maxCompressedLength(currentBlockLength) + HEADER_LENGTH); - flushBufferedData(footer); + try { + final ByteBuf footer = ctx.alloc().heapBuffer( + compressor.maxCompressedLength(currentBlockLength) + HEADER_LENGTH); + flushBufferedData(footer); - final int idx = footer.writerIndex(); - final byte[] dest = footer.array(); - final int destOff = footer.arrayOffset() + idx; - footer.setLong(idx, MAGIC_NUMBER); - dest[destOff + TOKEN_OFFSET] = (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel); - writeIntLE(0, dest, destOff + COMPRESSED_LENGTH_OFFSET); - writeIntLE(0, dest, destOff + DECOMPRESSED_LENGTH_OFFSET); - writeIntLE(0, dest, destOff + CHECKSUM_OFFSET); - footer.writerIndex(idx + HEADER_LENGTH); + final int idx = footer.writerIndex(); + footer.setLong(idx, MAGIC_NUMBER); + footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel)); + footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0); + footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0); + footer.setInt(idx + CHECKSUM_OFFSET, 0); - compressor = null; - checksum = null; - buffer = null; + footer.writerIndex(idx + HEADER_LENGTH); - return ctx.writeAndFlush(footer, promise); + return ctx.writeAndFlush(footer, promise); + } finally { + cleanup(); + } } - /** - * Writes {@code int} value into the byte buffer with little-endian format. - */ - private static void writeIntLE(int i, byte[] buf, int off) { - buf[off++] = (byte) i; - buf[off++] = (byte) (i >>> 8); - buf[off++] = (byte) (i >>> 16); - buf[off] = (byte) (i >>> 24); + private void cleanup() { + compressor = null; + checksum = null; + if (buffer != null) { + buffer.release(); + buffer = null; + } } /** @@ -329,5 +332,13 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; + // Ensure we use a heap based ByteBuf. + buffer = Unpooled.wrappedBuffer(new byte[blockSize]); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + super.handlerRemoved(ctx); + cleanup(); } }