SslHandler aggregation prefer copy over CompositeByteBuf

Motivation:
SslHandler will do aggregation of writes by default in an attempt to improve goodput and reduce the number of discrete buffers which must be accumulated. However if aggregation is not possible then a CompositeByteBuf is used to accumulate multiple buffers. Using a CompositeByteBuf doesn't provide any of the benefits of better goodput and in the case of small + large writes (e.g. http/2 frame header + data) this can reduce the amount of data that can be passed to writev by about half. This has the impact of increasing latency as well as reducing goodput.

Modifications:
- SslHandler should prefer copying instead of using a CompositeByteBuf

Result:
Better goodput (and potentially improved latency) at the cost of copy operations.
This commit is contained in:
Scott Mitchell 2017-12-03 23:06:02 -08:00
parent ec368fea47
commit d7c977dd71
2 changed files with 30 additions and 8 deletions

View File

@ -1882,7 +1882,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
return composite; return composite;
} }
return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation : return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation :
composeIntoComposite(alloc, cumulation, next); copyAndCompose(alloc, cumulation, next);
} }
@Override @Override

View File

@ -17,16 +17,16 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import static io.netty.util.ReferenceCountUtil.safeRelease;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import static io.netty.util.internal.PlatformDependent.throwException;
@UnstableApi @UnstableApi
public abstract class AbstractCoalescingBufferQueue { public abstract class AbstractCoalescingBufferQueue {
@ -177,10 +177,10 @@ public abstract class AbstractCoalescingBufferQueue {
entryBuffer = null; entryBuffer = null;
} }
} catch (Throwable cause) { } catch (Throwable cause) {
ReferenceCountUtil.safeRelease(entryBuffer); safeRelease(entryBuffer);
ReferenceCountUtil.safeRelease(toReturn); safeRelease(toReturn);
aggregatePromise.setFailure(cause); aggregatePromise.setFailure(cause);
PlatformDependent.throwException(cause); throwException(cause);
} }
decrementReadableBytes(originalBytes - bytes); decrementReadableBytes(originalBytes - bytes);
return toReturn; return toReturn;
@ -276,11 +276,33 @@ public abstract class AbstractCoalescingBufferQueue {
composite.addComponent(true, next); composite.addComponent(true, next);
} catch (Throwable cause) { } catch (Throwable cause) {
composite.release(); composite.release();
PlatformDependent.throwException(cause); safeRelease(next);
throwException(cause);
} }
return composite; return composite;
} }
/**
* Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}.
* @param alloc The allocator to use to allocate the new buffer.
* @param cumulation The current cumulation.
* @param next The next buffer.
* @return The result of {@code cumulation + next}.
*/
protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
try {
newCumulation.writeBytes(cumulation).writeBytes(next);
} catch (Throwable cause) {
newCumulation.release();
safeRelease(next);
throwException(cause);
}
cumulation.release();
next.release();
return newCumulation;
}
/** /**
* Calculate the first {@link ByteBuf} which will be used in subsequent calls to * Calculate the first {@link ByteBuf} which will be used in subsequent calls to
* {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}. * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
@ -313,7 +335,7 @@ public abstract class AbstractCoalescingBufferQueue {
} }
try { try {
if (entry instanceof ByteBuf) { if (entry instanceof ByteBuf) {
ReferenceCountUtil.safeRelease(entry); safeRelease(entry);
} else { } else {
((ChannelFutureListener) entry).operationComplete(future); ((ChannelFutureListener) entry).operationComplete(future);
} }