From 86e653e04fb452c92154e39cd7189615dc0ec323 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 8 May 2017 18:06:42 -0700 Subject: [PATCH] SslHandler aggregation of plaintext data on write Motivation: Each call to SSL_write may introduce about ~100 bytes of overhead. The OpenSslEngine (based upon OpenSSL) is not able to do gathering writes so this means each wrap operation will incur the ~100 byte overhead. This commit attempts to increase goodput by aggregating the plaintext in chunks of 2^14. If many small chunks are written this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations. Modifications: - Introduce SslHandlerCoalescingBufferQueue in SslHandler which will aggregate up to 2^14 chunks of plaintext by default - Introduce SslHandler#setWrapDataSize to control how much data should be aggregated for each write. Aggregation can be disabled by setting this value to <= 0. Result: Better goodput when using SslHandler and the OpenSslEngine. --- .../java/io/netty/buffer/ByteBufUtil.java | 11 + .../java/io/netty/handler/ssl/SslHandler.java | 142 ++++++++- .../io/netty/handler/ssl/SslHandlerTest.java | 27 +- .../AbstractCoalescingBufferQueue.java | 296 ++++++++++++++++++ .../netty/channel/CoalescingBufferQueue.java | 171 ++-------- .../DelegatingChannelPromiseNotifier.java | 226 +++++++++++++ 6 files changed, 699 insertions(+), 174 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java create mode 100644 transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java index e39e32d95a..86898c686d 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java @@ -145,6 +145,17 @@ public final class ByteBufUtil { return StringUtil.decodeHexDump(hexDump, fromIndex, length); } + /** + * Used to determine if the return value of {@link ByteBuf#ensureWritable(int, boolean)} means that there is + * adequate space and a write operation will succeed. + * @param ensureWritableResult The return value from {@link ByteBuf#ensureWritable(int, boolean)}. + * @return {@code true} if {@code ensureWritableResult} means that there is adequate space and a write operation + * will succeed. + */ + public static boolean ensureWritableSuccess(int ensureWritableResult) { + return ensureWritableResult == 0 || ensureWritableResult == 2; + } + /** * Calculates the hash code of the specified buffer. This method is * useful when implementing a new buffer type. diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 6c29e5d4e4..8af8e1a48a 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.AbstractCoalescingBufferQueue; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; @@ -31,7 +32,6 @@ import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromiseNotifier; -import io.netty.channel.PendingWriteQueue; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.UnsupportedMessageTypeException; import io.netty.util.ReferenceCounted; @@ -43,6 +43,7 @@ import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ThrowableUtil; +import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -66,6 +67,14 @@ import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; + +import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess; import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength; /** @@ -180,6 +189,12 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH private static final ClosedChannelException CHANNEL_CLOSED = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), SslHandler.class, "channelInactive(...)"); + /** + * 2^14 which is the maximum sized plaintext chunk + * allowed by the TLS RFC. + */ + private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024; + private enum SslEngineType { TCNATIVE(true, COMPOSITE_CUMULATOR) { @Override @@ -344,8 +359,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH private boolean sentFirstMessage; private boolean flushedBeforeHandshake; private boolean readDuringHandshake; - private PendingWriteQueue pendingUnencryptedWrites; - + private final SslHandlerCoalescingBufferQueue pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(16); private Promise handshakePromise = new LazyChannelPromise(); private final LazyChannelPromise sslClosePromise = new LazyChannelPromise(); @@ -445,6 +459,31 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH this.handshakeTimeoutMillis = handshakeTimeoutMillis; } + /** + * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call. + *

+ * This value will partition data which is passed to write + * {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows: + *

+ *

+ * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then + * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead + * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP) + * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and + * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive. + * @param wrapDataSize the number of bytes which will be passed to each + * {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call. + */ + @UnstableApi + public final void setWrapDataSize(int wrapDataSize) { + pendingUnencryptedWrites.wrapDataSize = wrapDataSize; + } + /** * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()} */ @@ -610,7 +649,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { if (!pendingUnencryptedWrites.isEmpty()) { // Check if queue is not empty first because create a new ChannelException is expensive - pendingUnencryptedWrites.removeAndFailAll(new ChannelException("Pending write on removal of SslHandler")); + pendingUnencryptedWrites.releaseAndFailAll(ctx, + new ChannelException("Pending write on removal of SslHandler")); } if (engine instanceof ReferenceCounted) { ((ReferenceCounted) engine).release(); @@ -660,7 +700,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH promise.setFailure(new UnsupportedMessageTypeException(msg, ByteBuf.class)); return; } - pendingUnencryptedWrites.add(msg, promise); + pendingUnencryptedWrites.add((ByteBuf) msg, promise); } @Override @@ -669,7 +709,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH // created with startTLS flag turned on. if (startTls && !sentFirstMessage) { sentFirstMessage = true; - pendingUnencryptedWrites.removeAndWriteAll(); + pendingUnencryptedWrites.writeAndRemoveAll(ctx); forceFlush(ctx); return; } @@ -709,15 +749,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH ByteBufAllocator alloc = ctx.alloc(); boolean needUnwrap = false; try { + final int wrapDataSize = pendingUnencryptedWrites.wrapDataSize; // Only continue to loop if the handler was not removed in the meantime. // See https://github.com/netty/netty/issues/5860 while (!ctx.isRemoved()) { - Object msg = pendingUnencryptedWrites.current(); - if (msg == null) { + promise = ctx.newPromise(); + ByteBuf buf = wrapDataSize > 0 ? + pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) : + pendingUnencryptedWrites.removeFirst(promise); + if (buf == null) { break; } - ByteBuf buf = (ByteBuf) msg; if (out == null) { out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount()); } @@ -725,15 +768,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH SSLEngineResult result = wrap(alloc, engine, buf, out); if (result.getStatus() == Status.CLOSED) { + buf.release(); + promise.tryFailure(SSLENGINE_CLOSED); + promise = null; // SSLEngine has been closed already. // Any further write attempts should be denied. - pendingUnencryptedWrites.removeAndFailAll(SSLENGINE_CLOSED); + pendingUnencryptedWrites.releaseAndFailAll(ctx, SSLENGINE_CLOSED); return; } else { - if (!buf.isReadable()) { - promise = pendingUnencryptedWrites.remove(); + if (buf.isReadable()) { + pendingUnencryptedWrites.addFirst(buf); } else { - promise = null; + buf.release(); } switch (result.getHandshakeStatus()) { @@ -1424,7 +1470,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH notifyHandshakeFailure(cause); } finally { // Ensure we remove and fail all pending writes in all cases and so release memory quickly. - pendingUnencryptedWrites.removeAndFailAll(cause); + pendingUnencryptedWrites.releaseAndFailAll(ctx, cause); } } @@ -1485,7 +1531,6 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH @Override public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; - pendingUnencryptedWrites = new PendingWriteQueue(ctx); if (ctx.channel().isActive() && engine.getUseClientMode()) { // Begin the initial handshake. @@ -1740,6 +1785,73 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH return allocate(ctx, engineType.calculateWrapBufferCapacity(this, pendingBytes, numComponents)); } + /** + * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase + * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written + * this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations. + */ + private static final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue { + volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH; + + SslHandlerCoalescingBufferQueue(int initSize) { + super(initSize); + } + + @Override + protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) { + final int wrapDataSize = this.wrapDataSize; + if (cumulation instanceof CompositeByteBuf) { + CompositeByteBuf composite = (CompositeByteBuf) cumulation; + int numComponents = composite.numComponents(); + if (numComponents == 0 || + !attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) { + composite.addComponent(true, next); + } + return composite; + } + if (attemptCopyToCumulation(cumulation, next, wrapDataSize)) { + return cumulation; + } + CompositeByteBuf composite = alloc.compositeDirectBuffer(size() + 2); + composite.addComponent(true, cumulation); + composite.addComponent(true, next); + return composite; + } + + @Override + protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) { + if (first instanceof CompositeByteBuf) { + CompositeByteBuf composite = (CompositeByteBuf) first; + first = allocator.directBuffer(composite.readableBytes()); + first.writeBytes(composite); + composite.release(); + } + return first; + } + + @Override + protected ByteBuf removeEmptyValue() { + return null; + } + + private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) { + final int inReadableBytes = next.readableBytes(); + final int cumulationCapacity = cumulation.capacity(); + if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes && + // Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize. + // Only copy if there is enough space available and the capacity is large enough, and attempt to + // resize if the capacity is small. + (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize || + cumulationCapacity < wrapDataSize && + ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) { + cumulation.writeBytes(next); + next.release(); + return true; + } + return false; + } + } + private final class LazyChannelPromise extends DefaultPromise { @Override diff --git a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java index 7cfd050c21..84559a7640 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java @@ -583,13 +583,21 @@ public class SslHandlerTest { SslProvider clientProvider = providers[j]; if (isSupported(clientProvider)) { compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, - true, true); + true, true, true); compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, - true, false); + true, true, false); compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, - false, true); + true, false, true); compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, - false, false); + true, false, false); + compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, + false, true, true); + compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, + false, true, false); + compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, + false, false, true); + compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider, + false, false, false); } } } @@ -598,6 +606,7 @@ public class SslHandlerTest { private static void compositeBufSizeEstimationGuaranteesSynchronousWrite( SslProvider serverProvider, SslProvider clientProvider, + final boolean serverDisableWrapSize, final boolean letHandlerCreateServerEngine, final boolean letHandlerCreateClientEngine) throws CertificateException, SSLException, ExecutionException, InterruptedException { SelfSignedCertificate ssc = new SelfSignedCertificate(); @@ -630,11 +639,13 @@ public class SslHandlerTest { .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { - if (letHandlerCreateServerEngine) { - ch.pipeline().addLast(sslServerCtx.newHandler(ch.alloc())); - } else { - ch.pipeline().addLast(new SslHandler(sslServerCtx.newEngine(ch.alloc()))); + final SslHandler handler = letHandlerCreateServerEngine + ? sslServerCtx.newHandler(ch.alloc()) + : new SslHandler(sslServerCtx.newEngine(ch.alloc())); + if (serverDisableWrapSize) { + handler.setWrapDataSize(-1); } + ch.pipeline().addLast(handler); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { diff --git a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java new file mode 100644 index 0000000000..b1b795c1be --- /dev/null +++ b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java @@ -0,0 +1,296 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.UnstableApi; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.ArrayDeque; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; + +@UnstableApi +public abstract class AbstractCoalescingBufferQueue { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class); + private final ArrayDeque bufAndListenerPairs; + private int readableBytes; + + public AbstractCoalescingBufferQueue(int initSize) { + bufAndListenerPairs = new ArrayDeque(initSize); + } + + /** + * Add a buffer to the front of the queue. + */ + public final void addFirst(ByteBuf buf) { + incrementReadableBytes(buf.readableBytes()); + // Listener would be added here, but since it is null there is no need. The assumption is there is already a + // listener at the front of the queue, or there is a buffer at the front of the queue, which was spliced from + // buf via remove(). + bufAndListenerPairs.addFirst(buf); + } + + /** + * Add a buffer to the end of the queue. + */ + public final void add(ByteBuf buf) { + add(buf, (ChannelFutureListener) null); + } + + /** + * Add a buffer to the end of the queue and associate a promise with it that should be completed when + * all the buffers bytes have been consumed from the queue and written. + * @param buf to add to the tail of the queue + * @param promise to complete when all the bytes have been consumed and written, can be void. + */ + public final void add(ByteBuf buf, ChannelPromise promise) { + // buffers are added before promises so that we naturally 'consume' the entire buffer during removal + // before we complete it's promise. + add(buf, promise.isVoid() ? null : (ChannelFutureListener) new DelegatingChannelPromiseNotifier(promise)); + } + + /** + * Add a buffer to the end of the queue and associate a listener with it that should be completed when + * all the buffers bytes have been consumed from the queue and written. + * @param buf to add to the tail of the queue + * @param listener to notify when all the bytes have been consumed and written, can be {@code null}. + */ + public final void add(ByteBuf buf, ChannelFutureListener listener) { + // buffers are added before promises so that we naturally 'consume' the entire buffer during removal + // before we complete it's promise. + incrementReadableBytes(buf.readableBytes()); + bufAndListenerPairs.add(buf); + if (listener != null) { + bufAndListenerPairs.add(listener); + } + } + + /** + * Remove the first {@link ByteBuf} from the queue. + * @param aggregatePromise used to aggregate the promises and listeners for the returned buffer. + * @return the first {@link ByteBuf} from the queue. + */ + public final ByteBuf removeFirst(ChannelPromise aggregatePromise) { + Object entry = bufAndListenerPairs.poll(); + if (entry == null) { + return null; + } + assert entry instanceof ByteBuf; + ByteBuf result = (ByteBuf) entry; + + readableBytes -= result.readableBytes(); + assert readableBytes >= 0; + + entry = bufAndListenerPairs.peek(); + if (entry instanceof ChannelFutureListener) { + aggregatePromise.addListener((ChannelFutureListener) entry); + bufAndListenerPairs.poll(); + } + return result; + } + + /** + * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are + * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise} + * completes. + * + * @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process. + * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater + * than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned. + * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers. + * @return a {@link ByteBuf} composed of the enqueued buffers. + */ + public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) { + checkPositiveOrZero(bytes, "bytes"); + checkNotNull(aggregatePromise, "aggregatePromise"); + + // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer. + if (bufAndListenerPairs.isEmpty()) { + return removeEmptyValue(); + } + bytes = Math.min(bytes, readableBytes); + + ByteBuf toReturn = null; + int originalBytes = bytes; + for (;;) { + Object entry = bufAndListenerPairs.poll(); + if (entry == null) { + break; + } + if (entry instanceof ChannelFutureListener) { + aggregatePromise.addListener((ChannelFutureListener) entry); + continue; + } + ByteBuf entryBuffer = (ByteBuf) entry; + if (entryBuffer.readableBytes() > bytes) { + // Add the buffer back to the queue as we can't consume all of it. + bufAndListenerPairs.addFirst(entryBuffer); + if (bytes > 0) { + // Take a slice of what we can consume and retain it. + ByteBuf next = entryBuffer.readRetainedSlice(bytes); + toReturn = toReturn == null ? composeFirst(alloc, next) : compose(alloc, toReturn, next); + bytes = 0; + } + break; + } else { + bytes -= entryBuffer.readableBytes(); + toReturn = toReturn == null ? composeFirst(alloc, entryBuffer) : compose(alloc, toReturn, entryBuffer); + } + } + readableBytes -= originalBytes - bytes; + assert readableBytes >= 0; + return toReturn; + } + + /** + * The number of readable bytes. + */ + public final int readableBytes() { + return readableBytes; + } + + /** + * Are there pending buffers in the queue. + */ + public final boolean isEmpty() { + return bufAndListenerPairs.isEmpty(); + } + + /** + * Release all buffers in the queue and complete all listeners and promises. + */ + public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) { + releaseAndCompleteAll(invoker.newFailedFuture(cause)); + } + + /** + * Copy all pending entries in this queue into the destination queue. + * @param dest to copy pending buffers to. + */ + public final void copyTo(AbstractCoalescingBufferQueue dest) { + dest.bufAndListenerPairs.addAll(bufAndListenerPairs); + dest.readableBytes += readableBytes; + } + + /** + * Writes all remaining elements in this queue. + * @param ctx The context to write all elements to. + */ + public final void writeAndRemoveAll(ChannelHandlerContext ctx) { + readableBytes = 0; + Throwable pending = null; + ByteBuf previousBuf = null; + for (;;) { + Object entry = bufAndListenerPairs.poll(); + try { + if (entry == null) { + if (previousBuf != null) { + ctx.write(previousBuf, ctx.voidPromise()); + } + break; + } + + if (entry instanceof ByteBuf) { + if (previousBuf != null) { + ctx.write(previousBuf, ctx.voidPromise()); + } + previousBuf = (ByteBuf) entry; + } else if (entry instanceof ChannelPromise) { + ctx.write(previousBuf, (ChannelPromise) entry); + previousBuf = null; + } else { + ctx.write(previousBuf).addListener((ChannelFutureListener) entry); + previousBuf = null; + } + } catch (Throwable t) { + if (pending == null) { + pending = t; + } else { + logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t); + } + } + } + if (pending != null) { + throw new IllegalStateException(pending); + } + } + + /** + * Calculate the result of {@code current + next}. + */ + protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next); + + /** + * Calculate the first {@link ByteBuf} which will be used in subsequent calls to + * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}. + */ + protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) { + return first; + } + + /** + * The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty. + * @return the {@link ByteBuf} which represents an empty queue. + */ + protected abstract ByteBuf removeEmptyValue(); + + /** + * Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods. + * @return the number of elements in this queue. + */ + protected final int size() { + return bufAndListenerPairs.size(); + } + + private void releaseAndCompleteAll(ChannelFuture future) { + readableBytes = 0; + Throwable pending = null; + for (;;) { + Object entry = bufAndListenerPairs.poll(); + if (entry == null) { + break; + } + try { + if (entry instanceof ByteBuf) { + ReferenceCountUtil.safeRelease(entry); + } else { + ((ChannelFutureListener) entry).operationComplete(future); + } + } catch (Throwable t) { + if (pending == null) { + pending = t; + } else { + logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t); + } + } + } + if (pending != null) { + throw new IllegalStateException(pending); + } + } + + private void incrementReadableBytes(int increment) { + int nextReadableBytes = readableBytes + increment; + if (nextReadableBytes < readableBytes) { + throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment); + } + readableBytes = nextReadableBytes; + } +} diff --git a/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java index 439d6a4053..8f26dbaf31 100644 --- a/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/CoalescingBufferQueue.java @@ -15,13 +15,11 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.ObjectUtil; -import java.util.ArrayDeque; - /** * A FIFO queue of bytes where producers add bytes by repeatedly adding {@link ByteBuf} and consumers take bytes in * arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes @@ -34,60 +32,16 @@ import java.util.ArrayDeque; *

This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols * such as HTTP2. */ -public final class CoalescingBufferQueue { - +public final class CoalescingBufferQueue extends AbstractCoalescingBufferQueue { private final Channel channel; - private final ArrayDeque bufAndListenerPairs; - private int readableBytes; public CoalescingBufferQueue(Channel channel) { this(channel, 4); } public CoalescingBufferQueue(Channel channel, int initSize) { + super(initSize); this.channel = ObjectUtil.checkNotNull(channel, "channel"); - bufAndListenerPairs = new ArrayDeque(initSize); - } - - /** - * Add a buffer to the end of the queue. - */ - public void add(ByteBuf buf) { - add(buf, (ChannelFutureListener) null); - } - - /** - * Add a buffer to the end of the queue and associate a promise with it that should be completed when - * all the buffers bytes have been consumed from the queue and written. - * @param buf to add to the tail of the queue - * @param promise to complete when all the bytes have been consumed and written, can be void. - */ - public void add(ByteBuf buf, ChannelPromise promise) { - // buffers are added before promises so that we naturally 'consume' the entire buffer during removal - // before we complete it's promise. - ObjectUtil.checkNotNull(promise, "promise"); - add(buf, promise.isVoid() ? null : new ChannelPromiseNotifier(promise)); - } - - /** - * Add a buffer to the end of the queue and associate a listener with it that should be completed when - * all the buffers bytes have been consumed from the queue and written. - * @param buf to add to the tail of the queue - * @param listener to notify when all the bytes have been consumed and written, can be {@code null}. - */ - public void add(ByteBuf buf, ChannelFutureListener listener) { - // buffers are added before promises so that we naturally 'consume' the entire buffer during removal - // before we complete it's promise. - ObjectUtil.checkNotNull(buf, "buf"); - if (readableBytes > Integer.MAX_VALUE - buf.readableBytes()) { - throw new IllegalStateException("buffer queue length overflow: " + readableBytes - + " + " + buf.readableBytes()); - } - bufAndListenerPairs.add(buf); - if (listener != null) { - bufAndListenerPairs.add(listener); - } - readableBytes += buf.readableBytes(); } /** @@ -101,118 +55,33 @@ public final class CoalescingBufferQueue { * @return a {@link ByteBuf} composed of the enqueued buffers. */ public ByteBuf remove(int bytes, ChannelPromise aggregatePromise) { - if (bytes < 0) { - throw new IllegalArgumentException("bytes (expected >= 0): " + bytes); - } - ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise"); - - // Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer. - if (bufAndListenerPairs.isEmpty()) { - return Unpooled.EMPTY_BUFFER; - } - bytes = Math.min(bytes, readableBytes); - - ByteBuf toReturn = null; - int originalBytes = bytes; - for (;;) { - Object entry = bufAndListenerPairs.poll(); - if (entry == null) { - break; - } - if (entry instanceof ChannelFutureListener) { - aggregatePromise.addListener((ChannelFutureListener) entry); - continue; - } - ByteBuf entryBuffer = (ByteBuf) entry; - if (entryBuffer.readableBytes() > bytes) { - // Add the buffer back to the queue as we can't consume all of it. - bufAndListenerPairs.addFirst(entryBuffer); - if (bytes > 0) { - // Take a slice of what we can consume and retain it. - toReturn = compose(toReturn, entryBuffer.readRetainedSlice(bytes)); - bytes = 0; - } - break; - } else { - toReturn = compose(toReturn, entryBuffer); - bytes -= entryBuffer.readableBytes(); - } - } - readableBytes -= originalBytes - bytes; - assert readableBytes >= 0; - return toReturn; - } - - /** - * Compose the current buffer with another. - */ - private ByteBuf compose(ByteBuf current, ByteBuf next) { - if (current == null) { - return next; - } - if (current instanceof CompositeByteBuf) { - CompositeByteBuf composite = (CompositeByteBuf) current; - composite.addComponent(true, next); - return composite; - } - // Create a composite buffer to accumulate this pair and potentially all the buffers - // in the queue. Using +2 as we have already dequeued current and next. - CompositeByteBuf composite = channel.alloc().compositeBuffer(bufAndListenerPairs.size() + 2); - composite.addComponent(true, current); - composite.addComponent(true, next); - return composite; - } - - /** - * The number of readable bytes. - */ - public int readableBytes() { - return readableBytes; - } - - /** - * Are there pending buffers in the queue. - */ - public boolean isEmpty() { - return bufAndListenerPairs.isEmpty(); + return remove(channel.alloc(), bytes, aggregatePromise); } /** * Release all buffers in the queue and complete all listeners and promises. */ public void releaseAndFailAll(Throwable cause) { - releaseAndCompleteAll(channel.newFailedFuture(cause)); + releaseAndFailAll(channel, cause); } - private void releaseAndCompleteAll(ChannelFuture future) { - readableBytes = 0; - Throwable pending = null; - for (;;) { - Object entry = bufAndListenerPairs.poll(); - if (entry == null) { - break; - } - try { - if (entry instanceof ByteBuf) { - ReferenceCountUtil.safeRelease(entry); - } else { - ((ChannelFutureListener) entry).operationComplete(future); - } - } catch (Throwable t) { - pending = t; - } - } - if (pending != null) { - throw new IllegalStateException(pending); + @Override + protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) { + if (cumulation instanceof CompositeByteBuf) { + CompositeByteBuf composite = (CompositeByteBuf) cumulation; + composite.addComponent(true, next); + return composite; } + // Create a composite buffer to accumulate this pair and potentially all the buffers + // in the queue. Using +2 as we have already dequeued current and next. + CompositeByteBuf composite = alloc.compositeBuffer(size() + 2); + composite.addComponent(true, cumulation); + composite.addComponent(true, next); + return composite; } - /** - * Copy all pending entries in this queue into the destination queue. - * @param dest to copy pending buffers to. - */ - public void copyTo(CoalescingBufferQueue dest) { - dest.bufAndListenerPairs.addAll(bufAndListenerPairs); - dest.readableBytes += readableBytes; + @Override + protected ByteBuf removeEmptyValue() { + return Unpooled.EMPTY_BUFFER; } } diff --git a/transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java b/transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java new file mode 100644 index 0000000000..39d1719b29 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java @@ -0,0 +1,226 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.internal.PromiseNotificationUtil; +import io.netty.util.internal.UnstableApi; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +@UnstableApi +public final class DelegatingChannelPromiseNotifier implements ChannelPromise, ChannelFutureListener { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(DelegatingChannelPromiseNotifier.class); + private final ChannelPromise delegate; + private final boolean logNotifyFailure; + + public DelegatingChannelPromiseNotifier(ChannelPromise delegate) { + this(delegate, true); + } + + public DelegatingChannelPromiseNotifier(ChannelPromise delegate, boolean logNotifyFailure) { + this.delegate = checkNotNull(delegate, "delegate"); + this.logNotifyFailure = logNotifyFailure; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + InternalLogger internalLogger = logNotifyFailure ? logger : null; + if (future.isSuccess()) { + Void result = future.get(); + PromiseNotificationUtil.trySuccess(delegate, result, internalLogger); + } else if (future.isCancelled()) { + PromiseNotificationUtil.tryCancel(delegate, internalLogger); + } else { + Throwable cause = future.cause(); + PromiseNotificationUtil.tryFailure(delegate, cause, internalLogger); + } + } + + @Override + public Channel channel() { + return delegate.channel(); + } + + @Override + public ChannelPromise setSuccess(Void result) { + delegate.setSuccess(result); + return this; + } + + @Override + public ChannelPromise setSuccess() { + delegate.setSuccess(); + return this; + } + + @Override + public boolean trySuccess() { + return delegate.trySuccess(); + } + + @Override + public boolean trySuccess(Void result) { + return delegate.trySuccess(result); + } + + @Override + public ChannelPromise setFailure(Throwable cause) { + delegate.setFailure(cause); + return this; + } + + @Override + public ChannelPromise addListener(GenericFutureListener> listener) { + delegate.addListener(listener); + return this; + } + + @Override + public ChannelPromise addListeners(GenericFutureListener>[] listeners) { + delegate.addListeners(listeners); + return this; + } + + @Override + public ChannelPromise removeListener(GenericFutureListener> listener) { + delegate.removeListener(listener); + return this; + } + + @Override + public ChannelPromise removeListeners(GenericFutureListener>[] listeners) { + delegate.removeListeners(listeners); + return this; + } + + @Override + public boolean tryFailure(Throwable cause) { + return delegate.tryFailure(cause); + } + + @Override + public boolean setUncancellable() { + return delegate.setUncancellable(); + } + + @Override + public ChannelPromise await() throws InterruptedException { + delegate.await(); + return this; + } + + @Override + public ChannelPromise awaitUninterruptibly() { + delegate.awaitUninterruptibly(); + return this; + } + + @Override + public boolean isVoid() { + return delegate.isVoid(); + } + + @Override + public ChannelPromise unvoid() { + return isVoid() ? new DelegatingChannelPromiseNotifier(delegate.unvoid()) : this; + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.await(timeout, unit); + } + + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return delegate.await(timeoutMillis); + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + return delegate.awaitUninterruptibly(timeout, unit); + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + return delegate.awaitUninterruptibly(timeoutMillis); + } + + @Override + public Void getNow() { + return delegate.getNow(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + @Override + public ChannelPromise sync() throws InterruptedException { + delegate.sync(); + return this; + } + + @Override + public ChannelPromise syncUninterruptibly() { + delegate.syncUninterruptibly(); + return this; + } + + @Override + public boolean isSuccess() { + return delegate.isSuccess(); + } + + @Override + public boolean isCancellable() { + return delegate.isCancellable(); + } + + @Override + public Throwable cause() { + return delegate.cause(); + } +}