Eliminate memory copy in ByteToMessageDecoder whenever possible
Motivation: Currently when there are bytes left in the cumulation buffer we do a byte copy to produce the input buffer for the decode method. This can put quite some overhead on the impl. Modification: - Use a CompositeByteBuf to eliminate the byte copy. - Allow to specify if a CompositeBytebug should be used or not as some handlers can only act on one ByteBuffer in an efficient way (like SslHandler :( ). Result: Performance improvement as shown in the following benchmark. Without this patch: [xxx@xxx ~]$ ./wrk-benchmark Running 5m test @ http://xxx:8080/plaintext 16 threads and 256 connections Thread Stats Avg Stdev Max +/- Stdev Latency 20.19ms 38.34ms 1.02s 98.70% Req/Sec 241.10k 26.50k 303.45k 93.46% 1153994119 requests in 5.00m, 155.84GB read Requests/sec: 3846702.44 Transfer/sec: 531.93MB With the patch: [xxx@xxx ~]$ ./wrk-benchmark Running 5m test @ http://xxx:8080/plaintext 16 threads and 256 connections Thread Stats Avg Stdev Max +/- Stdev Latency 17.34ms 27.14ms 877.62ms 98.26% Req/Sec 252.55k 23.77k 329.50k 87.71% 1209772221 requests in 5.00m, 163.37GB read Requests/sec: 4032584.22 Transfer/sec: 557.64MB
This commit is contained in:
parent
065bb8c440
commit
feacf128ca
@ -16,6 +16,8 @@
|
|||||||
package io.netty.handler.codec;
|
package io.netty.handler.codec;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
@ -61,13 +63,75 @@ import java.util.List;
|
|||||||
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
|
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
|
||||||
* annotated with {@link @Sharable}.
|
* annotated with {@link @Sharable}.
|
||||||
* <p>
|
* <p>
|
||||||
* Some methods such as {@link ByteBuf.readBytes(int)} will cause a memory leak if the returned buffer
|
* Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
|
||||||
* is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf.readSlice(int)}
|
* is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
|
||||||
* to avoid leaking memory.
|
* to avoid leaking memory.
|
||||||
*/
|
*/
|
||||||
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
|
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
|
||||||
|
*/
|
||||||
|
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
|
||||||
|
@Override
|
||||||
|
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
|
||||||
|
ByteBuf buffer;
|
||||||
|
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|
||||||
|
|| cumulation.refCnt() > 1) {
|
||||||
|
// Expand cumulation (by replace it) when either there is not more room in the buffer
|
||||||
|
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
|
||||||
|
// duplicate().retain().
|
||||||
|
//
|
||||||
|
// See:
|
||||||
|
// - https://github.com/netty/netty/issues/2327
|
||||||
|
// - https://github.com/netty/netty/issues/1764
|
||||||
|
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
|
||||||
|
} else {
|
||||||
|
buffer = cumulation;
|
||||||
|
}
|
||||||
|
buffer.writeBytes(in);
|
||||||
|
in.release();
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
|
||||||
|
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
|
||||||
|
* and the decoder implemention this may be slower then just use the {@link #MERGE_CUMULATOR}.
|
||||||
|
*/
|
||||||
|
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
|
||||||
|
@Override
|
||||||
|
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
|
||||||
|
ByteBuf buffer;
|
||||||
|
if (cumulation.refCnt() > 1) {
|
||||||
|
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
|
||||||
|
// use slice().retain() or duplicate().retain().
|
||||||
|
//
|
||||||
|
// See:
|
||||||
|
// - https://github.com/netty/netty/issues/2327
|
||||||
|
// - https://github.com/netty/netty/issues/1764
|
||||||
|
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
|
||||||
|
buffer.writeBytes(in);
|
||||||
|
in.release();
|
||||||
|
} else {
|
||||||
|
CompositeByteBuf composite;
|
||||||
|
if (cumulation instanceof CompositeByteBuf) {
|
||||||
|
composite = (CompositeByteBuf) cumulation;
|
||||||
|
} else {
|
||||||
|
int readable = cumulation.readableBytes();
|
||||||
|
composite = alloc.compositeBuffer();
|
||||||
|
composite.addComponent(cumulation).writerIndex(readable);
|
||||||
|
}
|
||||||
|
composite.addComponent(in).writerIndex(composite.writerIndex() + in.readableBytes());
|
||||||
|
buffer = composite;
|
||||||
|
}
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
ByteBuf cumulation;
|
ByteBuf cumulation;
|
||||||
|
private Cumulator cumulator = MERGE_CUMULATOR;
|
||||||
private boolean singleDecode;
|
private boolean singleDecode;
|
||||||
private boolean decodeWasNull;
|
private boolean decodeWasNull;
|
||||||
private boolean first;
|
private boolean first;
|
||||||
@ -96,6 +160,16 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
return singleDecode;
|
return singleDecode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
|
||||||
|
*/
|
||||||
|
public void setCumulator(Cumulator cumulator) {
|
||||||
|
if (cumulator == null) {
|
||||||
|
throw new NullPointerException("cumulator");
|
||||||
|
}
|
||||||
|
this.cumulator = cumulator;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the actual number of readable bytes in the internal cumulative
|
* Returns the actual number of readable bytes in the internal cumulative
|
||||||
* buffer of this decoder. You usually do not need to rely on this value
|
* buffer of this decoder. You usually do not need to rely on this value
|
||||||
@ -151,19 +225,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
if (first) {
|
if (first) {
|
||||||
cumulation = data;
|
cumulation = data;
|
||||||
} else {
|
} else {
|
||||||
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
|
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
|
||||||
|| cumulation.refCnt() > 1) {
|
|
||||||
// Expand cumulation (by replace it) when either there is not more room in the buffer
|
|
||||||
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
|
|
||||||
// duplicate().retain().
|
|
||||||
//
|
|
||||||
// See:
|
|
||||||
// - https://github.com/netty/netty/issues/2327
|
|
||||||
// - https://github.com/netty/netty/issues/1764
|
|
||||||
expandCumulation(ctx, data.readableBytes());
|
|
||||||
}
|
|
||||||
cumulation.writeBytes(data);
|
|
||||||
data.release();
|
|
||||||
}
|
}
|
||||||
callDecode(ctx, cumulation, out);
|
callDecode(ctx, cumulation, out);
|
||||||
} catch (DecoderException e) {
|
} catch (DecoderException e) {
|
||||||
@ -188,13 +250,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expandCumulation(ChannelHandlerContext ctx, int readable) {
|
|
||||||
ByteBuf oldCumulation = cumulation;
|
|
||||||
cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);
|
|
||||||
cumulation.writeBytes(oldCumulation);
|
|
||||||
oldCumulation.release();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
if (cumulation != null && !first && cumulation.refCnt() == 1) {
|
if (cumulation != null && !first && cumulation.refCnt() == 1) {
|
||||||
@ -322,4 +377,24 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
decode(ctx, in, out);
|
decode(ctx, in, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
|
||||||
|
ByteBuf oldCumulation = cumulation;
|
||||||
|
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
|
||||||
|
cumulation.writeBytes(oldCumulation);
|
||||||
|
oldCumulation.release();
|
||||||
|
return cumulation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cumulate {@link ByteBuf}s.
|
||||||
|
*/
|
||||||
|
public interface Cumulator {
|
||||||
|
/**
|
||||||
|
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
|
||||||
|
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
|
||||||
|
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
|
||||||
|
*/
|
||||||
|
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -271,6 +271,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
|
public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
|
||||||
|
// As SslEngine.unwrap(....) only works with one ByteBuffer we should not try to use a CompositeByteBuf
|
||||||
|
// at the first place to make it as fast as possible.
|
||||||
|
super(false);
|
||||||
if (engine == null) {
|
if (engine == null) {
|
||||||
throw new NullPointerException("engine");
|
throw new NullPointerException("engine");
|
||||||
}
|
}
|
||||||
@ -477,7 +480,6 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
}
|
}
|
||||||
|
|
||||||
SSLEngineResult result = wrap(alloc, engine, buf, out);
|
SSLEngineResult result = wrap(alloc, engine, buf, out);
|
||||||
|
|
||||||
if (!buf.isReadable()) {
|
if (!buf.isReadable()) {
|
||||||
promise = pendingUnencryptedWrites.remove();
|
promise = pendingUnencryptedWrites.remove();
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user