SslHandler aggregation of plaintext data on write
Motivation: Each call to SSL_write may introduce about ~100 bytes of overhead. The OpenSslEngine (based upon OpenSSL) is not able to do gathering writes so this means each wrap operation will incur the ~100 byte overhead. This commit attempts to increase goodput by aggregating the plaintext in chunks of <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a>. If many small chunks are written this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations. Modifications: - Introduce SslHandlerCoalescingBufferQueue in SslHandler which will aggregate up to 2^14 chunks of plaintext by default - Introduce SslHandler#setWrapDataSize to control how much data should be aggregated for each write. Aggregation can be disabled by setting this value to <= 0. Result: Better goodput when using SslHandler and the OpenSslEngine.
This commit is contained in:
parent
f7b3caeddc
commit
86e653e04f
@ -145,6 +145,17 @@ public final class ByteBufUtil {
|
||||
return StringUtil.decodeHexDump(hexDump, fromIndex, length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to determine if the return value of {@link ByteBuf#ensureWritable(int, boolean)} means that there is
|
||||
* adequate space and a write operation will succeed.
|
||||
* @param ensureWritableResult The return value from {@link ByteBuf#ensureWritable(int, boolean)}.
|
||||
* @return {@code true} if {@code ensureWritableResult} means that there is adequate space and a write operation
|
||||
* will succeed.
|
||||
*/
|
||||
public static boolean ensureWritableSuccess(int ensureWritableResult) {
|
||||
return ensureWritableResult == 0 || ensureWritableResult == 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the hash code of the specified buffer. This method is
|
||||
* useful when implementing a new buffer type.
|
||||
|
@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.AbstractCoalescingBufferQueue;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelException;
|
||||
@ -31,7 +32,6 @@ import io.netty.channel.ChannelOutboundHandler;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ChannelPromiseNotifier;
|
||||
import io.netty.channel.PendingWriteQueue;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import io.netty.handler.codec.UnsupportedMessageTypeException;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
@ -43,6 +43,7 @@ import io.netty.util.concurrent.ImmediateExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.ThrowableUtil;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -66,6 +67,14 @@ import javax.net.ssl.SSLEngineResult.Status;
|
||||
import javax.net.ssl.SSLException;
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLEngineResult;
|
||||
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
|
||||
import javax.net.ssl.SSLEngineResult.Status;
|
||||
import javax.net.ssl.SSLException;
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
|
||||
import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
|
||||
|
||||
/**
|
||||
@ -180,6 +189,12 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
private static final ClosedChannelException CHANNEL_CLOSED = ThrowableUtil.unknownStackTrace(
|
||||
new ClosedChannelException(), SslHandler.class, "channelInactive(...)");
|
||||
|
||||
/**
|
||||
* <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
|
||||
* allowed by the TLS RFC.
|
||||
*/
|
||||
private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
|
||||
|
||||
private enum SslEngineType {
|
||||
TCNATIVE(true, COMPOSITE_CUMULATOR) {
|
||||
@Override
|
||||
@ -344,8 +359,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
private boolean sentFirstMessage;
|
||||
private boolean flushedBeforeHandshake;
|
||||
private boolean readDuringHandshake;
|
||||
private PendingWriteQueue pendingUnencryptedWrites;
|
||||
|
||||
private final SslHandlerCoalescingBufferQueue pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(16);
|
||||
private Promise<Channel> handshakePromise = new LazyChannelPromise();
|
||||
private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
|
||||
|
||||
@ -445,6 +459,31 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
this.handshakeTimeoutMillis = handshakeTimeoutMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
|
||||
* <p>
|
||||
* This value will partition data which is passed to write
|
||||
* {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows:
|
||||
* <ul>
|
||||
* <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
|
||||
* <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
|
||||
* <li>If {@code wrapDataSize > data size} Else if {@code wrapDataSize <= data size} then we will divide the data
|
||||
* into chunks of {@code wrapDataSize} when writing.</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
|
||||
* aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
|
||||
* to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
|
||||
* frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
|
||||
* be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
|
||||
* @param wrapDataSize the number of bytes which will be passed to each
|
||||
* {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
|
||||
*/
|
||||
@UnstableApi
|
||||
public final void setWrapDataSize(int wrapDataSize) {
|
||||
pendingUnencryptedWrites.wrapDataSize = wrapDataSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
|
||||
*/
|
||||
@ -610,7 +649,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!pendingUnencryptedWrites.isEmpty()) {
|
||||
// Check if queue is not empty first because create a new ChannelException is expensive
|
||||
pendingUnencryptedWrites.removeAndFailAll(new ChannelException("Pending write on removal of SslHandler"));
|
||||
pendingUnencryptedWrites.releaseAndFailAll(ctx,
|
||||
new ChannelException("Pending write on removal of SslHandler"));
|
||||
}
|
||||
if (engine instanceof ReferenceCounted) {
|
||||
((ReferenceCounted) engine).release();
|
||||
@ -660,7 +700,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
promise.setFailure(new UnsupportedMessageTypeException(msg, ByteBuf.class));
|
||||
return;
|
||||
}
|
||||
pendingUnencryptedWrites.add(msg, promise);
|
||||
pendingUnencryptedWrites.add((ByteBuf) msg, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -669,7 +709,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
// created with startTLS flag turned on.
|
||||
if (startTls && !sentFirstMessage) {
|
||||
sentFirstMessage = true;
|
||||
pendingUnencryptedWrites.removeAndWriteAll();
|
||||
pendingUnencryptedWrites.writeAndRemoveAll(ctx);
|
||||
forceFlush(ctx);
|
||||
return;
|
||||
}
|
||||
@ -709,15 +749,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
ByteBufAllocator alloc = ctx.alloc();
|
||||
boolean needUnwrap = false;
|
||||
try {
|
||||
final int wrapDataSize = pendingUnencryptedWrites.wrapDataSize;
|
||||
// Only continue to loop if the handler was not removed in the meantime.
|
||||
// See https://github.com/netty/netty/issues/5860
|
||||
while (!ctx.isRemoved()) {
|
||||
Object msg = pendingUnencryptedWrites.current();
|
||||
if (msg == null) {
|
||||
promise = ctx.newPromise();
|
||||
ByteBuf buf = wrapDataSize > 0 ?
|
||||
pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
|
||||
pendingUnencryptedWrites.removeFirst(promise);
|
||||
if (buf == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (out == null) {
|
||||
out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
|
||||
}
|
||||
@ -725,15 +768,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
SSLEngineResult result = wrap(alloc, engine, buf, out);
|
||||
|
||||
if (result.getStatus() == Status.CLOSED) {
|
||||
buf.release();
|
||||
promise.tryFailure(SSLENGINE_CLOSED);
|
||||
promise = null;
|
||||
// SSLEngine has been closed already.
|
||||
// Any further write attempts should be denied.
|
||||
pendingUnencryptedWrites.removeAndFailAll(SSLENGINE_CLOSED);
|
||||
pendingUnencryptedWrites.releaseAndFailAll(ctx, SSLENGINE_CLOSED);
|
||||
return;
|
||||
} else {
|
||||
if (!buf.isReadable()) {
|
||||
promise = pendingUnencryptedWrites.remove();
|
||||
if (buf.isReadable()) {
|
||||
pendingUnencryptedWrites.addFirst(buf);
|
||||
} else {
|
||||
promise = null;
|
||||
buf.release();
|
||||
}
|
||||
|
||||
switch (result.getHandshakeStatus()) {
|
||||
@ -1424,7 +1470,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
notifyHandshakeFailure(cause);
|
||||
} finally {
|
||||
// Ensure we remove and fail all pending writes in all cases and so release memory quickly.
|
||||
pendingUnencryptedWrites.removeAndFailAll(cause);
|
||||
pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1485,7 +1531,6 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
@Override
|
||||
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
pendingUnencryptedWrites = new PendingWriteQueue(ctx);
|
||||
|
||||
if (ctx.channel().isActive() && engine.getUseClientMode()) {
|
||||
// Begin the initial handshake.
|
||||
@ -1740,6 +1785,73 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
return allocate(ctx, engineType.calculateWrapBufferCapacity(this, pendingBytes, numComponents));
|
||||
}
|
||||
|
||||
/**
|
||||
* Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
|
||||
* goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
|
||||
* this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
|
||||
*/
|
||||
private static final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
|
||||
volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
|
||||
|
||||
SslHandlerCoalescingBufferQueue(int initSize) {
|
||||
super(initSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
|
||||
final int wrapDataSize = this.wrapDataSize;
|
||||
if (cumulation instanceof CompositeByteBuf) {
|
||||
CompositeByteBuf composite = (CompositeByteBuf) cumulation;
|
||||
int numComponents = composite.numComponents();
|
||||
if (numComponents == 0 ||
|
||||
!attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) {
|
||||
composite.addComponent(true, next);
|
||||
}
|
||||
return composite;
|
||||
}
|
||||
if (attemptCopyToCumulation(cumulation, next, wrapDataSize)) {
|
||||
return cumulation;
|
||||
}
|
||||
CompositeByteBuf composite = alloc.compositeDirectBuffer(size() + 2);
|
||||
composite.addComponent(true, cumulation);
|
||||
composite.addComponent(true, next);
|
||||
return composite;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
|
||||
if (first instanceof CompositeByteBuf) {
|
||||
CompositeByteBuf composite = (CompositeByteBuf) first;
|
||||
first = allocator.directBuffer(composite.readableBytes());
|
||||
first.writeBytes(composite);
|
||||
composite.release();
|
||||
}
|
||||
return first;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteBuf removeEmptyValue() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
|
||||
final int inReadableBytes = next.readableBytes();
|
||||
final int cumulationCapacity = cumulation.capacity();
|
||||
if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
|
||||
// Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
|
||||
// Only copy if there is enough space available and the capacity is large enough, and attempt to
|
||||
// resize if the capacity is small.
|
||||
(cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
|
||||
cumulationCapacity < wrapDataSize &&
|
||||
ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
|
||||
cumulation.writeBytes(next);
|
||||
next.release();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private final class LazyChannelPromise extends DefaultPromise<Channel> {
|
||||
|
||||
@Override
|
||||
|
@ -583,13 +583,21 @@ public class SslHandlerTest {
|
||||
SslProvider clientProvider = providers[j];
|
||||
if (isSupported(clientProvider)) {
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
true, true);
|
||||
true, true, true);
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
true, false);
|
||||
true, true, false);
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
false, true);
|
||||
true, false, true);
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
false, false);
|
||||
true, false, false);
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
false, true, true);
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
false, true, false);
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
false, false, true);
|
||||
compositeBufSizeEstimationGuaranteesSynchronousWrite(serverProvider, clientProvider,
|
||||
false, false, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -598,6 +606,7 @@ public class SslHandlerTest {
|
||||
|
||||
private static void compositeBufSizeEstimationGuaranteesSynchronousWrite(
|
||||
SslProvider serverProvider, SslProvider clientProvider,
|
||||
final boolean serverDisableWrapSize,
|
||||
final boolean letHandlerCreateServerEngine, final boolean letHandlerCreateClientEngine)
|
||||
throws CertificateException, SSLException, ExecutionException, InterruptedException {
|
||||
SelfSignedCertificate ssc = new SelfSignedCertificate();
|
||||
@ -630,11 +639,13 @@ public class SslHandlerTest {
|
||||
.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
if (letHandlerCreateServerEngine) {
|
||||
ch.pipeline().addLast(sslServerCtx.newHandler(ch.alloc()));
|
||||
} else {
|
||||
ch.pipeline().addLast(new SslHandler(sslServerCtx.newEngine(ch.alloc())));
|
||||
final SslHandler handler = letHandlerCreateServerEngine
|
||||
? sslServerCtx.newHandler(ch.alloc())
|
||||
: new SslHandler(sslServerCtx.newEngine(ch.alloc()));
|
||||
if (serverDisableWrapSize) {
|
||||
handler.setWrapDataSize(-1);
|
||||
}
|
||||
ch.pipeline().addLast(handler);
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
|
@ -0,0 +1,296 @@
|
||||
/*
|
||||
* Copyright 2017 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
|
||||
|
||||
@UnstableApi
|
||||
public abstract class AbstractCoalescingBufferQueue {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
|
||||
private final ArrayDeque<Object> bufAndListenerPairs;
|
||||
private int readableBytes;
|
||||
|
||||
public AbstractCoalescingBufferQueue(int initSize) {
|
||||
bufAndListenerPairs = new ArrayDeque<Object>(initSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the front of the queue.
|
||||
*/
|
||||
public final void addFirst(ByteBuf buf) {
|
||||
incrementReadableBytes(buf.readableBytes());
|
||||
// Listener would be added here, but since it is null there is no need. The assumption is there is already a
|
||||
// listener at the front of the queue, or there is a buffer at the front of the queue, which was spliced from
|
||||
// buf via remove().
|
||||
bufAndListenerPairs.addFirst(buf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the end of the queue.
|
||||
*/
|
||||
public final void add(ByteBuf buf) {
|
||||
add(buf, (ChannelFutureListener) null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the end of the queue and associate a promise with it that should be completed when
|
||||
* all the buffers bytes have been consumed from the queue and written.
|
||||
* @param buf to add to the tail of the queue
|
||||
* @param promise to complete when all the bytes have been consumed and written, can be void.
|
||||
*/
|
||||
public final void add(ByteBuf buf, ChannelPromise promise) {
|
||||
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||
// before we complete it's promise.
|
||||
add(buf, promise.isVoid() ? null : (ChannelFutureListener) new DelegatingChannelPromiseNotifier(promise));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the end of the queue and associate a listener with it that should be completed when
|
||||
* all the buffers bytes have been consumed from the queue and written.
|
||||
* @param buf to add to the tail of the queue
|
||||
* @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
|
||||
*/
|
||||
public final void add(ByteBuf buf, ChannelFutureListener listener) {
|
||||
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||
// before we complete it's promise.
|
||||
incrementReadableBytes(buf.readableBytes());
|
||||
bufAndListenerPairs.add(buf);
|
||||
if (listener != null) {
|
||||
bufAndListenerPairs.add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the first {@link ByteBuf} from the queue.
|
||||
* @param aggregatePromise used to aggregate the promises and listeners for the returned buffer.
|
||||
* @return the first {@link ByteBuf} from the queue.
|
||||
*/
|
||||
public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
|
||||
Object entry = bufAndListenerPairs.poll();
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
assert entry instanceof ByteBuf;
|
||||
ByteBuf result = (ByteBuf) entry;
|
||||
|
||||
readableBytes -= result.readableBytes();
|
||||
assert readableBytes >= 0;
|
||||
|
||||
entry = bufAndListenerPairs.peek();
|
||||
if (entry instanceof ChannelFutureListener) {
|
||||
aggregatePromise.addListener((ChannelFutureListener) entry);
|
||||
bufAndListenerPairs.poll();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
|
||||
* fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
|
||||
* completes.
|
||||
*
|
||||
* @param alloc The allocator used if a new {@link ByteBuf} is generated during the aggregation process.
|
||||
* @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
|
||||
* than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
|
||||
* @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
|
||||
* @return a {@link ByteBuf} composed of the enqueued buffers.
|
||||
*/
|
||||
public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
|
||||
checkPositiveOrZero(bytes, "bytes");
|
||||
checkNotNull(aggregatePromise, "aggregatePromise");
|
||||
|
||||
// Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
|
||||
if (bufAndListenerPairs.isEmpty()) {
|
||||
return removeEmptyValue();
|
||||
}
|
||||
bytes = Math.min(bytes, readableBytes);
|
||||
|
||||
ByteBuf toReturn = null;
|
||||
int originalBytes = bytes;
|
||||
for (;;) {
|
||||
Object entry = bufAndListenerPairs.poll();
|
||||
if (entry == null) {
|
||||
break;
|
||||
}
|
||||
if (entry instanceof ChannelFutureListener) {
|
||||
aggregatePromise.addListener((ChannelFutureListener) entry);
|
||||
continue;
|
||||
}
|
||||
ByteBuf entryBuffer = (ByteBuf) entry;
|
||||
if (entryBuffer.readableBytes() > bytes) {
|
||||
// Add the buffer back to the queue as we can't consume all of it.
|
||||
bufAndListenerPairs.addFirst(entryBuffer);
|
||||
if (bytes > 0) {
|
||||
// Take a slice of what we can consume and retain it.
|
||||
ByteBuf next = entryBuffer.readRetainedSlice(bytes);
|
||||
toReturn = toReturn == null ? composeFirst(alloc, next) : compose(alloc, toReturn, next);
|
||||
bytes = 0;
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
bytes -= entryBuffer.readableBytes();
|
||||
toReturn = toReturn == null ? composeFirst(alloc, entryBuffer) : compose(alloc, toReturn, entryBuffer);
|
||||
}
|
||||
}
|
||||
readableBytes -= originalBytes - bytes;
|
||||
assert readableBytes >= 0;
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of readable bytes.
|
||||
*/
|
||||
public final int readableBytes() {
|
||||
return readableBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Are there pending buffers in the queue.
|
||||
*/
|
||||
public final boolean isEmpty() {
|
||||
return bufAndListenerPairs.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Release all buffers in the queue and complete all listeners and promises.
|
||||
*/
|
||||
public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
|
||||
releaseAndCompleteAll(invoker.newFailedFuture(cause));
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy all pending entries in this queue into the destination queue.
|
||||
* @param dest to copy pending buffers to.
|
||||
*/
|
||||
public final void copyTo(AbstractCoalescingBufferQueue dest) {
|
||||
dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
|
||||
dest.readableBytes += readableBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes all remaining elements in this queue.
|
||||
* @param ctx The context to write all elements to.
|
||||
*/
|
||||
public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
|
||||
readableBytes = 0;
|
||||
Throwable pending = null;
|
||||
ByteBuf previousBuf = null;
|
||||
for (;;) {
|
||||
Object entry = bufAndListenerPairs.poll();
|
||||
try {
|
||||
if (entry == null) {
|
||||
if (previousBuf != null) {
|
||||
ctx.write(previousBuf, ctx.voidPromise());
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (entry instanceof ByteBuf) {
|
||||
if (previousBuf != null) {
|
||||
ctx.write(previousBuf, ctx.voidPromise());
|
||||
}
|
||||
previousBuf = (ByteBuf) entry;
|
||||
} else if (entry instanceof ChannelPromise) {
|
||||
ctx.write(previousBuf, (ChannelPromise) entry);
|
||||
previousBuf = null;
|
||||
} else {
|
||||
ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
|
||||
previousBuf = null;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
if (pending == null) {
|
||||
pending = t;
|
||||
} else {
|
||||
logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pending != null) {
|
||||
throw new IllegalStateException(pending);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the result of {@code current + next}.
|
||||
*/
|
||||
protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
|
||||
|
||||
/**
|
||||
* Calculate the first {@link ByteBuf} which will be used in subsequent calls to
|
||||
* {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}.
|
||||
*/
|
||||
protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
|
||||
return first;
|
||||
}
|
||||
|
||||
/**
|
||||
* The value to return when {@link #remove(ByteBufAllocator, int, ChannelPromise)} is called but the queue is empty.
|
||||
* @return the {@link ByteBuf} which represents an empty queue.
|
||||
*/
|
||||
protected abstract ByteBuf removeEmptyValue();
|
||||
|
||||
/**
|
||||
* Get the number of elements in this queue added via one of the {@link #add(ByteBuf)} methods.
|
||||
* @return the number of elements in this queue.
|
||||
*/
|
||||
protected final int size() {
|
||||
return bufAndListenerPairs.size();
|
||||
}
|
||||
|
||||
private void releaseAndCompleteAll(ChannelFuture future) {
|
||||
readableBytes = 0;
|
||||
Throwable pending = null;
|
||||
for (;;) {
|
||||
Object entry = bufAndListenerPairs.poll();
|
||||
if (entry == null) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
if (entry instanceof ByteBuf) {
|
||||
ReferenceCountUtil.safeRelease(entry);
|
||||
} else {
|
||||
((ChannelFutureListener) entry).operationComplete(future);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
if (pending == null) {
|
||||
pending = t;
|
||||
} else {
|
||||
logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pending != null) {
|
||||
throw new IllegalStateException(pending);
|
||||
}
|
||||
}
|
||||
|
||||
private void incrementReadableBytes(int increment) {
|
||||
int nextReadableBytes = readableBytes + increment;
|
||||
if (nextReadableBytes < readableBytes) {
|
||||
throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
|
||||
}
|
||||
readableBytes = nextReadableBytes;
|
||||
}
|
||||
}
|
@ -15,13 +15,11 @@
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
/**
|
||||
* A FIFO queue of bytes where producers add bytes by repeatedly adding {@link ByteBuf} and consumers take bytes in
|
||||
* arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes
|
||||
@ -34,60 +32,16 @@ import java.util.ArrayDeque;
|
||||
* <p>This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols
|
||||
* such as HTTP2.
|
||||
*/
|
||||
public final class CoalescingBufferQueue {
|
||||
|
||||
public final class CoalescingBufferQueue extends AbstractCoalescingBufferQueue {
|
||||
private final Channel channel;
|
||||
private final ArrayDeque<Object> bufAndListenerPairs;
|
||||
private int readableBytes;
|
||||
|
||||
public CoalescingBufferQueue(Channel channel) {
|
||||
this(channel, 4);
|
||||
}
|
||||
|
||||
public CoalescingBufferQueue(Channel channel, int initSize) {
|
||||
super(initSize);
|
||||
this.channel = ObjectUtil.checkNotNull(channel, "channel");
|
||||
bufAndListenerPairs = new ArrayDeque<Object>(initSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the end of the queue.
|
||||
*/
|
||||
public void add(ByteBuf buf) {
|
||||
add(buf, (ChannelFutureListener) null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the end of the queue and associate a promise with it that should be completed when
|
||||
* all the buffers bytes have been consumed from the queue and written.
|
||||
* @param buf to add to the tail of the queue
|
||||
* @param promise to complete when all the bytes have been consumed and written, can be void.
|
||||
*/
|
||||
public void add(ByteBuf buf, ChannelPromise promise) {
|
||||
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||
// before we complete it's promise.
|
||||
ObjectUtil.checkNotNull(promise, "promise");
|
||||
add(buf, promise.isVoid() ? null : new ChannelPromiseNotifier(promise));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the end of the queue and associate a listener with it that should be completed when
|
||||
* all the buffers bytes have been consumed from the queue and written.
|
||||
* @param buf to add to the tail of the queue
|
||||
* @param listener to notify when all the bytes have been consumed and written, can be {@code null}.
|
||||
*/
|
||||
public void add(ByteBuf buf, ChannelFutureListener listener) {
|
||||
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||
// before we complete it's promise.
|
||||
ObjectUtil.checkNotNull(buf, "buf");
|
||||
if (readableBytes > Integer.MAX_VALUE - buf.readableBytes()) {
|
||||
throw new IllegalStateException("buffer queue length overflow: " + readableBytes
|
||||
+ " + " + buf.readableBytes());
|
||||
}
|
||||
bufAndListenerPairs.add(buf);
|
||||
if (listener != null) {
|
||||
bufAndListenerPairs.add(listener);
|
||||
}
|
||||
readableBytes += buf.readableBytes();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -101,118 +55,33 @@ public final class CoalescingBufferQueue {
|
||||
* @return a {@link ByteBuf} composed of the enqueued buffers.
|
||||
*/
|
||||
public ByteBuf remove(int bytes, ChannelPromise aggregatePromise) {
|
||||
if (bytes < 0) {
|
||||
throw new IllegalArgumentException("bytes (expected >= 0): " + bytes);
|
||||
}
|
||||
ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise");
|
||||
|
||||
// Use isEmpty rather than readableBytes==0 as we may have a promise associated with an empty buffer.
|
||||
if (bufAndListenerPairs.isEmpty()) {
|
||||
return Unpooled.EMPTY_BUFFER;
|
||||
}
|
||||
bytes = Math.min(bytes, readableBytes);
|
||||
|
||||
ByteBuf toReturn = null;
|
||||
int originalBytes = bytes;
|
||||
for (;;) {
|
||||
Object entry = bufAndListenerPairs.poll();
|
||||
if (entry == null) {
|
||||
break;
|
||||
}
|
||||
if (entry instanceof ChannelFutureListener) {
|
||||
aggregatePromise.addListener((ChannelFutureListener) entry);
|
||||
continue;
|
||||
}
|
||||
ByteBuf entryBuffer = (ByteBuf) entry;
|
||||
if (entryBuffer.readableBytes() > bytes) {
|
||||
// Add the buffer back to the queue as we can't consume all of it.
|
||||
bufAndListenerPairs.addFirst(entryBuffer);
|
||||
if (bytes > 0) {
|
||||
// Take a slice of what we can consume and retain it.
|
||||
toReturn = compose(toReturn, entryBuffer.readRetainedSlice(bytes));
|
||||
bytes = 0;
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
toReturn = compose(toReturn, entryBuffer);
|
||||
bytes -= entryBuffer.readableBytes();
|
||||
}
|
||||
}
|
||||
readableBytes -= originalBytes - bytes;
|
||||
assert readableBytes >= 0;
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compose the current buffer with another.
|
||||
*/
|
||||
private ByteBuf compose(ByteBuf current, ByteBuf next) {
|
||||
if (current == null) {
|
||||
return next;
|
||||
}
|
||||
if (current instanceof CompositeByteBuf) {
|
||||
CompositeByteBuf composite = (CompositeByteBuf) current;
|
||||
composite.addComponent(true, next);
|
||||
return composite;
|
||||
}
|
||||
// Create a composite buffer to accumulate this pair and potentially all the buffers
|
||||
// in the queue. Using +2 as we have already dequeued current and next.
|
||||
CompositeByteBuf composite = channel.alloc().compositeBuffer(bufAndListenerPairs.size() + 2);
|
||||
composite.addComponent(true, current);
|
||||
composite.addComponent(true, next);
|
||||
return composite;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of readable bytes.
|
||||
*/
|
||||
public int readableBytes() {
|
||||
return readableBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Are there pending buffers in the queue.
|
||||
*/
|
||||
public boolean isEmpty() {
|
||||
return bufAndListenerPairs.isEmpty();
|
||||
return remove(channel.alloc(), bytes, aggregatePromise);
|
||||
}
|
||||
|
||||
/**
|
||||
* Release all buffers in the queue and complete all listeners and promises.
|
||||
*/
|
||||
public void releaseAndFailAll(Throwable cause) {
|
||||
releaseAndCompleteAll(channel.newFailedFuture(cause));
|
||||
releaseAndFailAll(channel, cause);
|
||||
}
|
||||
|
||||
private void releaseAndCompleteAll(ChannelFuture future) {
|
||||
readableBytes = 0;
|
||||
Throwable pending = null;
|
||||
for (;;) {
|
||||
Object entry = bufAndListenerPairs.poll();
|
||||
if (entry == null) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
if (entry instanceof ByteBuf) {
|
||||
ReferenceCountUtil.safeRelease(entry);
|
||||
} else {
|
||||
((ChannelFutureListener) entry).operationComplete(future);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
pending = t;
|
||||
}
|
||||
}
|
||||
if (pending != null) {
|
||||
throw new IllegalStateException(pending);
|
||||
@Override
|
||||
protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
|
||||
if (cumulation instanceof CompositeByteBuf) {
|
||||
CompositeByteBuf composite = (CompositeByteBuf) cumulation;
|
||||
composite.addComponent(true, next);
|
||||
return composite;
|
||||
}
|
||||
// Create a composite buffer to accumulate this pair and potentially all the buffers
|
||||
// in the queue. Using +2 as we have already dequeued current and next.
|
||||
CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
|
||||
composite.addComponent(true, cumulation);
|
||||
composite.addComponent(true, next);
|
||||
return composite;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy all pending entries in this queue into the destination queue.
|
||||
* @param dest to copy pending buffers to.
|
||||
*/
|
||||
public void copyTo(CoalescingBufferQueue dest) {
|
||||
dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
|
||||
dest.readableBytes += readableBytes;
|
||||
@Override
|
||||
protected ByteBuf removeEmptyValue() {
|
||||
return Unpooled.EMPTY_BUFFER;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,226 @@
|
||||
/*
|
||||
* Copyright 2017 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.internal.PromiseNotificationUtil;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
|
||||
@UnstableApi
|
||||
public final class DelegatingChannelPromiseNotifier implements ChannelPromise, ChannelFutureListener {
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(DelegatingChannelPromiseNotifier.class);
|
||||
private final ChannelPromise delegate;
|
||||
private final boolean logNotifyFailure;
|
||||
|
||||
public DelegatingChannelPromiseNotifier(ChannelPromise delegate) {
|
||||
this(delegate, true);
|
||||
}
|
||||
|
||||
public DelegatingChannelPromiseNotifier(ChannelPromise delegate, boolean logNotifyFailure) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
this.logNotifyFailure = logNotifyFailure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
InternalLogger internalLogger = logNotifyFailure ? logger : null;
|
||||
if (future.isSuccess()) {
|
||||
Void result = future.get();
|
||||
PromiseNotificationUtil.trySuccess(delegate, result, internalLogger);
|
||||
} else if (future.isCancelled()) {
|
||||
PromiseNotificationUtil.tryCancel(delegate, internalLogger);
|
||||
} else {
|
||||
Throwable cause = future.cause();
|
||||
PromiseNotificationUtil.tryFailure(delegate, cause, internalLogger);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel channel() {
|
||||
return delegate.channel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise setSuccess(Void result) {
|
||||
delegate.setSuccess(result);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise setSuccess() {
|
||||
delegate.setSuccess();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess() {
|
||||
return delegate.trySuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess(Void result) {
|
||||
return delegate.trySuccess(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise setFailure(Throwable cause) {
|
||||
delegate.setFailure(cause);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
|
||||
delegate.addListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>[] listeners) {
|
||||
delegate.addListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
|
||||
delegate.removeListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>[] listeners) {
|
||||
delegate.removeListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryFailure(Throwable cause) {
|
||||
return delegate.tryFailure(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean setUncancellable() {
|
||||
return delegate.setUncancellable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise await() throws InterruptedException {
|
||||
delegate.await();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise awaitUninterruptibly() {
|
||||
delegate.awaitUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVoid() {
|
||||
return delegate.isVoid();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise unvoid() {
|
||||
return isVoid() ? new DelegatingChannelPromiseNotifier(delegate.unvoid()) : this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
return delegate.await(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean await(long timeoutMillis) throws InterruptedException {
|
||||
return delegate.await(timeoutMillis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
|
||||
return delegate.awaitUninterruptibly(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitUninterruptibly(long timeoutMillis) {
|
||||
return delegate.awaitUninterruptibly(timeoutMillis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void getNow() {
|
||||
return delegate.getNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return delegate.cancel(mayInterruptIfRunning);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return delegate.isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return delegate.isDone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void get() throws InterruptedException, ExecutionException {
|
||||
return delegate.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
return delegate.get(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise sync() throws InterruptedException {
|
||||
delegate.sync();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise syncUninterruptibly() {
|
||||
delegate.syncUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSuccess() {
|
||||
return delegate.isSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancellable() {
|
||||
return delegate.isCancellable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable cause() {
|
||||
return delegate.cause();
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user