Eliminate memory copy in ByteToMessageDecoder whenever possible

Motivation:

Currently when there are bytes left in the cumulation buffer we do a byte copy to produce the input buffer for the decode method. This can put quite some overhead on the impl.

Modification:

- Use a CompositeByteBuf to eliminate the byte copy.
- Allow to specify if a CompositeBytebug should be used or not as some handlers can only act on one ByteBuffer in an efficient way (like SslHandler :( ).

Result:

Performance improvement as shown in the following benchmark.

Without this patch:
[xxx@xxx ~]$ ./wrk-benchmark
Running 5m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    20.19ms   38.34ms   1.02s    98.70%
    Req/Sec   241.10k    26.50k  303.45k    93.46%
  1153994119 requests in 5.00m, 155.84GB read
Requests/sec: 3846702.44
Transfer/sec:    531.93MB

With the patch:
[xxx@xxx ~]$ ./wrk-benchmark
Running 5m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    17.34ms   27.14ms 877.62ms   98.26%
    Req/Sec   252.55k    23.77k  329.50k    87.71%
  1209772221 requests in 5.00m, 163.37GB read
Requests/sec: 4032584.22
Transfer/sec:    557.64MB
This commit is contained in:
Norman Maurer 2014-11-21 21:10:47 +01:00 committed by Trustin Lee
parent 98731a51c8
commit 50af9b916c
2 changed files with 100 additions and 23 deletions

View File

@ -16,6 +16,8 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -61,13 +63,75 @@ import java.util.List;
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
* annotated with {@link @Sharable}.
* <p>
* Some methods such as {@link ByteBuf.readBytes(int)} will cause a memory leak if the returned buffer
* is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf.readSlice(int)}
* Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
* is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
* to avoid leaking memory.
*/
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
/**
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
/**
* Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
* and the decoder implemention this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
// use slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
buffer.writeBytes(in);
in.release();
} else {
CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
int readable = cumulation.readableBytes();
composite = alloc.compositeBuffer();
composite.addComponent(cumulation).writerIndex(readable);
}
composite.addComponent(in).writerIndex(composite.writerIndex() + in.readableBytes());
buffer = composite;
}
return buffer;
}
};
ByteBuf cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;
private boolean singleDecode;
private boolean decodeWasNull;
private boolean first;
@ -96,6 +160,16 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
return singleDecode;
}
/**
* Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
*/
public void setCumulator(Cumulator cumulator) {
if (cumulator == null) {
throw new NullPointerException("cumulator");
}
this.cumulator = cumulator;
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
@ -151,19 +225,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
if (first) {
cumulation = data;
} else {
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
|| cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
expandCumulation(ctx, data.readableBytes());
}
cumulation.writeBytes(data);
data.release();
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
@ -188,13 +250,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
}
private void expandCumulation(ChannelHandlerContext ctx, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (cumulation != null && !first && cumulation.refCnt() == 1) {
@ -322,4 +377,24 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decode(ctx, in, out);
}
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
/**
* Cumulate {@link ByteBuf}s.
*/
public interface Cumulator {
/**
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
*/
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
}

View File

@ -271,6 +271,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
*/
@Deprecated
public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
// As SslEngine.unwrap(....) only works with one ByteBuffer we should not try to use a CompositeByteBuf
// at the first place to make it as fast as possible.
super(false);
if (engine == null) {
throw new NullPointerException("engine");
}
@ -477,7 +480,6 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
}
SSLEngineResult result = wrap(alloc, engine, buf, out);
if (!buf.isReadable()) {
promise = pendingUnencryptedWrites.remove();
} else {