Optimize gathering write in the epoll transport

Motivation:

While benchmarking the native transport, I noticed that gathering write
is not as fast as expected.  It was due to the fact that we have to do a
lot of array copies to put the buffer addresses into the iovec struct
array.

Modifications:

Introduce a new class called IovArray, which allows to fill buffers
directly into an off-heap array of iovec structs, so that it can be
passed over to JNI without any extra array copies.

Result:

Big performance improvement when doing gathering writes:

Before:

[nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    23.44ms   16.37ms 259.57ms   91.77%
    Req/Sec   181.99k    31.69k  304.60k    78.12%
  346544071 requests in 2.00m, 46.48GB read
Requests/sec: 2887885.09
Transfer/sec:    396.59MB

After:

[nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    21.93ms   16.33ms 305.73ms   92.34%
    Req/Sec   194.56k    33.75k  309.33k    77.04%
  369617503 requests in 2.00m, 49.57GB read
Requests/sec: 3080169.65
Transfer/sec:    423.00MB
This commit is contained in:
Norman Maurer 2014-07-22 22:27:50 +02:00 committed by Trustin Lee
parent 997d8c32d2
commit e282e504f1
9 changed files with 415 additions and 108 deletions

View File

@ -80,6 +80,8 @@ public final class PlatformDependent {
private static final int BIT_MODE = bitMode0(); private static final int BIT_MODE = bitMode0();
private static final int ADDRESS_SIZE = addressSize0();
static { static {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED); logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
@ -173,6 +175,22 @@ public final class PlatformDependent {
return BIT_MODE; return BIT_MODE;
} }
/**
* Return the address size of the OS.
* 4 (for 32 bits systems ) and 8 (for 64 bits systems).
*/
public static int addressSize() {
return ADDRESS_SIZE;
}
public static long allocateMemory(long size) {
return PlatformDependent0.allocateMemory(size);
}
public static void freeMemory(long address) {
PlatformDependent0.freeMemory(address);
}
/** /**
* Raises an exception bypassing compiler checks for checked exceptions. * Raises an exception bypassing compiler checks for checked exceptions.
*/ */
@ -815,6 +833,13 @@ public final class PlatformDependent {
} }
} }
private static int addressSize0() {
if (!hasUnsafe()) {
return -1;
}
return PlatformDependent0.addressSize();
}
private PlatformDependent() { private PlatformDependent() {
// only static method supported // only static method supported
} }

View File

@ -365,6 +365,18 @@ final class PlatformDependent0 {
} }
} }
static int addressSize() {
return UNSAFE.addressSize();
}
static long allocateMemory(long size) {
return UNSAFE.allocateMemory(size);
}
static void freeMemory(long address) {
UNSAFE.freeMemory(address);
}
private PlatformDependent0() { private PlatformDependent0() {
} }

View File

@ -44,7 +44,7 @@ jfieldID limitFieldId = NULL;
jfieldID fileChannelFieldId = NULL; jfieldID fileChannelFieldId = NULL;
jfieldID transferedFieldId = NULL; jfieldID transferedFieldId = NULL;
jfieldID fdFieldId = NULL; jfieldID fdFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL; jfieldID fileDescriptorFieldId = NULL;;
jmethodID inetSocketAddrMethodId = NULL; jmethodID inetSocketAddrMethodId = NULL;
jmethodID datagramSocketAddrMethodId = NULL; jmethodID datagramSocketAddrMethodId = NULL;
jclass runtimeExceptionClass = NULL; jclass runtimeExceptionClass = NULL;
@ -384,6 +384,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
return JNI_ERR; return JNI_ERR;
} }
socketType = socket_type(); socketType = socket_type();
datagramSocketAddrMethodId = (*env)->GetMethodID(env, datagramSocketAddressClass, "<init>", "(Ljava/lang/String;II)V"); datagramSocketAddrMethodId = (*env)->GetMethodID(env, datagramSocketAddressClass, "<init>", "(Ljava/lang/String;II)V");
if (datagramSocketAddrMethodId == NULL) { if (datagramSocketAddrMethodId == NULL) {
throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress"); throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress");
@ -671,6 +672,29 @@ JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFromAddress(JNI
return recvFrom0(env, fd, (void*) address, pos, limit); return recvFrom0(env, fd, (void*) address, pos, limit);
} }
jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec * iov, jint length) {
ssize_t res;
int err;
do {
res = writev(fd, iov, length);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// network stack is saturated we will try again later
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
return -1;
}
return (jlong) res;
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
struct iovec iov[length]; struct iovec iov[length];
int iovidx = 0; int iovidx = 0;
@ -709,26 +733,12 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env,
// See https://github.com/netty/netty/issues/2623 // See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj); (*env)->DeleteLocalRef(env, bufObj);
} }
ssize_t res; return writev0(env, clazz, fd, iov, length);
int err; }
do {
res = writev(fd, iov, length);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) { JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length) {
if (err == EAGAIN || err == EWOULDBLOCK) { struct iovec * iov = (struct iovec *) memoryAddress;
// network stack is saturated we will try again later return writev0(env, clazz, fd, iov, length);
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
return -1;
}
return res;
} }
jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {

View File

@ -44,6 +44,7 @@ void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz,
jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length); jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length);
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);

View File

@ -32,6 +32,7 @@ import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.io.IOException; import java.io.IOException;
@ -111,7 +112,28 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
boolean done = false; boolean done = false;
long writtenBytes = 0; long writtenBytes = 0;
if (buf.nioBufferCount() == 1) { if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress();
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
for (;;) {
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) {
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
done = true;
break;
}
readerIndex += localFlushedAmount;
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
}
updateOutboundBuffer(in, writtenBytes);
return done;
} else if (buf.nioBufferCount() == 1) {
int readerIndex = buf.readerIndex(); int readerIndex = buf.readerIndex();
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes()); ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
for (;;) { for (;;) {
@ -131,91 +153,114 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
} }
updateOutboundBuffer(in, writtenBytes, 1, done); updateOutboundBuffer(in, writtenBytes);
return done; return done;
} else { } else {
ByteBuffer[] nioBuffers = buf.nioBuffers(); ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, 1, nioBuffers, nioBuffers.length, readableBytes); return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
} }
} }
private boolean writeBytesMultiple( private boolean writeBytesMultiple(
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, ChannelOutboundBuffer in, IovArray array) throws IOException {
boolean done = false;
long expectedWrittenBytes = array.size();
int cnt = array.count();
long writtenBytes = 0;
int offset = 0;
int end = offset + cnt;
for (;;) {
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
long bytes = array.processWritten(offset, localWrittenBytes);
if (bytes == -1) {
// incomplete write
break;
} else {
offset++;
cnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
}
updateOutboundBuffer(in, writtenBytes);
return done;
}
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException { int nioBufferCnt, long expectedWrittenBytes) throws IOException {
boolean done = false; boolean done = false;
long writtenBytes = 0; long writtenBytes = 0;
int offset = 0; int offset = 0;
int end = offset + nioBufferCnt; int end = offset + nioBufferCnt;
loop: while (nioBufferCnt > 0) { for (;;) {
for (;;) { long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt; if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); setEpollOut();
if (localWrittenBytes == 0) { break;
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break loop;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
updateOutboundBuffer(in, writtenBytes, msgCount, done); updateOutboundBuffer(in, writtenBytes);
return done; return done;
} }
private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes) {
boolean done) { for (;;) {
if (done) { final ByteBuf buf = (ByteBuf) in.current();
// Release all buffers final int readerIndex = buf.readerIndex();
for (int i = msgCount; i > 0; i --) { final int readableBytes = buf.writerIndex() - readerIndex;
final ByteBuf buf = (ByteBuf) in.current();
in.progress(buf.readableBytes()); if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove(); in.remove();
} writtenBytes -= readableBytes;
} else { } else if (readableBytes > writtenBytes) {
// Did not write all buffers completely. buf.readerIndex(readerIndex + (int) writtenBytes);
// Release the fully written buffers and update the indexes of the partially written buffer. in.progress(writtenBytes);
break;
// Did not write all buffers completely. } else { // readable == writtenBytes
// Release the fully written buffers and update the indexes of the partially written buffer. in.progress(readableBytes);
for (int i = msgCount; i > 0; i --) { in.remove();
final ByteBuf buf = (ByteBuf) in.current(); break;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readable == writtenBytes
in.progress(readableBytes);
in.remove();
break;
}
} }
} }
} }
@ -268,21 +313,38 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
// Do gathering write if: // Do gathering write if:
// * the outbound buffer contains more than one messages and // * the outbound buffer contains more than one messages and
// * they are all buffers rather than a file region. // * they are all buffers rather than a file region.
if (msgCount > 1) { if (msgCount >= 1) {
// Ensure the pending writes are made of ByteBufs only. if (PlatformDependent.hasUnsafe()) {
ByteBuffer[] nioBuffers = in.nioBuffers(); // this means we can cast to IovArray and write the IovArray directly.
int nioBufferCount = in.nioBufferCount(); IovArray array = IovArray.get(in);
if (nioBufferCount != 0) { int cnt = array.count();
if (!writeBytesMultiple(in, msgCount, nioBuffers, nioBufferCount, in.nioBufferSize())) { if (cnt > 1) {
// was not able to write everything so break here we will get notified later again once if (!writeBytesMultiple(in, array)) {
// the network stack can handle more writes. // was not able to write everything so break here we will get notified later again once
break; // the network stack can handle more writes.
} break;
}
// We do not break the loop here even if the outbound buffer was flushed completely, // We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her // because a user might have triggered another write and flush when we notify his or her
// listeners. // listeners.
continue; continue;
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt > 1) {
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
} }
} }
@ -362,15 +424,15 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
public void write(Object msg, ChannelPromise promise) { public void write(Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
if (!buf.isDirect()) { if (PlatformDependent.hasUnsafe() && !buf.hasMemoryAddress()) {
// We can only handle direct buffers so we need to copy if a non direct is // We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write. // passed to write.
int readable = buf.readableBytes(); int readable = buf.readableBytes();
ByteBuf dst = alloc().directBuffer(readable); ByteBuf dst = alloc().directBuffer(readable);
dst.writeBytes(buf, buf.readerIndex(), readable); dst.writeBytes(buf, buf.readerIndex(), readable);
buf.release(); buf.release();
msg = dst; msg = dst;
assert dst.hasMemoryAddress();
} }
} }
super.write(msg, promise); super.write(msg, promise);

View File

@ -0,0 +1,166 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
/**
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
* array copies.
*
* The buffers are written out directly into direct memory to match the struct iov. See also <code>man writev</code>.
*
* <pre>
* struct iovec {
* void *iov_base;
* size_t iov_len;
* };
* </pre>
*
* See also
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/">
* Efficient JNI programming IV: Wrapping native data objects</a>.
*/
final class IovArray implements ChannelOutboundBuffer.FlushedMessageProcessor {
// Maximal number of struct iov entries that can be passed to writev(...)
private static final int IOV_MAX = Native.IOV_MAX;
// The size of an address which should be 8 for 64 bits and 4 for 32 bits.
private static final int ADDRESS_SIZE = PlatformDependent.addressSize();
// The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the
// address.
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
// The needed memory to hold up to IOV_MAX iov entries.
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
@Override
protected IovArray initialValue() throws Exception {
return new IovArray();
}
@Override
protected void onRemoval(IovArray value) throws Exception {
// free the direct memory now
PlatformDependent.freeMemory(value.memoryAddress);
}
};
private final long memoryAddress;
private int count;
private long size;
private IovArray() {
memoryAddress = PlatformDependent.allocateMemory(CAPACITY);
}
/**
* Try to add the given {@link ByteBuf}. Returns {@code true} on success,
* {@code false} otherwise.
*/
private boolean add(ByteBuf buf) {
if (count == IOV_MAX) {
// No more room!
return false;
}
int len = buf.readableBytes();
long addr = buf.memoryAddress();
int offset = buf.readerIndex();
long baseOffset = memoryAddress(count++);
long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) {
// 64bit
PlatformDependent.putLong(baseOffset, addr + offset);
PlatformDependent.putLong(lengthOffset, len);
} else {
assert ADDRESS_SIZE == 4;
PlatformDependent.putInt(baseOffset, (int) addr + offset);
PlatformDependent.putInt(lengthOffset, len);
}
size += len;
return true;
}
/**
* Process the written iov entries. This will return the length of the iov entry on the given index if it is
* smaller then the given {@code written} value. Otherwise it returns {@code -1}.
*/
long processWritten(int index, long written) {
long baseOffset = memoryAddress(index);
long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) {
// 64bit
long len = PlatformDependent.getLong(lengthOffset);
if (len > written) {
long offset = PlatformDependent.getLong(baseOffset);
PlatformDependent.putLong(baseOffset, offset + written);
PlatformDependent.putLong(lengthOffset, len - written);
return -1;
}
return len;
} else {
assert ADDRESS_SIZE == 4;
long len = PlatformDependent.getInt(lengthOffset);
if (len > written) {
int offset = PlatformDependent.getInt(baseOffset);
PlatformDependent.putInt(baseOffset, (int) (offset + written));
PlatformDependent.putInt(lengthOffset, (int) (len - written));
return -1;
}
return len;
}
}
/**
* Returns the number if iov entries.
*/
int count() {
return count;
}
/**
* Returns the size in bytes
*/
long size() {
return size;
}
/**
* Returns the {@code memoryAddress} for the given {@code offset}.
*/
long memoryAddress(int offset) {
return memoryAddress + IOV_SIZE * offset;
}
@Override
public boolean process(Object msg) throws Exception {
return msg instanceof ByteBuf && add((ByteBuf) msg);
}
/**
* Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
*/
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
IovArray array = ARRAY.get();
array.size = 0;
array.count = 0;
buffer.forEachFlushedMessage(array);
return array;
}
}

View File

@ -69,6 +69,9 @@ final class Native {
public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException; public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException;
public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException; public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException;
public static native long writevAddresses(int fd, long memoryAddress, int length)
throws IOException;
public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException; public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException;
public static native int readAddress(int fd, long address, int pos, int limit) throws IOException; public static native int readAddress(int fd, long address, int pos, int limit) throws IOException;

View File

@ -374,7 +374,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/ */
protected abstract class AbstractUnsafe implements Unsafe { protected abstract class AbstractUnsafe implements Unsafe {
private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private boolean inFlush0; private boolean inFlush0;
@Override @Override

View File

@ -417,7 +417,7 @@ public final class ChannelOutboundBuffer {
} }
private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes, private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes,
ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) { ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) {
ByteBuf directBuf; ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) { if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes); directBuf = alloc.directBuffer(readableBytes);
@ -564,6 +564,34 @@ public final class ChannelOutboundBuffer {
return totalPendingSize; return totalPendingSize;
} }
/**
* Call {@link FlushedMessageProcessor#process(Object)} foreach flushed message
* in this {@link ChannelOutboundBuffer} until {@link FlushedMessageProcessor#process(Object)}
* returns {@code false} or ther are no more flushed messages to process.
*/
public void forEachFlushedMessage(FlushedMessageProcessor processor) throws Exception {
if (processor == null) {
throw new NullPointerException("processor");
}
Entry entry = flushedEntry;
while (entry != null) {
if (!entry.cancelled) {
if (!processor.process(entry.msg)) {
return;
}
}
entry = entry.next;
}
}
public interface FlushedMessageProcessor {
/**
* Will be called for each flushed message until it either there are no more flushed messages or this
* method returns {@code false}.
*/
boolean process(Object msg) throws Exception;
}
static final class Entry { static final class Entry {
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override @Override