diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 2d261c83d5..d86eb49b02 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -80,6 +80,8 @@ public final class PlatformDependent { private static final int BIT_MODE = bitMode0(); + private static final int ADDRESS_SIZE = addressSize0(); + static { if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED); @@ -173,6 +175,22 @@ public final class PlatformDependent { return BIT_MODE; } + /** + * Return the address size of the OS. + * 4 (for 32 bits systems ) and 8 (for 64 bits systems). + */ + public static int addressSize() { + return ADDRESS_SIZE; + } + + public static long allocateMemory(long size) { + return PlatformDependent0.allocateMemory(size); + } + + public static void freeMemory(long address) { + PlatformDependent0.freeMemory(address); + } + /** * Raises an exception bypassing compiler checks for checked exceptions. */ @@ -815,6 +833,13 @@ public final class PlatformDependent { } } + private static int addressSize0() { + if (!hasUnsafe()) { + return -1; + } + return PlatformDependent0.addressSize(); + } + private PlatformDependent() { // only static method supported } diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index 39414f3c6f..d2398275a2 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -365,6 +365,18 @@ final class PlatformDependent0 { } } + static int addressSize() { + return UNSAFE.addressSize(); + } + + static long allocateMemory(long size) { + return UNSAFE.allocateMemory(size); + } + + static void freeMemory(long address) { + UNSAFE.freeMemory(address); + } + private PlatformDependent0() { } diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index a3563c16f9..d6ed88d701 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -44,7 +44,7 @@ jfieldID limitFieldId = NULL; jfieldID fileChannelFieldId = NULL; jfieldID transferedFieldId = NULL; jfieldID fdFieldId = NULL; -jfieldID fileDescriptorFieldId = NULL; +jfieldID fileDescriptorFieldId = NULL;; jmethodID inetSocketAddrMethodId = NULL; jmethodID datagramSocketAddrMethodId = NULL; jclass runtimeExceptionClass = NULL; @@ -384,6 +384,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } socketType = socket_type(); + datagramSocketAddrMethodId = (*env)->GetMethodID(env, datagramSocketAddressClass, "", "(Ljava/lang/String;II)V"); if (datagramSocketAddrMethodId == NULL) { throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress"); @@ -671,6 +672,29 @@ JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFromAddress(JNI return recvFrom0(env, fd, (void*) address, pos, limit); } +jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec * iov, jint length) { + ssize_t res; + int err; + do { + res = writev(fd, iov, length); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // network stack is saturated we will try again later + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while writev(...): ", err)); + return -1; + } + return (jlong) res; +} + JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { struct iovec iov[length]; int iovidx = 0; @@ -709,26 +733,12 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, // See https://github.com/netty/netty/issues/2623 (*env)->DeleteLocalRef(env, bufObj); } - ssize_t res; - int err; - do { - res = writev(fd, iov, length); - // keep on writing if it was interrupted - } while(res == -1 && ((err = errno) == EINTR)); + return writev0(env, clazz, fd, iov, length); +} - if (res < 0) { - if (err == EAGAIN || err == EWOULDBLOCK) { - // network stack is saturated we will try again later - return 0; - } - if (err == EBADF) { - throwClosedChannelException(env); - return -1; - } - throwIOException(env, exceptionMessage("Error while writev(...): ", err)); - return -1; - } - return res; +JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length) { + struct iovec * iov = (struct iovec *) memoryAddress; + return writev0(env, clazz, fd, iov, length); } jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { @@ -1156,4 +1166,4 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) { return IOV_MAX; -} \ No newline at end of file +} diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index ef88890084..e964d0bb48 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -44,6 +44,7 @@ void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length); +jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length); jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index f33cfb3efa..5215a8a00b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -32,6 +32,7 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -111,7 +112,28 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } boolean done = false; long writtenBytes = 0; - if (buf.nioBufferCount() == 1) { + if (buf.hasMemoryAddress()) { + long memoryAddress = buf.memoryAddress(); + int readerIndex = buf.readerIndex(); + int writerIndex = buf.writerIndex(); + for (;;) { + int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex); + if (localFlushedAmount > 0) { + writtenBytes += localFlushedAmount; + if (writtenBytes == readableBytes) { + done = true; + break; + } + readerIndex += localFlushedAmount; + } else { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + } + updateOutboundBuffer(in, writtenBytes); + return done; + } else if (buf.nioBufferCount() == 1) { int readerIndex = buf.readerIndex(); ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes()); for (;;) { @@ -131,91 +153,114 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } - updateOutboundBuffer(in, writtenBytes, 1, done); + updateOutboundBuffer(in, writtenBytes); return done; } else { ByteBuffer[] nioBuffers = buf.nioBuffers(); - return writeBytesMultiple(in, 1, nioBuffers, nioBuffers.length, readableBytes); + return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); } } private boolean writeBytesMultiple( - ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, + ChannelOutboundBuffer in, IovArray array) throws IOException { + boolean done = false; + long expectedWrittenBytes = array.size(); + int cnt = array.count(); + long writtenBytes = 0; + int offset = 0; + int end = offset + cnt; + for (;;) { + long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt); + if (localWrittenBytes == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + + if (expectedWrittenBytes == 0) { + // Written everything, just break out here (fast-path) + done = true; + break; + } + + do { + long bytes = array.processWritten(offset, localWrittenBytes); + if (bytes == -1) { + // incomplete write + break; + } else { + offset++; + cnt--; + localWrittenBytes -= bytes; + } + } while (offset < end && localWrittenBytes > 0); + } + + updateOutboundBuffer(in, writtenBytes); + return done; + } + + private boolean writeBytesMultiple( + ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes) throws IOException { boolean done = false; long writtenBytes = 0; int offset = 0; int end = offset + nioBufferCnt; - loop: while (nioBufferCnt > 0) { - for (;;) { - int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt; - - long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); - if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break loop; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - if (expectedWrittenBytes == 0) { - // Written everything, just break out here (fast-path) - done = true; - break loop; - } - do { - ByteBuffer buffer = nioBuffers[offset]; - int pos = buffer.position(); - int bytes = buffer.limit() - pos; - if (bytes > localWrittenBytes) { - buffer.position(pos + (int) localWrittenBytes); - // incomplete write - break; - } else { - offset++; - nioBufferCnt--; - localWrittenBytes -= bytes; - } - } while (offset < end && localWrittenBytes > 0); + for (;;) { + long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt); + if (localWrittenBytes == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + + if (expectedWrittenBytes == 0) { + // Written everything, just break out here (fast-path) + done = true; + break; + } + do { + ByteBuffer buffer = nioBuffers[offset]; + int pos = buffer.position(); + int bytes = buffer.limit() - pos; + if (bytes > localWrittenBytes) { + buffer.position(pos + (int) localWrittenBytes); + // incomplete write + break; + } else { + offset++; + nioBufferCnt--; + localWrittenBytes -= bytes; + } + } while (offset < end && localWrittenBytes > 0); } - updateOutboundBuffer(in, writtenBytes, msgCount, done); + updateOutboundBuffer(in, writtenBytes); return done; } - private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, - boolean done) { - if (done) { - // Release all buffers - for (int i = msgCount; i > 0; i --) { - final ByteBuf buf = (ByteBuf) in.current(); - in.progress(buf.readableBytes()); + private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes) { + for (;;) { + final ByteBuf buf = (ByteBuf) in.current(); + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes < writtenBytes) { + in.progress(readableBytes); in.remove(); - } - } else { - // Did not write all buffers completely. - // Release the fully written buffers and update the indexes of the partially written buffer. - - // Did not write all buffers completely. - // Release the fully written buffers and update the indexes of the partially written buffer. - for (int i = msgCount; i > 0; i --) { - final ByteBuf buf = (ByteBuf) in.current(); - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; - - if (readableBytes < writtenBytes) { - in.progress(readableBytes); - in.remove(); - writtenBytes -= readableBytes; - } else if (readableBytes > writtenBytes) { - buf.readerIndex(readerIndex + (int) writtenBytes); - in.progress(writtenBytes); - break; - } else { // readable == writtenBytes - in.progress(readableBytes); - in.remove(); - break; - } + writtenBytes -= readableBytes; + } else if (readableBytes > writtenBytes) { + buf.readerIndex(readerIndex + (int) writtenBytes); + in.progress(writtenBytes); + break; + } else { // readable == writtenBytes + in.progress(readableBytes); + in.remove(); + break; } } } @@ -268,21 +313,38 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So // Do gathering write if: // * the outbound buffer contains more than one messages and // * they are all buffers rather than a file region. - if (msgCount > 1) { - // Ensure the pending writes are made of ByteBufs only. - ByteBuffer[] nioBuffers = in.nioBuffers(); - int nioBufferCount = in.nioBufferCount(); - if (nioBufferCount != 0) { - if (!writeBytesMultiple(in, msgCount, nioBuffers, nioBufferCount, in.nioBufferSize())) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - break; - } + if (msgCount >= 1) { + if (PlatformDependent.hasUnsafe()) { + // this means we can cast to IovArray and write the IovArray directly. + IovArray array = IovArray.get(in); + int cnt = array.count(); + if (cnt > 1) { + if (!writeBytesMultiple(in, array)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + break; + } - // We do not break the loop here even if the outbound buffer was flushed completely, - // because a user might have triggered another write and flush when we notify his or her - // listeners. - continue; + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. + continue; + } + } else { + ByteBuffer[] buffers = in.nioBuffers(); + int cnt = in.nioBufferCount(); + if (cnt > 1) { + if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + break; + } + + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. + continue; + } } } @@ -362,15 +424,15 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So public void write(Object msg, ChannelPromise promise) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - if (!buf.isDirect()) { - // We can only handle direct buffers so we need to copy if a non direct is + if (PlatformDependent.hasUnsafe() && !buf.hasMemoryAddress()) { + // We can only handle buffers with memory address so we need to copy if a non direct is // passed to write. int readable = buf.readableBytes(); ByteBuf dst = alloc().directBuffer(readable); dst.writeBytes(buf, buf.readerIndex(), readable); - buf.release(); msg = dst; + assert dst.hasMemoryAddress(); } } super.write(msg, promise); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java new file mode 100644 index 0000000000..1e060ae7e1 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java @@ -0,0 +1,166 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.internal.PlatformDependent; + +/** + * Represent an array of struct array and so can be passed directly over via JNI without the need to do any more + * array copies. + * + * The buffers are written out directly into direct memory to match the struct iov. See also man writev. + * + *
+ * struct iovec {
+ *   void  *iov_base;
+ *   size_t iov_len;
+ * };
+ * 
+ * + * See also + * + * Efficient JNI programming IV: Wrapping native data objects. + */ +final class IovArray implements ChannelOutboundBuffer.FlushedMessageProcessor { + // Maximal number of struct iov entries that can be passed to writev(...) + private static final int IOV_MAX = Native.IOV_MAX; + // The size of an address which should be 8 for 64 bits and 4 for 32 bits. + private static final int ADDRESS_SIZE = PlatformDependent.addressSize(); + // The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the + // address. + private static final int IOV_SIZE = 2 * ADDRESS_SIZE; + // The needed memory to hold up to IOV_MAX iov entries. + private static final int CAPACITY = IOV_MAX * IOV_SIZE; + + private static final FastThreadLocal ARRAY = new FastThreadLocal() { + @Override + protected IovArray initialValue() throws Exception { + return new IovArray(); + } + + @Override + protected void onRemoval(IovArray value) throws Exception { + // free the direct memory now + PlatformDependent.freeMemory(value.memoryAddress); + } + }; + + private final long memoryAddress; + private int count; + private long size; + + private IovArray() { + memoryAddress = PlatformDependent.allocateMemory(CAPACITY); + } + + /** + * Try to add the given {@link ByteBuf}. Returns {@code true} on success, + * {@code false} otherwise. + */ + private boolean add(ByteBuf buf) { + if (count == IOV_MAX) { + // No more room! + return false; + } + int len = buf.readableBytes(); + long addr = buf.memoryAddress(); + int offset = buf.readerIndex(); + + long baseOffset = memoryAddress(count++); + long lengthOffset = baseOffset + ADDRESS_SIZE; + if (ADDRESS_SIZE == 8) { + // 64bit + PlatformDependent.putLong(baseOffset, addr + offset); + PlatformDependent.putLong(lengthOffset, len); + } else { + assert ADDRESS_SIZE == 4; + PlatformDependent.putInt(baseOffset, (int) addr + offset); + PlatformDependent.putInt(lengthOffset, len); + } + size += len; + return true; + } + + /** + * Process the written iov entries. This will return the length of the iov entry on the given index if it is + * smaller then the given {@code written} value. Otherwise it returns {@code -1}. + */ + long processWritten(int index, long written) { + long baseOffset = memoryAddress(index); + long lengthOffset = baseOffset + ADDRESS_SIZE; + if (ADDRESS_SIZE == 8) { + // 64bit + long len = PlatformDependent.getLong(lengthOffset); + if (len > written) { + long offset = PlatformDependent.getLong(baseOffset); + PlatformDependent.putLong(baseOffset, offset + written); + PlatformDependent.putLong(lengthOffset, len - written); + return -1; + } + return len; + } else { + assert ADDRESS_SIZE == 4; + long len = PlatformDependent.getInt(lengthOffset); + if (len > written) { + int offset = PlatformDependent.getInt(baseOffset); + PlatformDependent.putInt(baseOffset, (int) (offset + written)); + PlatformDependent.putInt(lengthOffset, (int) (len - written)); + return -1; + } + return len; + } + } + + /** + * Returns the number if iov entries. + */ + int count() { + return count; + } + + /** + * Returns the size in bytes + */ + long size() { + return size; + } + + /** + * Returns the {@code memoryAddress} for the given {@code offset}. + */ + long memoryAddress(int offset) { + return memoryAddress + IOV_SIZE * offset; + } + + @Override + public boolean process(Object msg) throws Exception { + return msg instanceof ByteBuf && add((ByteBuf) msg); + } + + /** + * Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}. + */ + static IovArray get(ChannelOutboundBuffer buffer) throws Exception { + IovArray array = ARRAY.get(); + array.size = 0; + array.count = 0; + buffer.forEachFlushedMessage(array); + return array; + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index ff2b4123d9..33561ee884 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -69,6 +69,9 @@ final class Native { public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException; public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException; + public static native long writevAddresses(int fd, long memoryAddress, int length) + throws IOException; + public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException; public static native int readAddress(int fd, long address, int pos, int limit) throws IOException; diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index aa54e03131..5642f22655 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -374,7 +374,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ protected abstract class AbstractUnsafe implements Unsafe { - private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); + private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private boolean inFlush0; @Override diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 932dba47ea..0eba61d14d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -417,7 +417,7 @@ public final class ChannelOutboundBuffer { } private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes, - ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) { + ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) { ByteBuf directBuf; if (alloc.isDirectBufferPooled()) { directBuf = alloc.directBuffer(readableBytes); @@ -564,6 +564,34 @@ public final class ChannelOutboundBuffer { return totalPendingSize; } + /** + * Call {@link FlushedMessageProcessor#process(Object)} foreach flushed message + * in this {@link ChannelOutboundBuffer} until {@link FlushedMessageProcessor#process(Object)} + * returns {@code false} or ther are no more flushed messages to process. + */ + public void forEachFlushedMessage(FlushedMessageProcessor processor) throws Exception { + if (processor == null) { + throw new NullPointerException("processor"); + } + Entry entry = flushedEntry; + while (entry != null) { + if (!entry.cancelled) { + if (!processor.process(entry.msg)) { + return; + } + } + entry = entry.next; + } + } + + public interface FlushedMessageProcessor { + /** + * Will be called for each flushed message until it either there are no more flushed messages or this + * method returns {@code false}. + */ + boolean process(Object msg) throws Exception; + } + static final class Entry { private static final Recycler RECYCLER = new Recycler() { @Override