diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java index 89c21ef371..c84f71635d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java @@ -46,7 +46,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { } public void testGatheringWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testGatheringWrite0(sb, cb, false, true); + testGatheringWrite0(sb, cb, data, false, true); } @Test(timeout = 30000) @@ -55,7 +55,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { } public void testGatheringWriteNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testGatheringWrite0(sb, cb, false, false); + testGatheringWrite0(sb, cb, data, false, false); } @Test(timeout = 30000) @@ -64,7 +64,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { } public void testGatheringWriteWithCompositeNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testGatheringWrite0(sb, cb, true, false); + testGatheringWrite0(sb, cb, data, true, false); } @Test(timeout = 30000) @@ -73,11 +73,23 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { } public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testGatheringWrite0(sb, cb, true, true); + testGatheringWrite0(sb, cb, data, true, true); + } + + // Test for https://github.com/netty/netty/issues/2647 + @Test(timeout = 30000) + public void testGatheringWriteBig() throws Throwable { + run(); + } + + public void testGatheringWriteBig(ServerBootstrap sb, Bootstrap cb) throws Throwable { + byte[] bigData = new byte[1024 * 1024 * 50]; + random.nextBytes(bigData); + testGatheringWrite0(sb, cb, bigData, false, true); } private static void testGatheringWrite0( - ServerBootstrap sb, Bootstrap cb, boolean composite, boolean autoRead) throws Throwable { + ServerBootstrap sb, Bootstrap cb, byte[] data, boolean composite, boolean autoRead) throws Throwable { final TestHandler sh = new TestHandler(autoRead); final TestHandler ch = new TestHandler(autoRead); @@ -88,7 +100,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { Channel cc = cb.connect().sync().channel(); for (int i = 0; i < data.length;) { - int length = Math.min(random.nextInt(1024 * 64), data.length - i); + int length = Math.min(random.nextInt(1024 * 8), data.length - i); ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); if (composite && i % 2 == 0) { int split = buf.readableBytes() / 2; diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index c61183c23a..bef1cedc89 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -711,94 +711,141 @@ jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint leng } JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { - struct iovec iov[length]; - int i; - int iovidx = 0; - for (i = offset; i < length; i++) { - jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); - jint pos; - // Get the current position using the (*env)->GetIntField if possible and fallback - // to slower (*env)->CallIntMethod(...) if needed - if (posFieldId == NULL) { - pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); - } else { - pos = (*env)->GetIntField(env, bufObj, posFieldId); - } - jint limit; - // Get the current limit using the (*env)->GetIntField if possible and fallback - // to slower (*env)->CallIntMethod(...) if needed - if (limitFieldId == NULL) { - limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); - } else { - limit = (*env)->GetIntField(env, bufObj, limitFieldId); - } - void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); - if (buffer == NULL) { - throwRuntimeException(env, "Unable to access address of buffer"); - return -1; - } - iov[iovidx].iov_base = buffer + pos; - iov[iovidx].iov_len = (size_t) (limit - pos); - iovidx++; + // Calculate maximal size of iov + // + // See https://github.com/netty/netty/issues/2647 + int iovLen = IOV_MAX < length ? IOV_MAX : length; + struct iovec iov[iovLen]; + jlong w = 0; + while (length > 0) { + int i; + int iovidx = 0; + int loop = IOV_MAX < length ? IOV_MAX : length; + int num = offset + loop; + for (i = offset; i < num; i++) { + jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); + jint pos; + // Get the current position using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (posFieldId == NULL) { + pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); + } else { + pos = (*env)->GetIntField(env, bufObj, posFieldId); + } + jint limit; + // Get the current limit using the (*env)->GetIntField if possible and fallback + // to slower (*env)->CallIntMethod(...) if needed + if (limitFieldId == NULL) { + limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); + } else { + limit = (*env)->GetIntField(env, bufObj, limitFieldId); + } + void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + iov[iovidx].iov_base = buffer + pos; + iov[iovidx].iov_len = (size_t) (limit - pos); + iovidx++; + + // Explicit delete local reference as otherwise the local references will only be released once the native method returns. + // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. + // + // See https://github.com/netty/netty/issues/2623 + (*env)->DeleteLocalRef(env, bufObj); + } + jlong res = writev0(env, clazz, fd, iov, loop); + if (res <= 0) { + return res < 0 ? res : w; + } + + w += res; + offset += loop; + length -= loop; + + // update the position of the written buffers + int written = res; + int a; + for (a = 0; a < loop; a++) { + int len = iov[a].iov_len; + jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset); + if (len > written) { + incrementPosition(env, bufObj, written); + // Explicit delete local reference as otherwise the local references will only be released once the native method returns. + // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. + // + // See https://github.com/netty/netty/issues/2623 + (*env)->DeleteLocalRef(env, bufObj); + + // incomplete write which means the channel is not writable anymore. Return now! + return w; + } else { + incrementPosition(env, bufObj, len); + // Explicit delete local reference as otherwise the local references will only be released once the native method returns. + // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. + // + // See https://github.com/netty/netty/issues/2623 + (*env)->DeleteLocalRef(env, bufObj); + written -= len; + } - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); - } - jlong res = writev0(env, clazz, fd, iov, length); - if (res <= 0) { - return res; - } - - // update the position of the written buffers - int written = res; - int a; - for (a = 0; a < length; a++) { - int len = iov[a].iov_len; - jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset); - if (len >= written) { - incrementPosition(env, bufObj, written); - break; - } else { - incrementPosition(env, bufObj, len); - written -= len; } - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, bufObj); } - return res; + return w; } JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) { - struct iovec iov[length]; - int i; - int iovidx = 0; - for (i = offset; i < length; i++) { - jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i); - jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId); - jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId); - void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId); + // Calculate maximal size of iov + // + // See https://github.com/netty/netty/issues/2647 + int iovLen = IOV_MAX < length ? IOV_MAX : length; + struct iovec iov[iovLen]; + jlong w = 0; + while (length > 0) { + int i; + int iovidx = 0; + int loop = IOV_MAX < length ? IOV_MAX : length; + int num = offset + loop; + for (i = offset; i < num; i++) { + jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i); + jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId); + jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId); + void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId); - iov[iovidx].iov_base = memoryAddress + readerIndex; - iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex); - iovidx++; + iov[iovidx].iov_base = memoryAddress + readerIndex; + iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex); + iovidx++; - // Explicit delete local reference as otherwise the local references will only be released once the native method returns. - // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. - // - // See https://github.com/netty/netty/issues/2623 - (*env)->DeleteLocalRef(env, addressEntry); - } - - jlong res = writev0(env, clazz, fd, iov, length); - if (res <= 0) { - return res; + // Explicit delete local reference as otherwise the local references will only be released once the native method returns. + // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. + // + // See https://github.com/netty/netty/issues/2623 + (*env)->DeleteLocalRef(env, addressEntry); + } + + jlong res = writev0(env, clazz, fd, iov, loop); + if (res <= 0) { + return res < 0 ? res : w; + } + + w += res; + offset += loop; + length -= loop; + + // update the position of the written buffers + int written = res; + int a; + for (a = 0; a < loop; a++) { + int len = iov[a].iov_len; + if (len > written) { + // incomplete write which means the channel is not writable anymore. Return now! + return w; + } + written -= len; + } } + return w; } jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 558b1c5551..c28d676d88 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -14,7 +14,7 @@ * under the License. */ #include - +#include #define EPOLL_READ 0x01 #define EPOLL_WRITE 0x02 @@ -27,6 +27,12 @@ #define SO_REUSEPORT 15 #endif /* SO_REUSEPORT */ +// Define IOV_MAX if not found to limit the iov size on writev calls +// See https://github.com/netty/netty/issues/2647 +#ifndef IOV_MAX +#define IOV_MAX 1024 +#endif /* IOV_MAX */ + jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz); void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value); void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 11f8af80b0..20eb586b31 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -133,7 +133,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So int nioBufferCnt = in.addressCount(); long expectedWrittenBytes = in.addressSize(); - long localWrittenBytes = Native.writevAddresses(fd, nioBuffers, 0, nioBufferCnt); if (localWrittenBytes < expectedWrittenBytes) {