From d7c977dd715396e2d25773da1c96f816f3338a08 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Sun, 3 Dec 2017 23:06:02 -0800 Subject: [PATCH] SslHandler aggregation prefer copy over CompositeByteBuf Motivation: SslHandler will do aggregation of writes by default in an attempt to improve goodput and reduce the number of discrete buffers which must be accumulated. However if aggregation is not possible then a CompositeByteBuf is used to accumulate multiple buffers. Using a CompositeByteBuf doesn't provide any of the benefits of better goodput and in the case of small + large writes (e.g. http/2 frame header + data) this can reduce the amount of data that can be passed to writev by about half. This has the impact of increasing latency as well as reducing goodput. Modifications: - SslHandler should prefer copying instead of using a CompositeByteBuf Result: Better goodput (and potentially improved latency) at the cost of copy operations. --- .../java/io/netty/handler/ssl/SslHandler.java | 2 +- .../AbstractCoalescingBufferQueue.java | 36 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 709030fd36..199bab2121 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -1882,7 +1882,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH return composite; } return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation : - composeIntoComposite(alloc, cumulation, next); + copyAndCompose(alloc, cumulation, next); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java index c15e9d5e39..552793ee92 100644 --- a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java @@ -17,16 +17,16 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayDeque; +import static io.netty.util.ReferenceCountUtil.safeRelease; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; +import static io.netty.util.internal.PlatformDependent.throwException; @UnstableApi public abstract class AbstractCoalescingBufferQueue { @@ -177,10 +177,10 @@ public abstract class AbstractCoalescingBufferQueue { entryBuffer = null; } } catch (Throwable cause) { - ReferenceCountUtil.safeRelease(entryBuffer); - ReferenceCountUtil.safeRelease(toReturn); + safeRelease(entryBuffer); + safeRelease(toReturn); aggregatePromise.setFailure(cause); - PlatformDependent.throwException(cause); + throwException(cause); } decrementReadableBytes(originalBytes - bytes); return toReturn; @@ -276,11 +276,33 @@ public abstract class AbstractCoalescingBufferQueue { composite.addComponent(true, next); } catch (Throwable cause) { composite.release(); - PlatformDependent.throwException(cause); + safeRelease(next); + throwException(cause); } return composite; } + /** + * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}. + * @param alloc The allocator to use to allocate the new buffer. + * @param cumulation The current cumulation. + * @param next The next buffer. + * @return The result of {@code cumulation + next}. + */ + protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) { + ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes()); + try { + newCumulation.writeBytes(cumulation).writeBytes(next); + } catch (Throwable cause) { + newCumulation.release(); + safeRelease(next); + throwException(cause); + } + cumulation.release(); + next.release(); + return newCumulation; + } + /** * Calculate the first {@link ByteBuf} which will be used in subsequent calls to * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}. @@ -313,7 +335,7 @@ public abstract class AbstractCoalescingBufferQueue { } try { if (entry instanceof ByteBuf) { - ReferenceCountUtil.safeRelease(entry); + safeRelease(entry); } else { ((ChannelFutureListener) entry).operationComplete(future); }