From 4db6c655349df5581332dac5059a67f087756158 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 17 Jul 2014 16:00:53 +0200 Subject: [PATCH] [#2647] Handle IOV_MAX in java code Motivation: The handling of IOV_MAX was done in JNI code base which makes stuff really complicated to maintain etc. Modifications: Move handling of IOV_MAX to java code to simplify stuff Result: Cleaner code. --- .../main/c/io_netty_channel_epoll_Native.c | 182 ++++++------------ .../main/c/io_netty_channel_epoll_Native.h | 1 + .../channel/epoll/EpollSocketChannel.java | 107 ++++++---- .../java/io/netty/channel/epoll/Native.java | 4 + 4 files changed, 134 insertions(+), 160 deletions(-) diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 3e14d27e7d..ecccaeced7 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -711,141 +711,69 @@ jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint leng } JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { - // Calculate maximal size of iov - // - // See https://github.com/netty/netty/issues/2647 - int iovLen = IOV_MAX < length ? IOV_MAX : length; - struct iovec iov[iovLen]; - jlong w = 0; - while (length > 0) { - int i; - int iovidx = 0; - int loop = IOV_MAX < length ? IOV_MAX : length; - int num = offset + loop; - for (i = offset; i < num; i++) { - jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); - jint pos; - // Get the current position using the (*env)->GetIntField if possible and fallback - // to slower (*env)->CallIntMethod(...) if needed - if (posFieldId == NULL) { - pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); - } else { - pos = (*env)->GetIntField(env, bufObj, posFieldId); - } - jint limit; - // Get the current limit using the (*env)->GetIntField if possible and fallback - // to slower (*env)->CallIntMethod(...) if needed - if (limitFieldId == NULL) { - limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); - } else { - limit = (*env)->GetIntField(env, bufObj, limitFieldId); - } - void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); - if (buffer == NULL) { - throwRuntimeException(env, "Unable to access address of buffer"); - return -1; - } - iov[iovidx].iov_base = buffer + pos; - iov[iovidx].iov_len = (size_t) (limit - pos); - iovidx++; - - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); + struct iovec iov[length]; + int iovidx = 0; + int i; + int num = offset + length; + for (i = offset; i < num; i++) { + jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); + jint pos; + // Get the current position using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (posFieldId == NULL) { + pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); + } else { + pos = (*env)->GetIntField(env, bufObj, posFieldId); } - jlong res = writev0(env, clazz, fd, iov, loop); - if (res <= 0) { - return res < 0 ? res : w; + jint limit; + // Get the current limit using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (limitFieldId == NULL) { + limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); + } else { + limit = (*env)->GetIntField(env, bufObj, limitFieldId); } - - w += res; - - // update the position of the written buffers - int written = res; - int a; - for (a = 0; a < loop; a++) { - int len = iov[a].iov_len; - jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset); - if (len > written) { - incrementPosition(env, bufObj, written); - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); - - // incomplete write which means the channel is not writable anymore. Return now! - return w; - } else { - incrementPosition(env, bufObj, len); - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); - written -= len; - } - + void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; } - offset += loop; - length -= loop; + iov[iovidx].iov_base = buffer + pos; + iov[iovidx].iov_len = (size_t) (limit - pos); + iovidx++; + + // Explicit delete local reference as otherwise the local references will only be released once the native method returns. + // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. + // + // See https://github.com/netty/netty/issues/2623 + (*env)->DeleteLocalRef(env, bufObj); } - return w; + return writev0(env, clazz, fd, iov, length); } JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) { - // Calculate maximal size of iov - // - // See https://github.com/netty/netty/issues/2647 - int iovLen = IOV_MAX < length ? IOV_MAX : length; - struct iovec iov[iovLen]; - jlong w = 0; - while (length > 0) { - int i; - int iovidx = 0; - int loop = IOV_MAX < length ? IOV_MAX : length; - int num = offset + loop; - for (i = offset; i < num; i++) { - jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i); - jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId); - jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId); - void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId); + struct iovec iov[length]; + int iovidx = 0; + int i; + int num = offset + length; + for (i = offset; i < num; i++) { + jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i); + jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId); + jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId); + void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId); - iov[iovidx].iov_base = memoryAddress + readerIndex; - iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex); - iovidx++; + iov[iovidx].iov_base = memoryAddress + readerIndex; + iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex); + iovidx++; - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, addressEntry); - } - - jlong res = writev0(env, clazz, fd, iov, loop); - if (res <= 0) { - return res < 0 ? res : w; - } - - w += res; - offset += loop; - length -= loop; - - // update the position of the written buffers - int written = res; - int a; - for (a = 0; a < loop; a++) { - int len = iov[a].iov_len; - if (len > written) { - // incomplete write which means the channel is not writable anymore. Return now! - return w; - } - written -= len; - } + // Explicit delete local reference as otherwise the local references will only be released once the native method returns. + // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. + // + // See https://github.com/netty/netty/issues/2623 + (*env)->DeleteLocalRef(env, addressEntry); } - return w; + + return writev0(env, clazz, fd, iov, length); } jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { @@ -1266,3 +1194,7 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err)); return NULL; } + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) { + return IOV_MAX; +} diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index c28d676d88..40f42f86bd 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -93,3 +93,4 @@ jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv *env, jclass claz jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv *env, jclass clazz, jint fd); jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index d9e63de16c..d7f86b5236 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -129,41 +129,51 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } private void writeBytesMultiple( - EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] nioBuffers) throws IOException { + EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses) throws IOException { - int nioBufferCnt = in.addressCount(); + int addressCnt = in.addressCount(); long expectedWrittenBytes = in.addressSize(); - long localWrittenBytes = Native.writevAddresses(fd, nioBuffers, 0, nioBufferCnt); + boolean done = false; + boolean setEpollOut = false; + long writtenBytes = 0; + int offset = 0; + int end = offset + addressCnt; + int spinCount = config.getWriteSpinCount(); + loop: while (addressCnt > 0) { + for (int i = spinCount - 1; i >= 0; i --) { + int cnt = addressCnt > Native.IOV_MAX? Native.IOV_MAX : addressCnt; - if (localWrittenBytes < expectedWrittenBytes) { - setEpollOut(); + long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt); + if (localWrittenBytes == 0) { + setEpollOut = true; + break loop; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; - // Did not write all buffers completely. - // Release the fully written buffers and update the indexes of the partially written buffer. - for (int i = msgCount; i > 0; i --) { - final ByteBuf buf = (ByteBuf) in.current(); - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; - - if (readableBytes < localWrittenBytes) { - in.remove(); - localWrittenBytes -= readableBytes; - } else if (readableBytes > localWrittenBytes) { - - buf.readerIndex(readerIndex + (int) localWrittenBytes); - in.progress(localWrittenBytes); - break; - } else { // readable == writtenBytes - in.remove(); - break; + while (offset < end && localWrittenBytes > 0) { + AddressEntry address = addresses[offset]; + int readerIndex = address.readerIndex; + int bytes = address.writerIndex - readerIndex; + if (bytes > localWrittenBytes) { + address.readerIndex += (int) localWrittenBytes; + // incomplete write + break; + } else { + offset++; + addressCnt--; + localWrittenBytes -= bytes; + } } } - } else { - // Release all buffers - for (int i = msgCount; i > 0; i --) { - in.remove(); + + if (expectedWrittenBytes == 0) { + done = true; + break; } } + + updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); } private void writeBytesMultiple( @@ -174,20 +184,48 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So boolean done = false; boolean setEpollOut = false; long writtenBytes = 0; + int offset = 0; + int end = offset + nioBufferCnt; + int spinCount = config.getWriteSpinCount(); + loop: while (nioBufferCnt > 0) { + for (int i = spinCount - 1; i >= 0; i --) { + int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt; - for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); - if (localWrittenBytes == 0) { - setEpollOut = true; - break; + long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); + if (localWrittenBytes == 0) { + setEpollOut = true; + break loop; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + + while (offset < end && localWrittenBytes > 0) { + ByteBuffer buffer = nioBuffers[offset]; + int pos = buffer.position(); + int bytes = buffer.limit() - pos; + if (bytes > localWrittenBytes) { + buffer.position(pos + (int) localWrittenBytes); + // incomplete write + break; + } else { + offset++; + nioBufferCnt--; + localWrittenBytes -= bytes; + } + } } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; + if (expectedWrittenBytes == 0) { done = true; break; } } + + updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); + } + + private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, + boolean done, boolean setEpollOut) { if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { @@ -224,7 +262,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So incompleteWrite(setEpollOut); } } - private void incompleteWrite(boolean setEpollOut) { // Did not write completely. if (setEpollOut) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 2e9cb439e5..240aafe586 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -52,6 +52,7 @@ final class Native { public static final int EPOLLOUT = 0x02; public static final int EPOLLACCEPT = 0x04; public static final int EPOLLRDHUP = 0x08; + public static int IOV_MAX = iovMax(); public static native int eventFd(); public static native void eventFdWrite(int fd, long value); @@ -222,6 +223,9 @@ final class Native { } public static native String kernelVersion(); + + private static native int iovMax(); + private Native() { // utility }