diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index cbc9a1d310..921edb322a 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -663,104 +663,63 @@ void incrementPosition(JNIEnv * env, jobject bufObj, int written) { } JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { - // Calculate maximal size of iov - // - // See https://github.com/netty/netty/issues/2647 - int iovLen = IOV_MAX < length ? IOV_MAX : length; - struct iovec iov[iovLen]; - jlong w = 0; - while (length > 0) { - int i; - int iovidx = 0; - int loop = IOV_MAX < length ? IOV_MAX : length; - int num = offset + loop; - for (i = offset; i < num; i++) { - jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); - jint pos; - // Get the current position using the (*env)->GetIntField if possible and fallback - // to slower (*env)->CallIntMethod(...) if needed - if (posFieldId == NULL) { - pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); - } else { - pos = (*env)->GetIntField(env, bufObj, posFieldId); - } - jint limit; - // Get the current limit using the (*env)->GetIntField if possible and fallback - // to slower (*env)->CallIntMethod(...) if needed - if (limitFieldId == NULL) { - limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); - } else { - limit = (*env)->GetIntField(env, bufObj, limitFieldId); - } - void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); - if (buffer == NULL) { - throwRuntimeException(env, "Unable to access address of buffer"); - return -1; - } - iov[iovidx].iov_base = buffer + pos; - iov[iovidx].iov_len = (size_t) (limit - pos); - iovidx++; - - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); + struct iovec iov[length]; + int iovidx = 0; + int i; + int num = offset + length; + for (i = offset; i < num; i++) { + jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); + jint pos; + // Get the current position using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (posFieldId == NULL) { + pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); + } else { + pos = (*env)->GetIntField(env, bufObj, posFieldId); } - ssize_t res; - int err; - do { - res = writev(fd, iov, loop); - // keep on writing if it was interrupted - } while(res == -1 && ((err = errno) == EINTR)); - - if (res < 0) { - if (err == EAGAIN || err == EWOULDBLOCK) { - // network stack is saturated we will try again later - return w; - } - if (err == EBADF) { - throwClosedChannelException(env); - return -1; - } - throwIOException(env, exceptionMessage("Error while writev(...): ", err)); + jint limit; + // Get the current limit using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (limitFieldId == NULL) { + limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); + } else { + limit = (*env)->GetIntField(env, bufObj, limitFieldId); + } + void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); return -1; } + iov[iovidx].iov_base = buffer + pos; + iov[iovidx].iov_len = (size_t) (limit - pos); + iovidx++; - w += res; - - // update the position of the written buffers - int written = res; - int a; - for (a = 0; a < loop; a++) { - int len = iov[a].iov_len; - jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset); - if (len > written) { - incrementPosition(env, bufObj, written); - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); - - // incomplete write which means the channel is not writable anymore. Return now! - return w; - } else { - incrementPosition(env, bufObj, len); - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); - written -= len; - } - - } - - offset += loop; - length -= loop; + // Explicit delete local reference as otherwise the local references will only be released once the native method returns. + // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. + // + // See https://github.com/netty/netty/issues/2623 + (*env)->DeleteLocalRef(env, bufObj); } - return w; + ssize_t res; + int err; + do { + res = writev(fd, iov, length); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // network stack is saturated we will try again later + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while writev(...): ", err)); + return -1; + } + return res; } jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { @@ -1181,3 +1140,7 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err)); return NULL; } + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) { + return IOV_MAX; +} \ No newline at end of file diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 830e880e71..ef88890084 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -92,3 +92,4 @@ jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv *env, jclass claz jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv *env, jclass clazz, jint fd); jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 4ed522858a..daa578b9da 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -128,20 +128,43 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So boolean done = false; boolean setEpollOut = false; long writtenBytes = 0; + int offset = 0; + int end = offset + nioBufferCnt; + int spinCount = config.getWriteSpinCount(); + loop: while (nioBufferCnt > 0) { + for (int i = spinCount - 1; i >= 0; i --) { + int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt; - for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); - if (localWrittenBytes == 0) { - setEpollOut = true; - break; + long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); + if (localWrittenBytes == 0) { + setEpollOut = true; + break loop; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + + while (offset < end && localWrittenBytes > 0) { + ByteBuffer buffer = nioBuffers[offset]; + int pos = buffer.position(); + int bytes = buffer.limit() - pos; + if (bytes > localWrittenBytes) { + buffer.position(pos + (int) localWrittenBytes); + // incomplete write + break; + } else { + offset++; + nioBufferCnt--; + localWrittenBytes -= bytes; + } + } } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; + if (expectedWrittenBytes == 0) { done = true; break; } } + if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 4463d291bd..ff2b4123d9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -51,6 +51,7 @@ final class Native { public static final int EPOLLOUT = 0x02; public static final int EPOLLACCEPT = 0x04; public static final int EPOLLRDHUP = 0x08; + public static int IOV_MAX = iovMax(); public static native int eventFd(); public static native void eventFdWrite(int fd, long value); @@ -218,6 +219,9 @@ final class Native { } public static native String kernelVersion(); + + private static native int iovMax(); + private Native() { // utility }